22 #include <boost/fiber/all.hpp> 31 static size_t const c_freePeerBufferSize = 32;
32 static int const c_backroundWorkPeriodMs = 1000;
34 bool validateManifest(RLP
const& _manifestRlp)
36 if (!_manifestRlp.isList() || _manifestRlp.itemCount() != 1)
39 RLP
const manifest = _manifestRlp[0];
41 u256 const version = manifest[0].toInt<
u256>();
45 h256 snapshotBlockHash(RLP
const& _manifestRlp)
47 RLP
const manifest = _manifestRlp[0];
48 return manifest[5].toHash<
h256>();
51 class WarpPeerObserver :
public WarpPeerObserverFace
54 WarpPeerObserver(WarpCapability& _host, BlockChain
const& _blockChain,
55 boost::filesystem::path
const& _snapshotPath)
57 m_hostProtocolVersion(_host.protocolVersion()),
58 m_hostNetworkId(_host.networkId()),
59 m_hostGenesisHash(_blockChain.genesisHash()),
60 m_daoForkBlock(_blockChain.sealEngine()->chainParams().daoHardforkBlock),
61 m_freePeers(c_freePeerBufferSize),
62 m_snapshotDir(_snapshotPath)
67 m_downloadFiber->join();
70 void onPeerStatus(
NodeID const& _peerID)
override 72 boost::fibers::fiber checkPeerFiber(&WarpPeerObserver::validatePeer,
this, _peerID);
73 checkPeerFiber.detach();
77 m_downloadFiber.reset(
78 new boost::fibers::fiber(&WarpPeerObserver::downloadChunks,
this));
80 boost::this_fiber::yield();
83 void onPeerManifest(
NodeID const& _peerID, RLP
const& _r)
override 85 m_manifests[_peerID].set_value(_r.data().toBytes());
86 boost::this_fiber::yield();
89 void onPeerBlockHeaders(
NodeID const& _peerID, RLP
const& _r)
override 91 m_daoForkHeaders[_peerID].set_value(_r.data().toBytes());
92 boost::this_fiber::yield();
95 void onPeerData(
NodeID const& _peerID, RLP
const& _r)
override 97 if (!_r.isList() || _r.itemCount() != 1)
100 RLP
const data = _r[0];
102 h256 const hash =
sha3(data.toBytesConstRef());
104 auto it = m_requestedChunks.find(_peerID);
105 if (it == m_requestedChunks.end())
108 h256 const askedHash = it->second;
109 m_requestedChunks.erase(it);
111 if (hash == askedHash)
114 writeFile((boost::filesystem::path(m_snapshotDir) /
toHex(hash)).
string(),
115 data.toBytesConstRef());
117 LOG(m_logger) <<
"Saved chunk " << hash <<
" Chunks left: " << m_neededChunks.size()
118 <<
" Requested chunks: " << m_requestedChunks.size();
119 if (m_neededChunks.empty() && m_requestedChunks.empty())
120 LOG(m_logger) <<
"Snapshot download complete!";
123 m_neededChunks.push_back(askedHash);
125 m_freePeers.push(_peerID);
126 boost::this_fiber::yield();
129 void onPeerDisconnect(
NodeID const& _peerID,
Asking _asking)
override 133 auto it = m_manifests.find(_peerID);
134 if (it != m_manifests.end())
135 it->second.set_exception(std::make_exception_ptr(FailedToDownloadManifest()));
139 auto it = m_daoForkHeaders.find(_peerID);
140 if (it != m_daoForkHeaders.end())
141 it->second.set_exception(
142 std::make_exception_ptr(FailedToDownloadDaoForkBlockHeader()));
146 auto it = m_requestedChunks.find(_peerID);
147 if (it != m_requestedChunks.end())
149 m_neededChunks.push_back(it->second);
150 m_requestedChunks.erase(it);
153 boost::this_fiber::yield();
157 void validatePeer(
NodeID _peerID)
159 if (!m_host.validateStatus(
160 _peerID, m_hostGenesisHash, {m_hostProtocolVersion}, m_hostNetworkId))
163 m_host.requestManifest(_peerID);
165 bytes const manifestBytes = waitForManifestResponse(_peerID);
166 if (manifestBytes.empty())
169 RLP manifestRlp(manifestBytes);
170 if (!validateManifest(manifestRlp))
173 m_host.disablePeer(_peerID,
"Invalid snapshot manifest.");
177 u256 const snapshotHash = snapshotBlockHash(manifestRlp);
178 if (m_syncingSnapshotHash)
180 if (snapshotHash == m_syncingSnapshotHash)
181 m_freePeers.push(_peerID);
183 m_host.disablePeer(_peerID,
"Another snapshot.");
189 m_host.requestBlockHeaders(_peerID, m_daoForkBlock, 1, 0,
false);
191 bytes const headerBytes = waitForDaoForkBlockResponse(_peerID);
192 if (headerBytes.empty())
195 RLP headerRlp(headerBytes);
196 if (!verifyDaoChallengeResponse(headerRlp))
198 m_host.disablePeer(_peerID,
"Peer from another fork.");
203 m_syncingSnapshotHash = snapshotHash;
204 m_manifest.set_value(manifestBytes);
205 m_freePeers.push(_peerID);
209 bytes waitForManifestResponse(
NodeID const& _peerID)
213 bytes const result = m_manifests[_peerID].get_future().get();
214 m_manifests.erase(_peerID);
217 catch (Exception
const&)
219 m_manifests.erase(_peerID);
224 bytes waitForDaoForkBlockResponse(
NodeID const& _peerID)
228 bytes const result = m_daoForkHeaders[_peerID].get_future().get();
229 m_daoForkHeaders.erase(_peerID);
232 catch (Exception
const&)
234 m_daoForkHeaders.erase(_peerID);
239 bool verifyDaoChallengeResponse(RLP
const& _r)
241 if (_r.itemCount() != 1)
245 return info.number() == m_daoForkBlock &&
246 info.extraData() ==
fromHex(
"0x64616f2d686172642d666f726b");
249 void downloadChunks()
251 bytes const manifestBytes = m_manifest.get_future().get();
253 RLP manifestRlp(manifestBytes);
254 RLP manifest(manifestRlp[0]);
256 u256 const version = manifest[0].toInt<
u256>();
257 h256s const stateHashes = manifest[1].toVector<
h256>();
258 h256s const blockHashes = manifest[2].toVector<
h256>();
259 h256 const stateRoot = manifest[3].toHash<
h256>();
260 u256 const blockNumber = manifest[4].toInt<
u256>();
261 h256 const blockHash = manifest[5].toHash<
h256>();
263 LOG(m_logger) <<
"MANIFEST: " 264 <<
"version " << version <<
" state root " << stateRoot <<
" block number " 265 << blockNumber <<
" block hash " << blockHash;
268 writeFile((boost::filesystem::path(m_snapshotDir) /
"MANIFEST").
string(), manifest.data());
270 m_neededChunks.assign(stateHashes.begin(), stateHashes.end());
271 m_neededChunks.insert(m_neededChunks.end(), blockHashes.begin(), blockHashes.end());
273 while (!m_neededChunks.empty())
275 h256 const chunkHash(m_neededChunks.front());
280 peerID = m_freePeers.value_pop();
281 }
while (!m_host.requestData(peerID, chunkHash));
283 LOG(m_logger) <<
"Requested chunk " << chunkHash;
285 m_requestedChunks[peerID] = chunkHash;
286 m_neededChunks.pop_front();
290 WarpCapability& m_host;
291 unsigned const m_hostProtocolVersion;
292 u256 const m_hostNetworkId;
293 h256 const m_hostGenesisHash;
294 unsigned const m_daoForkBlock;
295 boost::fibers::promise<bytes> m_manifest;
296 h256 m_syncingSnapshotHash;
297 std::deque<h256> m_neededChunks;
298 boost::fibers::buffered_channel<NodeID> m_freePeers;
299 boost::filesystem::path
const m_snapshotDir;
300 std::map<NodeID, boost::fibers::promise<bytes>> m_manifests;
301 std::map<NodeID, boost::fibers::promise<bytes>> m_daoForkHeaders;
302 std::map<NodeID, h256> m_requestedChunks;
304 std::unique_ptr<boost::fibers::fiber> m_downloadFiber;
314 boost::filesystem::path
const& _snapshotDownloadPath,
315 std::shared_ptr<SnapshotStorageFace> _snapshotStorage)
316 : m_host(
std::move(_host)),
317 m_blockChain(_blockChain),
318 m_networkId(_networkId),
319 m_snapshot(_snapshotStorage),
322 _snapshotDownloadPath.empty() ? nullptr : createPeerObserver(_snapshotDownloadPath))
328 m_backgroundWorkEnabled =
true;
329 m_host->scheduleExecution(c_backroundWorkPeriodMs, [
this]() { doBackgroundWork(); });
334 m_backgroundWorkEnabled =
false;
337 std::shared_ptr<WarpPeerObserverFace> WarpCapability::createPeerObserver(
338 boost::filesystem::path
const& _snapshotDownloadPath)
340 return std::make_shared<WarpPeerObserver>(*
this, m_blockChain, _snapshotDownloadPath);
343 void WarpCapability::doBackgroundWork()
345 for (
auto const& peer : m_peers)
347 time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
348 auto const& status = peer.second;
352 m_host->disconnect(peer.first, p2p::PingTimeout);
356 if (m_backgroundWorkEnabled)
357 m_host->scheduleExecution(c_backroundWorkPeriodMs, [
this]() { doBackgroundWork(); });
364 u256 snapshotBlockNumber;
365 h256 snapshotBlockHash;
368 bytes const snapshotManifest(m_snapshot->readManifest());
369 RLP manifest(snapshotManifest);
370 if (manifest.itemCount() != 6)
371 BOOST_THROW_EXCEPTION(InvalidSnapshotManifest());
378 m_blockChain.
genesisHash(), snapshotBlockHash, snapshotBlockNumber);
383 auto& peerStatus = m_peers[_peerID];
384 peerStatus.m_lastAsk = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
393 BOOST_THROW_EXCEPTION(InvalidWarpStatusPacket());
399 peerStatus.m_protocolVersion = _r[0].
toInt<
unsigned>();
400 peerStatus.m_networkId = _r[1].
toInt<
u256>();
401 peerStatus.m_totalDifficulty = _r[2].
toInt<
u256>();
402 peerStatus.m_latestHash = _r[3].
toHash<
h256>();
403 peerStatus.m_genesisHash = _r[4].
toHash<
h256>();
404 peerStatus.m_snapshotHash = _r[5].
toHash<
h256>();
405 peerStatus.m_snapshotNumber = _r[6].
toInt<
u256>();
407 cnetlog <<
"Status: " 408 <<
" protocol version " << peerStatus.m_protocolVersion <<
" networkId " 409 << peerStatus.m_networkId <<
" genesis hash " << peerStatus.m_genesisHash
410 <<
" total difficulty " << peerStatus.m_totalDifficulty <<
" latest hash " 411 << peerStatus.m_latestHash <<
" snapshot hash " << peerStatus.m_snapshotHash
412 <<
" snapshot number " << peerStatus.m_snapshotNumber;
414 m_peerObserver->onPeerStatus(_peerID);
424 .appendRaw(m_snapshot->readManifest());
425 m_host->sealAndSend(_peerID, s);
437 .append(m_snapshot->readCompressedChunk(chunkHash));
438 m_host->sealAndSend(_peerID, s);
446 m_host->sealAndSend(_peerID, s);
452 m_peerObserver->onPeerBlockHeaders(_peerID, _r);
458 m_peerObserver->onPeerManifest(_peerID, _r);
464 m_peerObserver->onPeerData(_peerID, _r);
473 cnetlog <<
"Warp Peer causing an Exception: " 474 << boost::current_exception_diagnostic_information() <<
" " << _r;
476 catch (std::exception
const& _e)
478 cnetlog <<
"Warp Peer causing an exception: " << _e.what() <<
" " << _r;
486 m_peerObserver->onPeerDisconnect(_peerID, m_peers[_peerID].m_asking);
487 m_peers.erase(_peerID);
492 u256 const& _hostNetworkId,
u256 const& _chainTotalDifficulty,
h256 const& _chainCurrentHash,
493 h256 const& _chainGenesisHash,
h256 const& _snapshotBlockHash,
u256 const& _snapshotBlockNumber)
497 << _hostProtocolVersion << _hostNetworkId << _chainTotalDifficulty << _chainCurrentHash
498 << _chainGenesisHash << _snapshotBlockHash << _snapshotBlockNumber;
499 m_host->sealAndSend(_peerID, s);
504 NodeID const& _peerID,
unsigned _startNumber,
unsigned _count,
unsigned _skip,
bool _reverse)
506 auto itPeerStatus = m_peers.find(_peerID);
507 if (itPeerStatus == m_peers.end())
514 << _startNumber << _count << _skip << (_reverse ? 1 : 0);
515 m_host->sealAndSend(_peerID, s);
520 auto itPeerStatus = m_peers.find(_peerID);
521 if (itPeerStatus == m_peers.end())
528 m_host->sealAndSend(_peerID, s);
533 auto itPeerStatus = m_peers.find(_peerID);
534 if (itPeerStatus == m_peers.end())
542 m_host->sealAndSend(_peerID, s);
546 void WarpCapability::setAsking(
NodeID const& _peerID,
Asking _a)
548 auto itPeerStatus = m_peers.find(_peerID);
549 if (itPeerStatus == m_peers.end())
552 auto& peerStatus = itPeerStatus->second;
554 peerStatus.m_asking = _a;
555 peerStatus.m_lastAsk = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
560 std::vector<unsigned>
const& _protocolVersions,
u256 const& _networkId)
562 auto itPeerStatus = m_peers.find(_peerID);
563 if (itPeerStatus == m_peers.end())
566 auto const& peerStatus = itPeerStatus->second;
568 if (peerStatus.m_genesisHash != _genesisHash)
573 if (find(_protocolVersions.begin(), _protocolVersions.end(), peerStatus.m_protocolVersion) ==
574 _protocolVersions.end())
579 if (peerStatus.m_networkId != _networkId)
581 disablePeer(_peerID,
"Invalid network identifier.");
586 disablePeer(_peerID,
"Peer banned for unexpected status message.");
595 m_host->disableCapability(_peerID,
name(), _problem);
_T toInt(int _flags=Strict) const
Converts to int of type given; if isData(), decodes as big-endian bytestream.
void disablePeer(NodeID const &_peerID, std::string const &_problem)
bool interpretCapabilityPacket(NodeID const &_peerID, unsigned _id, RLP const &) override
WarpCapability(std::shared_ptr< p2p::CapabilityHostFace > _host, BlockChain const &_blockChain, u256 const &_networkId, boost::filesystem::path const &_snapshotDownloadPath, std::shared_ptr< SnapshotStorageFace > _snapshotStorage)
Implements the blockchain database. All data this gives is disk-backed. .
BlockDetails details(h256 const &_hash) const
Get the familial details concerning a block (or the most recent mined if none given). Thread-safe.
void onStarting() override
void requestBlockHeaders(NodeID const &_peerID, unsigned _startNumber, unsigned _count, unsigned _skip, bool _reverse)
bool sha3(bytesConstRef _input, bytesRef o_output) noexcept
void onStopping() override
void requestManifest(NodeID const &_peerID)
bytes fromHex(std::string const &_s, WhenError _throw=WhenError::DontThrow)
Base class for all exceptions.
void onConnect(NodeID const &_peerID, u256 const &_peerCapabilityVersion) override
unsigned const c_WarpProtocolVersion
void onDisconnect(NodeID const &_peerID) override
Logger createLogger(int _severity, std::string const &_channel)
std::vector< byte > bytes
void requestStatus(NodeID const &_peerID, unsigned _hostProtocolVersion, u256 const &_hostNetworkId, u256 const &_chainTotalDifficulty, h256 const &_chainCurrentHash, h256 const &_chainGenesisHash, h256 const &_snapshotBlockHash, u256 const &_snapshotBlockNumber)
bool validateStatus(NodeID const &_peerID, h256 const &_genesisHash, std::vector< unsigned > const &_protocolVersions, u256 const &_networkId)
Validates whether peer is able to communicate with the host, disables peer if not.
h256 genesisHash() const
Get the hash of the genesis block. Thread-safe.
h256 currentHash() const
Get a given block (RLP format). Thread-safe.
std::string name() const override
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 256, 256, boost::multiprecision::unsigned_magnitude, boost::multiprecision::unchecked, void > > u256
std::string toHex(Iterator _it, Iterator _end, std::string const &_prefix)
std::vector< h256 > h256s
void writeFile(boost::filesystem::path const &_file, bytesConstRef _data, bool _writeDeleteRename)
Class for writing to an RLP bytestream.
_N toHash(int _flags=Strict) const
bool requestData(NodeID const &_peerID, h256 const &_chunkHash)
boost::log::sources::severity_channel_logger<> Logger