Ethereum  PoC-8
The C++ Implementation of Ethereum
EthereumCapability.cpp
Go to the documentation of this file.
1 /*
2  This file is part of cpp-ethereum.
3 
4  cpp-ethereum is free software: you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation, either version 3 of the License, or
7  (at your option) any later version.
8 
9  cpp-ethereum is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
16 */
17 
18 #include "EthereumCapability.h"
19 #include "BlockChain.h"
20 #include "BlockChainSync.h"
21 #include "BlockQueue.h"
22 #include "TransactionQueue.h"
23 #include <libdevcore/Common.h>
24 #include <libethcore/Exceptions.h>
25 #include <libp2p/Host.h>
26 #include <libp2p/Session.h>
27 #include <chrono>
28 #include <thread>
29 
30 using namespace std;
31 using namespace dev;
32 using namespace dev::eth;
33 
34 static unsigned const c_maxSendTransactions = 256;
35 static unsigned const c_maxHeadersToSend = 1024;
36 static unsigned const c_maxIncomingNewHashes = 1024;
37 static int const c_backroundWorkPeriodMs = 1000;
38 static int const c_minBlockBroadcastPeers = 4;
39 
40 char const* const EthereumCapability::s_stateNames[static_cast<int>(SyncState::Size)] = {
41  "NotSynced", "Idle", "Waiting", "Blocks", "State"};
42 
43 namespace
44 {
45 string toString(Asking _a)
46 {
47  switch (_a)
48  {
49  case Asking::BlockHeaders:
50  return "BlockHeaders";
51  case Asking::BlockBodies:
52  return "BlockBodies";
53  case Asking::NodeData:
54  return "NodeData";
55  case Asking::Receipts:
56  return "Receipts";
57  case Asking::Nothing:
58  return "Nothing";
59  case Asking::State:
60  return "State";
61  case Asking::WarpManifest:
62  return "WarpManifest";
63  case Asking::WarpData:
64  return "WarpData";
65  }
66  return "?";
67 }
68 
69 class EthereumPeerObserver: public EthereumPeerObserverFace
70 {
71 public:
72  EthereumPeerObserver(shared_ptr<BlockChainSync> _sync, TransactionQueue& _tq): m_sync(_sync), m_tq(_tq) {}
73 
74  void onPeerStatus(EthereumPeer const& _peer) override
75  {
76  try
77  {
78  m_sync->onPeerStatus(_peer);
79  }
80  catch (FailedInvariant const&)
81  {
82  // "fix" for https://github.com/ethereum/webthree-umbrella/issues/300
83  cwarn << "Failed invariant during sync, restarting sync";
84  m_sync->restartSync();
85  }
86  }
87 
88  void onPeerTransactions(NodeID const& _peerID, RLP const& _r) override
89  {
90  unsigned itemCount = _r.itemCount();
91  LOG(m_logger) << "Transactions (" << dec << itemCount << " entries)";
92  m_tq.enqueue(_r, _peerID);
93  }
94 
95  void onPeerAborting() override
96  {
97  try
98  {
99  m_sync->onPeerAborting();
100  }
101  catch (Exception&)
102  {
103  cwarn << "Exception on peer destruciton: " << boost::current_exception_diagnostic_information();
104  }
105  }
106 
107  void onPeerBlockHeaders(NodeID const& _peerID, RLP const& _headers) override
108  {
109  try
110  {
111  m_sync->onPeerBlockHeaders(_peerID, _headers);
112  }
113  catch (FailedInvariant const&)
114  {
115  // "fix" for https://github.com/ethereum/webthree-umbrella/issues/300
116  cwarn << "Failed invariant during sync, restarting sync";
117  m_sync->restartSync();
118  }
119  }
120 
121  void onPeerBlockBodies(NodeID const& _peerID, RLP const& _r) override
122  {
123  try
124  {
125  m_sync->onPeerBlockBodies(_peerID, _r);
126  }
127  catch (FailedInvariant const&)
128  {
129  // "fix" for https://github.com/ethereum/webthree-umbrella/issues/300
130  cwarn << "Failed invariant during sync, restarting sync";
131  m_sync->restartSync();
132  }
133  }
134 
135  void onPeerNewHashes(
136  NodeID const& _peerID, std::vector<std::pair<h256, u256>> const& _hashes) override
137  {
138  try
139  {
140  m_sync->onPeerNewHashes(_peerID, _hashes);
141  }
142  catch (FailedInvariant const&)
143  {
144  // "fix" for https://github.com/ethereum/webthree-umbrella/issues/300
145  cwarn << "Failed invariant during sync, restarting sync";
146  m_sync->restartSync();
147  }
148  }
149 
150  void onPeerNewBlock(NodeID const& _peerID, RLP const& _r) override
151  {
152  try
153  {
154  m_sync->onPeerNewBlock(_peerID, _r);
155  }
156  catch (FailedInvariant const&)
157  {
158  // "fix" for https://github.com/ethereum/webthree-umbrella/issues/300
159  cwarn << "Failed invariant during sync, restarting sync";
160  m_sync->restartSync();
161  }
162  }
163 
164  void onPeerNodeData(NodeID const& /* _peerID */, RLP const& _r) override
165  {
166  unsigned itemCount = _r.itemCount();
167  LOG(m_logger) << "Node Data (" << dec << itemCount << " entries)";
168  }
169 
170  void onPeerReceipts(NodeID const& /* _peerID */, RLP const& _r) override
171  {
172  unsigned itemCount = _r.itemCount();
173  LOG(m_logger) << "Receipts (" << dec << itemCount << " entries)";
174  }
175 
176 private:
177  shared_ptr<BlockChainSync> m_sync;
178  TransactionQueue& m_tq;
179 
180  Logger m_logger{createLogger(VerbosityDebug, "host")};
181 };
182 
183 class EthereumHostData: public EthereumHostDataFace
184 {
185 public:
186  EthereumHostData(BlockChain const& _chain, OverlayDB const& _db): m_chain(_chain), m_db(_db) {}
187 
188  pair<bytes, unsigned> blockHeaders(RLP const& _blockId, unsigned _maxHeaders, u256 _skip, bool _reverse) const override
189  {
190  auto numHeadersToSend = _maxHeaders;
191 
192  auto step = static_cast<unsigned>(_skip) + 1;
193  assert(step > 0 && "step must not be 0");
194 
195  h256 blockHash;
196  if (_blockId.size() == 32) // block id is a hash
197  {
198  blockHash = _blockId.toHash<h256>();
199  cnetlog << "GetBlockHeaders (block (hash): " << blockHash
200  << ", maxHeaders: " << _maxHeaders << ", skip: " << _skip
201  << ", reverse: " << _reverse << ")";
202 
203  if (!m_chain.isKnown(blockHash))
204  blockHash = {};
205  else if (!_reverse)
206  {
207  auto n = m_chain.number(blockHash);
208  if (numHeadersToSend == 0)
209  blockHash = {};
210  else if (n != 0 || blockHash == m_chain.genesisHash())
211  {
212  auto top = n + uint64_t(step) * numHeadersToSend - 1;
213  auto lastBlock = m_chain.number();
214  if (top > lastBlock)
215  {
216  numHeadersToSend = (lastBlock - n) / step + 1;
217  top = n + step * (numHeadersToSend - 1);
218  }
219  assert(top <= lastBlock && "invalid top block calculated");
220  blockHash = m_chain.numberHash(static_cast<unsigned>(top)); // override start block hash with the hash of the top block we have
221  }
222  else
223  blockHash = {};
224  }
225  }
226  else // block id is a number
227  {
228  auto n = _blockId.toInt<bigint>();
229  cnetlog << "GetBlockHeaders (" << n << " max: " << _maxHeaders << " skip: " << _skip
230  << (_reverse ? " reverse" : "") << ")";
231 
232  if (!_reverse)
233  {
234  auto lastBlock = m_chain.number();
235  if (n > lastBlock || numHeadersToSend == 0)
236  blockHash = {};
237  else
238  {
239  bigint top = n + uint64_t(step) * (numHeadersToSend - 1);
240  if (top > lastBlock)
241  {
242  numHeadersToSend = (lastBlock - static_cast<unsigned>(n)) / step + 1;
243  top = n + step * (numHeadersToSend - 1);
244  }
245  assert(top <= lastBlock && "invalid top block calculated");
246  blockHash = m_chain.numberHash(static_cast<unsigned>(top)); // override start block hash with the hash of the top block we have
247  }
248  }
249  else if (n <= std::numeric_limits<unsigned>::max())
250  blockHash = m_chain.numberHash(static_cast<unsigned>(n));
251  else
252  blockHash = {};
253  }
254 
255  auto nextHash = [this](h256 _h, unsigned _step)
256  {
257  static const unsigned c_blockNumberUsageLimit = 1000;
258 
259  const auto lastBlock = m_chain.number();
260  const auto limitBlock = lastBlock > c_blockNumberUsageLimit ? lastBlock - c_blockNumberUsageLimit : 0; // find the number of the block below which we don't expect BC changes.
261 
262  while (_step) // parent hash traversal
263  {
264  auto details = m_chain.details(_h);
265  if (details.number < limitBlock)
266  break; // stop using parent hash traversal, fallback to using block numbers
267  _h = details.parent;
268  --_step;
269  }
270 
271  if (_step) // still need lower block
272  {
273  auto n = m_chain.number(_h);
274  if (n >= _step)
275  _h = m_chain.numberHash(n - _step);
276  else
277  _h = {};
278  }
279 
280 
281  return _h;
282  };
283 
284  bytes rlp;
285  unsigned itemCount = 0;
286  vector<h256> hashes;
287  for (unsigned i = 0; i != numHeadersToSend; ++i)
288  {
289  if (!blockHash || !m_chain.isKnown(blockHash))
290  break;
291 
292  hashes.push_back(blockHash);
293  ++itemCount;
294 
295  blockHash = nextHash(blockHash, step);
296  }
297 
298  for (unsigned i = 0; i < hashes.size() && rlp.size() < c_maxPayload; ++i)
299  rlp += m_chain.headerData(hashes[_reverse ? i : hashes.size() - 1 - i]);
300 
301  return make_pair(rlp, itemCount);
302  }
303 
304  pair<bytes, unsigned> blockBodies(RLP const& _blockHashes) const override
305  {
306  unsigned const count = static_cast<unsigned>(_blockHashes.itemCount());
307 
308  bytes rlp;
309  unsigned n = 0;
310  auto numBodiesToSend = std::min(count, c_maxBlocks);
311  for (unsigned i = 0; i < numBodiesToSend && rlp.size() < c_maxPayload; ++i)
312  {
313  auto h = _blockHashes[i].toHash<h256>();
314  if (m_chain.isKnown(h))
315  {
316  bytes blockBytes = m_chain.block(h);
317  RLP block{blockBytes};
318  RLPStream body;
319  body.appendList(2);
320  body.appendRaw(block[1].data()); // transactions
321  body.appendRaw(block[2].data()); // uncles
322  auto bodyBytes = body.out();
323  rlp.insert(rlp.end(), bodyBytes.begin(), bodyBytes.end());
324  ++n;
325  }
326  }
327  if (count > 20 && n == 0)
328  cnetlog << "all " << count << " unknown blocks requested; peer on different chain?";
329  else
330  cnetlog << n << " blocks known and returned; " << (numBodiesToSend - n)
331  << " blocks unknown; " << (count > c_maxBlocks ? count - c_maxBlocks : 0)
332  << " blocks ignored";
333 
334  return make_pair(rlp, n);
335  }
336 
337  strings nodeData(RLP const& _dataHashes) const override
338  {
339  unsigned const count = static_cast<unsigned>(_dataHashes.itemCount());
340 
341  strings data;
342  size_t payloadSize = 0;
343  auto numItemsToSend = std::min(count, c_maxNodes);
344  for (unsigned i = 0; i < numItemsToSend && payloadSize < c_maxPayload; ++i)
345  {
346  auto h = _dataHashes[i].toHash<h256>();
347  auto node = m_db.lookup(h);
348  if (!node.empty())
349  {
350  payloadSize += node.length();
351  data.push_back(move(node));
352  }
353  }
354  cnetlog << data.size() << " nodes known and returned; " << (numItemsToSend - data.size())
355  << " unknown; " << (count > c_maxNodes ? count - c_maxNodes : 0) << " ignored";
356 
357  return data;
358  }
359 
360  pair<bytes, unsigned> receipts(RLP const& _blockHashes) const override
361  {
362  unsigned const count = static_cast<unsigned>(_blockHashes.itemCount());
363 
364  bytes rlp;
365  unsigned n = 0;
366  auto numItemsToSend = std::min(count, c_maxReceipts);
367  for (unsigned i = 0; i < numItemsToSend && rlp.size() < c_maxPayload; ++i)
368  {
369  auto h = _blockHashes[i].toHash<h256>();
370  if (m_chain.isKnown(h))
371  {
372  auto const receipts = m_chain.receipts(h);
373  auto receiptsRlpList = receipts.rlp();
374  rlp.insert(rlp.end(), receiptsRlpList.begin(), receiptsRlpList.end());
375  ++n;
376  }
377  }
378  cnetlog << n << " receipt lists known and returned; " << (numItemsToSend - n)
379  << " unknown; " << (count > c_maxReceipts ? count - c_maxReceipts : 0)
380  << " ignored";
381 
382  return make_pair(rlp, n);
383  }
384 
385 private:
386  BlockChain const& m_chain;
387  OverlayDB const& m_db;
388 };
389 
390 }
391 
392 EthereumCapability::EthereumCapability(shared_ptr<p2p::CapabilityHostFace> _host,
393  BlockChain const& _ch, OverlayDB const& _db, TransactionQueue& _tq, BlockQueue& _bq,
394  u256 _networkId)
395  : m_host(move(_host)),
396  m_chain(_ch),
397  m_db(_db),
398  m_tq(_tq),
399  m_bq(_bq),
400  m_networkId(_networkId),
401  m_hostData(new EthereumHostData(m_chain, m_db))
402 {
403  // TODO: Composition would be better. Left like that to avoid initialization
404  // issues as BlockChainSync accesses other EthereumHost members.
405  m_sync.reset(new BlockChainSync(*this));
406  m_peerObserver.reset(new EthereumPeerObserver(m_sync, m_tq));
407  m_latestBlockSent = _ch.currentHash();
408  m_tq.onImport([this](ImportResult _ir, h256 const& _h, h512 const& _nodeId) { onTransactionImported(_ir, _h, _nodeId); });
409  std::random_device seed;
410  m_urng = std::mt19937_64(seed());
411 }
412 
414 {
415  m_backgroundWorkEnabled = true;
416  m_host->scheduleExecution(c_backroundWorkPeriodMs, [this]() { doBackgroundWork(); });
417 }
418 
420 {
421  m_backgroundWorkEnabled = false;
422 }
423 
424 bool EthereumCapability::ensureInitialised()
425 {
426  if (!m_latestBlockSent)
427  {
428  // First time - just initialise.
429  m_latestBlockSent = m_chain.currentHash();
430  LOG(m_logger) << "Initialising: latest=" << m_latestBlockSent;
431 
432  m_transactionsSent = m_tq.knownTransactions();
433  return true;
434  }
435  return false;
436 }
437 
439 {
440  m_sync->abortSync();
441 
442  // reset() can be called from RPC handling thread,
443  // but we access m_latestBlockSent and m_transactionsSent only from the network thread
444  m_host->scheduleExecution(0, [this]() {
445  m_latestBlockSent = h256();
446  m_transactionsSent.clear();
447  });
448 }
449 
451 {
452  m_sync->completeSync();
453 }
454 
455 void EthereumCapability::doBackgroundWork()
456 {
457  ensureInitialised();
458  auto h = m_chain.currentHash();
459  // If we've finished our initial sync (including getting all the blocks into the chain so as to reduce invalid transactions), start trading transactions & blocks
460  if (!isSyncing() && m_chain.isKnown(m_latestBlockSent))
461  {
462  if (m_newTransactions)
463  {
464  m_newTransactions = false;
465  maintainTransactions();
466  }
467  if (m_newBlocks)
468  {
469  m_newBlocks = false;
470  maintainBlocks(h);
471  }
472  }
473 
474  time_t now = std::chrono::system_clock::to_time_t(chrono::system_clock::now());
475  if (now - m_lastTick >= 1)
476  {
477  m_lastTick = now;
478  for (auto const& peer : m_peers)
479  {
480  time_t now = std::chrono::system_clock::to_time_t(chrono::system_clock::now());
481 
482  if (now - peer.second.lastAsk() > 10 && peer.second.isConversing())
483  // timeout
484  m_host->disconnect(peer.first, p2p::PingTimeout);
485  }
486  }
487 
488  if (m_backgroundWorkEnabled)
489  m_host->scheduleExecution(c_backroundWorkPeriodMs, [this]() { doBackgroundWork(); });
490 }
491 
492 void EthereumCapability::maintainTransactions()
493 {
494  // Send any new transactions.
495  unordered_map<NodeID, std::vector<size_t>> peerTransactions;
496  auto ts = m_tq.topTransactions(c_maxSendTransactions);
497  {
498  for (size_t i = 0; i < ts.size(); ++i)
499  {
500  auto const& t = ts[i];
501  bool unsent = !m_transactionsSent.count(t.sha3());
502  auto const peers = selectPeers([&](EthereumPeer const& _peer) {
503  return _peer.isWaitingForTransactions() ||
504  (unsent && !_peer.isTransactionKnown(t.sha3()));
505  });
506  for (auto const& p: peers)
507  peerTransactions[p].push_back(i);
508  }
509  for (auto const& t: ts)
510  m_transactionsSent.insert(t.sha3());
511  }
512 
513  for (auto& peer : m_peers)
514  {
515  bytes b;
516  unsigned n = 0;
517  for (auto const& i : peerTransactions[peer.first])
518  {
519  peer.second.markTransactionAsKnown(ts[i].sha3());
520  b += ts[i].rlp();
521  ++n;
522  }
523 
524  if (n || peer.second.isWaitingForTransactions())
525  {
526  RLPStream ts;
527  m_host->prep(peer.first, name(), ts, TransactionsPacket, n).appendRaw(b, n);
528  m_host->sealAndSend(peer.first, ts);
529  LOG(m_logger) << "Sent " << n << " transactions to " << peer.first;
530  }
531  peer.second.setWaitingForTransactions(false);
532  }
533 }
534 
535 vector<NodeID> EthereumCapability::selectPeers(
536  std::function<bool(EthereumPeer const&)> const& _predicate) const
537 {
538  vector<NodeID> allowed;
539  for (auto const& peer : m_peers)
540  {
541  if (_predicate(peer.second))
542  allowed.push_back(peer.first);
543  }
544  return allowed;
545 }
546 
547 std::pair<std::vector<NodeID>, std::vector<NodeID>> EthereumCapability::randomPartitionPeers(
548  std::vector<NodeID> const& _peers, std::size_t _number) const
549 {
550  vector<NodeID> part1(_peers);
551  vector<NodeID> part2;
552 
553  if (_number >= _peers.size())
554  return std::make_pair(part1, part2);
555 
556  std::shuffle(part1.begin(), part1.end(), m_urng);
557 
558  // Remove elements from the end of the shuffled part1 vector and move them to part2.
559  std::move(part1.begin() + _number, part1.end(), std::back_inserter(part2));
560  part1.erase(part1.begin() + _number, part1.end());
561  return std::make_pair(move(part1), move(part2));
562 }
563 
564 void EthereumCapability::maintainBlocks(h256 const& _currentHash)
565 {
566  // Send any new blocks.
567  auto detailsFrom = m_chain.details(m_latestBlockSent);
568  auto detailsTo = m_chain.details(_currentHash);
569  if (detailsFrom.totalDifficulty < detailsTo.totalDifficulty)
570  {
571  if (diff(detailsFrom.number, detailsTo.number) < 20)
572  {
573  // don't be sending more than 20 "new" blocks. if there are any more we were probably waaaay behind.
574  LOG(m_logger) << "Sending new blocks (current is " << _currentHash << ", was "
575  << m_latestBlockSent << ")";
576 
577  h256s blocks = get<0>(m_chain.treeRoute(m_latestBlockSent, _currentHash, false, false, true));
578 
579 
580  auto const peersWithoutBlock = selectPeers(
581  [&](EthereumPeer const& _peer) { return !_peer.isBlockKnown(_currentHash); });
582 
583  auto const peersToSendNumber =
584  std::max<std::size_t>(c_minBlockBroadcastPeers, std::sqrt(m_peers.size()));
585 
586  std::vector<NodeID> peersToSend;
587  std::vector<NodeID> peersToAnnounce;
588  std::tie(peersToSend, peersToAnnounce) =
589  randomPartitionPeers(peersWithoutBlock, peersToSendNumber);
590 
591  for (NodeID const& peerID : peersToSend)
592  for (auto const& b: blocks)
593  {
594  RLPStream ts;
595  m_host->prep(peerID, name(), ts, NewBlockPacket, 2)
596  .appendRaw(m_chain.block(b), 1)
597  .append(m_chain.details(b).totalDifficulty);
598 
599  auto itPeer = m_peers.find(peerID);
600  if (itPeer != m_peers.end())
601  {
602  m_host->sealAndSend(peerID, ts);
603  itPeer->second.clearKnownBlocks();
604  }
605  }
606  if (!peersToSend.empty())
607  LOG(m_logger) << "Sent " << blocks.size() << " block(s) to " << peersToSend.size()
608  << " peers";
609 
610  for (NodeID const& peerID : peersToAnnounce)
611  {
612  RLPStream ts;
613  m_host->prep(peerID, name(), ts, NewBlockHashesPacket, blocks.size());
614  for (auto const& b: blocks)
615  {
616  ts.appendList(2);
617  ts.append(b);
618  ts.append(m_chain.number(b));
619  }
620 
621  auto itPeer = m_peers.find(peerID);
622  if (itPeer != m_peers.end())
623  {
624  m_host->sealAndSend(peerID, ts);
625  itPeer->second.clearKnownBlocks();
626  }
627  }
628  if (!peersToAnnounce.empty())
629  LOG(m_logger) << "Announced " << blocks.size() << " block(s) to "
630  << peersToAnnounce.size() << " peers";
631  }
632  m_latestBlockSent = _currentHash;
633  }
634 }
635 
637 {
638  return m_sync->isSyncing();
639 }
640 
642 {
643  return m_sync->status();
644 }
645 
646 void EthereumCapability::onTransactionImported(
647  ImportResult _ir, h256 const& _h, h512 const& _nodeId)
648 {
649  m_host->scheduleExecution(0, [this, _ir, _h, _nodeId]() {
650  auto itPeerStatus = m_peers.find(_nodeId);
651  if (itPeerStatus == m_peers.end())
652  return;
653 
654  auto& peer = itPeerStatus->second;
655 
657  switch (_ir)
658  {
660  m_host->updateRating(_nodeId, -100);
661  break;
663  // if we already had the transaction, then don't bother sending it on.
664  m_transactionsSent.insert(_h);
665  m_host->updateRating(_nodeId, 0);
666  break;
668  m_host->updateRating(_nodeId, 100);
669  break;
670  default:;
671  }
672  });
673 }
674 
675 void EthereumCapability::onConnect(NodeID const& _peerID, u256 const& _peerCapabilityVersion)
676 {
677  m_host->addNote(_peerID, "manners", m_host->isRude(_peerID, name()) ? "RUDE" : "nice");
678 
679  EthereumPeer peer{m_host, _peerID, _peerCapabilityVersion};
680  m_peers.emplace(_peerID, peer);
681  peer.requestStatus(m_networkId, m_chain.details().totalDifficulty, m_chain.currentHash(),
682  m_chain.genesisHash());
683 }
684 
686 {
687  // TODO lower peer's rating or mark as rude if it disconnects when being asked for something
688  m_peerObserver->onPeerAborting();
689 
690  m_peers.erase(_peerID);
691 }
692 
694  NodeID const& _peerID, unsigned _id, RLP const& _r)
695 {
696  auto& peer = m_peers[_peerID];
697  peer.setLastAsk(std::chrono::system_clock::to_time_t(chrono::system_clock::now()));
698 
699  try
700  {
701  switch (_id)
702  {
703  case StatusPacket:
704  {
705  auto const peerProtocolVersion = _r[0].toInt<unsigned>();
706  auto const networkId = _r[1].toInt<u256>();
707  auto const totalDifficulty = _r[2].toInt<u256>();
708  auto const latestHash = _r[3].toHash<h256>();
709  auto const genesisHash = _r[4].toHash<h256>();
710 
711  LOG(m_logger) << "Status: " << peerProtocolVersion << " / " << networkId << " / "
712  << genesisHash << ", TD: " << totalDifficulty << " = " << latestHash;
713 
714  peer.setStatus(
715  peerProtocolVersion, networkId, totalDifficulty, latestHash, genesisHash);
716  setIdle(_peerID);
717  m_peerObserver->onPeerStatus(peer);
718  break;
719  }
720  case TransactionsPacket:
721  {
722  m_peerObserver->onPeerTransactions(_peerID, _r);
723  break;
724  }
726  {
729  const auto blockId = _r[0];
730  const auto maxHeaders = _r[1].toInt<u256>();
731  const auto skip = _r[2].toInt<u256>();
732  const auto reverse = _r[3].toInt<bool>();
733 
734  auto numHeadersToSend = maxHeaders <= c_maxHeadersToSend ?
735  static_cast<unsigned>(maxHeaders) :
736  c_maxHeadersToSend;
737 
738  if (skip > std::numeric_limits<unsigned>::max() - 1)
739  {
740  cnetdetails << "Requested block skip is too big: " << skip;
741  break;
742  }
743 
744  pair<bytes, unsigned> const rlpAndItemCount =
745  m_hostData->blockHeaders(blockId, numHeadersToSend, skip, reverse);
746 
747  RLPStream s;
748  m_host->prep(_peerID, name(), s, BlockHeadersPacket, rlpAndItemCount.second)
749  .appendRaw(rlpAndItemCount.first, rlpAndItemCount.second);
750  m_host->sealAndSend(_peerID, s);
751  m_host->updateRating(_peerID, 0);
752  break;
753  }
754  case BlockHeadersPacket:
755  {
757  LOG(m_loggerImpolite)
758  << "Peer giving us block headers when we didn't ask for them.";
759  else
760  {
761  setIdle(_peerID);
762  m_peerObserver->onPeerBlockHeaders(_peerID, _r);
763  }
764  break;
765  }
767  {
768  unsigned count = static_cast<unsigned>(_r.itemCount());
769  cnetlog << "GetBlockBodies (" << dec << count << " entries)";
770 
771  if (!count)
772  {
773  LOG(m_loggerImpolite) << "Zero-entry GetBlockBodies: Not replying.";
774  m_host->updateRating(_peerID, -10);
775  break;
776  }
777 
778  pair<bytes, unsigned> const rlpAndItemCount = m_hostData->blockBodies(_r);
779 
780  m_host->updateRating(_peerID, 0);
781  RLPStream s;
782  m_host->prep(_peerID, name(), s, BlockBodiesPacket, rlpAndItemCount.second)
783  .appendRaw(rlpAndItemCount.first, rlpAndItemCount.second);
784  m_host->sealAndSend(_peerID, s);
785  break;
786  }
787  case BlockBodiesPacket:
788  {
790  LOG(m_loggerImpolite) << "Peer giving us block bodies when we didn't ask for them.";
791  else
792  {
793  setIdle(_peerID);
794  m_peerObserver->onPeerBlockBodies(_peerID, _r);
795  }
796  break;
797  }
798  case NewBlockPacket:
799  {
800  m_peerObserver->onPeerNewBlock(_peerID, _r);
801  break;
802  }
804  {
805  unsigned itemCount = _r.itemCount();
806 
807  cnetlog << "BlockHashes (" << dec << itemCount << " entries) "
808  << (itemCount ? "" : " : NoMoreHashes");
809 
810  if (itemCount > c_maxIncomingNewHashes)
811  {
812  disablePeer(_peerID, "Too many new hashes");
813  break;
814  }
815 
816  vector<pair<h256, u256>> hashes(itemCount);
817  for (unsigned i = 0; i < itemCount; ++i)
818  hashes[i] = std::make_pair(_r[i][0].toHash<h256>(), _r[i][1].toInt<u256>());
819 
820  m_peerObserver->onPeerNewHashes(_peerID, hashes);
821  break;
822  }
823  case GetNodeDataPacket:
824  {
825  unsigned count = static_cast<unsigned>(_r.itemCount());
826  if (!count)
827  {
828  LOG(m_loggerImpolite) << "Zero-entry GetNodeData: Not replying.";
829  m_host->updateRating(_peerID, -10);
830  break;
831  }
832  cnetlog << "GetNodeData (" << dec << count << " entries)";
833 
834  strings const data = m_hostData->nodeData(_r);
835 
836  m_host->updateRating(_peerID, 0);
837  RLPStream s;
838  m_host->prep(_peerID, name(), s, NodeDataPacket, data.size());
839  for (auto const& element : data)
840  s.append(element);
841  m_host->sealAndSend(_peerID, s);
842  break;
843  }
844  case GetReceiptsPacket:
845  {
846  unsigned count = static_cast<unsigned>(_r.itemCount());
847  if (!count)
848  {
849  LOG(m_loggerImpolite) << "Zero-entry GetReceipts: Not replying.";
850  m_host->updateRating(_peerID, -10);
851  break;
852  }
853  cnetlog << "GetReceipts (" << dec << count << " entries)";
854 
855  pair<bytes, unsigned> const rlpAndItemCount = m_hostData->receipts(_r);
856 
857  m_host->updateRating(_peerID, 0);
858  RLPStream s;
859  m_host->prep(_peerID, name(), s, ReceiptsPacket, rlpAndItemCount.second)
860  .appendRaw(rlpAndItemCount.first, rlpAndItemCount.second);
861  m_host->sealAndSend(_peerID, s);
862  break;
863  }
864  case NodeDataPacket:
865  {
866  if (peer.asking() != Asking::NodeData)
867  LOG(m_loggerImpolite) << "Peer giving us node data when we didn't ask for them.";
868  else
869  {
870  setIdle(_peerID);
871  m_peerObserver->onPeerNodeData(_peerID, _r);
872  }
873  break;
874  }
875  case ReceiptsPacket:
876  {
877  if (peer.asking() != Asking::Receipts)
878  LOG(m_loggerImpolite) << "Peer giving us receipts when we didn't ask for them.";
879  else
880  {
881  setIdle(_peerID);
882  m_peerObserver->onPeerReceipts(_peerID, _r);
883  }
884  break;
885  }
886  default:
887  return false;
888  }
889  }
890  catch (Exception const&)
891  {
892  cnetlog << "Peer causing an Exception: "
893  << boost::current_exception_diagnostic_information() << " " << _r;
894  }
895  catch (std::exception const& _e)
896  {
897  cnetlog << "Peer causing an exception: " << _e.what() << " " << _r;
898  }
899 
900  return true;
901 }
902 
903 void EthereumCapability::setIdle(NodeID const& _peerID)
904 {
905  setAsking(_peerID, Asking::Nothing);
906 }
907 
908 void EthereumCapability::setAsking(NodeID const& _peerID, Asking _a)
909 {
910  auto itPeerStatus = m_peers.find(_peerID);
911  if (itPeerStatus == m_peers.end())
912  return;
913 
914  auto& peerStatus = itPeerStatus->second;
915 
916  peerStatus.setAsking(_a);
917  peerStatus.setLastAsk(std::chrono::system_clock::to_time_t(chrono::system_clock::now()));
918 
919  m_host->addNote(_peerID, "ask", ::toString(_a));
920  m_host->addNote(_peerID, "sync",
921  string(isCriticalSyncing(_peerID) ? "ONGOING" : "holding") +
922  (needsSyncing(_peerID) ? " & needed" : ""));
923 }
924 
925 bool EthereumCapability::isCriticalSyncing(NodeID const& _peerID) const
926 {
927  auto itPeerStatus = m_peers.find(_peerID);
928  if (itPeerStatus == m_peers.end())
929  return false;
930 
931  auto const& peerStatus = itPeerStatus->second;
932 
933  auto const asking = peerStatus.asking();
934  return asking == Asking::BlockHeaders || asking == Asking::State;
935 }
936 
937 bool EthereumCapability::needsSyncing(NodeID const& _peerID) const
938 {
939  if (m_host->isRude(_peerID, name()))
940  return false;
941 
942  auto peerStatus = m_peers.find(_peerID);
943  return (peerStatus != m_peers.end() && peerStatus->second.latestHash());
944 }
945 
946 void EthereumCapability::disablePeer(NodeID const& _peerID, std::string const& _problem)
947 {
948  m_host->disableCapability(_peerID, name(), _problem);
949 }
950 
951 EthereumPeer const& EthereumCapability::peer(NodeID const& _peerID) const
952 {
953  return const_cast<EthereumCapability*>(this)->peer(_peerID);
954 }
955 
957 {
958  auto peer = m_peers.find(_peerID);
959  if (peer == m_peers.end())
960  BOOST_THROW_EXCEPTION(PeerDisconnected() << errinfo_nodeID(_peerID));
961 
962  return peer->second;
963 }
void requestStatus(u256 _hostNetworkId, u256 _chainTotalDifficulty, h256 _chainCurrentHash, h256 _chainGenesPeersh)
_T toInt(int _flags=Strict) const
Converts to int of type given; if isData(), decodes as big-endian bytestream.
Definition: RLP.h:257
Definition: Address.cpp:20
void setWaitingForTransactions(bool _value)
Definition: EthereumPeer.h:61
A queue of Transactions, each stored as RLP. Maintains a transaction queue sorted by nonce diff and g...
Handler< ImportResult, h256 const &, h512 const & > onImport(T const &_t)
Register a handler that will be called once asynchronous verification is comeplte an transaction has ...
void onDisconnect(NodeID const &_nodeID) override
bool isWaitingForTransactions() const
Definition: EthereumPeer.h:60
Implements the blockchain database. All data this gives is disk-backed. .
Definition: BlockChain.h:104
bytes rlp(_T _t)
Export a single item in RLP format, returning a byte array.
Definition: RLP.h:453
BlockDetails details(h256 const &_hash) const
Get the familial details concerning a block (or the most recent mined if none given). Thread-safe.
Definition: BlockChain.h:157
void setLastAsk(time_t _lastAsk)
Definition: EthereumPeer.h:51
RLPStream & append(unsigned _s)
Append given datum to the byte stream.
Definition: RLP.h:381
void completeSync()
Don&#39;t sync further - used only in test mode.
bool interpretCapabilityPacket(NodeID const &_peerID, unsigned _id, RLP const &_r) override
Asking asking() const
Definition: EthereumPeer.h:53
void setStatus(unsigned _protocolVersion, u256 const &_networkId, u256 const &_totalDifficulty, h256 const &_latestHash, h256 const &_genesisHash)
void markTransactionAsKnown(h256 const &_hash)
Definition: EthereumPeer.h:64
The EthereumCapability class.
std::vector< std::string > strings
Definition: Common.h:143
Definition: FixedHash.h:390
unsigned number(h256 const &_hash) const
Get a number for the given hash (or the most recent mined if none given). Thread-safe.
Definition: BlockChain.h:224
EthereumPeer const & peer(NodeID const &_peerID) const
bool isBlockKnown(h256 const &_hash) const
Definition: EthereumPeer.h:66
std::string toString(std::chrono::time_point< T > const &_e, std::string const &_format="%F %T")
Definition: CommonIO.h:86
void disablePeer(NodeID const &_peerID, std::string const &_problem)
bool isConversing() const
Definition: EthereumPeer.h:54
bool sha3(bytesConstRef _input, bytesRef o_output) noexcept
Definition: SHA3.cpp:28
ImportResult
Definition: Common.h:115
boost::multiprecision::number< boost::multiprecision::cpp_int_backend<> > bigint
Definition: Common.h:118
bool isKnown(h256 const &_hash, bool _isCurrent=true) const
Returns true if the given block is known (though not necessarily a part of the canon chain)...
Base class for all exceptions.
Definition: Exceptions.h:38
p2p::NodeID NodeID
Definition: CommonNet.h:105
std::tuple< h256s, h256, unsigned > treeRoute(h256 const &_from, h256 const &_to, bool _common=true, bool _pre=true, bool _post=true) const
Logger createLogger(int _severity, std::string const &_channel)
Definition: Log.h:125
std::vector< byte > bytes
Definition: Common.h:72
time_t lastAsk() const
Definition: EthereumPeer.h:50
RLPStream & appendList(size_t _items)
Appends a list.
Definition: RLP.cpp:268
void onConnect(NodeID const &_nodeID, u256 const &_peerCapabilityVersion) override
FixedHash< 32 > h256
Definition: FixedHash.h:354
#define cwarn
A queue of blocks. Sits between network or other I/O and the BlockChain. Sorts them ready for blockch...
Definition: BlockQueue.h:223
h256 genesisHash() const
Get the hash of the genesis block. Thread-safe.
Definition: BlockChain.h:231
h256Hash knownTransactions() const
boost::error_info< struct tag_nodeID, h512 > errinfo_nodeID
Definition: Exceptions.h:96
h256 currentHash() const
Get a given block (RLP format). Thread-safe.
Definition: BlockChain.h:228
bytes const & out() const
Read the byte stream.
Definition: RLP.h:419
Base BlockChain synchronization strategy class. Syncs to peers and keeps up to date. Base class handles blocks downloading but does not contain any details on state transfer logic.
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 256, 256, boost::multiprecision::unsigned_magnitude, boost::multiprecision::unchecked, void > > u256
Definition: Common.h:121
#define LOG
Definition: Log.h:63
size_t size() const
Definition: RLP.h:105
N diff(N const &_a, N const &_b)
Definition: Common.h:193
std::string name() const override
size_t itemCount() const
Definition: RLP.h:101
Transactions topTransactions(unsigned _limit, h256Hash const &_avoid=h256Hash()) const
std::vector< h256 > h256s
Definition: FixedHash.h:359
Class for writing to an RLP bytestream.
Definition: RLP.h:369
_N toHash(int _flags=Strict) const
Definition: RLP.h:288
RLPStream & appendRaw(bytesConstRef _rlp, size_t _itemCount=1)
Appends raw (pre-serialised) RLP data. Use with caution.
Definition: RLP.cpp:222
bool isTransactionKnown(h256 const &_hash) const
Definition: EthereumPeer.h:63
Definition: RLP.h:47
boost::log::sources::severity_channel_logger<> Logger
Definition: Log.h:124
bytes block(h256 const &_hash) const
Get a block (RLP format) for the given hash (or the most recent mined if none given). Thread-safe.