30 #include <libp2p/Host.h> 31 #include <libp2p/Session.h> 46 _out <<
"state: " << EthereumCapability::stateName(_sync.
state) <<
" ";
47 if (_sync.
state == SyncState::Blocks)
55 template<
typename T>
bool haveItem(std::map<unsigned, T>& _container,
unsigned _number)
57 if (_container.empty())
59 auto lower = _container.lower_bound(_number);
60 if (lower != _container.end() && lower->first == _number)
62 if (lower == _container.begin())
65 return lower->first <= _number && (lower->first + lower->second.size()) > _number;
68 template<
typename T> T
const* findItem(std::map<
unsigned, std::vector<T>>& _container,
unsigned _number)
70 if (_container.empty())
72 auto lower = _container.lower_bound(_number);
73 if (lower != _container.end() && lower->first == _number)
74 return &(*lower->second.begin());
75 if (lower == _container.begin())
78 if (lower->first <= _number && (lower->first + lower->second.size()) > _number)
79 return &lower->second.at(_number - lower->first);
83 template<
typename T>
void removeItem(std::map<
unsigned, std::vector<T>>& _container,
unsigned _number)
85 if (_container.empty())
87 auto lower = _container.lower_bound(_number);
88 if (lower != _container.end() && lower->first == _number)
90 _container.erase(lower);
93 if (lower == _container.begin())
96 if (lower->first <= _number && (lower->first + lower->second.size()) > _number)
97 lower->second.erase(lower->second.begin() + (_number - lower->first), lower->second.end());
100 template<
typename T>
void removeAllStartingWith(std::map<
unsigned, std::vector<T>>& _container,
unsigned _number)
102 if (_container.empty())
104 auto lower = _container.lower_bound(_number);
105 if (lower != _container.end() && lower->first == _number)
107 _container.erase(lower, _container.end());
110 if (lower == _container.begin())
116 if (lower->first <= _number && (lower->first + lower->second.size()) > _number)
117 lower->second.erase(lower->second.begin() + (_number - lower->first), lower->second.end());
118 _container.erase(++lower, _container.end());
121 template<
typename T>
void mergeInto(std::map<
unsigned, std::vector<T>>& _container,
unsigned _number, T&& _data)
123 assert(!haveItem(_container, _number));
124 auto lower = _container.lower_bound(_number);
125 if (!_container.empty() && lower != _container.begin())
127 if (lower != _container.end() && (lower->first + lower->second.size() == _number))
130 lower->second.emplace_back(_data);
134 if (next != _container.end() && (lower->first + lower->second.size() == next->first))
137 std::move(next->second.begin(), next->second.end(), std::back_inserter(lower->second));
138 _container.erase(next);
145 auto inserted = _container.insert(lower, std::make_pair(_number, std::vector<T> { _data }));
146 auto next = inserted;
148 if (next != _container.end() && next->first == _number + 1)
150 std::move(next->second.begin(), next->second.end(), std::back_inserter(inserted->second));
151 _container.erase(next);
160 m_chainStartBlock(_host.chain().chainStartBlockNumber()),
161 m_startingBlock(_host.chain().number()),
162 m_lastImportedBlock(m_startingBlock),
163 m_lastImportedBlockHash(_host.chain().currentHash())
184 if (_info.
number() > m_lastImportedBlock)
186 m_lastImportedBlock =
static_cast<unsigned>(_info.
number());
187 m_lastImportedBlockHash = _info.
hash();
188 m_highestBlock = max(m_lastImportedBlock, m_highestBlock);
205 if (!peerSessionInfo)
208 std::string disconnectReason;
209 if (peerSessionInfo->clientVersion.find(
"/v0.7.0/") != string::npos)
210 disconnectReason =
"Blacklisted client version.";
213 host().chain().genesisHash(), host().protocolVersion(), host().networkId());
215 if (!disconnectReason.empty())
217 LOG(m_logger) <<
"Peer not suitable for sync: " << disconnectReason;
223 if (!requestDaoForkBlockHeader(_peer.
id()))
226 syncPeer(_peer.
id(),
false);
230 bool BlockChainSync::requestDaoForkBlockHeader(
NodeID const& _peerID)
234 if (daoHardfork == 0)
237 m_daoChallengedPeers.insert(_peerID);
242 void BlockChainSync::syncPeer(
NodeID const& _peerID,
bool _force)
246 LOG(m_loggerDetail) <<
"Can't sync with this peer - outstanding asks.";
254 if (host().bq().isActive())
257 u256 syncingDifficulty = std::max(m_syncingTotalDifficulty, td);
259 auto& peer = m_host.
peer(_peerID);
262 if (_force || peerTotalDifficulty > syncingDifficulty)
264 if (peerTotalDifficulty > syncingDifficulty)
265 LOG(m_logger) <<
"Discovered new highest difficulty";
268 m_syncingTotalDifficulty = peerTotalDifficulty;
271 LOG(m_loggerInfo) <<
"Starting full sync";
274 peer.requestBlockHeaders(peer.latestHash(), 1, 0,
false);
275 peer.setWaitingForTransactions(
true);
281 requestBlocks(_peerID);
286 void BlockChainSync::continueSync()
289 syncPeer(_peerID,
false);
294 void BlockChainSync::requestBlocks(
NodeID const& _peerID)
296 clearPeerDownload(_peerID);
297 if (host().bq().knownFull())
299 LOG(m_loggerDetail) <<
"Waiting for block queue before downloading blocks";
304 auto header = m_headers.begin();
306 vector<unsigned> neededNumbers;
308 if (m_haveCommonHeader && !m_headers.empty() && m_headers.begin()->first == m_lastImportedBlock + 1)
310 while (header != m_headers.end() && neededBodies.size() <
c_maxRequestBodies && index < header->second.size())
312 unsigned block = header->first + index;
313 if (m_downloadingBodies.count(block) == 0 && !haveItem(m_bodies, block))
315 neededBodies.push_back(header->second[index].hash);
316 neededNumbers.push_back(block);
317 m_downloadingBodies.insert(block);
321 if (index >= header->second.size())
325 if (neededBodies.size() > 0)
327 m_bodySyncPeers[_peerID] = neededNumbers;
334 if (!m_haveCommonHeader)
337 start = m_lastImportedBlock;
338 if (!m_headers.empty())
339 start = std::min(start, m_headers.begin()->first - 1);
340 m_lastImportedBlock = start;
343 if (start <= m_chainStartBlock + 1)
344 m_haveCommonHeader =
true;
346 if (m_haveCommonHeader)
348 start = m_lastImportedBlock + 1;
349 auto next = m_headers.begin();
351 if (!m_headers.empty() && start >= m_headers.begin()->first)
353 start = m_headers.begin()->first + m_headers.begin()->second.size();
357 while (count == 0 && next != m_headers.end())
360 while(count > 0 && m_downloadingHeaders.count(start) != 0)
365 std::vector<unsigned> headers;
366 for (
unsigned block = start; block < start + count; block++)
367 if (m_downloadingHeaders.count(block) == 0)
369 headers.push_back(block);
370 m_downloadingHeaders.insert(block);
372 count = headers.size();
375 m_headerSyncPeers[_peerID] = headers;
376 assert(!haveItem(m_headers, start));
379 else if (start >= next->first)
381 start = next->first + next->second.size();
391 void BlockChainSync::clearPeerDownload(
NodeID const& _peerID)
393 auto syncPeer = m_headerSyncPeers.find(_peerID);
394 if (syncPeer != m_headerSyncPeers.end())
396 for (
unsigned block : syncPeer->second)
397 m_downloadingHeaders.erase(block);
398 m_headerSyncPeers.erase(syncPeer);
400 syncPeer = m_bodySyncPeers.find(_peerID);
401 if (syncPeer != m_bodySyncPeers.end())
403 for (
unsigned block : syncPeer->second)
404 m_downloadingBodies.erase(block);
405 m_bodySyncPeers.erase(syncPeer);
407 m_daoChallengedPeers.erase(_peerID);
410 void BlockChainSync::clearPeerDownload()
412 for (
auto s = m_headerSyncPeers.begin(); s != m_headerSyncPeers.end();)
416 for (
unsigned block : s->second)
417 m_downloadingHeaders.erase(block);
418 m_headerSyncPeers.erase(s++);
423 for (
auto s = m_bodySyncPeers.begin(); s != m_bodySyncPeers.end();)
427 for (
unsigned block : s->second)
428 m_downloadingBodies.erase(block);
429 m_bodySyncPeers.erase(s++);
434 for (
auto s = m_daoChallengedPeers.begin(); s != m_daoChallengedPeers.end();)
437 m_daoChallengedPeers.erase(s++);
443 void BlockChainSync::logNewBlock(
h256 const& _h)
445 m_knownNewHashes.erase(_h);
453 LOG(m_logger) <<
"BlocksHeaders (" << dec << itemCount <<
" entries) " 454 << (itemCount ?
"" :
": NoMoreHeaders");
456 if (m_daoChallengedPeers.find(_peerID) != m_daoChallengedPeers.end())
458 if (verifyDaoChallengeResponse(_r))
459 syncPeer(_peerID,
false);
461 m_host.
disablePeer(_peerID,
"Peer from another fork.");
463 m_daoChallengedPeers.erase(_peerID);
467 clearPeerDownload(_peerID);
470 LOG(m_logger) <<
"Ignoring unexpected blocks";
475 LOG(m_loggerDetail) <<
"Ignored blocks while waiting";
480 LOG(m_loggerDetail) <<
"Peer does not have the blocks requested";
483 for (
unsigned i = 0; i < itemCount; i++)
486 unsigned blockNumber =
static_cast<unsigned>(info.
number());
487 if (blockNumber < m_chainStartBlock)
489 LOG(m_logger) <<
"Skipping too old header " << blockNumber;
492 if (haveItem(m_headers, blockNumber))
494 LOG(m_logger) <<
"Skipping header " << blockNumber <<
" (already downloaded)";
497 if (blockNumber <= m_lastImportedBlock && m_haveCommonHeader)
499 LOG(m_logger) <<
"Skipping header " << blockNumber <<
" (already imported)";
502 if (blockNumber > m_highestBlock)
503 m_highestBlock = blockNumber;
508 m_haveCommonHeader =
true;
509 m_lastImportedBlock = (unsigned)info.
number();
510 m_lastImportedBlockHash = info.
hash();
512 if (!m_headers.empty() && m_headers.begin()->first == m_lastImportedBlock + 1 &&
513 m_headers.begin()->second[0].parent != m_lastImportedBlockHash)
518 <<
"Unknown parent of the downloaded headers, restarting sync";
529 if (m_haveCommonHeader)
531 Header
const* prevBlock = findItem(m_headers, blockNumber - 1);
532 if ((prevBlock && prevBlock->hash != info.
parentHash()) || (blockNumber == m_lastImportedBlock + 1 && info.
parentHash() != m_lastImportedBlockHash))
536 << info.
hash() <<
" (Restart syncing)";
542 Header
const* nextBlock = findItem(m_headers, blockNumber + 1);
543 if (nextBlock && nextBlock->parent != info.
hash())
546 <<
"Unknown block header " << blockNumber + 1 <<
" " << nextBlock->hash;
548 unsigned n = blockNumber + 1;
549 auto headers = m_headers.at(n);
550 for (
auto const& h : headers)
553 m_headerIdToNumber.erase(headerId);
554 m_downloadingBodies.erase(n);
555 m_downloadingHeaders.erase(n);
558 removeAllStartingWith(m_headers, blockNumber + 1);
559 removeAllStartingWith(m_bodies, blockNumber + 1);
563 mergeInto(m_headers, blockNumber, std::move(hdr));
572 mergeInto(m_bodies, blockNumber, std::move(body));
575 m_headerIdToNumber[headerId] = blockNumber;
582 bool BlockChainSync::verifyDaoChallengeResponse(
RLP const& _r)
589 info.extraData() ==
fromHex(
"0x64616f2d686172642d666f726b");
597 LOG(m_logger) <<
"BlocksBodies (" << dec << itemCount <<
" entries) " 598 << (itemCount ?
"" :
": NoMoreBodies");
599 clearPeerDownload(_peerID);
601 LOG(m_logger) <<
"Ignoring unexpected blocks";
606 LOG(m_loggerDetail) <<
"Ignored blocks while waiting";
611 LOG(m_loggerDetail) <<
"Peer does not have the blocks requested";
614 for (
unsigned i = 0; i < itemCount; i++)
618 auto txList = body[0];
619 h256 transactionRoot =
trieRootOver(txList.itemCount(), [&](
unsigned i){
return rlp(i); }, [&](
unsigned i){
return txList[i].data().toBytes(); });
621 HeaderId
id { transactionRoot, uncles };
622 auto iter = m_headerIdToNumber.find(
id);
623 if (iter == m_headerIdToNumber.end() || !haveItem(m_headers, iter->second))
625 LOG(m_loggerDetail) <<
"Ignored unknown block body";
628 unsigned blockNumber = iter->second;
629 if (haveItem(m_bodies, blockNumber))
631 LOG(m_logger) <<
"Skipping already downloaded block body " << blockNumber;
634 m_headerIdToNumber.erase(
id);
635 mergeInto(m_bodies, blockNumber, body.
data().
toBytes());
641 void BlockChainSync::collectBlocks()
643 if (!m_haveCommonHeader || m_headers.empty() || m_bodies.empty())
647 auto& headers = *m_headers.begin();
648 auto& bodies = *m_bodies.begin();
649 if (headers.first != bodies.first || headers.first != m_lastImportedBlock + 1)
652 unsigned success = 0;
655 unsigned unknown = 0;
657 for (; i < headers.second.size() && i < bodies.second.size(); i++)
660 blockStream.appendRaw(headers.second[i].data);
661 RLP body(bodies.second[i]);
662 blockStream.appendRaw(body[0].data());
663 blockStream.appendRaw(body[1].data());
665 blockStream.swapOut(block);
666 switch (host().bq().
import(&block))
670 if (headers.first + i > m_lastImportedBlock)
672 m_lastImportedBlock = headers.first + (unsigned)i;
673 m_lastImportedBlockHash = headers.second[i].hash;
677 LOG(m_logger) <<
"Malformed block #" << headers.first + i <<
". Restarting sync.";
681 LOG(m_logger) <<
"Block from the bad chain, block #" << headers.first + i
682 <<
". Restarting sync.";
694 if (headers.first + i > m_lastImportedBlock)
696 logImported(success, future, got, unknown);
698 <<
"Already known or future time & unknown parent or unknown parent, block #" 699 << headers.first + i <<
". Resetting sync.";
701 m_haveCommonHeader =
false;
709 logImported(success, future, got, unknown);
711 if (host().bq().unknownFull())
718 auto newHeaders = std::move(headers.second);
719 newHeaders.erase(newHeaders.begin(), newHeaders.begin() + i);
720 unsigned newHeaderHead = headers.first + i;
721 auto newBodies = std::move(bodies.second);
722 newBodies.erase(newBodies.begin(), newBodies.begin() + i);
723 unsigned newBodiesHead = bodies.first + i;
724 m_headers.erase(m_headers.begin());
725 m_bodies.erase(m_bodies.begin());
726 if (!newHeaders.empty())
727 m_headers[newHeaderHead] = newHeaders;
728 if (!newBodies.empty())
729 m_bodies[newBodiesHead] = newBodies;
731 if (m_headers.empty())
733 assert(m_bodies.empty());
739 void BlockChainSync::logImported(
740 unsigned _success,
unsigned _future,
unsigned _got,
unsigned _unknown)
742 LOG(m_logger) << dec << _success <<
" imported OK, " << _unknown <<
" with unknown parents, " 743 << _future <<
" with future timestamps, " << _got <<
" already known received.";
753 m_host.
disablePeer(_peerID,
"NewBlock without 2 data fields.");
757 auto h = info.
hash();
758 auto& peer = m_host.
peer(_peerID);
760 unsigned blockNumber =
static_cast<unsigned>(info.
number());
761 if (blockNumber > (m_lastImportedBlock + 1))
763 LOG(m_loggerDetail) <<
"Received unknown new block";
766 peer.setLatestHash(h);
767 syncPeer(_peerID,
true);
770 switch (host().bq().
import(_r[0].data()))
775 if (blockNumber > m_lastImportedBlock)
777 m_lastImportedBlock = max(m_lastImportedBlock, blockNumber);
778 m_lastImportedBlockHash = h;
780 m_highestBlock = max(m_lastImportedBlock, m_highestBlock);
781 m_downloadingBodies.erase(blockNumber);
782 m_downloadingHeaders.erase(blockNumber);
783 removeItem(m_headers, blockNumber);
784 removeItem(m_bodies, blockNumber);
785 if (m_headers.empty())
787 if (!m_bodies.empty())
790 <<
"Block headers map is empty, but block bodies map is not. Force-clearing.";
803 m_host.
disablePeer(_peerID,
"Malformed block received.");
813 peer.incrementUnknownNewBlocks();
816 m_host.
disablePeer(_peerID,
"Too many uknown new blocks");
821 if (totalDifficulty > peer.totalDifficulty())
823 LOG(m_loggerDetail) <<
"Received block with no known parent. Peer needs syncing...";
824 syncPeer(_peerID,
true);
844 void BlockChainSync::resetSync()
846 m_downloadingHeaders.clear();
847 m_downloadingBodies.clear();
850 m_headerSyncPeers.clear();
851 m_bodySyncPeers.clear();
852 m_headerIdToNumber.clear();
853 m_syncingTotalDifficulty = 0;
862 m_haveCommonHeader =
false;
865 m_lastImportedBlock = m_startingBlock;
876 void BlockChainSync::pauseSync()
887 NodeID const& _peerID, std::vector<std::pair<h256, u256>>
const& _hashes)
892 auto& peer = m_host.
peer(_peerID);
893 if (peer.isConversing())
895 LOG(m_loggerDetail) <<
"Ignoring new hashes since we're already downloading.";
898 LOG(m_loggerDetail) <<
"Not syncing and new block hash discovered: syncing.";
900 unsigned unknowns = 0;
901 unsigned maxHeight = 0;
902 for (
auto const& p: _hashes)
904 h256 const& h = p.first;
906 peer.markBlockAsKnown(h);
912 cwarn <<
"block hash bad!" << h <<
". Bailing...";
918 if (p.second > maxHeight)
920 maxHeight = (unsigned)p.second;
921 peer.setLatestHash(h);
927 LOG(m_logger) << knowns <<
" knowns, " << unknowns <<
" unknowns";
930 LOG(m_loggerDetail) <<
"Not syncing and new block hash discovered: syncing.";
931 syncPeer(_peerID,
true);
944 bool BlockChainSync::invariants()
const 947 BOOST_THROW_EXCEPTION(FailedInvariant() <<
errinfo_comment(
"Got headers while not syncing"));
949 BOOST_THROW_EXCEPTION(FailedInvariant() <<
errinfo_comment(
"Got bodies while not syncing"));
950 if (
isSyncing() && m_host.
chain().
number() > 0 && m_haveCommonHeader && m_lastImportedBlock == 0)
951 BOOST_THROW_EXCEPTION(FailedInvariant() <<
errinfo_comment(
"Common block not found"));
952 if (
isSyncing() && !m_headers.empty() && m_lastImportedBlock >= m_headers.begin()->first)
953 BOOST_THROW_EXCEPTION(FailedInvariant() <<
errinfo_comment(
"Header is too old"));
954 if (m_headerSyncPeers.empty() != m_downloadingHeaders.empty())
955 BOOST_THROW_EXCEPTION(FailedInvariant() <<
errinfo_comment(
"Header download map mismatch"));
956 if (m_bodySyncPeers.empty() != m_downloadingBodies.empty() && m_downloadingBodies.size() <= m_headerIdToNumber.size())
957 BOOST_THROW_EXCEPTION(FailedInvariant() <<
errinfo_comment(
"Body download map mismatch"));
void onPeerBlockHeaders(NodeID const &_peerID, RLP const &_r)
Called by peer once it has new block headers during sync.
_T toInt(int _flags=Strict) const
Converts to int of type given; if isData(), decodes as big-endian bytestream.
unsigned startBlockNumber
void restartSync()
Restart sync.
SealEngineFace * sealEngine() const
bytes rlp(_T _t)
Export a single item in RLP format, returning a byte array.
BlockDetails details(h256 const &_hash) const
Get the familial details concerning a block (or the most recent mined if none given). Thread-safe.
void onPeerNewHashes(NodeID const &_peerID, std::vector< std::pair< h256, u256 >> const &_hashes)
void abortSync()
Abort all sync activity.
std::ostream & operator<<(std::ostream &_out, BlockHeader const &_bi)
std::vector< unsigned char > toBytes() const
QueueStatus blockStatus(h256 const &_h) const
Get some infomration on the given block's status regarding us.
u256 totalDifficulty() const
The EthereumCapability class.
unsigned number(h256 const &_hash) const
Get a number for the given hash (or the most recent mined if none given). Thread-safe.
EthereumPeer const & peer(NodeID const &_peerID) const
void disablePeer(NodeID const &_peerID, std::string const &_problem)
void clear()
Clear everything.
#define DEV_INVARIANT_CHECK_HERE
bool isConversing() const
bool sha3(bytesConstRef _input, bytesRef o_output) noexcept
Initial chain sync complete. Waiting for new packets.
void requestBlockHeaders(h256 const &_startHash, unsigned _count, unsigned _skip, bool _reverse)
Request hashes for given parent hash.
bytes fromHex(std::string const &_s, WhenError _throw=WhenError::DontThrow)
bool isKnown(h256 const &_hash, bool _isCurrent=true) const
Returns true if the given block is known (though not necessarily a part of the canon chain)...
ChainOperationParams const & chainParams() const
unsigned const c_maxRequestBodies
#define DEV_INVARIANT_CHECK
Scope guard for invariant check in a class derived from HasInvariants.
Initial chain sync has not started yet.
bytesConstRef data() const
The bare data of the RLP.
void requestBlockBodies(h256s const &_blocks)
Request specified blocks from peer.
void onPeerBlockBodies(NodeID const &_peerID, RLP const &_r)
Called by peer once it has new block bodies.
std::vector< byte > bytes
SyncStatus status() const
h256 currentHash() const
Get a given block (RLP format). Thread-safe.
void onPeerAborting()
Called by peer when it is disconnecting.
Handler onRoomAvailable(std::function< void(void)> _t)
void onPeerStatus(EthereumPeer const &_peer)
Called by peer to report status.
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 256, 256, boost::multiprecision::unsigned_magnitude, boost::multiprecision::unchecked, void > > u256
unsigned currentBlockNumber
boost::error_info< struct tag_comment, std::string > errinfo_comment
h256 trieRootOver(unsigned _itemCount, T const &_getKey, U const &_getValue)
#define clog(SEVERITY, CHANNEL)
std::string name() const override
Block downloading paused. Waiting for block queue to process blocks and free space.
std::lock_guard< std::recursive_mutex > RecursiveGuard
h256 numberHash(unsigned _i) const
Get the hash for a given block's number.
void swapOut(bytes &_dest)
Swap the contents of the output stream out for some other byte array.
std::string validate(h256 const &_hostGenesisHash, unsigned _hostProtocolVersion, u256 const &_hostNetworkId) const
unsigned highestBlockNumber
std::vector< h256 > h256s
Class for writing to an RLP bytestream.
void markBlockAsKnown(h256 const &_hash)
void onBlockImported(BlockHeader const &_info)
Called when a blockchain has imported a new block onto the DB.
RLPStream & appendRaw(bytesConstRef _rlp, size_t _itemCount=1)
Appends raw (pre-serialised) RLP data. Use with caution.
unsigned const c_maxPeerUknownNewBlocks
unsigned const c_maxRequestHeaders
Max number of unknown new blocks peer can give us.
void onPeerNewBlock(NodeID const &_peerID, RLP const &_r)
Called by peer once it has new block bodies.
bytes RLPEmptyList
The empty list in RLP format.
p2p::CapabilityHostFace & capabilityHost()
BlockChain const & chain() const