Session.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634
  1. /*
  2. This file is part of cpp-ethereum.
  3. cpp-ethereum is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU General Public License as published by
  5. the Free Software Foundation, either version 3 of the License, or
  6. (at your option) any later version.
  7. cpp-ethereum is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU General Public License for more details.
  11. You should have received a copy of the GNU General Public License
  12. along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
  13. */
  14. /** @file Session.cpp
  15. * @author Gav Wood <i@gavwood.com>
  16. * @author Alex Leverington <nessence@gmail.com>
  17. * @date 2014
  18. */
  19. #include "Session.h"
  20. #include <chrono>
  21. #include <libdevcore/Common.h>
  22. #include <libdevcore/CommonIO.h>
  23. #include <libdevcore/Exceptions.h>
  24. #include "Host.h"
  25. #include "Capability.h"
  26. using namespace std;
  27. using namespace dev;
  28. using namespace dev::p2p;
  29. Session::Session(Host* _h, unique_ptr<RLPXFrameCoder>&& _io, std::shared_ptr<RLPXSocket> const& _s, std::shared_ptr<Peer> const& _n, PeerSessionInfo _info):
  30. m_server(_h),
  31. m_io(move(_io)),
  32. m_socket(_s),
  33. m_peer(_n),
  34. m_info(_info),
  35. m_ping(chrono::steady_clock::time_point::max())
  36. {
  37. registerFraming(0);
  38. m_peer->m_lastDisconnect = NoDisconnect;
  39. m_lastReceived = m_connect = chrono::steady_clock::now();
  40. DEV_GUARDED(x_info)
  41. m_info.socketId = m_socket->ref().native_handle();
  42. }
  43. Session::~Session()
  44. {
  45. ThreadContext tc(info().id.abridged());
  46. ThreadContext tc2(info().clientVersion);
  47. clog(NetMessageSummary) << "Closing peer session :-(";
  48. m_peer->m_lastConnected = m_peer->m_lastAttempted - chrono::seconds(1);
  49. // Read-chain finished for one reason or another.
  50. for (auto& i: m_capabilities)
  51. i.second.reset();
  52. try
  53. {
  54. bi::tcp::socket& socket = m_socket->ref();
  55. if (socket.is_open())
  56. {
  57. boost::system::error_code ec;
  58. socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
  59. socket.close();
  60. }
  61. }
  62. catch (...){}
  63. }
  64. ReputationManager& Session::repMan() const
  65. {
  66. return m_server->repMan();
  67. }
  68. NodeID Session::id() const
  69. {
  70. return m_peer ? m_peer->id : NodeID();
  71. }
  72. void Session::addRating(int _r)
  73. {
  74. if (m_peer)
  75. {
  76. m_peer->m_rating += _r;
  77. m_peer->m_score += _r;
  78. if (_r >= 0)
  79. m_peer->noteSessionGood();
  80. }
  81. }
  82. int Session::rating() const
  83. {
  84. return m_peer->m_rating;
  85. }
  86. template <class T> vector<T> randomSelection(vector<T> const& _t, unsigned _n)
  87. {
  88. if (_t.size() <= _n)
  89. return _t;
  90. vector<T> ret = _t;
  91. while (ret.size() > _n)
  92. {
  93. auto i = ret.begin();
  94. advance(i, rand() % ret.size());
  95. ret.erase(i);
  96. }
  97. return ret;
  98. }
  99. // TODO: P2P integration: replace w/asio post -> serviceNodesRequest()
  100. void Session::ensureNodesRequested()
  101. {
  102. if (isConnected() && !m_weRequestedNodes)
  103. {
  104. m_weRequestedNodes = true;
  105. RLPStream s;
  106. sealAndSend(prep(s, GetPeersPacket), 0);
  107. }
  108. }
  109. void Session::serviceNodesRequest()
  110. {
  111. ThreadContext tc(info().id.abridged() + "/" + info().clientVersion);
  112. if (!m_theyRequestedNodes)
  113. return;
  114. // TODO: P2P reimplement, as per TCP "close nodes" gossip specifications (WiP)
  115. // auto peers = m_server->potentialPeers(m_knownNodes);
  116. Peers peers;
  117. if (peers.empty())
  118. {
  119. addNote("peers", "requested");
  120. return;
  121. }
  122. // note this should cost them...
  123. RLPStream s;
  124. prep(s, PeersPacket, min<unsigned>(10, peers.size()));
  125. auto rs = randomSelection(peers, 10);
  126. for (auto const& i: rs)
  127. {
  128. clog(NetTriviaDetail) << "Sending peer " << i.id << i.endpoint;
  129. if (i.endpoint.address.is_v4())
  130. s.appendList(3) << bytesConstRef(i.endpoint.address.to_v4().to_bytes().data(), 4) << i.endpoint.tcpPort << i.id;
  131. else// if (i.second.address().is_v6()) - assumed
  132. s.appendList(3) << bytesConstRef(i.endpoint.address.to_v6().to_bytes().data(), 16) << i.endpoint.tcpPort << i.id;
  133. }
  134. sealAndSend(s, 0);
  135. m_theyRequestedNodes = false;
  136. addNote("peers", "done");
  137. }
  138. bool Session::readPacket(uint16_t _capId, PacketType _t, RLP const& _r)
  139. {
  140. m_lastReceived = chrono::steady_clock::now();
  141. clog(NetRight) << _t << _r;
  142. try // Generic try-catch block designed to capture RLP format errors - TODO: give decent diagnostics, make a bit more specific over what is caught.
  143. {
  144. // v4 frame headers are useless, offset packet type used
  145. // v5 protocol type is in header, packet type not offset
  146. if (_capId == 0 && _t < UserPacket)
  147. return interpret(_t, _r);
  148. if (isFramingEnabled())
  149. {
  150. for (auto const& i: m_capabilities)
  151. if (i.second->c_protocolID == _capId)
  152. return i.second->m_enabled ? i.second->interpret(_t, _r) : true;
  153. }
  154. else
  155. {
  156. for (auto const& i: m_capabilities)
  157. if (_t >= (int)i.second->m_idOffset && _t - i.second->m_idOffset < i.second->hostCapability()->messageCount())
  158. return i.second->m_enabled ? i.second->interpret(_t - i.second->m_idOffset, _r) : true;
  159. }
  160. return false;
  161. }
  162. catch (std::exception const& _e)
  163. {
  164. clog(NetWarn) << "Exception caught in p2p::Session::interpret(): " << _e.what() << ". PacketType: " << _t << ". RLP: " << _r;
  165. disconnect(BadProtocol);
  166. return true;
  167. }
  168. return true;
  169. }
  170. bool Session::interpret(PacketType _t, RLP const& _r)
  171. {
  172. switch (_t)
  173. {
  174. case DisconnectPacket:
  175. {
  176. string reason = "Unspecified";
  177. auto r = (DisconnectReason)_r[0].toInt<int>();
  178. if (!_r[0].isInt())
  179. drop(BadProtocol);
  180. else
  181. {
  182. reason = reasonOf(r);
  183. clog(NetMessageSummary) << "Disconnect (reason: " << reason << ")";
  184. drop(DisconnectRequested);
  185. }
  186. break;
  187. }
  188. case PingPacket:
  189. {
  190. clog(NetTriviaSummary) << "Ping" << m_info.id;
  191. RLPStream s;
  192. sealAndSend(prep(s, PongPacket), 0);
  193. break;
  194. }
  195. case PongPacket:
  196. DEV_GUARDED(x_info)
  197. {
  198. m_info.lastPing = std::chrono::steady_clock::now() - m_ping;
  199. clog(NetTriviaSummary) << "Latency: " << chrono::duration_cast<chrono::milliseconds>(m_info.lastPing).count() << " ms";
  200. }
  201. break;
  202. case GetPeersPacket:
  203. case PeersPacket:
  204. break;
  205. default:
  206. return false;
  207. }
  208. return true;
  209. }
  210. void Session::ping()
  211. {
  212. RLPStream s;
  213. sealAndSend(prep(s, PingPacket), 0);
  214. m_ping = std::chrono::steady_clock::now();
  215. }
  216. RLPStream& Session::prep(RLPStream& _s, PacketType _id, unsigned _args)
  217. {
  218. return _s.append((unsigned)_id).appendList(_args);
  219. }
  220. void Session::sealAndSend(RLPStream& _s, uint16_t _protocolID)
  221. {
  222. bytes b;
  223. _s.swapOut(b);
  224. send(move(b), _protocolID);
  225. }
  226. bool Session::checkPacket(bytesConstRef _msg)
  227. {
  228. if (_msg[0] > 0x7f || _msg.size() < 2)
  229. return false;
  230. if (RLP(_msg.cropped(1)).actualSize() + 1 != _msg.size())
  231. return false;
  232. return true;
  233. }
  234. void Session::send(bytes&& _msg, uint16_t _protocolID)
  235. {
  236. bytesConstRef msg(&_msg);
  237. clog(NetLeft) << RLP(msg.cropped(1));
  238. if (!checkPacket(msg))
  239. clog(NetWarn) << "INVALID PACKET CONSTRUCTED!";
  240. if (!m_socket->ref().is_open())
  241. return;
  242. bool doWrite = false;
  243. if (isFramingEnabled())
  244. {
  245. DEV_GUARDED(x_framing)
  246. {
  247. doWrite = m_encFrames.empty();
  248. auto f = getFraming(_protocolID);
  249. if (!f)
  250. return;
  251. f->writer.enque(RLPXPacket(_protocolID, msg));
  252. multiplexAll();
  253. }
  254. if (doWrite)
  255. writeFrames();
  256. }
  257. else
  258. {
  259. DEV_GUARDED(x_framing)
  260. {
  261. m_writeQueue.push_back(std::move(_msg));
  262. doWrite = (m_writeQueue.size() == 1);
  263. }
  264. if (doWrite)
  265. write();
  266. }
  267. }
  268. void Session::write()
  269. {
  270. bytes const* out = nullptr;
  271. DEV_GUARDED(x_framing)
  272. {
  273. m_io->writeSingleFramePacket(&m_writeQueue[0], m_writeQueue[0]);
  274. out = &m_writeQueue[0];
  275. }
  276. auto self(shared_from_this());
  277. ba::async_write(m_socket->ref(), ba::buffer(*out), [this, self](boost::system::error_code ec, std::size_t /*length*/)
  278. {
  279. ThreadContext tc(info().id.abridged());
  280. ThreadContext tc2(info().clientVersion);
  281. // must check queue, as write callback can occur following dropped()
  282. if (ec)
  283. {
  284. clog(NetWarn) << "Error sending: " << ec.message();
  285. drop(TCPError);
  286. return;
  287. }
  288. DEV_GUARDED(x_framing)
  289. {
  290. m_writeQueue.pop_front();
  291. if (m_writeQueue.empty())
  292. return;
  293. }
  294. write();
  295. });
  296. }
  297. void Session::writeFrames()
  298. {
  299. bytes const* out = nullptr;
  300. DEV_GUARDED(x_framing)
  301. {
  302. if (m_encFrames.empty())
  303. return;
  304. else
  305. out = &m_encFrames[0];
  306. }
  307. auto self(shared_from_this());
  308. ba::async_write(m_socket->ref(), ba::buffer(*out), [this, self](boost::system::error_code ec, std::size_t /*length*/)
  309. {
  310. ThreadContext tc(info().id.abridged());
  311. ThreadContext tc2(info().clientVersion);
  312. // must check queue, as write callback can occur following dropped()
  313. if (ec)
  314. {
  315. clog(NetWarn) << "Error sending: " << ec.message();
  316. drop(TCPError);
  317. return;
  318. }
  319. DEV_GUARDED(x_framing)
  320. {
  321. if (!m_encFrames.empty())
  322. m_encFrames.pop_front();
  323. multiplexAll();
  324. if (m_encFrames.empty())
  325. return;
  326. }
  327. writeFrames();
  328. });
  329. }
  330. void Session::drop(DisconnectReason _reason)
  331. {
  332. if (m_dropped)
  333. return;
  334. bi::tcp::socket& socket = m_socket->ref();
  335. if (socket.is_open())
  336. try
  337. {
  338. boost::system::error_code ec;
  339. clog(NetConnect) << "Closing " << socket.remote_endpoint(ec) << "(" << reasonOf(_reason) << ")";
  340. socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
  341. socket.close();
  342. }
  343. catch (...) {}
  344. m_peer->m_lastDisconnect = _reason;
  345. if (_reason == BadProtocol)
  346. {
  347. m_peer->m_rating /= 2;
  348. m_peer->m_score /= 2;
  349. }
  350. m_dropped = true;
  351. }
  352. void Session::disconnect(DisconnectReason _reason)
  353. {
  354. clog(NetConnect) << "Disconnecting (our reason:" << reasonOf(_reason) << ")";
  355. if (m_socket->ref().is_open())
  356. {
  357. RLPStream s;
  358. prep(s, DisconnectPacket, 1) << (int)_reason;
  359. sealAndSend(s, 0);
  360. }
  361. drop(_reason);
  362. }
  363. void Session::start()
  364. {
  365. ping();
  366. if (isFramingEnabled())
  367. doReadFrames();
  368. else
  369. doRead();
  370. }
  371. void Session::doRead()
  372. {
  373. // ignore packets received while waiting to disconnect.
  374. if (m_dropped)
  375. return;
  376. auto self(shared_from_this());
  377. m_data.resize(h256::size);
  378. ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, h256::size), [this,self](boost::system::error_code ec, std::size_t length)
  379. {
  380. ThreadContext tc(info().id.abridged());
  381. ThreadContext tc2(info().clientVersion);
  382. if (!checkRead(h256::size, ec, length))
  383. return;
  384. else if (!m_io->authAndDecryptHeader(bytesRef(m_data.data(), length)))
  385. {
  386. clog(NetWarn) << "header decrypt failed";
  387. drop(BadProtocol); // todo: better error
  388. return;
  389. }
  390. uint16_t hProtocolId;
  391. uint32_t hLength;
  392. uint8_t hPadding;
  393. try
  394. {
  395. RLPXFrameInfo header(bytesConstRef(m_data.data(), length));
  396. hProtocolId = header.protocolId;
  397. hLength = header.length;
  398. hPadding = header.padding;
  399. }
  400. catch (std::exception const& _e)
  401. {
  402. clog(NetWarn) << "Exception decoding frame header RLP:" << _e.what() << bytesConstRef(m_data.data(), h128::size).cropped(3);
  403. drop(BadProtocol);
  404. return;
  405. }
  406. /// read padded frame and mac
  407. auto tlen = hLength + hPadding + h128::size;
  408. m_data.resize(tlen);
  409. ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen), [this, self, hLength, hProtocolId, tlen](boost::system::error_code ec, std::size_t length)
  410. {
  411. ThreadContext tc(info().id.abridged());
  412. ThreadContext tc2(info().clientVersion);
  413. if (!checkRead(tlen, ec, length))
  414. return;
  415. else if (!m_io->authAndDecryptFrame(bytesRef(m_data.data(), tlen)))
  416. {
  417. clog(NetWarn) << "frame decrypt failed";
  418. drop(BadProtocol); // todo: better error
  419. return;
  420. }
  421. bytesConstRef frame(m_data.data(), hLength);
  422. if (!checkPacket(frame))
  423. {
  424. cerr << "Received " << frame.size() << ": " << toHex(frame) << endl;
  425. clog(NetWarn) << "INVALID MESSAGE RECEIVED";
  426. disconnect(BadProtocol);
  427. return;
  428. }
  429. else
  430. {
  431. auto packetType = (PacketType)RLP(frame.cropped(0, 1)).toInt<unsigned>();
  432. RLP r(frame.cropped(1));
  433. bool ok = readPacket(hProtocolId, packetType, r);
  434. (void)ok;
  435. #if ETH_DEBUG
  436. if (!ok)
  437. clog(NetWarn) << "Couldn't interpret packet." << RLP(r);
  438. #endif
  439. }
  440. doRead();
  441. });
  442. });
  443. }
  444. bool Session::checkRead(std::size_t _expected, boost::system::error_code _ec, std::size_t _length)
  445. {
  446. if (_ec && _ec.category() != boost::asio::error::get_misc_category() && _ec.value() != boost::asio::error::eof)
  447. {
  448. clog(NetConnect) << "Error reading: " << _ec.message();
  449. drop(TCPError);
  450. return false;
  451. }
  452. else if (_ec && _length < _expected)
  453. {
  454. clog(NetWarn) << "Error reading - Abrupt peer disconnect: " << _ec.message();
  455. repMan().noteRude(*this);
  456. drop(TCPError);
  457. return false;
  458. }
  459. else if (_length != _expected)
  460. {
  461. // with static m_data-sized buffer this shouldn't happen unless there's a regression
  462. // sec recommends checking anyways (instead of assert)
  463. clog(NetWarn) << "Error reading - TCP read buffer length differs from expected frame size.";
  464. disconnect(UserReason);
  465. return false;
  466. }
  467. return true;
  468. }
  469. void Session::doReadFrames()
  470. {
  471. if (m_dropped)
  472. return; // ignore packets received while waiting to disconnect
  473. auto self(shared_from_this());
  474. m_data.resize(h256::size);
  475. ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, h256::size), [this, self](boost::system::error_code ec, std::size_t length)
  476. {
  477. ThreadContext tc(info().id.abridged());
  478. ThreadContext tc2(info().clientVersion);
  479. if (!checkRead(h256::size, ec, length))
  480. return;
  481. DEV_GUARDED(x_framing)
  482. {
  483. if (!m_io->authAndDecryptHeader(bytesRef(m_data.data(), length)))
  484. {
  485. clog(NetWarn) << "header decrypt failed";
  486. drop(BadProtocol); // todo: better error
  487. return;
  488. }
  489. }
  490. bytesConstRef rawHeader(m_data.data(), length);
  491. try
  492. {
  493. RLPXFrameInfo tmpHeader(rawHeader);
  494. }
  495. catch (std::exception const& _e)
  496. {
  497. clog(NetWarn) << "Exception decoding frame header RLP:" << _e.what() << bytesConstRef(m_data.data(), h128::size).cropped(3);
  498. drop(BadProtocol);
  499. return;
  500. }
  501. RLPXFrameInfo header(rawHeader);
  502. auto tlen = header.length + header.padding + h128::size; // padded frame and mac
  503. m_data.resize(tlen);
  504. ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen), [this, self, tlen, header](boost::system::error_code ec, std::size_t length)
  505. {
  506. ThreadContext tc(info().id.abridged());
  507. ThreadContext tc2(info().clientVersion);
  508. if (!checkRead(tlen, ec, length))
  509. return;
  510. bytesRef frame(m_data.data(), tlen);
  511. vector<RLPXPacket> px;
  512. DEV_GUARDED(x_framing)
  513. {
  514. auto f = getFraming(header.protocolId);
  515. if (!f)
  516. {
  517. clog(NetWarn) << "Unknown subprotocol " << header.protocolId;
  518. drop(BadProtocol);
  519. return;
  520. }
  521. auto v = f->reader.demux(*m_io, header, frame);
  522. px.swap(v);
  523. }
  524. for (RLPXPacket& p: px)
  525. {
  526. PacketType packetType = (PacketType)RLP(p.type()).toInt<unsigned>(RLP::AllowNonCanon);
  527. bool ok = readPacket(header.protocolId, packetType, RLP(p.data()));
  528. #if ETH_DEBUG
  529. if (!ok)
  530. clog(NetWarn) << "Couldn't interpret packet." << RLP(p.data());
  531. #endif
  532. ok = true;
  533. (void)ok;
  534. }
  535. doReadFrames();
  536. });
  537. });
  538. }
  539. std::shared_ptr<Session::Framing> Session::getFraming(uint16_t _protocolID)
  540. {
  541. if (m_framing.find(_protocolID) == m_framing.end())
  542. return nullptr;
  543. else
  544. return m_framing[_protocolID];
  545. }
  546. void Session::registerCapability(CapDesc const& _desc, std::shared_ptr<Capability> _p)
  547. {
  548. DEV_GUARDED(x_framing)
  549. {
  550. m_capabilities[_desc] = _p;
  551. }
  552. }
  553. void Session::registerFraming(uint16_t _id)
  554. {
  555. DEV_GUARDED(x_framing)
  556. {
  557. if (m_framing.find(_id) == m_framing.end())
  558. {
  559. std::shared_ptr<Session::Framing> f(new Session::Framing(_id));
  560. m_framing[_id] = f;
  561. }
  562. }
  563. }
  564. void Session::multiplexAll()
  565. {
  566. for (auto& f: m_framing)
  567. f.second->writer.mux(*m_io, maxFrameSize(), m_encFrames);
  568. }