depo.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package network
  17. import (
  18. "bytes"
  19. "encoding/binary"
  20. "fmt"
  21. "time"
  22. "github.com/ethereum/go-ethereum/log"
  23. "github.com/ethereum/go-ethereum/metrics"
  24. "github.com/ethereum/go-ethereum/swarm/storage"
  25. )
  26. //metrics variables
  27. var (
  28. syncReceiveCount = metrics.NewRegisteredCounter("network.sync.recv.count", nil)
  29. syncReceiveIgnore = metrics.NewRegisteredCounter("network.sync.recv.ignore", nil)
  30. syncSendCount = metrics.NewRegisteredCounter("network.sync.send.count", nil)
  31. syncSendRefused = metrics.NewRegisteredCounter("network.sync.send.refused", nil)
  32. syncSendNotFound = metrics.NewRegisteredCounter("network.sync.send.notfound", nil)
  33. )
  34. // Handler for storage/retrieval related protocol requests
  35. // implements the StorageHandler interface used by the bzz protocol
  36. type Depo struct {
  37. hashfunc storage.SwarmHasher
  38. localStore storage.ChunkStore
  39. netStore storage.ChunkStore
  40. }
  41. func NewDepo(hash storage.SwarmHasher, localStore, remoteStore storage.ChunkStore) *Depo {
  42. return &Depo{
  43. hashfunc: hash,
  44. localStore: localStore,
  45. netStore: remoteStore, // entrypoint internal
  46. }
  47. }
  48. // Handles UnsyncedKeysMsg after msg decoding - unsynced hashes upto sync state
  49. // * the remote sync state is just stored and handled in protocol
  50. // * filters through the new syncRequests and send the ones missing
  51. // * back immediately as a deliveryRequest message
  52. // * empty message just pings back for more (is this needed?)
  53. // * strict signed sync states may be needed.
  54. func (self *Depo) HandleUnsyncedKeysMsg(req *unsyncedKeysMsgData, p *peer) error {
  55. unsynced := req.Unsynced
  56. var missing []*syncRequest
  57. var chunk *storage.Chunk
  58. var err error
  59. for _, req := range unsynced {
  60. // skip keys that are found,
  61. chunk, err = self.localStore.Get(req.Key[:])
  62. if err != nil || chunk.SData == nil {
  63. missing = append(missing, req)
  64. }
  65. }
  66. log.Debug(fmt.Sprintf("Depo.HandleUnsyncedKeysMsg: received %v unsynced keys: %v missing. new state: %v", len(unsynced), len(missing), req.State))
  67. log.Trace(fmt.Sprintf("Depo.HandleUnsyncedKeysMsg: received %v", unsynced))
  68. // send delivery request with missing keys
  69. err = p.deliveryRequest(missing)
  70. if err != nil {
  71. return err
  72. }
  73. // set peers state to persist
  74. p.syncState = req.State
  75. return nil
  76. }
  77. // Handles deliveryRequestMsg
  78. // * serves actual chunks asked by the remote peer
  79. // by pushing to the delivery queue (sync db) of the correct priority
  80. // (remote peer is free to reprioritize)
  81. // * the message implies remote peer wants more, so trigger for
  82. // * new outgoing unsynced keys message is fired
  83. func (self *Depo) HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer) error {
  84. deliver := req.Deliver
  85. // queue the actual delivery of a chunk ()
  86. log.Trace(fmt.Sprintf("Depo.HandleDeliveryRequestMsg: received %v delivery requests: %v", len(deliver), deliver))
  87. for _, sreq := range deliver {
  88. // TODO: look up in cache here or in deliveries
  89. // priorities are taken from the message so the remote party can
  90. // reprioritise to at their leisure
  91. // r = self.pullCached(sreq.Key) // pulls and deletes from cache
  92. Push(p, sreq.Key, sreq.Priority)
  93. }
  94. // sends it out as unsyncedKeysMsg
  95. p.syncer.sendUnsyncedKeys()
  96. return nil
  97. }
  98. // the entrypoint for store requests coming from the bzz wire protocol
  99. // if key found locally, return. otherwise
  100. // remote is untrusted, so hash is verified and chunk passed on to NetStore
  101. func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) {
  102. var islocal bool
  103. req.from = p
  104. chunk, err := self.localStore.Get(req.Key)
  105. switch {
  106. case err != nil:
  107. log.Trace(fmt.Sprintf("Depo.handleStoreRequest: %v not found locally. create new chunk/request", req.Key))
  108. // not found in memory cache, ie., a genuine store request
  109. // create chunk
  110. syncReceiveCount.Inc(1)
  111. chunk = storage.NewChunk(req.Key, nil)
  112. case chunk.SData == nil:
  113. // found chunk in memory store, needs the data, validate now
  114. log.Trace(fmt.Sprintf("Depo.HandleStoreRequest: %v. request entry found", req))
  115. default:
  116. // data is found, store request ignored
  117. // this should update access count?
  118. syncReceiveIgnore.Inc(1)
  119. log.Trace(fmt.Sprintf("Depo.HandleStoreRequest: %v found locally. ignore.", req))
  120. islocal = true
  121. //return
  122. }
  123. hasher := self.hashfunc()
  124. hasher.Write(req.SData)
  125. if !bytes.Equal(hasher.Sum(nil), req.Key) {
  126. // data does not validate, ignore
  127. // TODO: peer should be penalised/dropped?
  128. log.Warn(fmt.Sprintf("Depo.HandleStoreRequest: chunk invalid. store request ignored: %v", req))
  129. return
  130. }
  131. if islocal {
  132. return
  133. }
  134. // update chunk with size and data
  135. chunk.SData = req.SData // protocol validates that SData is minimum 9 bytes long (int64 size + at least one byte of data)
  136. chunk.Size = int64(binary.LittleEndian.Uint64(req.SData[0:8]))
  137. log.Trace(fmt.Sprintf("delivery of %v from %v", chunk, p))
  138. chunk.Source = p
  139. self.netStore.Put(chunk)
  140. }
  141. // entrypoint for retrieve requests coming from the bzz wire protocol
  142. // checks swap balance - return if peer has no credit
  143. func (self *Depo) HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer) {
  144. req.from = p
  145. // swap - record credit for 1 request
  146. // note that only charge actual reqsearches
  147. var err error
  148. if p.swap != nil {
  149. err = p.swap.Add(1)
  150. }
  151. if err != nil {
  152. log.Warn(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - cannot process request: %v", req.Key.Log(), err))
  153. return
  154. }
  155. // call storage.NetStore#Get which
  156. // blocks until local retrieval finished
  157. // launches cloud retrieval
  158. chunk, _ := self.netStore.Get(req.Key)
  159. req = self.strategyUpdateRequest(chunk.Req, req)
  160. // check if we can immediately deliver
  161. if chunk.SData != nil {
  162. log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content found, delivering...", req.Key.Log()))
  163. if req.MaxSize == 0 || int64(req.MaxSize) >= chunk.Size {
  164. sreq := &storeRequestMsgData{
  165. Id: req.Id,
  166. Key: chunk.Key,
  167. SData: chunk.SData,
  168. requestTimeout: req.timeout, //
  169. }
  170. syncSendCount.Inc(1)
  171. p.syncer.addRequest(sreq, DeliverReq)
  172. } else {
  173. syncSendRefused.Inc(1)
  174. log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content found, not wanted", req.Key.Log()))
  175. }
  176. } else {
  177. syncSendNotFound.Inc(1)
  178. log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content not found locally. asked swarm for help. will get back", req.Key.Log()))
  179. }
  180. }
  181. // add peer request the chunk and decides the timeout for the response if still searching
  182. func (self *Depo) strategyUpdateRequest(rs *storage.RequestStatus, origReq *retrieveRequestMsgData) (req *retrieveRequestMsgData) {
  183. log.Trace(fmt.Sprintf("Depo.strategyUpdateRequest: key %v", origReq.Key.Log()))
  184. // we do not create an alternative one
  185. req = origReq
  186. if rs != nil {
  187. self.addRequester(rs, req)
  188. req.setTimeout(self.searchTimeout(rs, req))
  189. }
  190. return
  191. }
  192. // decides the timeout promise sent with the immediate peers response to a retrieve request
  193. // if timeout is explicitly set and expired
  194. func (self *Depo) searchTimeout(rs *storage.RequestStatus, req *retrieveRequestMsgData) (timeout *time.Time) {
  195. reqt := req.getTimeout()
  196. t := time.Now().Add(searchTimeout)
  197. if reqt != nil && reqt.Before(t) {
  198. return reqt
  199. } else {
  200. return &t
  201. }
  202. }
  203. /*
  204. adds a new peer to an existing open request
  205. only add if less than requesterCount peers forwarded the same request id so far
  206. note this is done irrespective of status (searching or found)
  207. */
  208. func (self *Depo) addRequester(rs *storage.RequestStatus, req *retrieveRequestMsgData) {
  209. log.Trace(fmt.Sprintf("Depo.addRequester: key %v - add peer to req.Id %v", req.Key.Log(), req.Id))
  210. list := rs.Requesters[req.Id]
  211. rs.Requesters[req.Id] = append(list, req)
  212. }