123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- // Copyright 2016 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package network
- import (
- "fmt"
- "math/rand"
- "time"
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/swarm/storage"
- )
- const requesterCount = 3
- /*
- forwarder implements the CloudStore interface (use by storage.NetStore)
- and serves as the cloud store backend orchestrating storage/retrieval/delivery
- via the native bzz protocol
- which uses an MSB logarithmic distance-based semi-permanent Kademlia table for
- * recursive forwarding style routing for retrieval
- * smart syncronisation
- */
- type forwarder struct {
- hive *Hive
- }
- func NewForwarder(hive *Hive) *forwarder {
- return &forwarder{hive: hive}
- }
- // generate a unique id uint64
- func generateId() uint64 {
- r := rand.New(rand.NewSource(time.Now().UnixNano()))
- return uint64(r.Int63())
- }
- var searchTimeout = 3 * time.Second
- // forwarding logic
- // logic propagating retrieve requests to peers given by the kademlia hive
- func (self *forwarder) Retrieve(chunk *storage.Chunk) {
- peers := self.hive.getPeers(chunk.Key, 0)
- log.Trace(fmt.Sprintf("forwarder.Retrieve: %v - received %d peers from KΛÐΞMLIΛ...", chunk.Key.Log(), len(peers)))
- OUT:
- for _, p := range peers {
- log.Trace(fmt.Sprintf("forwarder.Retrieve: sending retrieveRequest %v to peer [%v]", chunk.Key.Log(), p))
- for _, recipients := range chunk.Req.Requesters {
- for _, recipient := range recipients {
- req := recipient.(*retrieveRequestMsgData)
- if req.from.Addr() == p.Addr() {
- continue OUT
- }
- }
- }
- req := &retrieveRequestMsgData{
- Key: chunk.Key,
- Id: generateId(),
- }
- var err error
- if p.swap != nil {
- err = p.swap.Add(-1)
- }
- if err == nil {
- p.retrieve(req)
- break OUT
- }
- log.Warn(fmt.Sprintf("forwarder.Retrieve: unable to send retrieveRequest to peer [%v]: %v", chunk.Key.Log(), err))
- }
- }
- // requests to specific peers given by the kademlia hive
- // except for peers that the store request came from (if any)
- // delivery queueing taken care of by syncer
- func (self *forwarder) Store(chunk *storage.Chunk) {
- var n int
- msg := &storeRequestMsgData{
- Key: chunk.Key,
- SData: chunk.SData,
- }
- var source *peer
- if chunk.Source != nil {
- source = chunk.Source.(*peer)
- }
- for _, p := range self.hive.getPeers(chunk.Key, 0) {
- log.Trace(fmt.Sprintf("forwarder.Store: %v %v", p, chunk))
- if p.syncer != nil && (source == nil || p.Addr() != source.Addr()) {
- n++
- Deliver(p, msg, PropagateReq)
- }
- }
- log.Trace(fmt.Sprintf("forwarder.Store: sent to %v peers (chunk = %v)", n, chunk))
- }
- // once a chunk is found deliver it to its requesters unless timed out
- func (self *forwarder) Deliver(chunk *storage.Chunk) {
- // iterate over request entries
- for id, requesters := range chunk.Req.Requesters {
- counter := requesterCount
- msg := &storeRequestMsgData{
- Key: chunk.Key,
- SData: chunk.SData,
- }
- var n int
- var req *retrieveRequestMsgData
- // iterate over requesters with the same id
- for id, r := range requesters {
- req = r.(*retrieveRequestMsgData)
- if req.timeout == nil || req.timeout.After(time.Now()) {
- log.Trace(fmt.Sprintf("forwarder.Deliver: %v -> %v", req.Id, req.from))
- msg.Id = uint64(id)
- Deliver(req.from, msg, DeliverReq)
- n++
- counter--
- if counter <= 0 {
- break
- }
- }
- }
- log.Trace(fmt.Sprintf("forwarder.Deliver: submit chunk %v (request id %v) for delivery to %v peers", chunk.Key.Log(), id, n))
- }
- }
- // initiate delivery of a chunk to a particular peer via syncer#addRequest
- // depending on syncer mode and priority settings and sync request type
- // this either goes via confirmation roundtrip or queued or pushed directly
- func Deliver(p *peer, req interface{}, ty int) {
- p.syncer.addRequest(req, ty)
- }
- // push chunk over to peer
- func Push(p *peer, key storage.Key, priority uint) {
- p.syncer.doDelivery(key, priority, p.syncer.quit)
- }
|