BlockChainSync.cpp 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849
  1. /*
  2. This file is part of cpp-ethereum.
  3. cpp-ethereum is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU General Public License as published by
  5. the Free Software Foundation, either version 3 of the License, or
  6. (at your option) any later version.
  7. cpp-ethereum is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU General Public License for more details.
  11. You should have received a copy of the GNU General Public License
  12. along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
  13. */
  14. /** @file BlockChainSync.cpp
  15. * @author Gav Wood <i@gavwood.com>
  16. * @date 2014
  17. */
  18. #include "BlockChainSync.h"
  19. #include <chrono>
  20. #include <libdevcore/Common.h>
  21. #include <libdevcore/TrieHash.h>
  22. #include <libp2p/Host.h>
  23. #include <libp2p/Session.h>
  24. #include <libethcore/Exceptions.h>
  25. #include "BlockChain.h"
  26. #include "BlockQueue.h"
  27. #include "EthereumPeer.h"
  28. #include "EthereumHost.h"
  29. using namespace std;
  30. using namespace dev;
  31. using namespace dev::eth;
  32. using namespace p2p;
  33. unsigned const c_maxPeerUknownNewBlocks = 1024; /// Max number of unknown new blocks peer can give us
  34. unsigned const c_maxRequestHeaders = 1024;
  35. unsigned const c_maxRequestBodies = 1024;
  36. std::ostream& dev::eth::operator<<(std::ostream& _out, SyncStatus const& _sync)
  37. {
  38. _out << "protocol: " << _sync.protocolVersion << endl;
  39. _out << "state: " << EthereumHost::stateName(_sync.state) << " ";
  40. if (_sync.state == SyncState::Blocks || _sync.state == SyncState::NewBlocks)
  41. _out << _sync.currentBlockNumber << "/" << _sync.highestBlockNumber;
  42. return _out;
  43. }
  44. namespace // Helper functions.
  45. {
  46. template<typename T> bool haveItem(std::map<unsigned, T>& _container, unsigned _number)
  47. {
  48. if (_container.empty())
  49. return false;
  50. auto lower = _container.lower_bound(_number);
  51. if (lower != _container.end() && lower->first == _number)
  52. return true;
  53. if (lower == _container.begin())
  54. return false;
  55. --lower;
  56. return lower->first <= _number && (lower->first + lower->second.size()) > _number;
  57. }
  58. template<typename T> T const* findItem(std::map<unsigned, std::vector<T>>& _container, unsigned _number)
  59. {
  60. if (_container.empty())
  61. return nullptr;
  62. auto lower = _container.lower_bound(_number);
  63. if (lower != _container.end() && lower->first == _number)
  64. return &(*lower->second.begin());
  65. if (lower == _container.begin())
  66. return nullptr;
  67. --lower;
  68. if (lower->first <= _number && (lower->first + lower->second.size()) > _number)
  69. return &lower->second.at(_number - lower->first);
  70. return nullptr;
  71. }
  72. template<typename T> void removeItem(std::map<unsigned, std::vector<T>>& _container, unsigned _number)
  73. {
  74. if (_container.empty())
  75. return;
  76. auto lower = _container.lower_bound(_number);
  77. if (lower != _container.end() && lower->first == _number)
  78. {
  79. _container.erase(lower);
  80. return;
  81. }
  82. if (lower == _container.begin())
  83. return;
  84. --lower;
  85. if (lower->first <= _number && (lower->first + lower->second.size()) > _number)
  86. lower->second.erase(lower->second.begin() + (_number - lower->first), lower->second.end());
  87. }
  88. template<typename T> void removeAllStartingWith(std::map<unsigned, std::vector<T>>& _container, unsigned _number)
  89. {
  90. if (_container.empty())
  91. return;
  92. auto lower = _container.lower_bound(_number);
  93. if (lower != _container.end() && lower->first == _number)
  94. {
  95. _container.erase(lower, _container.end());
  96. return;
  97. }
  98. if (lower == _container.begin())
  99. {
  100. _container.clear();
  101. return;
  102. }
  103. --lower;
  104. if (lower->first <= _number && (lower->first + lower->second.size()) > _number)
  105. lower->second.erase(lower->second.begin() + (_number - lower->first), lower->second.end());
  106. _container.erase(++lower, _container.end());
  107. }
  108. template<typename T> void mergeInto(std::map<unsigned, std::vector<T>>& _container, unsigned _number, T&& _data)
  109. {
  110. assert(!haveItem(_container, _number));
  111. auto lower = _container.lower_bound(_number);
  112. if (!_container.empty() && lower != _container.begin())
  113. --lower;
  114. if (lower != _container.end() && (lower->first + lower->second.size() == _number))
  115. {
  116. // extend existing chunk
  117. lower->second.emplace_back(_data);
  118. auto next = lower;
  119. ++next;
  120. if (next != _container.end() && (lower->first + lower->second.size() == next->first))
  121. {
  122. // merge with the next chunk
  123. std::move(next->second.begin(), next->second.end(), std::back_inserter(lower->second));
  124. _container.erase(next);
  125. }
  126. }
  127. else
  128. {
  129. // insert a new chunk
  130. auto inserted = _container.insert(lower, std::make_pair(_number, std::vector<T> { _data }));
  131. auto next = inserted;
  132. ++next;
  133. if (next != _container.end() && next->first == _number + 1)
  134. {
  135. std::move(next->second.begin(), next->second.end(), std::back_inserter(inserted->second));
  136. _container.erase(next);
  137. }
  138. }
  139. }
  140. } // Anonymous namespace -- helper functions.
  141. BlockChainSync::BlockChainSync(EthereumHost& _host):
  142. m_host(_host),
  143. m_startingBlock(_host.chain().number()),
  144. m_lastImportedBlock(m_startingBlock),
  145. m_lastImportedBlockHash(_host.chain().currentHash())
  146. {
  147. m_bqRoomAvailable = host().bq().onRoomAvailable([this]()
  148. {
  149. RecursiveGuard l(x_sync);
  150. m_state = SyncState::Blocks;
  151. continueSync();
  152. });
  153. }
  154. BlockChainSync::~BlockChainSync()
  155. {
  156. RecursiveGuard l(x_sync);
  157. abortSync();
  158. }
  159. void BlockChainSync::abortSync()
  160. {
  161. resetSync();
  162. host().foreachPeer([&](std::shared_ptr<EthereumPeer> _p)
  163. {
  164. _p->abortSync();
  165. return true;
  166. });
  167. }
  168. void BlockChainSync::onPeerStatus(std::shared_ptr<EthereumPeer> _peer)
  169. {
  170. RecursiveGuard l(x_sync);
  171. DEV_INVARIANT_CHECK;
  172. std::shared_ptr<Session> session = _peer->session();
  173. if (!session)
  174. return; // Expired
  175. if (_peer->m_genesisHash != host().chain().genesisHash())
  176. _peer->disable("Invalid genesis hash");
  177. else if (_peer->m_protocolVersion != host().protocolVersion() && _peer->m_protocolVersion != EthereumHost::c_oldProtocolVersion)
  178. _peer->disable("Invalid protocol version.");
  179. else if (_peer->m_networkId != host().networkId())
  180. _peer->disable("Invalid network identifier.");
  181. else if (session->info().clientVersion.find("/v0.7.0/") != string::npos)
  182. _peer->disable("Blacklisted client version.");
  183. else if (host().isBanned(session->id()))
  184. _peer->disable("Peer banned for previous bad behaviour.");
  185. else if (_peer->m_asking != Asking::State && _peer->m_asking != Asking::Nothing)
  186. _peer->disable("Peer banned for unexpected status message.");
  187. else
  188. syncPeer(_peer, false);
  189. }
  190. void BlockChainSync::syncPeer(std::shared_ptr<EthereumPeer> _peer, bool _force)
  191. {
  192. if (_peer->m_asking != Asking::Nothing)
  193. {
  194. clog(NetAllDetail) << "Can't sync with this peer - outstanding asks.";
  195. return;
  196. }
  197. if (m_state == SyncState::Waiting)
  198. return;
  199. u256 td = host().chain().details().totalDifficulty;
  200. if (host().bq().isActive())
  201. td += host().bq().difficulty();
  202. u256 syncingDifficulty = std::max(m_syncingTotalDifficulty, td);
  203. if (_force || _peer->m_totalDifficulty > syncingDifficulty)
  204. {
  205. // start sync
  206. m_syncingTotalDifficulty = _peer->m_totalDifficulty;
  207. if (m_state == SyncState::Idle || m_state == SyncState::NotSynced)
  208. m_state = SyncState::Blocks;
  209. _peer->requestBlockHeaders(_peer->m_latestHash, 1, 0, false);
  210. _peer->m_requireTransactions = true;
  211. return;
  212. }
  213. if (m_state == SyncState::Blocks)
  214. {
  215. requestBlocks(_peer);
  216. return;
  217. }
  218. }
  219. void BlockChainSync::continueSync()
  220. {
  221. host().foreachPeer([this](std::shared_ptr<EthereumPeer> _p)
  222. {
  223. syncPeer(_p, false);
  224. return true;
  225. });
  226. }
  227. void BlockChainSync::requestBlocks(std::shared_ptr<EthereumPeer> _peer)
  228. {
  229. clearPeerDownload(_peer);
  230. if (host().bq().knownFull())
  231. {
  232. clog(NetAllDetail) << "Waiting for block queue before downloading blocks";
  233. pauseSync();
  234. return;
  235. }
  236. // check to see if we need to download any block bodies first
  237. auto header = m_headers.begin();
  238. h256s neededBodies;
  239. vector<unsigned> neededNumbers;
  240. unsigned index = 0;
  241. if (m_haveCommonHeader && !m_headers.empty() && m_headers.begin()->first == m_lastImportedBlock + 1)
  242. {
  243. while (header != m_headers.end() && neededBodies.size() < c_maxRequestBodies && index < header->second.size())
  244. {
  245. unsigned block = header->first + index;
  246. if (m_downloadingBodies.count(block) == 0 && !haveItem(m_bodies, block))
  247. {
  248. neededBodies.push_back(header->second[index].hash);
  249. neededNumbers.push_back(block);
  250. m_downloadingBodies.insert(block);
  251. }
  252. ++index;
  253. if (index >= header->second.size())
  254. break; // Download bodies only for validated header chain
  255. }
  256. }
  257. if (neededBodies.size() > 0)
  258. {
  259. m_bodySyncPeers[_peer] = neededNumbers;
  260. _peer->requestBlockBodies(neededBodies);
  261. }
  262. else
  263. {
  264. // check if need to download headers
  265. unsigned start = 0;
  266. if (!m_haveCommonHeader)
  267. {
  268. // download backwards until common block is found 1 header at a time
  269. start = m_lastImportedBlock;
  270. if (!m_headers.empty())
  271. start = std::min(start, m_headers.begin()->first - 1);
  272. m_lastImportedBlock = start;
  273. m_lastImportedBlockHash = host().chain().numberHash(start);
  274. if (start <= 1)
  275. m_haveCommonHeader = true; //reached genesis
  276. }
  277. if (m_haveCommonHeader)
  278. {
  279. start = m_lastImportedBlock + 1;
  280. auto next = m_headers.begin();
  281. unsigned count = 0;
  282. if (!m_headers.empty() && start >= m_headers.begin()->first)
  283. {
  284. start = m_headers.begin()->first + m_headers.begin()->second.size();
  285. ++next;
  286. }
  287. while (count == 0 && next != m_headers.end())
  288. {
  289. count = std::min(c_maxRequestHeaders, next->first - start);
  290. while(count > 0 && m_downloadingHeaders.count(start) != 0)
  291. {
  292. start++;
  293. count--;
  294. }
  295. std::vector<unsigned> headers;
  296. for (unsigned block = start; block < start + count; block++)
  297. if (m_downloadingHeaders.count(block) == 0)
  298. {
  299. headers.push_back(block);
  300. m_downloadingHeaders.insert(block);
  301. }
  302. count = headers.size();
  303. if (count > 0)
  304. {
  305. m_headerSyncPeers[_peer] = headers;
  306. assert(!haveItem(m_headers, start));
  307. _peer->requestBlockHeaders(start, count, 0, false);
  308. }
  309. else if (start >= next->first)
  310. {
  311. start = next->first + next->second.size();
  312. ++next;
  313. }
  314. }
  315. }
  316. else
  317. _peer->requestBlockHeaders(start, 1, 0, false);
  318. }
  319. }
  320. void BlockChainSync::clearPeerDownload(std::shared_ptr<EthereumPeer> _peer)
  321. {
  322. auto syncPeer = m_headerSyncPeers.find(_peer);
  323. if (syncPeer != m_headerSyncPeers.end())
  324. {
  325. for (unsigned block : syncPeer->second)
  326. m_downloadingHeaders.erase(block);
  327. m_headerSyncPeers.erase(syncPeer);
  328. }
  329. syncPeer = m_bodySyncPeers.find(_peer);
  330. if (syncPeer != m_bodySyncPeers.end())
  331. {
  332. for (unsigned block : syncPeer->second)
  333. m_downloadingBodies.erase(block);
  334. m_bodySyncPeers.erase(syncPeer);
  335. }
  336. }
  337. void BlockChainSync::clearPeerDownload()
  338. {
  339. for (auto s = m_headerSyncPeers.begin(); s != m_headerSyncPeers.end();)
  340. {
  341. if (s->first.expired())
  342. {
  343. for (unsigned block : s->second)
  344. m_downloadingHeaders.erase(block);
  345. m_headerSyncPeers.erase(s++);
  346. }
  347. else
  348. ++s;
  349. }
  350. for (auto s = m_bodySyncPeers.begin(); s != m_bodySyncPeers.end();)
  351. {
  352. if (s->first.expired())
  353. {
  354. for (unsigned block : s->second)
  355. m_downloadingBodies.erase(block);
  356. m_bodySyncPeers.erase(s++);
  357. }
  358. else
  359. ++s;
  360. }
  361. }
  362. void BlockChainSync::logNewBlock(h256 const& _h)
  363. {
  364. if (m_state == SyncState::NewBlocks)
  365. clog(NetNote) << "NewBlock: " << _h;
  366. m_knownNewHashes.erase(_h);
  367. }
  368. void BlockChainSync::onPeerBlockHeaders(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
  369. {
  370. RecursiveGuard l(x_sync);
  371. DEV_INVARIANT_CHECK;
  372. size_t itemCount = _r.itemCount();
  373. clog(NetMessageSummary) << "BlocksHeaders (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreHeaders");
  374. clearPeerDownload(_peer);
  375. if (m_state != SyncState::Blocks && m_state != SyncState::NewBlocks && m_state != SyncState::Waiting)
  376. {
  377. clog(NetMessageSummary) << "Ignoring unexpected blocks";
  378. return;
  379. }
  380. if (m_state == SyncState::Waiting)
  381. {
  382. clog(NetAllDetail) << "Ignored blocks while waiting";
  383. return;
  384. }
  385. if (itemCount == 0)
  386. {
  387. clog(NetAllDetail) << "Peer does not have the blocks requested";
  388. _peer->addRating(-1);
  389. }
  390. for (unsigned i = 0; i < itemCount; i++)
  391. {
  392. BlockHeader info(_r[i].data(), HeaderData);
  393. unsigned blockNumber = static_cast<unsigned>(info.number());
  394. if (haveItem(m_headers, blockNumber))
  395. {
  396. clog(NetMessageSummary) << "Skipping header " << blockNumber;
  397. continue;
  398. }
  399. if (blockNumber <= m_lastImportedBlock && m_haveCommonHeader)
  400. {
  401. clog(NetMessageSummary) << "Skipping header " << blockNumber;
  402. continue;
  403. }
  404. if (blockNumber > m_highestBlock)
  405. m_highestBlock = blockNumber;
  406. auto status = host().bq().blockStatus(info.hash());
  407. if (status == QueueStatus::Importing || status == QueueStatus::Ready || host().chain().isKnown(info.hash()))
  408. {
  409. m_haveCommonHeader = true;
  410. m_lastImportedBlock = (unsigned)info.number();
  411. m_lastImportedBlockHash = info.hash();
  412. }
  413. else
  414. {
  415. Header hdr { _r[i].data().toBytes(), info.hash(), info.parentHash() };
  416. // validate chain
  417. HeaderId headerId { info.transactionsRoot(), info.sha3Uncles() };
  418. if (m_haveCommonHeader)
  419. {
  420. Header const* prevBlock = findItem(m_headers, blockNumber - 1);
  421. if ((prevBlock && prevBlock->hash != info.parentHash()) || (blockNumber == m_lastImportedBlock + 1 && info.parentHash() != m_lastImportedBlockHash))
  422. {
  423. // mismatching parent id, delete the previous block and don't add this one
  424. clog(NetImpolite) << "Unknown block header " << blockNumber << " " << info.hash() << " (Restart syncing)";
  425. _peer->addRating(-1);
  426. restartSync();
  427. return ;
  428. }
  429. Header const* nextBlock = findItem(m_headers, blockNumber + 1);
  430. if (nextBlock && nextBlock->parent != info.hash())
  431. {
  432. clog(NetImpolite) << "Unknown block header " << blockNumber + 1 << " " << nextBlock->hash;
  433. // clear following headers
  434. unsigned n = blockNumber + 1;
  435. auto headers = m_headers.at(n);
  436. for (auto const& h : headers)
  437. {
  438. BlockHeader deletingInfo(h.data, HeaderData);
  439. m_headerIdToNumber.erase(headerId);
  440. m_downloadingBodies.erase(n);
  441. m_downloadingHeaders.erase(n);
  442. ++n;
  443. }
  444. removeAllStartingWith(m_headers, blockNumber + 1);
  445. removeAllStartingWith(m_bodies, blockNumber + 1);
  446. }
  447. }
  448. mergeInto(m_headers, blockNumber, std::move(hdr));
  449. if (headerId.transactionsRoot == EmptyTrie && headerId.uncles == EmptyListSHA3)
  450. {
  451. //empty body, just mark as downloaded
  452. RLPStream r(2);
  453. r.appendRaw(RLPEmptyList);
  454. r.appendRaw(RLPEmptyList);
  455. bytes body;
  456. r.swapOut(body);
  457. mergeInto(m_bodies, blockNumber, std::move(body));
  458. }
  459. else
  460. m_headerIdToNumber[headerId] = blockNumber;
  461. }
  462. }
  463. collectBlocks();
  464. continueSync();
  465. }
  466. void BlockChainSync::onPeerBlockBodies(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
  467. {
  468. RecursiveGuard l(x_sync);
  469. DEV_INVARIANT_CHECK;
  470. size_t itemCount = _r.itemCount();
  471. clog(NetMessageSummary) << "BlocksBodies (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBodies");
  472. clearPeerDownload(_peer);
  473. if (m_state != SyncState::Blocks && m_state != SyncState::NewBlocks && m_state != SyncState::Waiting) {
  474. clog(NetMessageSummary) << "Ignoring unexpected blocks";
  475. return;
  476. }
  477. if (m_state == SyncState::Waiting)
  478. {
  479. clog(NetAllDetail) << "Ignored blocks while waiting";
  480. return;
  481. }
  482. if (itemCount == 0)
  483. {
  484. clog(NetAllDetail) << "Peer does not have the blocks requested";
  485. _peer->addRating(-1);
  486. }
  487. for (unsigned i = 0; i < itemCount; i++)
  488. {
  489. RLP body(_r[i]);
  490. auto txList = body[0];
  491. h256 transactionRoot = trieRootOver(txList.itemCount(), [&](unsigned i){ return rlp(i); }, [&](unsigned i){ return txList[i].data().toBytes(); });
  492. h256 uncles = sha3(body[1].data());
  493. HeaderId id { transactionRoot, uncles };
  494. auto iter = m_headerIdToNumber.find(id);
  495. if (iter == m_headerIdToNumber.end() || !haveItem(m_headers, iter->second))
  496. {
  497. clog(NetAllDetail) << "Ignored unknown block body";
  498. continue;
  499. }
  500. unsigned blockNumber = iter->second;
  501. m_headerIdToNumber.erase(id);
  502. mergeInto(m_bodies, blockNumber, body.data().toBytes());
  503. }
  504. collectBlocks();
  505. continueSync();
  506. }
  507. void BlockChainSync::collectBlocks()
  508. {
  509. if (!m_haveCommonHeader || m_headers.empty() || m_bodies.empty())
  510. return;
  511. // merge headers and bodies
  512. auto& headers = *m_headers.begin();
  513. auto& bodies = *m_bodies.begin();
  514. if (headers.first != bodies.first || headers.first != m_lastImportedBlock + 1)
  515. return;
  516. unsigned success = 0;
  517. unsigned future = 0;
  518. unsigned got = 0;
  519. unsigned unknown = 0;
  520. size_t i = 0;
  521. for (; i < headers.second.size() && i < bodies.second.size(); i++)
  522. {
  523. RLPStream blockStream(3);
  524. blockStream.appendRaw(headers.second[i].data);
  525. RLP body(bodies.second[i]);
  526. blockStream.appendRaw(body[0].data());
  527. blockStream.appendRaw(body[1].data());
  528. bytes block;
  529. blockStream.swapOut(block);
  530. switch (host().bq().import(&block))
  531. {
  532. case ImportResult::Success:
  533. success++;
  534. if (headers.first + i > m_lastImportedBlock)
  535. {
  536. m_lastImportedBlock = headers.first + (unsigned)i;
  537. m_lastImportedBlockHash = headers.second[i].hash;
  538. }
  539. break;
  540. case ImportResult::Malformed:
  541. case ImportResult::BadChain:
  542. restartSync();
  543. return;
  544. case ImportResult::FutureTimeKnown:
  545. future++;
  546. break;
  547. case ImportResult::AlreadyInChain:
  548. break;
  549. case ImportResult::AlreadyKnown:
  550. case ImportResult::FutureTimeUnknown:
  551. case ImportResult::UnknownParent:
  552. if (headers.first + i > m_lastImportedBlock)
  553. {
  554. resetSync();
  555. m_haveCommonHeader = false; // fork detected, search for common header again
  556. }
  557. return;
  558. default:;
  559. }
  560. }
  561. clog(NetMessageSummary) << dec << success << "imported OK," << unknown << "with unknown parents," << future << "with future timestamps," << got << " already known received.";
  562. if (host().bq().unknownFull())
  563. {
  564. clog(NetWarn) << "Too many unknown blocks, restarting sync";
  565. restartSync();
  566. return;
  567. }
  568. auto newHeaders = std::move(headers.second);
  569. newHeaders.erase(newHeaders.begin(), newHeaders.begin() + i);
  570. unsigned newHeaderHead = headers.first + i;
  571. auto newBodies = std::move(bodies.second);
  572. newBodies.erase(newBodies.begin(), newBodies.begin() + i);
  573. unsigned newBodiesHead = bodies.first + i;
  574. m_headers.erase(m_headers.begin());
  575. m_bodies.erase(m_bodies.begin());
  576. if (!newHeaders.empty())
  577. m_headers[newHeaderHead] = newHeaders;
  578. if (!newBodies.empty())
  579. m_bodies[newBodiesHead] = newBodies;
  580. if (m_headers.empty())
  581. {
  582. assert(m_bodies.empty());
  583. completeSync();
  584. }
  585. DEV_INVARIANT_CHECK_HERE;
  586. }
  587. void BlockChainSync::onPeerNewBlock(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
  588. {
  589. RecursiveGuard l(x_sync);
  590. DEV_INVARIANT_CHECK;
  591. if (_r.itemCount() != 2)
  592. {
  593. _peer->disable("NewBlock without 2 data fields.");
  594. return;
  595. }
  596. BlockHeader info(_r[0][0].data(), HeaderData);
  597. auto h = info.hash();
  598. DEV_GUARDED(_peer->x_knownBlocks)
  599. _peer->m_knownBlocks.insert(h);
  600. unsigned blockNumber = static_cast<unsigned>(info.number());
  601. if (blockNumber > (m_lastImportedBlock + 1))
  602. {
  603. clog(NetAllDetail) << "Received unknown new block";
  604. syncPeer(_peer, true);
  605. return;
  606. }
  607. switch (host().bq().import(_r[0].data()))
  608. {
  609. case ImportResult::Success:
  610. _peer->addRating(100);
  611. logNewBlock(h);
  612. if (blockNumber > m_lastImportedBlock)
  613. {
  614. m_lastImportedBlock = max(m_lastImportedBlock, blockNumber);
  615. m_lastImportedBlockHash = h;
  616. }
  617. m_highestBlock = max(m_lastImportedBlock, m_highestBlock);
  618. m_downloadingBodies.erase(blockNumber);
  619. m_downloadingHeaders.erase(blockNumber);
  620. removeItem(m_headers, blockNumber);
  621. removeItem(m_bodies, blockNumber);
  622. if (m_headers.empty())
  623. {
  624. if (!m_bodies.empty())
  625. {
  626. clog(NetMessageDetail) << "Block headers map is empty, but block bodies map is not. Force-clearing.";
  627. m_bodies.clear();
  628. }
  629. completeSync();
  630. }
  631. break;
  632. case ImportResult::FutureTimeKnown:
  633. //TODO: Rating dependent on how far in future it is.
  634. break;
  635. case ImportResult::Malformed:
  636. case ImportResult::BadChain:
  637. logNewBlock(h);
  638. _peer->disable("Malformed block received.");
  639. return;
  640. case ImportResult::AlreadyInChain:
  641. case ImportResult::AlreadyKnown:
  642. break;
  643. case ImportResult::FutureTimeUnknown:
  644. case ImportResult::UnknownParent:
  645. {
  646. _peer->m_unknownNewBlocks++;
  647. if (_peer->m_unknownNewBlocks > c_maxPeerUknownNewBlocks)
  648. {
  649. _peer->disable("Too many uknown new blocks");
  650. restartSync();
  651. }
  652. logNewBlock(h);
  653. u256 totalDifficulty = _r[1].toInt<u256>();
  654. if (totalDifficulty > _peer->m_totalDifficulty)
  655. {
  656. clog(NetMessageDetail) << "Received block with no known parent. Peer needs syncing...";
  657. syncPeer(_peer, true);
  658. }
  659. break;
  660. }
  661. default:;
  662. }
  663. }
  664. SyncStatus BlockChainSync::status() const
  665. {
  666. RecursiveGuard l(x_sync);
  667. SyncStatus res;
  668. res.state = m_state;
  669. res.protocolVersion = 62;
  670. res.startBlockNumber = m_startingBlock;
  671. res.currentBlockNumber = host().chain().number();
  672. res.highestBlockNumber = m_highestBlock;
  673. return res;
  674. }
  675. void BlockChainSync::resetSync()
  676. {
  677. m_downloadingHeaders.clear();
  678. m_downloadingBodies.clear();
  679. m_headers.clear();
  680. m_bodies.clear();
  681. m_headerSyncPeers.clear();
  682. m_bodySyncPeers.clear();
  683. m_headerIdToNumber.clear();
  684. m_syncingTotalDifficulty = 0;
  685. m_state = SyncState::Idle;
  686. }
  687. void BlockChainSync::restartSync()
  688. {
  689. RecursiveGuard l(x_sync);
  690. resetSync();
  691. m_highestBlock = 0;
  692. m_haveCommonHeader = false;
  693. host().bq().clear();
  694. m_startingBlock = host().chain().number();
  695. m_lastImportedBlock = m_startingBlock;
  696. m_lastImportedBlockHash = host().chain().currentHash();
  697. m_state = SyncState::NotSynced;
  698. }
  699. void BlockChainSync::completeSync()
  700. {
  701. resetSync();
  702. m_state = SyncState::Idle;
  703. }
  704. void BlockChainSync::pauseSync()
  705. {
  706. m_state = SyncState::Waiting;
  707. }
  708. bool BlockChainSync::isSyncing() const
  709. {
  710. return m_state != SyncState::Idle;
  711. }
  712. void BlockChainSync::onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, std::vector<std::pair<h256, u256>> const& _hashes)
  713. {
  714. RecursiveGuard l(x_sync);
  715. DEV_INVARIANT_CHECK;
  716. if (_peer->isConversing())
  717. {
  718. clog(NetMessageDetail) << "Ignoring new hashes since we're already downloading.";
  719. return;
  720. }
  721. clog(NetMessageDetail) << "Not syncing and new block hash discovered: syncing.";
  722. unsigned knowns = 0;
  723. unsigned unknowns = 0;
  724. unsigned maxHeight = 0;
  725. for (auto const& p: _hashes)
  726. {
  727. h256 const& h = p.first;
  728. _peer->addRating(1);
  729. DEV_GUARDED(_peer->x_knownBlocks)
  730. _peer->m_knownBlocks.insert(h);
  731. auto status = host().bq().blockStatus(h);
  732. if (status == QueueStatus::Importing || status == QueueStatus::Ready || host().chain().isKnown(h))
  733. knowns++;
  734. else if (status == QueueStatus::Bad)
  735. {
  736. cwarn << "block hash bad!" << h << ". Bailing...";
  737. return;
  738. }
  739. else if (status == QueueStatus::Unknown)
  740. {
  741. unknowns++;
  742. if (p.second > maxHeight)
  743. {
  744. maxHeight = (unsigned)p.second;
  745. _peer->m_latestHash = h;
  746. }
  747. }
  748. else
  749. knowns++;
  750. }
  751. clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns";
  752. if (unknowns > 0)
  753. {
  754. clog(NetMessageDetail) << "Not syncing and new block hash discovered: syncing.";
  755. syncPeer(_peer, true);
  756. }
  757. }
  758. void BlockChainSync::onPeerAborting()
  759. {
  760. RecursiveGuard l(x_sync);
  761. // Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
  762. clearPeerDownload();
  763. continueSync();
  764. DEV_INVARIANT_CHECK_HERE;
  765. }
  766. bool BlockChainSync::invariants() const
  767. {
  768. if (!isSyncing() && !m_headers.empty())
  769. BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Got headers while not syncing"));
  770. if (!isSyncing() && !m_bodies.empty())
  771. BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Got bodies while not syncing"));
  772. if (isSyncing() && m_host.chain().number() > 0 && m_haveCommonHeader && m_lastImportedBlock == 0)
  773. BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Common block not found"));
  774. if (isSyncing() && !m_headers.empty() && m_lastImportedBlock >= m_headers.begin()->first)
  775. BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Header is too old"));
  776. if (m_headerSyncPeers.empty() != m_downloadingHeaders.empty())
  777. BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Header download map mismatch"));
  778. if (m_bodySyncPeers.empty() != m_downloadingBodies.empty() && m_downloadingBodies.size() <= m_headerIdToNumber.size())
  779. BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Body download map mismatch"));
  780. return true;
  781. }