TransactionQueue.h 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. /*
  2. This file is part of cpp-ethereum.
  3. cpp-ethereum is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU General Public License as published by
  5. the Free Software Foundation, either version 3 of the License, or
  6. (at your option) any later version.
  7. cpp-ethereum is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU General Public License for more details.
  11. You should have received a copy of the GNU General Public License
  12. along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
  13. */
  14. /** @file TransactionQueue.h
  15. * @author Gav Wood <i@gavwood.com>
  16. * @date 2014
  17. */
  18. #pragma once
  19. #include <functional>
  20. #include <condition_variable>
  21. #include <thread>
  22. #include <deque>
  23. #include <libdevcore/Common.h>
  24. #include <libdevcore/Guards.h>
  25. #include <libdevcore/Log.h>
  26. #include <libethcore/Common.h>
  27. #include "Transaction.h"
  28. namespace dev
  29. {
  30. namespace eth
  31. {
  32. struct TransactionQueueChannel: public LogChannel { static const char* name(); static const int verbosity = 4; };
  33. struct TransactionQueueTraceChannel: public LogChannel { static const char* name(); static const int verbosity = 7; };
  34. #define ctxq dev::LogOutputStream<dev::eth::TransactionQueueTraceChannel, true>()
  35. /**
  36. * @brief A queue of Transactions, each stored as RLP.
  37. * Maintains a transaction queue sorted by nonce diff and gas price.
  38. * @threadsafe
  39. */
  40. class TransactionQueue
  41. {
  42. public:
  43. struct Limits { size_t current; size_t future; };
  44. /// @brief TransactionQueue
  45. /// @param _limit Maximum number of pending transactions in the queue.
  46. /// @param _futureLimit Maximum number of future nonce transactions.
  47. TransactionQueue(unsigned _limit = 1024, unsigned _futureLimit = 1024);
  48. TransactionQueue(Limits const& _l): TransactionQueue(_l.current, _l.future) {}
  49. ~TransactionQueue();
  50. /// Add transaction to the queue to be verified and imported.
  51. /// @param _data RLP encoded transaction data.
  52. /// @param _nodeId Optional network identified of a node transaction comes from.
  53. void enqueue(RLP const& _data, h512 const& _nodeId);
  54. /// Verify and add transaction to the queue synchronously.
  55. /// @param _tx RLP encoded transaction data.
  56. /// @param _ik Set to Retry to force re-addinga transaction that was previously dropped.
  57. /// @returns Import result code.
  58. ImportResult import(bytes const& _tx, IfDropped _ik = IfDropped::Ignore) { return import(&_tx, _ik); }
  59. /// Verify and add transaction to the queue synchronously.
  60. /// @param _tx Trasnaction data.
  61. /// @param _ik Set to Retry to force re-addinga transaction that was previously dropped.
  62. /// @returns Import result code.
  63. ImportResult import(Transaction const& _tx, IfDropped _ik = IfDropped::Ignore);
  64. /// Remove transaction from the queue
  65. /// @param _txHash Trasnaction hash
  66. void drop(h256 const& _txHash);
  67. /// Get number of pending transactions for account.
  68. /// @returns Pending transaction count.
  69. unsigned waiting(Address const& _a) const;
  70. /// Get top transactions from the queue. Returned transactions are not removed from the queue automatically.
  71. /// @param _limit Max number of transactions to return.
  72. /// @param _avoid Transactions to avoid returning.
  73. /// @returns up to _limit transactions ordered by nonce and gas price.
  74. Transactions topTransactions(unsigned _limit, h256Hash const& _avoid = h256Hash()) const;
  75. /// Get a hash set of transactions in the queue
  76. /// @returns A hash set of all transactions in the queue
  77. h256Hash knownTransactions() const;
  78. /// Get max nonce for an account
  79. /// @returns Max transaction nonce for account in the queue
  80. u256 maxNonce(Address const& _a) const;
  81. /// Mark transaction as future. It wont be retured in topTransactions list until a transaction with a preceeding nonce is imported or marked with dropGood
  82. /// @param _t Transaction hash
  83. void setFuture(h256 const& _t);
  84. /// Drop a trasnaction from the list if exists and move following future trasnactions to current (if any)
  85. /// @param _t Transaction hash
  86. void dropGood(Transaction const& _t);
  87. struct Status
  88. {
  89. size_t current;
  90. size_t future;
  91. size_t unverified;
  92. size_t dropped;
  93. };
  94. /// @returns the status of the transaction queue.
  95. Status status() const { Status ret; DEV_GUARDED(x_queue) { ret.unverified = m_unverified.size(); } ReadGuard l(m_lock); ret.dropped = m_dropped.size(); ret.current = m_currentByHash.size(); ret.future = m_future.size(); return ret; }
  96. /// @returns the transacrtion limits on current/future.
  97. Limits limits() const { return Limits{m_limit, m_futureLimit}; }
  98. /// Clear the queue
  99. void clear();
  100. /// Register a handler that will be called once there is a new transaction imported
  101. template <class T> Handler<> onReady(T const& _t) { return m_onReady.add(_t); }
  102. /// Register a handler that will be called once asynchronous verification is comeplte an transaction has been imported
  103. template <class T> Handler<ImportResult, h256 const&, h512 const&> onImport(T const& _t) { return m_onImport.add(_t); }
  104. /// Register a handler that will be called once asynchronous verification is comeplte an transaction has been imported
  105. template <class T> Handler<h256 const&> onReplaced(T const& _t) { return m_onReplaced.add(_t); }
  106. private:
  107. /// Verified and imported transaction
  108. struct VerifiedTransaction
  109. {
  110. VerifiedTransaction(Transaction const& _t): transaction(_t) {}
  111. VerifiedTransaction(VerifiedTransaction&& _t): transaction(std::move(_t.transaction)) {}
  112. VerifiedTransaction(VerifiedTransaction const&) = delete;
  113. VerifiedTransaction& operator=(VerifiedTransaction const&) = delete;
  114. Transaction transaction; ///< Transaction data
  115. };
  116. /// Trasnaction pending verification
  117. struct UnverifiedTransaction
  118. {
  119. UnverifiedTransaction() {}
  120. UnverifiedTransaction(bytesConstRef const& _t, h512 const& _nodeId): transaction(_t.toBytes()), nodeId(_nodeId) {}
  121. UnverifiedTransaction(UnverifiedTransaction&& _t): transaction(std::move(_t.transaction)) {}
  122. UnverifiedTransaction& operator=(UnverifiedTransaction&& _other)
  123. {
  124. assert(&_other != this);
  125. transaction = std::move(_other.transaction);
  126. nodeId = std::move(_other.nodeId);
  127. return *this;
  128. }
  129. UnverifiedTransaction(UnverifiedTransaction const&) = delete;
  130. UnverifiedTransaction& operator=(UnverifiedTransaction const&) = delete;
  131. bytes transaction; ///< RLP encoded transaction data
  132. h512 nodeId; ///< Network Id of the peer transaction comes from
  133. };
  134. struct PriorityCompare
  135. {
  136. TransactionQueue& queue;
  137. /// Compare transaction by nonce height and gas price.
  138. bool operator()(VerifiedTransaction const& _first, VerifiedTransaction const& _second) const
  139. {
  140. u256 const& height1 = _first.transaction.nonce() - queue.m_currentByAddressAndNonce[_first.transaction.sender()].begin()->first;
  141. u256 const& height2 = _second.transaction.nonce() - queue.m_currentByAddressAndNonce[_second.transaction.sender()].begin()->first;
  142. return height1 < height2 || (height1 == height2 && _first.transaction.gasPrice() > _second.transaction.gasPrice());
  143. }
  144. };
  145. // Use a set with dynamic comparator for minmax priority queue. The comparator takes into account min account nonce. Updating it does not affect the order.
  146. using PriorityQueue = std::multiset<VerifiedTransaction, PriorityCompare>;
  147. ImportResult import(bytesConstRef _tx, IfDropped _ik = IfDropped::Ignore);
  148. ImportResult check_WITH_LOCK(h256 const& _h, IfDropped _ik);
  149. ImportResult manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction);
  150. void insertCurrent_WITH_LOCK(std::pair<h256, Transaction> const& _p);
  151. void makeCurrent_WITH_LOCK(Transaction const& _t);
  152. bool remove_WITH_LOCK(h256 const& _txHash);
  153. u256 maxNonce_WITH_LOCK(Address const& _a) const;
  154. void verifierBody();
  155. mutable SharedMutex m_lock; ///< General lock.
  156. h256Hash m_known; ///< Headers of transactions in both sets.
  157. std::unordered_map<h256, std::function<void(ImportResult)>> m_callbacks; ///< Called once.
  158. h256Hash m_dropped; ///< Transactions that have previously been dropped
  159. PriorityQueue m_current;
  160. std::unordered_map<h256, PriorityQueue::iterator> m_currentByHash; ///< Transaction hash to set ref
  161. std::unordered_map<Address, std::map<u256, PriorityQueue::iterator>> m_currentByAddressAndNonce; ///< Transactions grouped by account and nonce
  162. std::unordered_map<Address, std::map<u256, VerifiedTransaction>> m_future; /// Future transactions
  163. Signal<> m_onReady; ///< Called when a subsequent call to import transactions will return a non-empty container. Be nice and exit fast.
  164. Signal<ImportResult, h256 const&, h512 const&> m_onImport; ///< Called for each import attempt. Arguments are result, transaction id an node id. Be nice and exit fast.
  165. Signal<h256 const&> m_onReplaced; ///< Called whan transction is dropped during a call to import() to make room for another transaction.
  166. unsigned m_limit; ///< Max number of pending transactions
  167. unsigned m_futureLimit; ///< Max number of future transactions
  168. unsigned m_futureSize = 0; ///< Current number of future transactions
  169. std::condition_variable m_queueReady; ///< Signaled when m_unverified has a new entry.
  170. std::vector<std::thread> m_verifiers;
  171. std::deque<UnverifiedTransaction> m_unverified; ///< Pending verification queue
  172. mutable Mutex x_queue; ///< Verification queue mutex
  173. bool m_aborting = false; ///< Exit condition for verifier.
  174. };
  175. }
  176. }