123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290 |
- /*
- This file is part of cpp-ethereum.
- cpp-ethereum is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
- cpp-ethereum is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
- */
- /** @file WhisperHost.cpp
- * @author Gav Wood <i@gavwood.com>
- * @date 2014
- */
- #include "WhisperHost.h"
- #include <libdevcore/CommonIO.h>
- #include <libdevcore/Log.h>
- #include <libp2p/All.h>
- #include "WhisperDB.h"
- using namespace std;
- using namespace dev;
- using namespace dev::p2p;
- using namespace dev::shh;
- WhisperHost::WhisperHost(bool _storeMessagesInDB): Worker("shh"), m_storeMessagesInDB(_storeMessagesInDB)
- {
- loadMessagesFromBD();
- }
- WhisperHost::~WhisperHost()
- {
- saveMessagesToBD();
- }
- void WhisperHost::streamMessage(h256 _m, RLPStream& _s) const
- {
- UpgradableGuard l(x_messages);
- if (m_messages.count(_m))
- {
- UpgradeGuard ll(l);
- auto const& m = m_messages.at(_m);
- // cnote << "streamRLP: " << m.expiry() << m.ttl() << m.topic() << toHex(m.data());
- m.streamRLP(_s);
- }
- }
- void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p)
- {
- // this function processes both outgoing messages originated both by local host (_p == null)
- // and incoming messages from remote peers (_p != null)
- //cnote << this << ": inject: " << _m.expiry() << _m.ttl() << _m.topic() << toHex(_m.data());
- if (_m.isExpired())
- return;
- auto h = _m.sha3();
- {
- UpgradableGuard l(x_messages);
- if (m_messages.count(h))
- return;
- UpgradeGuard ll(l);
- m_messages[h] = _m;
- m_expiryQueue.insert(make_pair(_m.expiry(), h));
- }
- // rating of incoming message from remote host is assessed according to the following criteria:
- // 1. installed watch match; 2. bloom filter match; 2. ttl; 3. proof of work
- int rating = 0;
- DEV_GUARDED(m_filterLock)
- if (_m.matchesBloomFilter(m_bloom))
- {
- ++rating;
- for (auto const& f: m_filters)
- if (f.second.filter.matches(_m))
- for (auto& i: m_watches)
- if (i.second.id == f.first) // match one of the watches
- {
- i.second.changes.push_back(h);
- rating += 2;
- }
- }
- if (_p) // incoming message from remote peer
- {
- rating *= 256;
- unsigned ttlReward = (256 > _m.ttl() ? 256 - _m.ttl() : 0);
- rating += ttlReward;
- rating *= 256;
- rating += _m.workProved();
- }
- // TODO p2p: capability-based rating
- for (auto i: peerSessions())
- {
- auto w = i.first->cap<WhisperPeer>().get();
- if (w == _p)
- w->addRating(rating);
- else
- w->noteNewMessage(h, _m);
- }
- }
- unsigned WhisperHost::installWatch(shh::Topics const& _t)
- {
- InstalledFilter f(_t);
- h256 h = f.filter.sha3();
- unsigned ret = 0;
- DEV_GUARDED(m_filterLock)
- {
- auto it = m_filters.find(h);
- if (m_filters.end() == it)
- m_filters.insert(make_pair(h, f));
- else
- it->second.refCount++;
- m_bloom.addRaw(f.filter.exportBloom());
- ret = m_watches.size() ? m_watches.rbegin()->first + 1 : 0;
- m_watches[ret] = ClientWatch(h);
- cwatshh << "+++" << ret << h;
- }
- noteAdvertiseTopicsOfInterest();
- return ret;
- }
- void WhisperHost::uninstallWatch(unsigned _i)
- {
- cwatshh << "XXX" << _i;
- DEV_GUARDED(m_filterLock)
- {
- auto it = m_watches.find(_i);
- if (it == m_watches.end())
- return;
- auto id = it->second.id;
- m_watches.erase(it);
- auto fit = m_filters.find(id);
- if (fit != m_filters.end())
- {
- m_bloom.removeRaw(fit->second.filter.exportBloom());
- if (!--fit->second.refCount)
- m_filters.erase(fit);
- }
- }
- noteAdvertiseTopicsOfInterest();
- }
- h256s WhisperHost::watchMessages(unsigned _watchId)
- {
- h256s ret;
- auto wit = m_watches.find(_watchId);
- if (wit == m_watches.end())
- return ret;
- TopicFilter f;
- {
- Guard l(m_filterLock);
- auto fit = m_filters.find(wit->second.id);
- if (fit == m_filters.end())
- return ret;
- f = fit->second.filter;
- }
- ReadGuard l(x_messages);
- for (auto const& m: m_messages)
- if (f.matches(m.second))
- ret.push_back(m.first);
- return ret;
- }
- h256s WhisperHost::checkWatch(unsigned _watchId)
- {
- h256s ret;
- cleanup();
- dev::Guard l(m_filterLock);
- try
- {
- ret = m_watches.at(_watchId).changes;
- m_watches.at(_watchId).changes.clear();
- }
- catch (...)
- {
- }
- return ret;
- }
- void WhisperHost::doWork()
- {
- for (auto i: peerSessions())
- i.first->cap<WhisperPeer>()->sendMessages();
- cleanup();
- }
- void WhisperHost::cleanup()
- {
- // remove old messages.
- // should be called every now and again.
- uint64_t now = utcTime();
- WriteGuard l(x_messages);
- for (auto it = m_expiryQueue.begin(); it != m_expiryQueue.end() && it->first <= now; it = m_expiryQueue.erase(it))
- m_messages.erase(it->second);
- }
- void WhisperHost::noteAdvertiseTopicsOfInterest()
- {
- for (auto i: peerSessions())
- i.first->cap<WhisperPeer>()->noteAdvertiseTopicsOfInterest();
- }
- bool WhisperHost::isWatched(Envelope const& _e) const
- {
- DEV_GUARDED(m_filterLock)
- if (_e.matchesBloomFilter(m_bloom))
- for (auto const& f: m_filters)
- if (f.second.filter.matches(_e))
- for (auto const& i: m_watches)
- if (i.second.id == f.first)
- return true;
- return false;
- }
- void WhisperHost::saveMessagesToBD()
- {
- if (!m_storeMessagesInDB)
- return;
- try
- {
- WhisperMessagesDB db;
- ReadGuard g(x_messages);
- uint64_t now = utcTime();
- for (auto const& m: m_messages)
- if (m.second.expiry() > now)
- if (isWatched(m.second))
- db.saveSingleMessage(m.first, m.second);
- }
- catch(FailedToOpenLevelDB const& ex)
- {
- cwarn << "Exception in WhisperHost::saveMessagesToBD() - failed to open DB:" << ex.what();
- }
- catch(Exception const& ex)
- {
- cwarn << "Exception in WhisperHost::saveMessagesToBD():" << ex.what();
- }
- catch(...)
- {
- cwarn << "Unknown Exception in WhisperHost::saveMessagesToBD()";
- }
- }
- void WhisperHost::loadMessagesFromBD()
- {
- if (!m_storeMessagesInDB)
- return;
- try
- {
- map<h256, Envelope> m;
- WhisperMessagesDB db;
- db.loadAllMessages(m);
- WriteGuard g(x_messages);
- m_messages.swap(m);
- for (auto const& msg: m)
- m_expiryQueue.insert(make_pair(msg.second.expiry(), msg.first));
- }
- catch(Exception const& ex)
- {
- cwarn << "Exception in WhisperHost::loadMessagesFromBD():" << ex.what();
- }
- catch(...)
- {
- cwarn << "Unknown Exception in WhisperHost::loadMessagesFromBD()";
- }
- }
|