protocol.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package network
  17. /*
  18. bzz implements the swarm wire protocol [bzz] (sister of eth and shh)
  19. the protocol instance is launched on each peer by the network layer if the
  20. bzz protocol handler is registered on the p2p server.
  21. The bzz protocol component speaks the bzz protocol
  22. * handle the protocol handshake
  23. * register peers in the KΛÐΞMLIΛ table via the hive logistic manager
  24. * dispatch to hive for handling the DHT logic
  25. * encode and decode requests for storage and retrieval
  26. * handle sync protocol messages via the syncer
  27. * talks the SWAP payment protocol (swap accounting is done within NetStore)
  28. */
  29. import (
  30. "errors"
  31. "fmt"
  32. "net"
  33. "strconv"
  34. "time"
  35. "github.com/ethereum/go-ethereum/contracts/chequebook"
  36. "github.com/ethereum/go-ethereum/log"
  37. "github.com/ethereum/go-ethereum/metrics"
  38. "github.com/ethereum/go-ethereum/p2p"
  39. bzzswap "github.com/ethereum/go-ethereum/swarm/services/swap"
  40. "github.com/ethereum/go-ethereum/swarm/services/swap/swap"
  41. "github.com/ethereum/go-ethereum/swarm/storage"
  42. )
  43. //metrics variables
  44. var (
  45. storeRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.storerequest.count", nil)
  46. retrieveRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.retrieverequest.count", nil)
  47. peersMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.peers.count", nil)
  48. syncRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.syncrequest.count", nil)
  49. unsyncedKeysMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.unsyncedkeys.count", nil)
  50. deliverRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.deliverrequest.count", nil)
  51. paymentMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.payment.count", nil)
  52. invalidMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.invalid.count", nil)
  53. handleStatusMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.handlestatus.count", nil)
  54. )
  55. const (
  56. Version = 0
  57. ProtocolLength = uint64(8)
  58. ProtocolMaxMsgSize = 10 * 1024 * 1024
  59. NetworkId = 3
  60. )
  61. // bzz represents the swarm wire protocol
  62. // an instance is running on each peer
  63. type bzz struct {
  64. storage StorageHandler // handler storage/retrieval related requests coming via the bzz wire protocol
  65. hive *Hive // the logistic manager, peerPool, routing service and peer handler
  66. dbAccess *DbAccess // access to db storage counter and iterator for syncing
  67. requestDb *storage.LDBDatabase // db to persist backlog of deliveries to aid syncing
  68. remoteAddr *peerAddr // remote peers address
  69. peer *p2p.Peer // the p2p peer object
  70. rw p2p.MsgReadWriter // messageReadWriter to send messages to
  71. backend chequebook.Backend
  72. lastActive time.Time
  73. NetworkId uint64
  74. swap *swap.Swap // swap instance for the peer connection
  75. swapParams *bzzswap.SwapParams // swap settings both local and remote
  76. swapEnabled bool // flag to enable SWAP (will be set via Caps in handshake)
  77. syncEnabled bool // flag to enable SYNC (will be set via Caps in handshake)
  78. syncer *syncer // syncer instance for the peer connection
  79. syncParams *SyncParams // syncer params
  80. syncState *syncState // outgoing syncronisation state (contains reference to remote peers db counter)
  81. }
  82. // interface type for handler of storage/retrieval related requests coming
  83. // via the bzz wire protocol
  84. // messages: UnsyncedKeys, DeliveryRequest, StoreRequest, RetrieveRequest
  85. type StorageHandler interface {
  86. HandleUnsyncedKeysMsg(req *unsyncedKeysMsgData, p *peer) error
  87. HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer) error
  88. HandleStoreRequestMsg(req *storeRequestMsgData, p *peer)
  89. HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer)
  90. }
  91. /*
  92. main entrypoint, wrappers starting a server that will run the bzz protocol
  93. use this constructor to attach the protocol ("class") to server caps
  94. This is done by node.Node#Register(func(node.ServiceContext) (Service, error))
  95. Service implements Protocols() which is an array of protocol constructors
  96. at node startup the protocols are initialised
  97. the Dev p2p layer then calls Run(p *p2p.Peer, rw p2p.MsgReadWriter) error
  98. on each peer connection
  99. The Run function of the Bzz protocol class creates a bzz instance
  100. which will represent the peer for the swarm hive and all peer-aware components
  101. */
  102. func Bzz(cloud StorageHandler, backend chequebook.Backend, hive *Hive, dbaccess *DbAccess, sp *bzzswap.SwapParams, sy *SyncParams, networkId uint64) (p2p.Protocol, error) {
  103. // a single global request db is created for all peer connections
  104. // this is to persist delivery backlog and aid syncronisation
  105. requestDb, err := storage.NewLDBDatabase(sy.RequestDbPath)
  106. if err != nil {
  107. return p2p.Protocol{}, fmt.Errorf("error setting up request db: %v", err)
  108. }
  109. if networkId == 0 {
  110. networkId = NetworkId
  111. }
  112. return p2p.Protocol{
  113. Name: "bzz",
  114. Version: Version,
  115. Length: ProtocolLength,
  116. Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  117. return run(requestDb, cloud, backend, hive, dbaccess, sp, sy, networkId, p, rw)
  118. },
  119. }, nil
  120. }
  121. /*
  122. the main protocol loop that
  123. * does the handshake by exchanging statusMsg
  124. * if peer is valid and accepted, registers with the hive
  125. * then enters into a forever loop handling incoming messages
  126. * storage and retrieval related queries coming via bzz are dispatched to StorageHandler
  127. * peer-related messages are dispatched to the hive
  128. * payment related messages are relayed to SWAP service
  129. * on disconnect, unregister the peer in the hive (note RemovePeer in the post-disconnect hook)
  130. * whenever the loop terminates, the peer will disconnect with Subprotocol error
  131. * whenever handlers return an error the loop terminates
  132. */
  133. func run(requestDb *storage.LDBDatabase, depo StorageHandler, backend chequebook.Backend, hive *Hive, dbaccess *DbAccess, sp *bzzswap.SwapParams, sy *SyncParams, networkId uint64, p *p2p.Peer, rw p2p.MsgReadWriter) (err error) {
  134. self := &bzz{
  135. storage: depo,
  136. backend: backend,
  137. hive: hive,
  138. dbAccess: dbaccess,
  139. requestDb: requestDb,
  140. peer: p,
  141. rw: rw,
  142. swapParams: sp,
  143. syncParams: sy,
  144. swapEnabled: hive.swapEnabled,
  145. syncEnabled: true,
  146. NetworkId: networkId,
  147. }
  148. // handle handshake
  149. err = self.handleStatus()
  150. if err != nil {
  151. return err
  152. }
  153. defer func() {
  154. // if the handler loop exits, the peer is disconnecting
  155. // deregister the peer in the hive
  156. self.hive.removePeer(&peer{bzz: self})
  157. if self.syncer != nil {
  158. self.syncer.stop() // quits request db and delivery loops, save requests
  159. }
  160. if self.swap != nil {
  161. self.swap.Stop() // quits chequebox autocash etc
  162. }
  163. }()
  164. // the main forever loop that handles incoming requests
  165. for {
  166. if self.hive.blockRead {
  167. log.Warn(fmt.Sprintf("Cannot read network"))
  168. time.Sleep(100 * time.Millisecond)
  169. continue
  170. }
  171. err = self.handle()
  172. if err != nil {
  173. return
  174. }
  175. }
  176. }
  177. // TODO: may need to implement protocol drop only? don't want to kick off the peer
  178. // if they are useful for other protocols
  179. func (self *bzz) Drop() {
  180. self.peer.Disconnect(p2p.DiscSubprotocolError)
  181. }
  182. // one cycle of the main forever loop that handles and dispatches incoming messages
  183. func (self *bzz) handle() error {
  184. msg, err := self.rw.ReadMsg()
  185. log.Debug(fmt.Sprintf("<- %v", msg))
  186. if err != nil {
  187. return err
  188. }
  189. if msg.Size > ProtocolMaxMsgSize {
  190. return fmt.Errorf("message too long: %v > %v", msg.Size, ProtocolMaxMsgSize)
  191. }
  192. // make sure that the payload has been fully consumed
  193. defer msg.Discard()
  194. switch msg.Code {
  195. case statusMsg:
  196. // no extra status message allowed. The one needed already handled by
  197. // handleStatus
  198. log.Debug(fmt.Sprintf("Status message: %v", msg))
  199. return errors.New("extra status message")
  200. case storeRequestMsg:
  201. // store requests are dispatched to netStore
  202. storeRequestMsgCounter.Inc(1)
  203. var req storeRequestMsgData
  204. if err := msg.Decode(&req); err != nil {
  205. return fmt.Errorf("<- %v: %v", msg, err)
  206. }
  207. if n := len(req.SData); n < 9 {
  208. return fmt.Errorf("<- %v: Data too short (%v)", msg, n)
  209. }
  210. // last Active time is set only when receiving chunks
  211. self.lastActive = time.Now()
  212. log.Trace(fmt.Sprintf("incoming store request: %s", req.String()))
  213. // swap accounting is done within forwarding
  214. self.storage.HandleStoreRequestMsg(&req, &peer{bzz: self})
  215. case retrieveRequestMsg:
  216. // retrieve Requests are dispatched to netStore
  217. retrieveRequestMsgCounter.Inc(1)
  218. var req retrieveRequestMsgData
  219. if err := msg.Decode(&req); err != nil {
  220. return fmt.Errorf("<- %v: %v", msg, err)
  221. }
  222. req.from = &peer{bzz: self}
  223. // if request is lookup and not to be delivered
  224. if req.isLookup() {
  225. log.Trace(fmt.Sprintf("self lookup for %v: responding with peers only...", req.from))
  226. } else if req.Key == nil {
  227. return fmt.Errorf("protocol handler: req.Key == nil || req.Timeout == nil")
  228. } else {
  229. // swap accounting is done within netStore
  230. self.storage.HandleRetrieveRequestMsg(&req, &peer{bzz: self})
  231. }
  232. // direct response with peers, TODO: sort this out
  233. self.hive.peers(&req)
  234. case peersMsg:
  235. // response to lookups and immediate response to retrieve requests
  236. // dispatches new peer data to the hive that adds them to KADDB
  237. peersMsgCounter.Inc(1)
  238. var req peersMsgData
  239. if err := msg.Decode(&req); err != nil {
  240. return fmt.Errorf("<- %v: %v", msg, err)
  241. }
  242. req.from = &peer{bzz: self}
  243. log.Trace(fmt.Sprintf("<- peer addresses: %v", req))
  244. self.hive.HandlePeersMsg(&req, &peer{bzz: self})
  245. case syncRequestMsg:
  246. syncRequestMsgCounter.Inc(1)
  247. var req syncRequestMsgData
  248. if err := msg.Decode(&req); err != nil {
  249. return fmt.Errorf("<- %v: %v", msg, err)
  250. }
  251. log.Debug(fmt.Sprintf("<- sync request: %v", req))
  252. self.lastActive = time.Now()
  253. self.sync(req.SyncState)
  254. case unsyncedKeysMsg:
  255. // coming from parent node offering
  256. unsyncedKeysMsgCounter.Inc(1)
  257. var req unsyncedKeysMsgData
  258. if err := msg.Decode(&req); err != nil {
  259. return fmt.Errorf("<- %v: %v", msg, err)
  260. }
  261. log.Debug(fmt.Sprintf("<- unsynced keys : %s", req.String()))
  262. err := self.storage.HandleUnsyncedKeysMsg(&req, &peer{bzz: self})
  263. self.lastActive = time.Now()
  264. if err != nil {
  265. return fmt.Errorf("<- %v: %v", msg, err)
  266. }
  267. case deliveryRequestMsg:
  268. // response to syncKeysMsg hashes filtered not existing in db
  269. // also relays the last synced state to the source
  270. deliverRequestMsgCounter.Inc(1)
  271. var req deliveryRequestMsgData
  272. if err := msg.Decode(&req); err != nil {
  273. return fmt.Errorf("<-msg %v: %v", msg, err)
  274. }
  275. log.Debug(fmt.Sprintf("<- delivery request: %s", req.String()))
  276. err := self.storage.HandleDeliveryRequestMsg(&req, &peer{bzz: self})
  277. self.lastActive = time.Now()
  278. if err != nil {
  279. return fmt.Errorf("<- %v: %v", msg, err)
  280. }
  281. case paymentMsg:
  282. // swap protocol message for payment, Units paid for, Cheque paid with
  283. paymentMsgCounter.Inc(1)
  284. if self.swapEnabled {
  285. var req paymentMsgData
  286. if err := msg.Decode(&req); err != nil {
  287. return fmt.Errorf("<- %v: %v", msg, err)
  288. }
  289. log.Debug(fmt.Sprintf("<- payment: %s", req.String()))
  290. self.swap.Receive(int(req.Units), req.Promise)
  291. }
  292. default:
  293. // no other message is allowed
  294. invalidMsgCounter.Inc(1)
  295. return fmt.Errorf("invalid message code: %v", msg.Code)
  296. }
  297. return nil
  298. }
  299. func (self *bzz) handleStatus() (err error) {
  300. handshake := &statusMsgData{
  301. Version: uint64(Version),
  302. ID: "honey",
  303. Addr: self.selfAddr(),
  304. NetworkId: self.NetworkId,
  305. Swap: &bzzswap.SwapProfile{
  306. Profile: self.swapParams.Profile,
  307. PayProfile: self.swapParams.PayProfile,
  308. },
  309. }
  310. err = p2p.Send(self.rw, statusMsg, handshake)
  311. if err != nil {
  312. return err
  313. }
  314. // read and handle remote status
  315. var msg p2p.Msg
  316. msg, err = self.rw.ReadMsg()
  317. if err != nil {
  318. return err
  319. }
  320. if msg.Code != statusMsg {
  321. return fmt.Errorf("first msg has code %x (!= %x)", msg.Code, statusMsg)
  322. }
  323. handleStatusMsgCounter.Inc(1)
  324. if msg.Size > ProtocolMaxMsgSize {
  325. return fmt.Errorf("message too long: %v > %v", msg.Size, ProtocolMaxMsgSize)
  326. }
  327. var status statusMsgData
  328. if err := msg.Decode(&status); err != nil {
  329. return fmt.Errorf("<- %v: %v", msg, err)
  330. }
  331. if status.NetworkId != self.NetworkId {
  332. return fmt.Errorf("network id mismatch: %d (!= %d)", status.NetworkId, self.NetworkId)
  333. }
  334. if Version != status.Version {
  335. return fmt.Errorf("protocol version mismatch: %d (!= %d)", status.Version, Version)
  336. }
  337. self.remoteAddr = self.peerAddr(status.Addr)
  338. log.Trace(fmt.Sprintf("self: advertised IP: %v, peer advertised: %v, local address: %v\npeer: advertised IP: %v, remote address: %v\n", self.selfAddr(), self.remoteAddr, self.peer.LocalAddr(), status.Addr.IP, self.peer.RemoteAddr()))
  339. if self.swapEnabled {
  340. // set remote profile for accounting
  341. self.swap, err = bzzswap.NewSwap(self.swapParams, status.Swap, self.backend, self)
  342. if err != nil {
  343. return err
  344. }
  345. }
  346. log.Info(fmt.Sprintf("Peer %08x is capable (%d/%d)", self.remoteAddr.Addr[:4], status.Version, status.NetworkId))
  347. err = self.hive.addPeer(&peer{bzz: self})
  348. if err != nil {
  349. return err
  350. }
  351. // hive sets syncstate so sync should start after node added
  352. log.Info(fmt.Sprintf("syncronisation request sent with %v", self.syncState))
  353. self.syncRequest()
  354. return nil
  355. }
  356. func (self *bzz) sync(state *syncState) error {
  357. // syncer setup
  358. if self.syncer != nil {
  359. return errors.New("sync request can only be sent once")
  360. }
  361. cnt := self.dbAccess.counter()
  362. remoteaddr := self.remoteAddr.Addr
  363. start, stop := self.hive.kad.KeyRange(remoteaddr)
  364. // an explicitly received nil syncstate disables syncronisation
  365. if state == nil {
  366. self.syncEnabled = false
  367. log.Warn(fmt.Sprintf("syncronisation disabled for peer %v", self))
  368. state = &syncState{DbSyncState: &storage.DbSyncState{}, Synced: true}
  369. } else {
  370. state.synced = make(chan bool)
  371. state.SessionAt = cnt
  372. if storage.IsZeroKey(state.Stop) && state.Synced {
  373. state.Start = storage.Key(start[:])
  374. state.Stop = storage.Key(stop[:])
  375. }
  376. log.Debug(fmt.Sprintf("syncronisation requested by peer %v at state %v", self, state))
  377. }
  378. var err error
  379. self.syncer, err = newSyncer(
  380. self.requestDb,
  381. storage.Key(remoteaddr[:]),
  382. self.dbAccess,
  383. self.unsyncedKeys, self.store,
  384. self.syncParams, state, func() bool { return self.syncEnabled },
  385. )
  386. if err != nil {
  387. return nil
  388. }
  389. log.Trace(fmt.Sprintf("syncer set for peer %v", self))
  390. return nil
  391. }
  392. func (self *bzz) String() string {
  393. return self.remoteAddr.String()
  394. }
  395. // repair reported address if IP missing
  396. func (self *bzz) peerAddr(base *peerAddr) *peerAddr {
  397. if base.IP.IsUnspecified() {
  398. host, _, _ := net.SplitHostPort(self.peer.RemoteAddr().String())
  399. base.IP = net.ParseIP(host)
  400. }
  401. return base
  402. }
  403. // returns self advertised node connection info (listening address w enodes)
  404. // IP will get repaired on the other end if missing
  405. // or resolved via ID by discovery at dialout
  406. func (self *bzz) selfAddr() *peerAddr {
  407. id := self.hive.id
  408. host, port, _ := net.SplitHostPort(self.hive.listenAddr())
  409. intport, _ := strconv.Atoi(port)
  410. addr := &peerAddr{
  411. Addr: self.hive.addr,
  412. ID: id[:],
  413. IP: net.ParseIP(host),
  414. Port: uint16(intport),
  415. }
  416. return addr
  417. }
  418. // outgoing messages
  419. // send retrieveRequestMsg
  420. func (self *bzz) retrieve(req *retrieveRequestMsgData) error {
  421. return self.send(retrieveRequestMsg, req)
  422. }
  423. // send storeRequestMsg
  424. func (self *bzz) store(req *storeRequestMsgData) error {
  425. return self.send(storeRequestMsg, req)
  426. }
  427. func (self *bzz) syncRequest() error {
  428. req := &syncRequestMsgData{}
  429. if self.hive.syncEnabled {
  430. log.Debug(fmt.Sprintf("syncronisation request to peer %v at state %v", self, self.syncState))
  431. req.SyncState = self.syncState
  432. }
  433. if self.syncState == nil {
  434. log.Warn(fmt.Sprintf("syncronisation disabled for peer %v at state %v", self, self.syncState))
  435. }
  436. return self.send(syncRequestMsg, req)
  437. }
  438. // queue storeRequestMsg in request db
  439. func (self *bzz) deliveryRequest(reqs []*syncRequest) error {
  440. req := &deliveryRequestMsgData{
  441. Deliver: reqs,
  442. }
  443. return self.send(deliveryRequestMsg, req)
  444. }
  445. // batch of syncRequests to send off
  446. func (self *bzz) unsyncedKeys(reqs []*syncRequest, state *syncState) error {
  447. req := &unsyncedKeysMsgData{
  448. Unsynced: reqs,
  449. State: state,
  450. }
  451. return self.send(unsyncedKeysMsg, req)
  452. }
  453. // send paymentMsg
  454. func (self *bzz) Pay(units int, promise swap.Promise) {
  455. req := &paymentMsgData{uint(units), promise.(*chequebook.Cheque)}
  456. self.payment(req)
  457. }
  458. // send paymentMsg
  459. func (self *bzz) payment(req *paymentMsgData) error {
  460. return self.send(paymentMsg, req)
  461. }
  462. // sends peersMsg
  463. func (self *bzz) peers(req *peersMsgData) error {
  464. return self.send(peersMsg, req)
  465. }
  466. func (self *bzz) send(msg uint64, data interface{}) error {
  467. if self.hive.blockWrite {
  468. return fmt.Errorf("network write blocked")
  469. }
  470. log.Trace(fmt.Sprintf("-> %v: %v (%T) to %v", msg, data, data, self))
  471. err := p2p.Send(self.rw, msg, data)
  472. if err != nil {
  473. self.Drop()
  474. }
  475. return err
  476. }