123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782 |
- // Copyright 2016 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package network
- import (
- "encoding/binary"
- "encoding/json"
- "fmt"
- "path/filepath"
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/swarm/storage"
- )
- // syncer parameters (global, not peer specific) default values
- const (
- requestDbBatchSize = 512 // size of batch before written to request db
- keyBufferSize = 1024 // size of buffer for unsynced keys
- syncBatchSize = 128 // maximum batchsize for outgoing requests
- syncBufferSize = 128 // size of buffer for delivery requests
- syncCacheSize = 1024 // cache capacity to store request queue in memory
- )
- // priorities
- const (
- Low = iota // 0
- Medium // 1
- High // 2
- priorities // 3 number of priority levels
- )
- // request types
- const (
- DeliverReq = iota // 0
- PushReq // 1
- PropagateReq // 2
- HistoryReq // 3
- BacklogReq // 4
- )
- // json serialisable struct to record the syncronisation state between 2 peers
- type syncState struct {
- *storage.DbSyncState // embeds the following 4 fields:
- // Start Key // lower limit of address space
- // Stop Key // upper limit of address space
- // First uint64 // counter taken from last sync state
- // Last uint64 // counter of remote peer dbStore at the time of last connection
- SessionAt uint64 // set at the time of connection
- LastSeenAt uint64 // set at the time of connection
- Latest storage.Key // cursor of dbstore when last (continuously set by syncer)
- Synced bool // true iff Sync is done up to the last disconnect
- synced chan bool // signal that sync stage finished
- }
- // wrapper of db-s to provide mockable custom local chunk store access to syncer
- type DbAccess struct {
- db *storage.DbStore
- loc *storage.LocalStore
- }
- func NewDbAccess(loc *storage.LocalStore) *DbAccess {
- return &DbAccess{loc.DbStore.(*storage.DbStore), loc}
- }
- // to obtain the chunks from key or request db entry only
- func (self *DbAccess) get(key storage.Key) (*storage.Chunk, error) {
- return self.loc.Get(key)
- }
- // current storage counter of chunk db
- func (self *DbAccess) counter() uint64 {
- return self.db.Counter()
- }
- // implemented by dbStoreSyncIterator
- type keyIterator interface {
- Next() storage.Key
- }
- // generator function for iteration by address range and storage counter
- func (self *DbAccess) iterator(s *syncState) keyIterator {
- it, err := self.db.NewSyncIterator(*(s.DbSyncState))
- if err != nil {
- return nil
- }
- return keyIterator(it)
- }
- func (self syncState) String() string {
- if self.Synced {
- return fmt.Sprintf(
- "session started at: %v, last seen at: %v, latest key: %v",
- self.SessionAt, self.LastSeenAt,
- self.Latest.Log(),
- )
- } else {
- return fmt.Sprintf(
- "address: %v-%v, index: %v-%v, session started at: %v, last seen at: %v, latest key: %v",
- self.Start.Log(), self.Stop.Log(),
- self.First, self.Last,
- self.SessionAt, self.LastSeenAt,
- self.Latest.Log(),
- )
- }
- }
- // syncer parameters (global, not peer specific)
- type SyncParams struct {
- RequestDbPath string // path for request db (leveldb)
- RequestDbBatchSize uint // nuber of items before batch is saved to requestdb
- KeyBufferSize uint // size of key buffer
- SyncBatchSize uint // maximum batchsize for outgoing requests
- SyncBufferSize uint // size of buffer for
- SyncCacheSize uint // cache capacity to store request queue in memory
- SyncPriorities []uint // list of priority levels for req types 0-3
- SyncModes []bool // list of sync modes for for req types 0-3
- }
- // constructor with default values
- func NewDefaultSyncParams() *SyncParams {
- return &SyncParams{
- RequestDbBatchSize: requestDbBatchSize,
- KeyBufferSize: keyBufferSize,
- SyncBufferSize: syncBufferSize,
- SyncBatchSize: syncBatchSize,
- SyncCacheSize: syncCacheSize,
- SyncPriorities: []uint{High, Medium, Medium, Low, Low},
- SyncModes: []bool{true, true, true, true, false},
- }
- }
- //this can only finally be set after all config options (file, cmd line, env vars)
- //have been evaluated
- func (self *SyncParams) Init(path string) {
- self.RequestDbPath = filepath.Join(path, "requests")
- }
- // syncer is the agent that manages content distribution/storage replication/chunk storeRequest forwarding
- type syncer struct {
- *SyncParams // sync parameters
- syncF func() bool // if syncing is needed
- key storage.Key // remote peers address key
- state *syncState // sync state for our dbStore
- syncStates chan *syncState // different stages of sync
- deliveryRequest chan bool // one of two triggers needed to send unsyncedKeys
- newUnsyncedKeys chan bool // one of two triggers needed to send unsynced keys
- quit chan bool // signal to quit loops
- // DB related fields
- dbAccess *DbAccess // access to dbStore
- // native fields
- queues [priorities]*syncDb // in-memory cache / queues for sync reqs
- keys [priorities]chan interface{} // buffer for unsynced keys
- deliveries [priorities]chan *storeRequestMsgData // delivery
- // bzz protocol instance outgoing message callbacks (mockable for testing)
- unsyncedKeys func([]*syncRequest, *syncState) error // send unsyncedKeysMsg
- store func(*storeRequestMsgData) error // send storeRequestMsg
- }
- // a syncer instance is linked to each peer connection
- // constructor is called from protocol after successful handshake
- // the returned instance is attached to the peer and can be called
- // by the forwarder
- func newSyncer(
- db *storage.LDBDatabase, remotekey storage.Key,
- dbAccess *DbAccess,
- unsyncedKeys func([]*syncRequest, *syncState) error,
- store func(*storeRequestMsgData) error,
- params *SyncParams,
- state *syncState,
- syncF func() bool,
- ) (*syncer, error) {
- syncBufferSize := params.SyncBufferSize
- keyBufferSize := params.KeyBufferSize
- dbBatchSize := params.RequestDbBatchSize
- self := &syncer{
- syncF: syncF,
- key: remotekey,
- dbAccess: dbAccess,
- syncStates: make(chan *syncState, 20),
- deliveryRequest: make(chan bool, 1),
- newUnsyncedKeys: make(chan bool, 1),
- SyncParams: params,
- state: state,
- quit: make(chan bool),
- unsyncedKeys: unsyncedKeys,
- store: store,
- }
- // initialising
- for i := 0; i < priorities; i++ {
- self.keys[i] = make(chan interface{}, keyBufferSize)
- self.deliveries[i] = make(chan *storeRequestMsgData)
- // initialise a syncdb instance for each priority queue
- self.queues[i] = newSyncDb(db, remotekey, uint(i), syncBufferSize, dbBatchSize, self.deliver(uint(i)))
- }
- log.Info(fmt.Sprintf("syncer started: %v", state))
- // launch chunk delivery service
- go self.syncDeliveries()
- // launch sync task manager
- if self.syncF() {
- go self.sync()
- }
- // process unsynced keys to broadcast
- go self.syncUnsyncedKeys()
- return self, nil
- }
- // metadata serialisation
- func encodeSync(state *syncState) (*json.RawMessage, error) {
- data, err := json.MarshalIndent(state, "", " ")
- if err != nil {
- return nil, err
- }
- meta := json.RawMessage(data)
- return &meta, nil
- }
- func decodeSync(meta *json.RawMessage) (*syncState, error) {
- if meta == nil {
- return nil, fmt.Errorf("unable to deserialise sync state from <nil>")
- }
- data := []byte(*(meta))
- if len(data) == 0 {
- return nil, fmt.Errorf("unable to deserialise sync state from <nil>")
- }
- state := &syncState{DbSyncState: &storage.DbSyncState{}}
- err := json.Unmarshal(data, state)
- return state, err
- }
- /*
- sync implements the syncing script
- * first all items left in the request Db are replayed
- * type = StaleSync
- * Mode: by default once again via confirmation roundtrip
- * Priority: the items are replayed as the proirity specified for StaleSync
- * but within the order respects earlier priority level of request
- * after all items are consumed for a priority level, the the respective
- queue for delivery requests is open (this way new reqs not written to db)
- (TODO: this should be checked)
- * the sync state provided by the remote peer is used to sync history
- * all the backlog from earlier (aborted) syncing is completed starting from latest
- * if Last < LastSeenAt then all items in between then process all
- backlog from upto last disconnect
- * if Last > 0 &&
- sync is called from the syncer constructor and is not supposed to be used externally
- */
- func (self *syncer) sync() {
- state := self.state
- // sync finished
- defer close(self.syncStates)
- // 0. first replay stale requests from request db
- if state.SessionAt == 0 {
- log.Debug(fmt.Sprintf("syncer[%v]: nothing to sync", self.key.Log()))
- return
- }
- log.Debug(fmt.Sprintf("syncer[%v]: start replaying stale requests from request db", self.key.Log()))
- for p := priorities - 1; p >= 0; p-- {
- self.queues[p].dbRead(false, 0, self.replay())
- }
- log.Debug(fmt.Sprintf("syncer[%v]: done replaying stale requests from request db", self.key.Log()))
- // unless peer is synced sync unfinished history beginning on
- if !state.Synced {
- start := state.Start
- if !storage.IsZeroKey(state.Latest) {
- // 1. there is unfinished earlier sync
- state.Start = state.Latest
- log.Debug(fmt.Sprintf("syncer[%v]: start syncronising backlog (unfinished sync: %v)", self.key.Log(), state))
- // blocks while the entire history upto state is synced
- self.syncState(state)
- if state.Last < state.SessionAt {
- state.First = state.Last + 1
- }
- }
- state.Latest = storage.ZeroKey
- state.Start = start
- // 2. sync up to last disconnect1
- if state.First < state.LastSeenAt {
- state.Last = state.LastSeenAt
- log.Debug(fmt.Sprintf("syncer[%v]: start syncronising history upto last disconnect at %v: %v", self.key.Log(), state.LastSeenAt, state))
- self.syncState(state)
- state.First = state.LastSeenAt
- }
- state.Latest = storage.ZeroKey
- } else {
- // synchronisation starts at end of last session
- state.First = state.LastSeenAt
- }
- // 3. sync up to current session start
- // if there have been new chunks since last session
- if state.LastSeenAt < state.SessionAt {
- state.Last = state.SessionAt
- log.Debug(fmt.Sprintf("syncer[%v]: start syncronising history since last disconnect at %v up until session start at %v: %v", self.key.Log(), state.LastSeenAt, state.SessionAt, state))
- // blocks until state syncing is finished
- self.syncState(state)
- }
- log.Info(fmt.Sprintf("syncer[%v]: syncing all history complete", self.key.Log()))
- }
- // wait till syncronised block uptil state is synced
- func (self *syncer) syncState(state *syncState) {
- self.syncStates <- state
- select {
- case <-state.synced:
- case <-self.quit:
- }
- }
- // stop quits both request processor and saves the request cache to disk
- func (self *syncer) stop() {
- close(self.quit)
- log.Trace(fmt.Sprintf("syncer[%v]: stop and save sync request db backlog", self.key.Log()))
- for _, db := range self.queues {
- db.stop()
- }
- }
- // rlp serialisable sync request
- type syncRequest struct {
- Key storage.Key
- Priority uint
- }
- func (self *syncRequest) String() string {
- return fmt.Sprintf("<Key: %v, Priority: %v>", self.Key.Log(), self.Priority)
- }
- func (self *syncer) newSyncRequest(req interface{}, p int) (*syncRequest, error) {
- key, _, _, _, err := parseRequest(req)
- // TODO: if req has chunk, it should be put in a cache
- // create
- if err != nil {
- return nil, err
- }
- return &syncRequest{key, uint(p)}, nil
- }
- // serves historical items from the DB
- // * read is on demand, blocking unless history channel is read
- // * accepts sync requests (syncStates) to create new db iterator
- // * closes the channel one iteration finishes
- func (self *syncer) syncHistory(state *syncState) chan interface{} {
- var n uint
- history := make(chan interface{})
- log.Debug(fmt.Sprintf("syncer[%v]: syncing history between %v - %v for chunk addresses %v - %v", self.key.Log(), state.First, state.Last, state.Start, state.Stop))
- it := self.dbAccess.iterator(state)
- if it != nil {
- go func() {
- // signal end of the iteration ended
- defer close(history)
- IT:
- for {
- key := it.Next()
- if key == nil {
- break IT
- }
- select {
- // blocking until history channel is read from
- case history <- key:
- n++
- log.Trace(fmt.Sprintf("syncer[%v]: history: %v (%v keys)", self.key.Log(), key.Log(), n))
- state.Latest = key
- case <-self.quit:
- return
- }
- }
- log.Debug(fmt.Sprintf("syncer[%v]: finished syncing history between %v - %v for chunk addresses %v - %v (at %v) (chunks = %v)", self.key.Log(), state.First, state.Last, state.Start, state.Stop, state.Latest, n))
- }()
- }
- return history
- }
- // triggers key syncronisation
- func (self *syncer) sendUnsyncedKeys() {
- select {
- case self.deliveryRequest <- true:
- default:
- }
- }
- // assembles a new batch of unsynced keys
- // * keys are drawn from the key buffers in order of priority queue
- // * if the queues of priority for History (HistoryReq) or higher are depleted,
- // historical data is used so historical items are lower priority within
- // their priority group.
- // * Order of historical data is unspecified
- func (self *syncer) syncUnsyncedKeys() {
- // send out new
- var unsynced []*syncRequest
- var more, justSynced bool
- var keyCount, historyCnt int
- var history chan interface{}
- priority := High
- keys := self.keys[priority]
- var newUnsyncedKeys, deliveryRequest chan bool
- keyCounts := make([]int, priorities)
- histPrior := self.SyncPriorities[HistoryReq]
- syncStates := self.syncStates
- state := self.state
- LOOP:
- for {
- var req interface{}
- // select the highest priority channel to read from
- // keys channels are buffered so the highest priority ones
- // are checked first - integrity can only be guaranteed if writing
- // is locked while selecting
- if priority != High || len(keys) == 0 {
- // selection is not needed if the High priority queue has items
- keys = nil
- PRIORITIES:
- for priority = High; priority >= 0; priority-- {
- // the first priority channel that is non-empty will be assigned to keys
- if len(self.keys[priority]) > 0 {
- log.Trace(fmt.Sprintf("syncer[%v]: reading request with priority %v", self.key.Log(), priority))
- keys = self.keys[priority]
- break PRIORITIES
- }
- log.Trace(fmt.Sprintf("syncer[%v/%v]: queue: [%v, %v, %v]", self.key.Log(), priority, len(self.keys[High]), len(self.keys[Medium]), len(self.keys[Low])))
- // if the input queue is empty on this level, resort to history if there is any
- if uint(priority) == histPrior && history != nil {
- log.Trace(fmt.Sprintf("syncer[%v]: reading history for %v", self.key.Log(), self.key))
- keys = history
- break PRIORITIES
- }
- }
- }
- // if peer ready to receive but nothing to send
- if keys == nil && deliveryRequest == nil {
- // if no items left and switch to waiting mode
- log.Trace(fmt.Sprintf("syncer[%v]: buffers consumed. Waiting", self.key.Log()))
- newUnsyncedKeys = self.newUnsyncedKeys
- }
- // send msg iff
- // * peer is ready to receive keys AND (
- // * all queues and history are depleted OR
- // * batch full OR
- // * all history have been consumed, synced)
- if deliveryRequest == nil &&
- (justSynced ||
- len(unsynced) > 0 && keys == nil ||
- len(unsynced) == int(self.SyncBatchSize)) {
- justSynced = false
- // listen to requests
- deliveryRequest = self.deliveryRequest
- newUnsyncedKeys = nil // not care about data until next req comes in
- // set sync to current counter
- // (all nonhistorical outgoing traffic sheduled and persisted
- state.LastSeenAt = self.dbAccess.counter()
- state.Latest = storage.ZeroKey
- log.Trace(fmt.Sprintf("syncer[%v]: sending %v", self.key.Log(), unsynced))
- // send the unsynced keys
- stateCopy := *state
- err := self.unsyncedKeys(unsynced, &stateCopy)
- if err != nil {
- log.Warn(fmt.Sprintf("syncer[%v]: unable to send unsynced keys: %v", self.key.Log(), err))
- }
- self.state = state
- log.Debug(fmt.Sprintf("syncer[%v]: --> %v keys sent: (total: %v (%v), history: %v), sent sync state: %v", self.key.Log(), len(unsynced), keyCounts, keyCount, historyCnt, stateCopy))
- unsynced = nil
- keys = nil
- }
- // process item and add it to the batch
- select {
- case <-self.quit:
- break LOOP
- case req, more = <-keys:
- if keys == history && !more {
- log.Trace(fmt.Sprintf("syncer[%v]: syncing history segment complete", self.key.Log()))
- // history channel is closed, waiting for new state (called from sync())
- syncStates = self.syncStates
- state.Synced = true // this signals that the current segment is complete
- select {
- case state.synced <- false:
- case <-self.quit:
- break LOOP
- }
- justSynced = true
- history = nil
- }
- case <-deliveryRequest:
- log.Trace(fmt.Sprintf("syncer[%v]: peer ready to receive", self.key.Log()))
- // this 1 cap channel can wake up the loop
- // signaling that peer is ready to receive unsynced Keys
- // the channel is set to nil any further writes will be ignored
- deliveryRequest = nil
- case <-newUnsyncedKeys:
- log.Trace(fmt.Sprintf("syncer[%v]: new unsynced keys available", self.key.Log()))
- // this 1 cap channel can wake up the loop
- // signals that data is available to send if peer is ready to receive
- newUnsyncedKeys = nil
- keys = self.keys[High]
- case state, more = <-syncStates:
- // this resets the state
- if !more {
- state = self.state
- log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) syncing complete upto %v)", self.key.Log(), priority, state))
- state.Synced = true
- syncStates = nil
- } else {
- log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) syncing history upto %v priority %v)", self.key.Log(), priority, state, histPrior))
- state.Synced = false
- history = self.syncHistory(state)
- // only one history at a time, only allow another one once the
- // history channel is closed
- syncStates = nil
- }
- }
- if req == nil {
- continue LOOP
- }
- log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) added to unsynced keys: %v", self.key.Log(), priority, req))
- keyCounts[priority]++
- keyCount++
- if keys == history {
- log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) history item %v (synced = %v)", self.key.Log(), priority, req, state.Synced))
- historyCnt++
- }
- if sreq, err := self.newSyncRequest(req, priority); err == nil {
- // extract key from req
- log.Trace(fmt.Sprintf("syncer[%v]: (priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced))
- unsynced = append(unsynced, sreq)
- } else {
- log.Warn(fmt.Sprintf("syncer[%v]: (priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, err))
- }
- }
- }
- // delivery loop
- // takes into account priority, send store Requests with chunk (delivery)
- // idle blocking if no new deliveries in any of the queues
- func (self *syncer) syncDeliveries() {
- var req *storeRequestMsgData
- p := High
- var deliveries chan *storeRequestMsgData
- var msg *storeRequestMsgData
- var err error
- var c = [priorities]int{}
- var n = [priorities]int{}
- var total, success uint
- for {
- deliveries = self.deliveries[p]
- select {
- case req = <-deliveries:
- n[p]++
- c[p]++
- default:
- if p == Low {
- // blocking, depletion on all channels, no preference for priority
- select {
- case req = <-self.deliveries[High]:
- n[High]++
- case req = <-self.deliveries[Medium]:
- n[Medium]++
- case req = <-self.deliveries[Low]:
- n[Low]++
- case <-self.quit:
- return
- }
- p = High
- } else {
- p--
- continue
- }
- }
- total++
- msg, err = self.newStoreRequestMsgData(req)
- if err != nil {
- log.Warn(fmt.Sprintf("syncer[%v]: failed to create store request for %v: %v", self.key.Log(), req, err))
- } else {
- err = self.store(msg)
- if err != nil {
- log.Warn(fmt.Sprintf("syncer[%v]: failed to deliver %v: %v", self.key.Log(), req, err))
- } else {
- success++
- log.Trace(fmt.Sprintf("syncer[%v]: %v successfully delivered", self.key.Log(), req))
- }
- }
- if total%self.SyncBatchSize == 0 {
- log.Debug(fmt.Sprintf("syncer[%v]: deliver Total: %v, Success: %v, High: %v/%v, Medium: %v/%v, Low %v/%v", self.key.Log(), total, success, c[High], n[High], c[Medium], n[Medium], c[Low], n[Low]))
- }
- }
- }
- /*
- addRequest handles requests for delivery
- it accepts 4 types:
- * storeRequestMsgData: coming from netstore propagate response
- * chunk: coming from forwarding (questionable: id?)
- * key: from incoming syncRequest
- * syncDbEntry: key,id encoded in db
- If sync mode is on for the type of request, then
- it sends the request to the keys queue of the correct priority
- channel buffered with capacity (SyncBufferSize)
- If sync mode is off then, requests are directly sent to deliveries
- */
- func (self *syncer) addRequest(req interface{}, ty int) {
- // retrieve priority for request type name int8
- priority := self.SyncPriorities[ty]
- // sync mode for this type ON
- if self.syncF() || ty == DeliverReq {
- if self.SyncModes[ty] {
- self.addKey(req, priority, self.quit)
- } else {
- self.addDelivery(req, priority, self.quit)
- }
- }
- }
- // addKey queues sync request for sync confirmation with given priority
- // ie the key will go out in an unsyncedKeys message
- func (self *syncer) addKey(req interface{}, priority uint, quit chan bool) bool {
- select {
- case self.keys[priority] <- req:
- // this wakes up the unsynced keys loop if idle
- select {
- case self.newUnsyncedKeys <- true:
- default:
- }
- return true
- case <-quit:
- return false
- }
- }
- // addDelivery queues delivery request for with given priority
- // ie the chunk will be delivered ASAP mod priority queueing handled by syncdb
- // requests are persisted across sessions for correct sync
- func (self *syncer) addDelivery(req interface{}, priority uint, quit chan bool) bool {
- select {
- case self.queues[priority].buffer <- req:
- return true
- case <-quit:
- return false
- }
- }
- // doDelivery delivers the chunk for the request with given priority
- // without queuing
- func (self *syncer) doDelivery(req interface{}, priority uint, quit chan bool) bool {
- msgdata, err := self.newStoreRequestMsgData(req)
- if err != nil {
- log.Warn(fmt.Sprintf("unable to deliver request %v: %v", msgdata, err))
- return false
- }
- select {
- case self.deliveries[priority] <- msgdata:
- return true
- case <-quit:
- return false
- }
- }
- // returns the delivery function for given priority
- // passed on to syncDb
- func (self *syncer) deliver(priority uint) func(req interface{}, quit chan bool) bool {
- return func(req interface{}, quit chan bool) bool {
- return self.doDelivery(req, priority, quit)
- }
- }
- // returns the replay function passed on to syncDb
- // depending on sync mode settings for BacklogReq,
- // re play of request db backlog sends items via confirmation
- // or directly delivers
- func (self *syncer) replay() func(req interface{}, quit chan bool) bool {
- sync := self.SyncModes[BacklogReq]
- priority := self.SyncPriorities[BacklogReq]
- // sync mode for this type ON
- if sync {
- return func(req interface{}, quit chan bool) bool {
- return self.addKey(req, priority, quit)
- }
- } else {
- return func(req interface{}, quit chan bool) bool {
- return self.doDelivery(req, priority, quit)
- }
- }
- }
- // given a request, extends it to a full storeRequestMsgData
- // polimorphic: see addRequest for the types accepted
- func (self *syncer) newStoreRequestMsgData(req interface{}) (*storeRequestMsgData, error) {
- key, id, chunk, sreq, err := parseRequest(req)
- if err != nil {
- return nil, err
- }
- if sreq == nil {
- if chunk == nil {
- var err error
- chunk, err = self.dbAccess.get(key)
- if err != nil {
- return nil, err
- }
- }
- sreq = &storeRequestMsgData{
- Id: id,
- Key: chunk.Key,
- SData: chunk.SData,
- }
- }
- return sreq, nil
- }
- // parse request types and extracts, key, id, chunk, request if available
- // does not do chunk lookup !
- func parseRequest(req interface{}) (storage.Key, uint64, *storage.Chunk, *storeRequestMsgData, error) {
- var key storage.Key
- var entry *syncDbEntry
- var chunk *storage.Chunk
- var id uint64
- var ok bool
- var sreq *storeRequestMsgData
- var err error
- if key, ok = req.(storage.Key); ok {
- id = generateId()
- } else if entry, ok = req.(*syncDbEntry); ok {
- id = binary.BigEndian.Uint64(entry.val[32:])
- key = storage.Key(entry.val[:32])
- } else if chunk, ok = req.(*storage.Chunk); ok {
- key = chunk.Key
- id = generateId()
- } else if sreq, ok = req.(*storeRequestMsgData); ok {
- key = sreq.Key
- } else {
- err = fmt.Errorf("type not allowed: %v (%T)", req, req)
- }
- return key, id, chunk, sreq, err
- }
|