WhisperHost.cpp 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. /*
  2. This file is part of cpp-ethereum.
  3. cpp-ethereum is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU General Public License as published by
  5. the Free Software Foundation, either version 3 of the License, or
  6. (at your option) any later version.
  7. cpp-ethereum is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU General Public License for more details.
  11. You should have received a copy of the GNU General Public License
  12. along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
  13. */
  14. /** @file WhisperHost.cpp
  15. * @author Gav Wood <i@gavwood.com>
  16. * @date 2014
  17. */
  18. #include "WhisperHost.h"
  19. #include <libdevcore/CommonIO.h>
  20. #include <libdevcore/Log.h>
  21. #include <libp2p/All.h>
  22. #include "WhisperDB.h"
  23. using namespace std;
  24. using namespace dev;
  25. using namespace dev::p2p;
  26. using namespace dev::shh;
  27. WhisperHost::WhisperHost(bool _storeMessagesInDB): Worker("shh"), m_storeMessagesInDB(_storeMessagesInDB)
  28. {
  29. loadMessagesFromBD();
  30. }
  31. WhisperHost::~WhisperHost()
  32. {
  33. saveMessagesToBD();
  34. }
  35. void WhisperHost::streamMessage(h256 _m, RLPStream& _s) const
  36. {
  37. UpgradableGuard l(x_messages);
  38. if (m_messages.count(_m))
  39. {
  40. UpgradeGuard ll(l);
  41. auto const& m = m_messages.at(_m);
  42. // cnote << "streamRLP: " << m.expiry() << m.ttl() << m.topic() << toHex(m.data());
  43. m.streamRLP(_s);
  44. }
  45. }
  46. void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p)
  47. {
  48. // this function processes both outgoing messages originated both by local host (_p == null)
  49. // and incoming messages from remote peers (_p != null)
  50. //cnote << this << ": inject: " << _m.expiry() << _m.ttl() << _m.topic() << toHex(_m.data());
  51. if (_m.isExpired())
  52. return;
  53. auto h = _m.sha3();
  54. {
  55. UpgradableGuard l(x_messages);
  56. if (m_messages.count(h))
  57. return;
  58. UpgradeGuard ll(l);
  59. m_messages[h] = _m;
  60. m_expiryQueue.insert(make_pair(_m.expiry(), h));
  61. }
  62. // rating of incoming message from remote host is assessed according to the following criteria:
  63. // 1. installed watch match; 2. bloom filter match; 2. ttl; 3. proof of work
  64. int rating = 0;
  65. DEV_GUARDED(m_filterLock)
  66. if (_m.matchesBloomFilter(m_bloom))
  67. {
  68. ++rating;
  69. for (auto const& f: m_filters)
  70. if (f.second.filter.matches(_m))
  71. for (auto& i: m_watches)
  72. if (i.second.id == f.first) // match one of the watches
  73. {
  74. i.second.changes.push_back(h);
  75. rating += 2;
  76. }
  77. }
  78. if (_p) // incoming message from remote peer
  79. {
  80. rating *= 256;
  81. unsigned ttlReward = (256 > _m.ttl() ? 256 - _m.ttl() : 0);
  82. rating += ttlReward;
  83. rating *= 256;
  84. rating += _m.workProved();
  85. }
  86. // TODO p2p: capability-based rating
  87. for (auto i: peerSessions())
  88. {
  89. auto w = i.first->cap<WhisperPeer>().get();
  90. if (w == _p)
  91. w->addRating(rating);
  92. else
  93. w->noteNewMessage(h, _m);
  94. }
  95. }
  96. unsigned WhisperHost::installWatch(shh::Topics const& _t)
  97. {
  98. InstalledFilter f(_t);
  99. h256 h = f.filter.sha3();
  100. unsigned ret = 0;
  101. DEV_GUARDED(m_filterLock)
  102. {
  103. auto it = m_filters.find(h);
  104. if (m_filters.end() == it)
  105. m_filters.insert(make_pair(h, f));
  106. else
  107. it->second.refCount++;
  108. m_bloom.addRaw(f.filter.exportBloom());
  109. ret = m_watches.size() ? m_watches.rbegin()->first + 1 : 0;
  110. m_watches[ret] = ClientWatch(h);
  111. cwatshh << "+++" << ret << h;
  112. }
  113. noteAdvertiseTopicsOfInterest();
  114. return ret;
  115. }
  116. void WhisperHost::uninstallWatch(unsigned _i)
  117. {
  118. cwatshh << "XXX" << _i;
  119. DEV_GUARDED(m_filterLock)
  120. {
  121. auto it = m_watches.find(_i);
  122. if (it == m_watches.end())
  123. return;
  124. auto id = it->second.id;
  125. m_watches.erase(it);
  126. auto fit = m_filters.find(id);
  127. if (fit != m_filters.end())
  128. {
  129. m_bloom.removeRaw(fit->second.filter.exportBloom());
  130. if (!--fit->second.refCount)
  131. m_filters.erase(fit);
  132. }
  133. }
  134. noteAdvertiseTopicsOfInterest();
  135. }
  136. h256s WhisperHost::watchMessages(unsigned _watchId)
  137. {
  138. h256s ret;
  139. auto wit = m_watches.find(_watchId);
  140. if (wit == m_watches.end())
  141. return ret;
  142. TopicFilter f;
  143. {
  144. Guard l(m_filterLock);
  145. auto fit = m_filters.find(wit->second.id);
  146. if (fit == m_filters.end())
  147. return ret;
  148. f = fit->second.filter;
  149. }
  150. ReadGuard l(x_messages);
  151. for (auto const& m: m_messages)
  152. if (f.matches(m.second))
  153. ret.push_back(m.first);
  154. return ret;
  155. }
  156. h256s WhisperHost::checkWatch(unsigned _watchId)
  157. {
  158. h256s ret;
  159. cleanup();
  160. dev::Guard l(m_filterLock);
  161. try
  162. {
  163. ret = m_watches.at(_watchId).changes;
  164. m_watches.at(_watchId).changes.clear();
  165. }
  166. catch (...)
  167. {
  168. }
  169. return ret;
  170. }
  171. void WhisperHost::doWork()
  172. {
  173. for (auto i: peerSessions())
  174. i.first->cap<WhisperPeer>()->sendMessages();
  175. cleanup();
  176. }
  177. void WhisperHost::cleanup()
  178. {
  179. // remove old messages.
  180. // should be called every now and again.
  181. uint64_t now = utcTime();
  182. WriteGuard l(x_messages);
  183. for (auto it = m_expiryQueue.begin(); it != m_expiryQueue.end() && it->first <= now; it = m_expiryQueue.erase(it))
  184. m_messages.erase(it->second);
  185. }
  186. void WhisperHost::noteAdvertiseTopicsOfInterest()
  187. {
  188. for (auto i: peerSessions())
  189. i.first->cap<WhisperPeer>()->noteAdvertiseTopicsOfInterest();
  190. }
  191. bool WhisperHost::isWatched(Envelope const& _e) const
  192. {
  193. DEV_GUARDED(m_filterLock)
  194. if (_e.matchesBloomFilter(m_bloom))
  195. for (auto const& f: m_filters)
  196. if (f.second.filter.matches(_e))
  197. for (auto const& i: m_watches)
  198. if (i.second.id == f.first)
  199. return true;
  200. return false;
  201. }
  202. void WhisperHost::saveMessagesToBD()
  203. {
  204. if (!m_storeMessagesInDB)
  205. return;
  206. try
  207. {
  208. WhisperMessagesDB db;
  209. ReadGuard g(x_messages);
  210. uint64_t now = utcTime();
  211. for (auto const& m: m_messages)
  212. if (m.second.expiry() > now)
  213. if (isWatched(m.second))
  214. db.saveSingleMessage(m.first, m.second);
  215. }
  216. catch(FailedToOpenLevelDB const& ex)
  217. {
  218. cwarn << "Exception in WhisperHost::saveMessagesToBD() - failed to open DB:" << ex.what();
  219. }
  220. catch(Exception const& ex)
  221. {
  222. cwarn << "Exception in WhisperHost::saveMessagesToBD():" << ex.what();
  223. }
  224. catch(...)
  225. {
  226. cwarn << "Unknown Exception in WhisperHost::saveMessagesToBD()";
  227. }
  228. }
  229. void WhisperHost::loadMessagesFromBD()
  230. {
  231. if (!m_storeMessagesInDB)
  232. return;
  233. try
  234. {
  235. map<h256, Envelope> m;
  236. WhisperMessagesDB db;
  237. db.loadAllMessages(m);
  238. WriteGuard g(x_messages);
  239. m_messages.swap(m);
  240. for (auto const& msg: m)
  241. m_expiryQueue.insert(make_pair(msg.second.expiry(), msg.first));
  242. }
  243. catch(Exception const& ex)
  244. {
  245. cwarn << "Exception in WhisperHost::loadMessagesFromBD():" << ex.what();
  246. }
  247. catch(...)
  248. {
  249. cwarn << "Unknown Exception in WhisperHost::loadMessagesFromBD()";
  250. }
  251. }