PoolManager.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. /* Copyright (C) 1883 Thomas Edison - All Rights Reserved
  2. * You may use, distribute and modify this code under the
  3. * terms of the GPLv3 license, which unfortunately won't be
  4. * written for another century.
  5. *
  6. * You should have received a copy of the LICENSE file with
  7. * this file.
  8. */
  9. #include <chrono>
  10. #include "PoolManager.h"
  11. using namespace std;
  12. using namespace dev;
  13. using namespace eth;
  14. PoolManager* PoolManager::m_this = nullptr;
  15. PoolManager::PoolManager(PoolSettings _settings)
  16. : m_Settings(move(_settings)), m_io_strand(g_io_service), m_failovertimer(g_io_service),
  17. m_submithrtimer(g_io_service), m_reconnecttimer(g_io_service), m_lastBlock(-1) {
  18. m_this = this;
  19. m_currentWp.header = h256();
  20. Farm::f().onMinerRestart([&]() {
  21. cnote << "Restart miners...";
  22. if (Farm::f().isMining()) {
  23. cnote << "Shutting down miners...";
  24. Farm::f().stop();
  25. }
  26. cnote << "Spinning up miners...";
  27. Farm::f().start();
  28. });
  29. Farm::f().onSolutionFound([&](const Solution& sol) {
  30. // Solution should passthrough only if client is
  31. // properly connected. Otherwise we'll have the bad behavior
  32. // to log nonce submission but receive no response
  33. if (p_client && p_client->isConnected()) {
  34. p_client->submitSolution(sol);
  35. } else {
  36. cnote << string(EthOrange "Solution 0x") + toHex(sol.nonce) << " wasted. Waiting for connection...";
  37. }
  38. return false;
  39. });
  40. }
  41. void PoolManager::setClientHandlers() {
  42. p_client->onConnected([&]() {
  43. {
  44. cnote << "Established connection to " << m_selectedHost;
  45. m_connectionAttempt = 0;
  46. // Reset current WorkPackage
  47. m_currentWp.job.clear();
  48. m_currentWp.header = h256();
  49. // Rough implementation to return to primary pool
  50. // after specified amount of time
  51. if (m_activeConnectionIdx != 0 && m_Settings.poolFailoverTimeout) {
  52. m_failovertimer.expires_from_now(boost::posix_time::minutes(m_Settings.poolFailoverTimeout));
  53. m_failovertimer.async_wait(m_io_strand.wrap(
  54. boost::bind(&PoolManager::failovertimer_elapsed, this, boost::asio::placeholders::error)));
  55. } else
  56. m_failovertimer.cancel();
  57. }
  58. if (!Farm::f().isMining()) {
  59. cnote << "Spinning up miners...";
  60. Farm::f().start();
  61. } else if (Farm::f().paused()) {
  62. cnote << "Resume mining ...";
  63. Farm::f().resume();
  64. }
  65. // Activate timing for HR submission
  66. if (m_Settings.reportHashrate) {
  67. m_submithrtimer.expires_from_now(boost::posix_time::seconds(m_Settings.hashRateInterval));
  68. m_submithrtimer.async_wait(m_io_strand.wrap(
  69. boost::bind(&PoolManager::submithrtimer_elapsed, this, boost::asio::placeholders::error)));
  70. }
  71. // Signal async operations have completed
  72. m_async_pending.store(false, memory_order_relaxed);
  73. });
  74. p_client->onDisconnected([&]() {
  75. cnote << "Disconnected from " << m_selectedHost;
  76. // Clear current connection
  77. p_client->unsetConnection();
  78. m_currentWp.header = h256();
  79. // Stop timing actors
  80. m_failovertimer.cancel();
  81. m_submithrtimer.cancel();
  82. if (m_stopping.load(memory_order_relaxed)) {
  83. if (Farm::f().isMining()) {
  84. cnote << "Shutting down miners...";
  85. Farm::f().stop();
  86. }
  87. m_running.store(false, memory_order_relaxed);
  88. } else {
  89. // Signal we will reconnect async
  90. m_async_pending.store(true, memory_order_relaxed);
  91. // Suspend mining and submit new connection request
  92. cnote << "No connection. Suspend mining ...";
  93. Farm::f().pause();
  94. g_io_service.post(m_io_strand.wrap(boost::bind(&PoolManager::rotateConnect, this)));
  95. }
  96. });
  97. p_client->onWorkReceived([&](WorkPackage const& wp) {
  98. // Should not happen !
  99. if (!wp)
  100. return;
  101. int _currentEpoch = m_currentWp.epoch;
  102. bool newEpoch = (_currentEpoch == -1);
  103. // In EthereumStratum/2.0.0 epoch number is set in session
  104. if (!newEpoch) {
  105. if (p_client->getConnection()->StratumMode() == 3)
  106. newEpoch = (wp.epoch != m_currentWp.epoch);
  107. else
  108. newEpoch = (wp.seed != m_currentWp.seed);
  109. }
  110. bool newDiff = (wp.boundary != m_currentWp.boundary);
  111. m_currentWp.difficulty = wp.difficulty;
  112. m_currentWp = wp;
  113. if (newEpoch) {
  114. m_epochChanges.fetch_add(1, memory_order_relaxed);
  115. // If epoch is valued in workpackage take it
  116. if (wp.epoch == -1) {
  117. if (m_currentWp.block >= 0)
  118. m_currentWp.epoch = m_currentWp.block / 30000;
  119. else
  120. m_currentWp.epoch = ethash::find_epoch_number(ethash::hash256_from_bytes(m_currentWp.seed.data()));
  121. }
  122. } else {
  123. m_currentWp.epoch = _currentEpoch;
  124. }
  125. if (newDiff || newEpoch)
  126. showMiningAt();
  127. cnote << "Job: " EthWhite << m_currentWp.header.abridged() << EthGray
  128. << (m_currentWp.block != -1 ? " blk: " : "") << (m_lastBlock == m_currentWp.block ? EthGray : EthWhite)
  129. << (m_currentWp.block != -1 ? to_string(m_currentWp.block) : "") << EthReset << " " << m_selectedHost;
  130. m_lastBlock = m_currentWp.block;
  131. Farm::f().setWork(m_currentWp);
  132. });
  133. p_client->onSolutionAccepted(
  134. [&](chrono::milliseconds const& _responseDelay, unsigned const& _minerIdx, bool _asStale) {
  135. stringstream ss;
  136. ss << setw(4) << setfill(' ') << _responseDelay.count() << " ms. " << m_selectedHost;
  137. cnote << EthLime "**Accepted" << (_asStale ? " stale" : "") << EthReset << ss.str();
  138. Farm::f().accountSolution(_minerIdx, SolutionAccountingEnum::Accepted);
  139. });
  140. p_client->onSolutionRejected([&](chrono::milliseconds const& _responseDelay, unsigned const& _minerIdx) {
  141. stringstream ss;
  142. ss << setw(4) << setfill(' ') << _responseDelay.count() << " ms. " << m_selectedHost;
  143. cwarn << EthRed "**Rejected" EthReset << ss.str();
  144. Farm::f().accountSolution(_minerIdx, SolutionAccountingEnum::Rejected);
  145. });
  146. }
  147. void PoolManager::stop() {
  148. if (m_running.load(memory_order_relaxed)) {
  149. m_async_pending.store(true, memory_order_relaxed);
  150. m_stopping.store(true, memory_order_relaxed);
  151. if (p_client && p_client->isConnected()) {
  152. p_client->disconnect();
  153. // Wait for async operations to complete
  154. while (m_running.load(memory_order_relaxed))
  155. this_thread::sleep_for(chrono::milliseconds(500));
  156. p_client = nullptr;
  157. } else {
  158. // Stop timing actors
  159. m_failovertimer.cancel();
  160. m_submithrtimer.cancel();
  161. m_reconnecttimer.cancel();
  162. if (Farm::f().isMining()) {
  163. cnote << "Shutting down miners...";
  164. Farm::f().stop();
  165. }
  166. }
  167. }
  168. }
  169. void PoolManager::addConnection(string _connstring) {
  170. m_Settings.connections.push_back(shared_ptr<URI>(new URI(_connstring)));
  171. }
  172. void PoolManager::addConnection(shared_ptr<URI> _uri) { m_Settings.connections.push_back(_uri); }
  173. /*
  174. * Remove a connection
  175. * Returns: 0 on success
  176. * -1 failure (out of bounds)
  177. * -2 failure (active connection should be deleted)
  178. */
  179. void PoolManager::removeConnection(unsigned int idx) {
  180. // Are there any outstanding operations ?
  181. if (m_async_pending.load(memory_order_relaxed))
  182. throw runtime_error("Outstanding operations. Retry ...");
  183. // Check bounds
  184. if (idx >= m_Settings.connections.size())
  185. throw runtime_error("Index out-of bounds.");
  186. // Can't delete active connection
  187. if (idx == m_activeConnectionIdx)
  188. throw runtime_error("Can't remove active connection");
  189. // Remove the selected connection
  190. m_Settings.connections.erase(m_Settings.connections.begin() + idx);
  191. if (m_activeConnectionIdx > idx)
  192. m_activeConnectionIdx--;
  193. }
  194. void PoolManager::setActiveConnectionCommon(unsigned int idx) {
  195. // Are there any outstanding operations ?
  196. bool ex = false;
  197. if (!m_async_pending.compare_exchange_strong(ex, true))
  198. throw runtime_error("Outstanding operations. Retry ...");
  199. if (idx != m_activeConnectionIdx) {
  200. m_connectionSwitches.fetch_add(1, memory_order_relaxed);
  201. m_activeConnectionIdx = idx;
  202. m_connectionAttempt = 0;
  203. p_client->disconnect();
  204. } else {
  205. // Release the flag immediately
  206. m_async_pending.store(false, memory_order_relaxed);
  207. }
  208. }
  209. /*
  210. * Sets the active connection
  211. * Returns: 0 on success, -1 on failure (out of bounds)
  212. */
  213. void PoolManager::setActiveConnection(unsigned int idx) {
  214. // Sets the active connection to the requested index
  215. if (idx >= m_Settings.connections.size())
  216. throw runtime_error("Index out-of bounds.");
  217. setActiveConnectionCommon(idx);
  218. }
  219. void PoolManager::setActiveConnection(string& _connstring) {
  220. for (size_t idx = 0; idx < m_Settings.connections.size(); idx++)
  221. if (boost::iequals(m_Settings.connections[idx]->str(), _connstring)) {
  222. setActiveConnectionCommon(idx);
  223. return;
  224. }
  225. throw runtime_error("Not found.");
  226. }
  227. shared_ptr<URI> PoolManager::getActiveConnection() {
  228. try {
  229. return m_Settings.connections.at(m_activeConnectionIdx);
  230. } catch (const exception&) {
  231. return nullptr;
  232. }
  233. }
  234. Json::Value PoolManager::getConnectionsJson() {
  235. // Returns the list of configured connections
  236. Json::Value jRes;
  237. for (size_t i = 0; i < m_Settings.connections.size(); i++) {
  238. Json::Value JConn;
  239. JConn["index"] = (unsigned)i;
  240. JConn["active"] = (i == m_activeConnectionIdx ? true : false);
  241. JConn["uri"] = m_Settings.connections[i]->str();
  242. jRes.append(JConn);
  243. }
  244. return jRes;
  245. }
  246. void PoolManager::start() {
  247. m_running.store(true, memory_order_relaxed);
  248. m_async_pending.store(true, memory_order_relaxed);
  249. m_connectionSwitches.fetch_add(1, memory_order_relaxed);
  250. g_io_service.post(m_io_strand.wrap(boost::bind(&PoolManager::rotateConnect, this)));
  251. }
  252. void PoolManager::rotateConnect() {
  253. if (p_client && p_client->isConnected())
  254. return;
  255. // Check we're within bounds
  256. if (m_activeConnectionIdx >= m_Settings.connections.size())
  257. m_activeConnectionIdx = 0;
  258. // If this connection is marked Unrecoverable then discard it
  259. if (m_Settings.connections.at(m_activeConnectionIdx)->IsUnrecoverable()) {
  260. m_Settings.connections.erase(m_Settings.connections.begin() + m_activeConnectionIdx);
  261. m_connectionAttempt = 0;
  262. if (m_activeConnectionIdx >= m_Settings.connections.size())
  263. m_activeConnectionIdx = 0;
  264. m_connectionSwitches.fetch_add(1, memory_order_relaxed);
  265. } else if (m_Settings.connections.size() == 1) {
  266. // If this is the only connection we can't rotate
  267. // forever
  268. if (m_Settings.connectionMaxRetries && (m_connectionAttempt >= m_Settings.connectionMaxRetries))
  269. m_Settings.connections.erase(m_Settings.connections.begin() + m_activeConnectionIdx);
  270. }
  271. // Rotate connections if above max attempts threshold
  272. if (!m_Settings.connections.empty() && m_Settings.connectionMaxRetries != 0 &&
  273. (m_connectionAttempt >= m_Settings.connectionMaxRetries)) {
  274. m_connectionAttempt = 0;
  275. m_activeConnectionIdx++;
  276. if (m_activeConnectionIdx >= m_Settings.connections.size())
  277. m_activeConnectionIdx = 0;
  278. m_connectionSwitches.fetch_add(1, memory_order_relaxed);
  279. }
  280. if (!m_Settings.connections.empty() && (m_Settings.connections.at(m_activeConnectionIdx)->Host() != "exit")) {
  281. if (p_client)
  282. p_client = nullptr;
  283. if (m_Settings.connections.at(m_activeConnectionIdx)->Family() == ProtocolFamily::GETWORK)
  284. p_client =
  285. unique_ptr<PoolClient>(new EthGetworkClient(m_Settings.noWorkTimeout, m_Settings.getWorkPollInterval));
  286. if (m_Settings.connections.at(m_activeConnectionIdx)->Family() == ProtocolFamily::STRATUM)
  287. p_client =
  288. unique_ptr<PoolClient>(new EthStratumClient(m_Settings.noWorkTimeout, m_Settings.noResponseTimeout));
  289. if (m_Settings.connections.at(m_activeConnectionIdx)->Family() == ProtocolFamily::SIMULATION)
  290. p_client = unique_ptr<PoolClient>(new SimulateClient(m_Settings.benchmarkBlock));
  291. if (p_client)
  292. setClientHandlers();
  293. // Count connectionAttempts
  294. m_connectionAttempt++;
  295. // Invoke connections
  296. m_selectedHost = m_Settings.connections.at(m_activeConnectionIdx)->Host() + ":" +
  297. to_string(m_Settings.connections.at(m_activeConnectionIdx)->Port());
  298. p_client->setConnection(m_Settings.connections.at(m_activeConnectionIdx));
  299. cnote << "Selected pool " << m_selectedHost;
  300. if ((m_connectionAttempt > 1) && (m_Settings.delayBeforeRetry > 0)) {
  301. cnote << "Next connection attempt in " << m_Settings.delayBeforeRetry << " seconds";
  302. m_reconnecttimer.expires_from_now(boost::posix_time::seconds(m_Settings.delayBeforeRetry));
  303. m_reconnecttimer.async_wait(m_io_strand.wrap(
  304. boost::bind(&PoolManager::reconnecttimer_elapsed, this, boost::asio::placeholders::error)));
  305. } else
  306. p_client->connect();
  307. } else {
  308. if (m_Settings.connections.empty())
  309. cnote << "No more connections to try. Exiting...";
  310. else
  311. cnote << "'exit' failover just got hit. Exiting...";
  312. // Stop mining if applicable
  313. if (Farm::f().isMining()) {
  314. cnote << "Shutting down miners...";
  315. Farm::f().stop();
  316. }
  317. m_running.store(false, memory_order_relaxed);
  318. raise(SIGTERM);
  319. }
  320. }
  321. void PoolManager::showMiningAt() {
  322. // Should not happen
  323. if (!m_currentWp)
  324. return;
  325. double d = dev::getHashesToTarget(m_currentWp.boundary.hex(HexPrefix::Add));
  326. cnote << "Epoch : " EthWhite << m_currentWp.epoch << EthReset << " Difficulty : " EthWhite
  327. << dev::getFormattedHashes(d) << EthReset;
  328. }
  329. void PoolManager::failovertimer_elapsed(const boost::system::error_code& ec) {
  330. if (!ec) {
  331. if (m_running.load(memory_order_relaxed)) {
  332. if (m_activeConnectionIdx != 0) {
  333. m_activeConnectionIdx = 0;
  334. m_connectionAttempt = 0;
  335. m_connectionSwitches.fetch_add(1, memory_order_relaxed);
  336. cnote << "Failover timeout reached, retrying connection to primary pool";
  337. p_client->disconnect();
  338. }
  339. }
  340. }
  341. }
  342. void PoolManager::submithrtimer_elapsed(const boost::system::error_code& ec) {
  343. if (!ec) {
  344. if (m_running.load(memory_order_relaxed)) {
  345. if (p_client && p_client->isConnected())
  346. p_client->submitHashrate((uint32_t)Farm::f().HashRate(), m_Settings.hashRateId);
  347. // Resubmit actor
  348. m_submithrtimer.expires_from_now(boost::posix_time::seconds(m_Settings.hashRateInterval));
  349. m_submithrtimer.async_wait(m_io_strand.wrap(
  350. boost::bind(&PoolManager::submithrtimer_elapsed, this, boost::asio::placeholders::error)));
  351. }
  352. }
  353. }
  354. void PoolManager::reconnecttimer_elapsed(const boost::system::error_code& ec) {
  355. if (ec)
  356. return;
  357. if (m_running.load(memory_order_relaxed)) {
  358. if (p_client && !p_client->isConnected()) {
  359. p_client->connect();
  360. }
  361. }
  362. }
  363. int PoolManager::getCurrentEpoch() { return m_currentWp.epoch; }
  364. double PoolManager::getPoolDifficulty() {
  365. if (!m_currentWp)
  366. return 0.0;
  367. return m_currentWp.difficulty;
  368. }
  369. unsigned PoolManager::getConnectionSwitches() { return m_connectionSwitches.load(memory_order_relaxed); }
  370. unsigned PoolManager::getEpochChanges() { return m_epochChanges.load(memory_order_relaxed); }