hive.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package network
  17. import (
  18. "fmt"
  19. "math/rand"
  20. "path/filepath"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/log"
  24. "github.com/ethereum/go-ethereum/metrics"
  25. "github.com/ethereum/go-ethereum/p2p/discover"
  26. "github.com/ethereum/go-ethereum/p2p/netutil"
  27. "github.com/ethereum/go-ethereum/swarm/network/kademlia"
  28. "github.com/ethereum/go-ethereum/swarm/storage"
  29. )
  30. // Hive is the logistic manager of the swarm
  31. // it uses a generic kademlia nodetable to find best peer list
  32. // for any target
  33. // this is used by the netstore to search for content in the swarm
  34. // the bzz protocol peersMsgData exchange is relayed to Kademlia
  35. // for db storage and filtering
  36. // connections and disconnections are reported and relayed
  37. // to keep the nodetable uptodate
  38. var (
  39. peersNumGauge = metrics.NewRegisteredGauge("network.peers.num", nil)
  40. addPeerCounter = metrics.NewRegisteredCounter("network.addpeer.count", nil)
  41. removePeerCounter = metrics.NewRegisteredCounter("network.removepeer.count", nil)
  42. )
  43. type Hive struct {
  44. listenAddr func() string
  45. callInterval uint64
  46. id discover.NodeID
  47. addr kademlia.Address
  48. kad *kademlia.Kademlia
  49. path string
  50. quit chan bool
  51. toggle chan bool
  52. more chan bool
  53. // for testing only
  54. swapEnabled bool
  55. syncEnabled bool
  56. blockRead bool
  57. blockWrite bool
  58. }
  59. const (
  60. callInterval = 3000000000
  61. // bucketSize = 3
  62. // maxProx = 8
  63. // proxBinSize = 4
  64. )
  65. type HiveParams struct {
  66. CallInterval uint64
  67. KadDbPath string
  68. *kademlia.KadParams
  69. }
  70. //create default params
  71. func NewDefaultHiveParams() *HiveParams {
  72. kad := kademlia.NewDefaultKadParams()
  73. // kad.BucketSize = bucketSize
  74. // kad.MaxProx = maxProx
  75. // kad.ProxBinSize = proxBinSize
  76. return &HiveParams{
  77. CallInterval: callInterval,
  78. KadParams: kad,
  79. }
  80. }
  81. //this can only finally be set after all config options (file, cmd line, env vars)
  82. //have been evaluated
  83. func (self *HiveParams) Init(path string) {
  84. self.KadDbPath = filepath.Join(path, "bzz-peers.json")
  85. }
  86. func NewHive(addr common.Hash, params *HiveParams, swapEnabled, syncEnabled bool) *Hive {
  87. kad := kademlia.New(kademlia.Address(addr), params.KadParams)
  88. return &Hive{
  89. callInterval: params.CallInterval,
  90. kad: kad,
  91. addr: kad.Addr(),
  92. path: params.KadDbPath,
  93. swapEnabled: swapEnabled,
  94. syncEnabled: syncEnabled,
  95. }
  96. }
  97. func (self *Hive) SyncEnabled(on bool) {
  98. self.syncEnabled = on
  99. }
  100. func (self *Hive) SwapEnabled(on bool) {
  101. self.swapEnabled = on
  102. }
  103. func (self *Hive) BlockNetworkRead(on bool) {
  104. self.blockRead = on
  105. }
  106. func (self *Hive) BlockNetworkWrite(on bool) {
  107. self.blockWrite = on
  108. }
  109. // public accessor to the hive base address
  110. func (self *Hive) Addr() kademlia.Address {
  111. return self.addr
  112. }
  113. // Start receives network info only at startup
  114. // listedAddr is a function to retrieve listening address to advertise to peers
  115. // connectPeer is a function to connect to a peer based on its NodeID or enode URL
  116. // there are called on the p2p.Server which runs on the node
  117. func (self *Hive) Start(id discover.NodeID, listenAddr func() string, connectPeer func(string) error) (err error) {
  118. self.toggle = make(chan bool)
  119. self.more = make(chan bool)
  120. self.quit = make(chan bool)
  121. self.id = id
  122. self.listenAddr = listenAddr
  123. err = self.kad.Load(self.path, nil)
  124. if err != nil {
  125. log.Warn(fmt.Sprintf("Warning: error reading kaddb '%s' (skipping): %v", self.path, err))
  126. err = nil
  127. }
  128. // this loop is doing bootstrapping and maintains a healthy table
  129. go self.keepAlive()
  130. go func() {
  131. // whenever toggled ask kademlia about most preferred peer
  132. for alive := range self.more {
  133. if !alive {
  134. // receiving false closes the loop while allowing parallel routines
  135. // to attempt to write to more (remove Peer when shutting down)
  136. return
  137. }
  138. node, need, proxLimit := self.kad.Suggest()
  139. if node != nil && len(node.Url) > 0 {
  140. log.Trace(fmt.Sprintf("call known bee %v", node.Url))
  141. // enode or any lower level connection address is unnecessary in future
  142. // discovery table is used to look it up.
  143. connectPeer(node.Url)
  144. }
  145. if need {
  146. // a random peer is taken from the table
  147. peers := self.kad.FindClosest(kademlia.RandomAddressAt(self.addr, rand.Intn(self.kad.MaxProx)), 1)
  148. if len(peers) > 0 {
  149. // a random address at prox bin 0 is sent for lookup
  150. randAddr := kademlia.RandomAddressAt(self.addr, proxLimit)
  151. req := &retrieveRequestMsgData{
  152. Key: storage.Key(randAddr[:]),
  153. }
  154. log.Trace(fmt.Sprintf("call any bee near %v (PO%03d) - messenger bee: %v", randAddr, proxLimit, peers[0]))
  155. peers[0].(*peer).retrieve(req)
  156. } else {
  157. log.Warn(fmt.Sprintf("no peer"))
  158. }
  159. log.Trace(fmt.Sprintf("buzz kept alive"))
  160. } else {
  161. log.Info(fmt.Sprintf("no need for more bees"))
  162. }
  163. select {
  164. case self.toggle <- need:
  165. case <-self.quit:
  166. return
  167. }
  168. log.Debug(fmt.Sprintf("queen's address: %v, population: %d (%d)", self.addr, self.kad.Count(), self.kad.DBCount()))
  169. }
  170. }()
  171. return
  172. }
  173. // keepAlive is a forever loop
  174. // in its awake state it periodically triggers connection attempts
  175. // by writing to self.more until Kademlia Table is saturated
  176. // wake state is toggled by writing to self.toggle
  177. // it restarts if the table becomes non-full again due to disconnections
  178. func (self *Hive) keepAlive() {
  179. alarm := time.NewTicker(time.Duration(self.callInterval)).C
  180. for {
  181. peersNumGauge.Update(int64(self.kad.Count()))
  182. select {
  183. case <-alarm:
  184. if self.kad.DBCount() > 0 {
  185. select {
  186. case self.more <- true:
  187. log.Debug(fmt.Sprintf("buzz wakeup"))
  188. default:
  189. }
  190. }
  191. case need := <-self.toggle:
  192. if alarm == nil && need {
  193. alarm = time.NewTicker(time.Duration(self.callInterval)).C
  194. }
  195. if alarm != nil && !need {
  196. alarm = nil
  197. }
  198. case <-self.quit:
  199. return
  200. }
  201. }
  202. }
  203. func (self *Hive) Stop() error {
  204. // closing toggle channel quits the updateloop
  205. close(self.quit)
  206. return self.kad.Save(self.path, saveSync)
  207. }
  208. // called at the end of a successful protocol handshake
  209. func (self *Hive) addPeer(p *peer) error {
  210. addPeerCounter.Inc(1)
  211. defer func() {
  212. select {
  213. case self.more <- true:
  214. default:
  215. }
  216. }()
  217. log.Trace(fmt.Sprintf("hi new bee %v", p))
  218. err := self.kad.On(p, loadSync)
  219. if err != nil {
  220. return err
  221. }
  222. // self lookup (can be encoded as nil/zero key since peers addr known) + no id ()
  223. // the most common way of saying hi in bzz is initiation of gossip
  224. // let me know about anyone new from my hood , here is the storageradius
  225. // to send the 6 byte self lookup
  226. // we do not record as request or forward it, just reply with peers
  227. p.retrieve(&retrieveRequestMsgData{})
  228. log.Trace(fmt.Sprintf("'whatsup wheresdaparty' sent to %v", p))
  229. return nil
  230. }
  231. // called after peer disconnected
  232. func (self *Hive) removePeer(p *peer) {
  233. removePeerCounter.Inc(1)
  234. log.Debug(fmt.Sprintf("bee %v removed", p))
  235. self.kad.Off(p, saveSync)
  236. select {
  237. case self.more <- true:
  238. default:
  239. }
  240. if self.kad.Count() == 0 {
  241. log.Debug(fmt.Sprintf("empty, all bees gone"))
  242. }
  243. }
  244. // Retrieve a list of live peers that are closer to target than us
  245. func (self *Hive) getPeers(target storage.Key, max int) (peers []*peer) {
  246. var addr kademlia.Address
  247. copy(addr[:], target[:])
  248. for _, node := range self.kad.FindClosest(addr, max) {
  249. peers = append(peers, node.(*peer))
  250. }
  251. return
  252. }
  253. // disconnects all the peers
  254. func (self *Hive) DropAll() {
  255. log.Info(fmt.Sprintf("dropping all bees"))
  256. for _, node := range self.kad.FindClosest(kademlia.Address{}, 0) {
  257. node.Drop()
  258. }
  259. }
  260. // contructor for kademlia.NodeRecord based on peer address alone
  261. // TODO: should go away and only addr passed to kademlia
  262. func newNodeRecord(addr *peerAddr) *kademlia.NodeRecord {
  263. now := time.Now()
  264. return &kademlia.NodeRecord{
  265. Addr: addr.Addr,
  266. Url: addr.String(),
  267. Seen: now,
  268. After: now,
  269. }
  270. }
  271. // called by the protocol when receiving peerset (for target address)
  272. // peersMsgData is converted to a slice of NodeRecords for Kademlia
  273. // this is to store all thats needed
  274. func (self *Hive) HandlePeersMsg(req *peersMsgData, from *peer) {
  275. var nrs []*kademlia.NodeRecord
  276. for _, p := range req.Peers {
  277. if err := netutil.CheckRelayIP(from.remoteAddr.IP, p.IP); err != nil {
  278. log.Trace(fmt.Sprintf("invalid peer IP %v from %v: %v", from.remoteAddr.IP, p.IP, err))
  279. continue
  280. }
  281. nrs = append(nrs, newNodeRecord(p))
  282. }
  283. self.kad.Add(nrs)
  284. }
  285. // peer wraps the protocol instance to represent a connected peer
  286. // it implements kademlia.Node interface
  287. type peer struct {
  288. *bzz // protocol instance running on peer connection
  289. }
  290. // protocol instance implements kademlia.Node interface (embedded peer)
  291. func (self *peer) Addr() kademlia.Address {
  292. return self.remoteAddr.Addr
  293. }
  294. func (self *peer) Url() string {
  295. return self.remoteAddr.String()
  296. }
  297. // TODO take into account traffic
  298. func (self *peer) LastActive() time.Time {
  299. return self.lastActive
  300. }
  301. // reads the serialised form of sync state persisted as the 'Meta' attribute
  302. // and sets the decoded syncState on the online node
  303. func loadSync(record *kademlia.NodeRecord, node kademlia.Node) error {
  304. p, ok := node.(*peer)
  305. if !ok {
  306. return fmt.Errorf("invalid type")
  307. }
  308. if record.Meta == nil {
  309. log.Debug(fmt.Sprintf("no sync state for node record %v setting default", record))
  310. p.syncState = &syncState{DbSyncState: &storage.DbSyncState{}}
  311. return nil
  312. }
  313. state, err := decodeSync(record.Meta)
  314. if err != nil {
  315. return fmt.Errorf("error decoding kddb record meta info into a sync state: %v", err)
  316. }
  317. log.Trace(fmt.Sprintf("sync state for node record %v read from Meta: %s", record, string(*(record.Meta))))
  318. p.syncState = state
  319. return err
  320. }
  321. // callback when saving a sync state
  322. func saveSync(record *kademlia.NodeRecord, node kademlia.Node) {
  323. if p, ok := node.(*peer); ok {
  324. meta, err := encodeSync(p.syncState)
  325. if err != nil {
  326. log.Warn(fmt.Sprintf("error saving sync state for %v: %v", node, err))
  327. return
  328. }
  329. log.Trace(fmt.Sprintf("saved sync state for %v: %s", node, string(*meta)))
  330. record.Meta = meta
  331. }
  332. }
  333. // the immediate response to a retrieve request,
  334. // sends relevant peer data given by the kademlia hive to the requester
  335. // TODO: remember peers sent for duration of the session, only new peers sent
  336. func (self *Hive) peers(req *retrieveRequestMsgData) {
  337. if req != nil {
  338. var addrs []*peerAddr
  339. if req.timeout == nil || time.Now().Before(*(req.timeout)) {
  340. key := req.Key
  341. // self lookup from remote peer
  342. if storage.IsZeroKey(key) {
  343. addr := req.from.Addr()
  344. key = storage.Key(addr[:])
  345. req.Key = nil
  346. }
  347. // get peer addresses from hive
  348. for _, peer := range self.getPeers(key, int(req.MaxPeers)) {
  349. addrs = append(addrs, peer.remoteAddr)
  350. }
  351. log.Debug(fmt.Sprintf("Hive sending %d peer addresses to %v. req.Id: %v, req.Key: %v", len(addrs), req.from, req.Id, req.Key.Log()))
  352. peersData := &peersMsgData{
  353. Peers: addrs,
  354. Key: req.Key,
  355. Id: req.Id,
  356. }
  357. peersData.setTimeout(req.timeout)
  358. req.from.peers(peersData)
  359. }
  360. }
  361. }
  362. func (self *Hive) String() string {
  363. return self.kad.String()
  364. }