Ethereum  PoC-8
The C++ Implementation of Ethereum
BlockChainSync.cpp
Go to the documentation of this file.
1 /*
2  This file is part of cpp-ethereum.
3 
4  cpp-ethereum is free software: you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation, either version 3 of the License, or
7  (at your option) any later version.
8 
9  cpp-ethereum is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
16 */
22 #include "BlockChainSync.h"
23 
24 #include "BlockChain.h"
25 #include "BlockQueue.h"
26 #include "EthereumCapability.h"
27 #include <libdevcore/Common.h>
28 #include <libdevcore/TrieHash.h>
29 #include <libethcore/Exceptions.h>
30 #include <libp2p/Host.h>
31 #include <libp2p/Session.h>
32 #include <chrono>
33 
34 using namespace std;
35 using namespace dev;
36 using namespace dev::eth;
37 
38 unsigned const c_maxPeerUknownNewBlocks = 1024;
39 unsigned const c_maxRequestHeaders = 1024;
40 unsigned const c_maxRequestBodies = 1024;
41 
42 
43 std::ostream& dev::eth::operator<<(std::ostream& _out, SyncStatus const& _sync)
44 {
45  _out << "protocol: " << _sync.protocolVersion << endl;
46  _out << "state: " << EthereumCapability::stateName(_sync.state) << " ";
47  if (_sync.state == SyncState::Blocks)
48  _out << _sync.currentBlockNumber << "/" << _sync.highestBlockNumber;
49  return _out;
50 }
51 
52 namespace // Helper functions.
53 {
54 
55 template<typename T> bool haveItem(std::map<unsigned, T>& _container, unsigned _number)
56 {
57  if (_container.empty())
58  return false;
59  auto lower = _container.lower_bound(_number);
60  if (lower != _container.end() && lower->first == _number)
61  return true;
62  if (lower == _container.begin())
63  return false;
64  --lower;
65  return lower->first <= _number && (lower->first + lower->second.size()) > _number;
66 }
67 
68 template<typename T> T const* findItem(std::map<unsigned, std::vector<T>>& _container, unsigned _number)
69 {
70  if (_container.empty())
71  return nullptr;
72  auto lower = _container.lower_bound(_number);
73  if (lower != _container.end() && lower->first == _number)
74  return &(*lower->second.begin());
75  if (lower == _container.begin())
76  return nullptr;
77  --lower;
78  if (lower->first <= _number && (lower->first + lower->second.size()) > _number)
79  return &lower->second.at(_number - lower->first);
80  return nullptr;
81 }
82 
83 template<typename T> void removeItem(std::map<unsigned, std::vector<T>>& _container, unsigned _number)
84 {
85  if (_container.empty())
86  return;
87  auto lower = _container.lower_bound(_number);
88  if (lower != _container.end() && lower->first == _number)
89  {
90  _container.erase(lower);
91  return;
92  }
93  if (lower == _container.begin())
94  return;
95  --lower;
96  if (lower->first <= _number && (lower->first + lower->second.size()) > _number)
97  lower->second.erase(lower->second.begin() + (_number - lower->first), lower->second.end());
98 }
99 
100 template<typename T> void removeAllStartingWith(std::map<unsigned, std::vector<T>>& _container, unsigned _number)
101 {
102  if (_container.empty())
103  return;
104  auto lower = _container.lower_bound(_number);
105  if (lower != _container.end() && lower->first == _number)
106  {
107  _container.erase(lower, _container.end());
108  return;
109  }
110  if (lower == _container.begin())
111  {
112  _container.clear();
113  return;
114  }
115  --lower;
116  if (lower->first <= _number && (lower->first + lower->second.size()) > _number)
117  lower->second.erase(lower->second.begin() + (_number - lower->first), lower->second.end());
118  _container.erase(++lower, _container.end());
119 }
120 
121 template<typename T> void mergeInto(std::map<unsigned, std::vector<T>>& _container, unsigned _number, T&& _data)
122 {
123  assert(!haveItem(_container, _number));
124  auto lower = _container.lower_bound(_number);
125  if (!_container.empty() && lower != _container.begin())
126  --lower;
127  if (lower != _container.end() && (lower->first + lower->second.size() == _number))
128  {
129  // extend existing chunk
130  lower->second.emplace_back(_data);
131 
132  auto next = lower;
133  ++next;
134  if (next != _container.end() && (lower->first + lower->second.size() == next->first))
135  {
136  // merge with the next chunk
137  std::move(next->second.begin(), next->second.end(), std::back_inserter(lower->second));
138  _container.erase(next);
139  }
140 
141  }
142  else
143  {
144  // insert a new chunk
145  auto inserted = _container.insert(lower, std::make_pair(_number, std::vector<T> { _data }));
146  auto next = inserted;
147  ++next;
148  if (next != _container.end() && next->first == _number + 1)
149  {
150  std::move(next->second.begin(), next->second.end(), std::back_inserter(inserted->second));
151  _container.erase(next);
152  }
153  }
154 }
155 
156 } // Anonymous namespace -- helper functions.
157 
158 BlockChainSync::BlockChainSync(EthereumCapability& _host)
159  : m_host(_host),
160  m_chainStartBlock(_host.chain().chainStartBlockNumber()),
161  m_startingBlock(_host.chain().number()),
162  m_lastImportedBlock(m_startingBlock),
163  m_lastImportedBlockHash(_host.chain().currentHash())
164 {
165  m_bqRoomAvailable = host().bq().onRoomAvailable([this]()
166  {
167  RecursiveGuard l(x_sync);
168  m_state = SyncState::Blocks;
169  continueSync();
170  });
171 }
172 
174 {
175  RecursiveGuard l(x_sync);
176  abortSync();
177 }
178 
180 {
181  //if a block has been added via mining or other block import function
182  //through RPC, then we should count it as a last imported block
183  RecursiveGuard l(x_sync);
184  if (_info.number() > m_lastImportedBlock)
185  {
186  m_lastImportedBlock = static_cast<unsigned>(_info.number());
187  m_lastImportedBlockHash = _info.hash();
188  m_highestBlock = max(m_lastImportedBlock, m_highestBlock);
189  }
190 }
191 
193 {
194  RecursiveGuard l(x_sync);
195  resetSync();
196  onPeerAborting();
197 }
198 
200 {
201  RecursiveGuard l(x_sync);
203 
204  auto peerSessionInfo = m_host.capabilityHost().peerSessionInfo(_peer.id());
205  if (!peerSessionInfo)
206  return; // Expired
207 
208  std::string disconnectReason;
209  if (peerSessionInfo->clientVersion.find("/v0.7.0/") != string::npos)
210  disconnectReason = "Blacklisted client version.";
211  else
212  disconnectReason = _peer.validate(
213  host().chain().genesisHash(), host().protocolVersion(), host().networkId());
214 
215  if (!disconnectReason.empty())
216  {
217  LOG(m_logger) << "Peer not suitable for sync: " << disconnectReason;
218  m_host.capabilityHost().disconnect(_peer.id(), p2p::UserReason);
219  return;
220  }
221 
222  // Before starting to exchange the data with the node, let's verify that it's on our chain
223  if (!requestDaoForkBlockHeader(_peer.id()))
224  {
225  // DAO challenge not needed
226  syncPeer(_peer.id(), false);
227  }
228 }
229 
230 bool BlockChainSync::requestDaoForkBlockHeader(NodeID const& _peerID)
231 {
232  // DAO challenge
233  unsigned const daoHardfork = static_cast<unsigned>(host().chain().sealEngine()->chainParams().daoHardforkBlock);
234  if (daoHardfork == 0)
235  return false;
236 
237  m_daoChallengedPeers.insert(_peerID);
238  m_host.peer(_peerID).requestBlockHeaders(daoHardfork, 1, 0, false);
239  return true;
240 }
241 
242 void BlockChainSync::syncPeer(NodeID const& _peerID, bool _force)
243 {
244  if (m_host.peer(_peerID).isConversing())
245  {
246  LOG(m_loggerDetail) << "Can't sync with this peer - outstanding asks.";
247  return;
248  }
249 
250  if (m_state == SyncState::Waiting)
251  return;
252 
253  u256 td = host().chain().details().totalDifficulty;
254  if (host().bq().isActive())
255  td += host().bq().difficulty();
256 
257  u256 syncingDifficulty = std::max(m_syncingTotalDifficulty, td);
258 
259  auto& peer = m_host.peer(_peerID);
260  u256 peerTotalDifficulty = peer.totalDifficulty();
261 
262  if (_force || peerTotalDifficulty > syncingDifficulty)
263  {
264  if (peerTotalDifficulty > syncingDifficulty)
265  LOG(m_logger) << "Discovered new highest difficulty";
266 
267  // start sync
268  m_syncingTotalDifficulty = peerTotalDifficulty;
269  if (m_state == SyncState::Idle || m_state == SyncState::NotSynced)
270  {
271  LOG(m_loggerInfo) << "Starting full sync";
272  m_state = SyncState::Blocks;
273  }
274  peer.requestBlockHeaders(peer.latestHash(), 1, 0, false);
275  peer.setWaitingForTransactions(true);
276  return;
277  }
278 
279  if (m_state == SyncState::Blocks)
280  {
281  requestBlocks(_peerID);
282  return;
283  }
284 }
285 
286 void BlockChainSync::continueSync()
287 {
288  host().capabilityHost().foreachPeer(m_host.name(), [this](NodeID const& _peerID) {
289  syncPeer(_peerID, false);
290  return true;
291  });
292 }
293 
294 void BlockChainSync::requestBlocks(NodeID const& _peerID)
295 {
296  clearPeerDownload(_peerID);
297  if (host().bq().knownFull())
298  {
299  LOG(m_loggerDetail) << "Waiting for block queue before downloading blocks";
300  pauseSync();
301  return;
302  }
303  // check to see if we need to download any block bodies first
304  auto header = m_headers.begin();
305  h256s neededBodies;
306  vector<unsigned> neededNumbers;
307  unsigned index = 0;
308  if (m_haveCommonHeader && !m_headers.empty() && m_headers.begin()->first == m_lastImportedBlock + 1)
309  {
310  while (header != m_headers.end() && neededBodies.size() < c_maxRequestBodies && index < header->second.size())
311  {
312  unsigned block = header->first + index;
313  if (m_downloadingBodies.count(block) == 0 && !haveItem(m_bodies, block))
314  {
315  neededBodies.push_back(header->second[index].hash);
316  neededNumbers.push_back(block);
317  m_downloadingBodies.insert(block);
318  }
319 
320  ++index;
321  if (index >= header->second.size())
322  break; // Download bodies only for validated header chain
323  }
324  }
325  if (neededBodies.size() > 0)
326  {
327  m_bodySyncPeers[_peerID] = neededNumbers;
328  m_host.peer(_peerID).requestBlockBodies(neededBodies);
329  }
330  else
331  {
332  // check if need to download headers
333  unsigned start = 0;
334  if (!m_haveCommonHeader)
335  {
336  // download backwards until common block is found 1 header at a time
337  start = m_lastImportedBlock;
338  if (!m_headers.empty())
339  start = std::min(start, m_headers.begin()->first - 1);
340  m_lastImportedBlock = start;
341  m_lastImportedBlockHash = host().chain().numberHash(start);
342 
343  if (start <= m_chainStartBlock + 1)
344  m_haveCommonHeader = true; //reached chain start
345  }
346  if (m_haveCommonHeader)
347  {
348  start = m_lastImportedBlock + 1;
349  auto next = m_headers.begin();
350  unsigned count = 0;
351  if (!m_headers.empty() && start >= m_headers.begin()->first)
352  {
353  start = m_headers.begin()->first + m_headers.begin()->second.size();
354  ++next;
355  }
356 
357  while (count == 0 && next != m_headers.end())
358  {
359  count = std::min(c_maxRequestHeaders, next->first - start);
360  while(count > 0 && m_downloadingHeaders.count(start) != 0)
361  {
362  start++;
363  count--;
364  }
365  std::vector<unsigned> headers;
366  for (unsigned block = start; block < start + count; block++)
367  if (m_downloadingHeaders.count(block) == 0)
368  {
369  headers.push_back(block);
370  m_downloadingHeaders.insert(block);
371  }
372  count = headers.size();
373  if (count > 0)
374  {
375  m_headerSyncPeers[_peerID] = headers;
376  assert(!haveItem(m_headers, start));
377  m_host.peer(_peerID).requestBlockHeaders(start, count, 0, false);
378  }
379  else if (start >= next->first)
380  {
381  start = next->first + next->second.size();
382  ++next;
383  }
384  }
385  }
386  else
387  m_host.peer(_peerID).requestBlockHeaders(start, 1, 0, false);
388  }
389 }
390 
391 void BlockChainSync::clearPeerDownload(NodeID const& _peerID)
392 {
393  auto syncPeer = m_headerSyncPeers.find(_peerID);
394  if (syncPeer != m_headerSyncPeers.end())
395  {
396  for (unsigned block : syncPeer->second)
397  m_downloadingHeaders.erase(block);
398  m_headerSyncPeers.erase(syncPeer);
399  }
400  syncPeer = m_bodySyncPeers.find(_peerID);
401  if (syncPeer != m_bodySyncPeers.end())
402  {
403  for (unsigned block : syncPeer->second)
404  m_downloadingBodies.erase(block);
405  m_bodySyncPeers.erase(syncPeer);
406  }
407  m_daoChallengedPeers.erase(_peerID);
408 }
409 
410 void BlockChainSync::clearPeerDownload()
411 {
412  for (auto s = m_headerSyncPeers.begin(); s != m_headerSyncPeers.end();)
413  {
414  if (!m_host.capabilityHost().peerSessionInfo(s->first))
415  {
416  for (unsigned block : s->second)
417  m_downloadingHeaders.erase(block);
418  m_headerSyncPeers.erase(s++);
419  }
420  else
421  ++s;
422  }
423  for (auto s = m_bodySyncPeers.begin(); s != m_bodySyncPeers.end();)
424  {
425  if (!m_host.capabilityHost().peerSessionInfo(s->first))
426  {
427  for (unsigned block : s->second)
428  m_downloadingBodies.erase(block);
429  m_bodySyncPeers.erase(s++);
430  }
431  else
432  ++s;
433  }
434  for (auto s = m_daoChallengedPeers.begin(); s != m_daoChallengedPeers.end();)
435  {
436  if (!m_host.capabilityHost().peerSessionInfo(*s))
437  m_daoChallengedPeers.erase(s++);
438  else
439  ++s;
440  }
441 }
442 
443 void BlockChainSync::logNewBlock(h256 const& _h)
444 {
445  m_knownNewHashes.erase(_h);
446 }
447 
448 void BlockChainSync::onPeerBlockHeaders(NodeID const& _peerID, RLP const& _r)
449 {
450  RecursiveGuard l(x_sync);
452  size_t itemCount = _r.itemCount();
453  LOG(m_logger) << "BlocksHeaders (" << dec << itemCount << " entries) "
454  << (itemCount ? "" : ": NoMoreHeaders");
455 
456  if (m_daoChallengedPeers.find(_peerID) != m_daoChallengedPeers.end())
457  {
458  if (verifyDaoChallengeResponse(_r))
459  syncPeer(_peerID, false);
460  else
461  m_host.disablePeer(_peerID, "Peer from another fork.");
462 
463  m_daoChallengedPeers.erase(_peerID);
464  return;
465  }
466 
467  clearPeerDownload(_peerID);
468  if (m_state != SyncState::Blocks && m_state != SyncState::Waiting)
469  {
470  LOG(m_logger) << "Ignoring unexpected blocks";
471  return;
472  }
473  if (m_state == SyncState::Waiting)
474  {
475  LOG(m_loggerDetail) << "Ignored blocks while waiting";
476  return;
477  }
478  if (itemCount == 0)
479  {
480  LOG(m_loggerDetail) << "Peer does not have the blocks requested";
481  m_host.capabilityHost().updateRating(_peerID, -1);
482  }
483  for (unsigned i = 0; i < itemCount; i++)
484  {
485  BlockHeader info(_r[i].data(), HeaderData);
486  unsigned blockNumber = static_cast<unsigned>(info.number());
487  if (blockNumber < m_chainStartBlock)
488  {
489  LOG(m_logger) << "Skipping too old header " << blockNumber;
490  continue;
491  }
492  if (haveItem(m_headers, blockNumber))
493  {
494  LOG(m_logger) << "Skipping header " << blockNumber << " (already downloaded)";
495  continue;
496  }
497  if (blockNumber <= m_lastImportedBlock && m_haveCommonHeader)
498  {
499  LOG(m_logger) << "Skipping header " << blockNumber << " (already imported)";
500  continue;
501  }
502  if (blockNumber > m_highestBlock)
503  m_highestBlock = blockNumber;
504 
505  auto status = host().bq().blockStatus(info.hash());
506  if (status == QueueStatus::Importing || status == QueueStatus::Ready || host().chain().isKnown(info.hash()))
507  {
508  m_haveCommonHeader = true;
509  m_lastImportedBlock = (unsigned)info.number();
510  m_lastImportedBlockHash = info.hash();
511 
512  if (!m_headers.empty() && m_headers.begin()->first == m_lastImportedBlock + 1 &&
513  m_headers.begin()->second[0].parent != m_lastImportedBlockHash)
514  {
515  // Start of the header chain in m_headers doesn't match our known chain,
516  // probably we've downloaded other fork
517  clog(VerbosityWarning, "sync")
518  << "Unknown parent of the downloaded headers, restarting sync";
519  restartSync();
520  return;
521  }
522  }
523  else
524  {
525  Header hdr { _r[i].data().toBytes(), info.hash(), info.parentHash() };
526 
527  // validate chain
528  HeaderId headerId { info.transactionsRoot(), info.sha3Uncles() };
529  if (m_haveCommonHeader)
530  {
531  Header const* prevBlock = findItem(m_headers, blockNumber - 1);
532  if ((prevBlock && prevBlock->hash != info.parentHash()) || (blockNumber == m_lastImportedBlock + 1 && info.parentHash() != m_lastImportedBlockHash))
533  {
534  // mismatching parent id, delete the previous block and don't add this one
535  clog(VerbosityWarning, "sync") << "Unknown block header " << blockNumber << " "
536  << info.hash() << " (Restart syncing)";
537  m_host.capabilityHost().updateRating(_peerID, -1);
538  restartSync();
539  return ;
540  }
541 
542  Header const* nextBlock = findItem(m_headers, blockNumber + 1);
543  if (nextBlock && nextBlock->parent != info.hash())
544  {
545  LOG(m_loggerDetail)
546  << "Unknown block header " << blockNumber + 1 << " " << nextBlock->hash;
547  // clear following headers
548  unsigned n = blockNumber + 1;
549  auto headers = m_headers.at(n);
550  for (auto const& h : headers)
551  {
552  BlockHeader deletingInfo(h.data, HeaderData);
553  m_headerIdToNumber.erase(headerId);
554  m_downloadingBodies.erase(n);
555  m_downloadingHeaders.erase(n);
556  ++n;
557  }
558  removeAllStartingWith(m_headers, blockNumber + 1);
559  removeAllStartingWith(m_bodies, blockNumber + 1);
560  }
561  }
562 
563  mergeInto(m_headers, blockNumber, std::move(hdr));
564  if (headerId.transactionsRoot == EmptyTrie && headerId.uncles == EmptyListSHA3)
565  {
566  //empty body, just mark as downloaded
567  RLPStream r(2);
570  bytes body;
571  r.swapOut(body);
572  mergeInto(m_bodies, blockNumber, std::move(body));
573  }
574  else
575  m_headerIdToNumber[headerId] = blockNumber;
576  }
577  }
578  collectBlocks();
579  continueSync();
580 }
581 
582 bool BlockChainSync::verifyDaoChallengeResponse(RLP const& _r)
583 {
584  if (_r.itemCount() != 1)
585  return false;
586 
587  BlockHeader info(_r[0].data(), HeaderData);
588  return info.number() == host().chain().sealEngine()->chainParams().daoHardforkBlock &&
589  info.extraData() == fromHex("0x64616f2d686172642d666f726b");
590 }
591 
592 void BlockChainSync::onPeerBlockBodies(NodeID const& _peerID, RLP const& _r)
593 {
594  RecursiveGuard l(x_sync);
596  size_t itemCount = _r.itemCount();
597  LOG(m_logger) << "BlocksBodies (" << dec << itemCount << " entries) "
598  << (itemCount ? "" : ": NoMoreBodies");
599  clearPeerDownload(_peerID);
600  if (m_state != SyncState::Blocks && m_state != SyncState::Waiting) {
601  LOG(m_logger) << "Ignoring unexpected blocks";
602  return;
603  }
604  if (m_state == SyncState::Waiting)
605  {
606  LOG(m_loggerDetail) << "Ignored blocks while waiting";
607  return;
608  }
609  if (itemCount == 0)
610  {
611  LOG(m_loggerDetail) << "Peer does not have the blocks requested";
612  m_host.capabilityHost().updateRating(_peerID, -1);
613  }
614  for (unsigned i = 0; i < itemCount; i++)
615  {
616  RLP body(_r[i]);
617 
618  auto txList = body[0];
619  h256 transactionRoot = trieRootOver(txList.itemCount(), [&](unsigned i){ return rlp(i); }, [&](unsigned i){ return txList[i].data().toBytes(); });
620  h256 uncles = sha3(body[1].data());
621  HeaderId id { transactionRoot, uncles };
622  auto iter = m_headerIdToNumber.find(id);
623  if (iter == m_headerIdToNumber.end() || !haveItem(m_headers, iter->second))
624  {
625  LOG(m_loggerDetail) << "Ignored unknown block body";
626  continue;
627  }
628  unsigned blockNumber = iter->second;
629  if (haveItem(m_bodies, blockNumber))
630  {
631  LOG(m_logger) << "Skipping already downloaded block body " << blockNumber;
632  continue;
633  }
634  m_headerIdToNumber.erase(id);
635  mergeInto(m_bodies, blockNumber, body.data().toBytes());
636  }
637  collectBlocks();
638  continueSync();
639 }
640 
641 void BlockChainSync::collectBlocks()
642 {
643  if (!m_haveCommonHeader || m_headers.empty() || m_bodies.empty())
644  return;
645 
646  // merge headers and bodies
647  auto& headers = *m_headers.begin();
648  auto& bodies = *m_bodies.begin();
649  if (headers.first != bodies.first || headers.first != m_lastImportedBlock + 1)
650  return;
651 
652  unsigned success = 0;
653  unsigned future = 0;
654  unsigned got = 0;
655  unsigned unknown = 0;
656  size_t i = 0;
657  for (; i < headers.second.size() && i < bodies.second.size(); i++)
658  {
659  RLPStream blockStream(3);
660  blockStream.appendRaw(headers.second[i].data);
661  RLP body(bodies.second[i]);
662  blockStream.appendRaw(body[0].data());
663  blockStream.appendRaw(body[1].data());
664  bytes block;
665  blockStream.swapOut(block);
666  switch (host().bq().import(&block))
667  {
669  success++;
670  if (headers.first + i > m_lastImportedBlock)
671  {
672  m_lastImportedBlock = headers.first + (unsigned)i;
673  m_lastImportedBlockHash = headers.second[i].hash;
674  }
675  break;
677  LOG(m_logger) << "Malformed block #" << headers.first + i << ". Restarting sync.";
678  restartSync();
679  return;
681  LOG(m_logger) << "Block from the bad chain, block #" << headers.first + i
682  << ". Restarting sync.";
683  restartSync();
684  return;
685 
687  future++;
688  break;
690  break;
694  if (headers.first + i > m_lastImportedBlock)
695  {
696  logImported(success, future, got, unknown);
697  LOG(m_logger)
698  << "Already known or future time & unknown parent or unknown parent, block #"
699  << headers.first + i << ". Resetting sync.";
700  resetSync();
701  m_haveCommonHeader = false; // fork detected, search for common header again
702  }
703  return;
704 
705  default:;
706  }
707  }
708 
709  logImported(success, future, got, unknown);
710 
711  if (host().bq().unknownFull())
712  {
713  clog(VerbosityWarning, "sync") << "Too many unknown blocks, restarting sync";
714  restartSync();
715  return;
716  }
717 
718  auto newHeaders = std::move(headers.second);
719  newHeaders.erase(newHeaders.begin(), newHeaders.begin() + i);
720  unsigned newHeaderHead = headers.first + i;
721  auto newBodies = std::move(bodies.second);
722  newBodies.erase(newBodies.begin(), newBodies.begin() + i);
723  unsigned newBodiesHead = bodies.first + i;
724  m_headers.erase(m_headers.begin());
725  m_bodies.erase(m_bodies.begin());
726  if (!newHeaders.empty())
727  m_headers[newHeaderHead] = newHeaders;
728  if (!newBodies.empty())
729  m_bodies[newBodiesHead] = newBodies;
730 
731  if (m_headers.empty())
732  {
733  assert(m_bodies.empty());
734  completeSync();
735  }
737 }
738 
739 void BlockChainSync::logImported(
740  unsigned _success, unsigned _future, unsigned _got, unsigned _unknown)
741 {
742  LOG(m_logger) << dec << _success << " imported OK, " << _unknown << " with unknown parents, "
743  << _future << " with future timestamps, " << _got << " already known received.";
744 }
745 
746 void BlockChainSync::onPeerNewBlock(NodeID const& _peerID, RLP const& _r)
747 {
748  RecursiveGuard l(x_sync);
750 
751  if (_r.itemCount() != 2)
752  {
753  m_host.disablePeer(_peerID, "NewBlock without 2 data fields.");
754  return;
755  }
756  BlockHeader info(_r[0][0].data(), HeaderData);
757  auto h = info.hash();
758  auto& peer = m_host.peer(_peerID);
759  peer.markBlockAsKnown(h);
760  unsigned blockNumber = static_cast<unsigned>(info.number());
761  if (blockNumber > (m_lastImportedBlock + 1))
762  {
763  LOG(m_loggerDetail) << "Received unknown new block";
764  // Update the hash of highest known block of the peer.
765  // syncPeer will then request the highest block header to properly restart syncing
766  peer.setLatestHash(h);
767  syncPeer(_peerID, true);
768  return;
769  }
770  switch (host().bq().import(_r[0].data()))
771  {
773  m_host.capabilityHost().updateRating(_peerID, 100);
774  logNewBlock(h);
775  if (blockNumber > m_lastImportedBlock)
776  {
777  m_lastImportedBlock = max(m_lastImportedBlock, blockNumber);
778  m_lastImportedBlockHash = h;
779  }
780  m_highestBlock = max(m_lastImportedBlock, m_highestBlock);
781  m_downloadingBodies.erase(blockNumber);
782  m_downloadingHeaders.erase(blockNumber);
783  removeItem(m_headers, blockNumber);
784  removeItem(m_bodies, blockNumber);
785  if (m_headers.empty())
786  {
787  if (!m_bodies.empty())
788  {
789  LOG(m_loggerDetail)
790  << "Block headers map is empty, but block bodies map is not. Force-clearing.";
791  m_bodies.clear();
792  }
793  completeSync();
794  }
795  break;
797  //TODO: Rating dependent on how far in future it is.
798  break;
799 
802  logNewBlock(h);
803  m_host.disablePeer(_peerID, "Malformed block received.");
804  return;
805 
808  break;
809 
812  {
813  peer.incrementUnknownNewBlocks();
814  if (peer.unknownNewBlocks() > c_maxPeerUknownNewBlocks)
815  {
816  m_host.disablePeer(_peerID, "Too many uknown new blocks");
817  restartSync();
818  }
819  logNewBlock(h);
820  u256 totalDifficulty = _r[1].toInt<u256>();
821  if (totalDifficulty > peer.totalDifficulty())
822  {
823  LOG(m_loggerDetail) << "Received block with no known parent. Peer needs syncing...";
824  syncPeer(_peerID, true);
825  }
826  break;
827  }
828  default:;
829  }
830 }
831 
833 {
834  RecursiveGuard l(x_sync);
835  SyncStatus res;
836  res.state = m_state;
837  res.protocolVersion = 62;
838  res.startBlockNumber = m_startingBlock;
839  res.currentBlockNumber = host().chain().number();
840  res.highestBlockNumber = m_highestBlock;
841  return res;
842 }
843 
844 void BlockChainSync::resetSync()
845 {
846  m_downloadingHeaders.clear();
847  m_downloadingBodies.clear();
848  m_headers.clear();
849  m_bodies.clear();
850  m_headerSyncPeers.clear();
851  m_bodySyncPeers.clear();
852  m_headerIdToNumber.clear();
853  m_syncingTotalDifficulty = 0;
854  m_state = SyncState::NotSynced;
855 }
856 
858 {
859  RecursiveGuard l(x_sync);
860  resetSync();
861  m_highestBlock = 0;
862  m_haveCommonHeader = false;
863  host().bq().clear();
864  m_startingBlock = host().chain().number();
865  m_lastImportedBlock = m_startingBlock;
866  m_lastImportedBlockHash = host().chain().currentHash();
867 }
868 
870 {
871  RecursiveGuard l(x_sync);
872  resetSync();
873  m_state = SyncState::Idle;
874 }
875 
876 void BlockChainSync::pauseSync()
877 {
878  m_state = SyncState::Waiting;
879 }
880 
882 {
883  return m_state != SyncState::Idle;
884 }
885 
887  NodeID const& _peerID, std::vector<std::pair<h256, u256>> const& _hashes)
888 {
889  RecursiveGuard l(x_sync);
891 
892  auto& peer = m_host.peer(_peerID);
893  if (peer.isConversing())
894  {
895  LOG(m_loggerDetail) << "Ignoring new hashes since we're already downloading.";
896  return;
897  }
898  LOG(m_loggerDetail) << "Not syncing and new block hash discovered: syncing.";
899  unsigned knowns = 0;
900  unsigned unknowns = 0;
901  unsigned maxHeight = 0;
902  for (auto const& p: _hashes)
903  {
904  h256 const& h = p.first;
905  m_host.capabilityHost().updateRating(_peerID, 1);
906  peer.markBlockAsKnown(h);
907  auto status = host().bq().blockStatus(h);
908  if (status == QueueStatus::Importing || status == QueueStatus::Ready || host().chain().isKnown(h))
909  knowns++;
910  else if (status == QueueStatus::Bad)
911  {
912  cwarn << "block hash bad!" << h << ". Bailing...";
913  return;
914  }
915  else if (status == QueueStatus::Unknown)
916  {
917  unknowns++;
918  if (p.second > maxHeight)
919  {
920  maxHeight = (unsigned)p.second;
921  peer.setLatestHash(h);
922  }
923  }
924  else
925  knowns++;
926  }
927  LOG(m_logger) << knowns << " knowns, " << unknowns << " unknowns";
928  if (unknowns > 0)
929  {
930  LOG(m_loggerDetail) << "Not syncing and new block hash discovered: syncing.";
931  syncPeer(_peerID, true);
932  }
933 }
934 
936 {
937  RecursiveGuard l(x_sync);
938  // Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
939  clearPeerDownload();
940  continueSync();
942 }
943 
944 bool BlockChainSync::invariants() const
945 {
946  if (!isSyncing() && !m_headers.empty())
947  BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Got headers while not syncing"));
948  if (!isSyncing() && !m_bodies.empty())
949  BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Got bodies while not syncing"));
950  if (isSyncing() && m_host.chain().number() > 0 && m_haveCommonHeader && m_lastImportedBlock == 0)
951  BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Common block not found"));
952  if (isSyncing() && !m_headers.empty() && m_lastImportedBlock >= m_headers.begin()->first)
953  BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Header is too old"));
954  if (m_headerSyncPeers.empty() != m_downloadingHeaders.empty())
955  BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Header download map mismatch"));
956  if (m_bodySyncPeers.empty() != m_downloadingBodies.empty() && m_downloadingBodies.size() <= m_headerIdToNumber.size())
957  BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Body download map mismatch"));
958  return true;
959 }
void onPeerBlockHeaders(NodeID const &_peerID, RLP const &_r)
Called by peer once it has new block headers during sync.
_T toInt(int _flags=Strict) const
Converts to int of type given; if isData(), decodes as big-endian bytestream.
Definition: RLP.h:257
Definition: Address.cpp:20
unsigned startBlockNumber
Definition: CommonNet.h:99
void restartSync()
Restart sync.
SealEngineFace * sealEngine() const
Definition: BlockChain.h:308
bytes rlp(_T _t)
Export a single item in RLP format, returning a byte array.
Definition: RLP.h:453
BlockDetails details(h256 const &_hash) const
Get the familial details concerning a block (or the most recent mined if none given). Thread-safe.
Definition: BlockChain.h:157
Encapsulation of a block header. Class to contain all of a block header&#39;s data. It is able to parse a...
Definition: BlockHeader.h:96
Downloading blocks.
void onPeerNewHashes(NodeID const &_peerID, std::vector< std::pair< h256, u256 >> const &_hashes)
void abortSync()
Abort all sync activity.
std::ostream & operator<<(std::ostream &_out, BlockHeader const &_bi)
Definition: BlockHeader.h:217
std::vector< unsigned char > toBytes() const
Definition: vector_ref.h:43
u256 difficulty() const
Definition: BlockQueue.cpp:530
QueueStatus blockStatus(h256 const &_h) const
Get some infomration on the given block&#39;s status regarding us.
Definition: BlockQueue.cpp:387
u256 totalDifficulty() const
Definition: EthereumPeer.h:48
h256 const EmptyTrie
Definition: TrieCommon.cpp:28
The EthereumCapability class.
Definition: FixedHash.h:390
unsigned number(h256 const &_hash) const
Get a number for the given hash (or the most recent mined if none given). Thread-safe.
Definition: BlockChain.h:224
unsigned protocolVersion
Definition: CommonNet.h:98
EthereumPeer const & peer(NodeID const &_peerID) const
NodeID id() const
Definition: EthereumPeer.h:46
void disablePeer(NodeID const &_peerID, std::string const &_problem)
int64_t number() const
Definition: BlockHeader.h:166
void clear()
Clear everything.
Definition: BlockQueue.cpp:67
#define DEV_INVARIANT_CHECK_HERE
Definition: Common.h:239
bool isConversing() const
Definition: EthereumPeer.h:54
bool sha3(bytesConstRef _input, bytesRef o_output) noexcept
Definition: SHA3.cpp:28
Initial chain sync complete. Waiting for new packets.
void requestBlockHeaders(h256 const &_startHash, unsigned _count, unsigned _skip, bool _reverse)
Request hashes for given parent hash.
bytes fromHex(std::string const &_s, WhenError _throw=WhenError::DontThrow)
Definition: CommonData.cpp:81
bool isKnown(h256 const &_hash, bool _isCurrent=true) const
Returns true if the given block is known (though not necessarily a part of the canon chain)...
ChainOperationParams const & chainParams() const
Definition: SealEngine.h:74
unsigned const c_maxRequestBodies
#define DEV_INVARIANT_CHECK
Scope guard for invariant check in a class derived from HasInvariants.
Definition: Common.h:238
h256 const & parentHash() const
Definition: BlockHeader.h:157
Initial chain sync has not started yet.
p2p::NodeID NodeID
Definition: CommonNet.h:105
bytesConstRef data() const
The bare data of the RLP.
Definition: RLP.h:80
void requestBlockBodies(h256s const &_blocks)
Request specified blocks from peer.
h256 const & transactionsRoot() const
Definition: BlockHeader.h:163
void onPeerBlockBodies(NodeID const &_peerID, RLP const &_r)
Called by peer once it has new block bodies.
std::vector< byte > bytes
Definition: Common.h:72
h256 hash(IncludeSeal _i=WithSeal) const
#define cwarn
h256 const EmptyListSHA3
Definition: SHA3.cpp:26
SyncStatus status() const
h256 currentHash() const
Get a given block (RLP format). Thread-safe.
Definition: BlockChain.h:228
void onPeerAborting()
Called by peer when it is disconnecting.
Handler onRoomAvailable(std::function< void(void)> _t)
Definition: BlockQueue.h:270
void onPeerStatus(EthereumPeer const &_peer)
Called by peer to report status.
h256 const & sha3Uncles() const
Definition: BlockHeader.h:158
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 256, 256, boost::multiprecision::unsigned_magnitude, boost::multiprecision::unchecked, void > > u256
Definition: Common.h:121
unsigned currentBlockNumber
Definition: CommonNet.h:100
#define LOG
Definition: Log.h:63
boost::error_info< struct tag_comment, std::string > errinfo_comment
Definition: Assertions.h:69
h256 trieRootOver(unsigned _itemCount, T const &_getKey, U const &_getValue)
Definition: TrieHash.h:36
#define clog(SEVERITY, CHANNEL)
std::string name() const override
Block downloading paused. Waiting for block queue to process blocks and free space.
size_t itemCount() const
Definition: RLP.h:101
std::lock_guard< std::recursive_mutex > RecursiveGuard
Definition: Guards.h:43
h256 numberHash(unsigned _i) const
Get the hash for a given block&#39;s number.
Definition: BlockChain.h:184
void swapOut(bytes &_dest)
Swap the contents of the output stream out for some other byte array.
Definition: RLP.h:425
std::string validate(h256 const &_hostGenesisHash, unsigned _hostProtocolVersion, u256 const &_hostNetworkId) const
unsigned highestBlockNumber
Definition: CommonNet.h:101
std::vector< h256 > h256s
Definition: FixedHash.h:359
Class for writing to an RLP bytestream.
Definition: RLP.h:369
void markBlockAsKnown(h256 const &_hash)
Definition: EthereumPeer.h:67
void onBlockImported(BlockHeader const &_info)
Called when a blockchain has imported a new block onto the DB.
RLPStream & appendRaw(bytesConstRef _rlp, size_t _itemCount=1)
Appends raw (pre-serialised) RLP data. Use with caution.
Definition: RLP.cpp:222
Definition: RLP.h:47
unsigned const c_maxPeerUknownNewBlocks
unsigned const c_maxRequestHeaders
Max number of unknown new blocks peer can give us.
void onPeerNewBlock(NodeID const &_peerID, RLP const &_r)
Called by peer once it has new block bodies.
bytes RLPEmptyList
The empty list in RLP format.
Definition: RLP.cpp:23
p2p::CapabilityHostFace & capabilityHost()
BlockChain const & chain() const