RLPXSocketIO.cpp 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. /*
  2. This file is part of cpp-ethereum.
  3. cpp-ethereum is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU General Public License as published by
  5. the Free Software Foundation, either version 3 of the License, or
  6. (at your option) any later version.
  7. cpp-ethereum is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU General Public License for more details.
  11. You should have received a copy of the GNU General Public License
  12. along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
  13. */
  14. /** @file RLPXSocketIO.cpp
  15. * @author Alex Leverington <nessence@gmail.com>
  16. * @date 2015
  17. */
  18. #include "RLPXSocketIO.h"
  19. #include <algorithm>
  20. using namespace std;
  21. using namespace dev;
  22. using namespace dev::p2p;
  23. uint32_t const RLPXSocketIO::MinFrameSize = h128::size * 3; // header + block + mac
  24. uint32_t const RLPXSocketIO::MaxPacketSize = 1 << 24;
  25. uint16_t const RLPXSocketIO::DefaultInitialCapacity = 8 << 8;
  26. RLPXSocketIO::RLPXSocketIO(unsigned _protCount, RLPXFrameCoder& _coder, bi::tcp::socket& _socket, bool _flowControl, size_t _initialCapacity):
  27. m_flowControl(_flowControl),
  28. m_coder(_coder),
  29. m_socket(_socket),
  30. m_writers(writers(_protCount)),
  31. m_egressCapacity(m_flowControl ? _initialCapacity : MaxPacketSize * m_writers.size())
  32. {}
  33. vector<RLPXFrameWriter> RLPXSocketIO::writers(unsigned _capacity)
  34. {
  35. vector<RLPXFrameWriter> ret;
  36. for (unsigned i = 0; i < _capacity; i++)
  37. ret.push_back(RLPXFrameWriter(i));
  38. return ret;
  39. }
  40. void RLPXSocketIO::send(unsigned _protocolType, unsigned _type, RLPStream& _payload)
  41. {
  42. if (!m_socket.is_open())
  43. return; // TCPSocketNotOpen
  44. m_writers.at(_protocolType).enque(_type, _payload);
  45. bool wasEmtpy = false;
  46. DEV_GUARDED(x_queued)
  47. wasEmtpy = (++m_queued == 1);
  48. if (wasEmtpy)
  49. doWrite();
  50. }
  51. void RLPXSocketIO::doWrite()
  52. {
  53. m_toSend.clear();
  54. size_t capacity = 0;
  55. DEV_GUARDED(x_queued)
  56. capacity = min(m_egressCapacity, MaxPacketSize);
  57. size_t active = 0;
  58. for (auto const& w: m_writers)
  59. if (w.size())
  60. active += 1;
  61. size_t dequed = 0;
  62. size_t protFrameSize = capacity / active;
  63. if (protFrameSize >= MinFrameSize)
  64. for (auto& w: m_writers)
  65. dequed += w.mux(m_coder, protFrameSize, m_toSend);
  66. if (dequed)
  67. write(dequed);
  68. else
  69. deferWrite();
  70. }
  71. void RLPXSocketIO::deferWrite()
  72. {
  73. auto self(shared_from_this());
  74. m_congestion.reset(new ba::deadline_timer(m_socket.get_io_service()));
  75. m_congestion->expires_from_now(boost::posix_time::milliseconds(50));
  76. m_congestion->async_wait([=](boost::system::error_code const& _ec) { m_congestion.reset(); if (!_ec) doWrite(); });
  77. }
  78. void RLPXSocketIO::write(size_t _dequed)
  79. {
  80. auto self(shared_from_this());
  81. if (m_toSend.empty())
  82. return;
  83. ba::async_write(m_socket, ba::buffer(m_toSend[0]), [this, self, _dequed](boost::system::error_code ec, size_t written)
  84. {
  85. if (ec)
  86. return; // TCPSocketWriteError
  87. bool reschedule = false;
  88. DEV_GUARDED(x_queued)
  89. {
  90. if (m_flowControl)
  91. m_egressCapacity -= written;
  92. m_queued -= _dequed;
  93. reschedule = m_queued > 0;
  94. }
  95. if (reschedule)
  96. doWrite();
  97. });
  98. }