123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390 |
- // Copyright 2016 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package network
- import (
- "encoding/binary"
- "fmt"
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/swarm/storage"
- "github.com/syndtr/goleveldb/leveldb"
- "github.com/syndtr/goleveldb/leveldb/iterator"
- )
- const counterKeyPrefix = 0x01
- /*
- syncDb is a queueing service for outgoing deliveries.
- One instance per priority queue for each peer
- a syncDb instance maintains an in-memory buffer (of capacity bufferSize)
- once its in-memory buffer is full it switches to persisting in db
- and dbRead iterator iterates through the items keeping their order
- once the db read catches up (there is no more items in the db) then
- it switches back to in-memory buffer.
- when syncdb is stopped all items in the buffer are saved to the db
- */
- type syncDb struct {
- start []byte // this syncdb starting index in requestdb
- key storage.Key // remote peers address key
- counterKey []byte // db key to persist counter
- priority uint // priotity High|Medium|Low
- buffer chan interface{} // incoming request channel
- db *storage.LDBDatabase // underlying db (TODO should be interface)
- done chan bool // chan to signal goroutines finished quitting
- quit chan bool // chan to signal quitting to goroutines
- total, dbTotal int // counts for one session
- batch chan chan int // channel for batch requests
- dbBatchSize uint // number of items before batch is saved
- }
- // constructor needs a shared request db (leveldb)
- // priority is used in the index key
- // uses a buffer and a leveldb for persistent storage
- // bufferSize, dbBatchSize are config parameters
- func newSyncDb(db *storage.LDBDatabase, key storage.Key, priority uint, bufferSize, dbBatchSize uint, deliver func(interface{}, chan bool) bool) *syncDb {
- start := make([]byte, 42)
- start[1] = byte(priorities - priority)
- copy(start[2:34], key)
- counterKey := make([]byte, 34)
- counterKey[0] = counterKeyPrefix
- copy(counterKey[1:], start[1:34])
- syncdb := &syncDb{
- start: start,
- key: key,
- counterKey: counterKey,
- priority: priority,
- buffer: make(chan interface{}, bufferSize),
- db: db,
- done: make(chan bool),
- quit: make(chan bool),
- batch: make(chan chan int),
- dbBatchSize: dbBatchSize,
- }
- log.Trace(fmt.Sprintf("syncDb[peer: %v, priority: %v] - initialised", key.Log(), priority))
- // starts the main forever loop reading from buffer
- go syncdb.bufferRead(deliver)
- return syncdb
- }
- /*
- bufferRead is a forever iterator loop that takes care of delivering
- outgoing store requests reads from incoming buffer
- its argument is the deliver function taking the item as first argument
- and a quit channel as second.
- Closing of this channel is supposed to abort all waiting for delivery
- (typically network write)
- The iteration switches between 2 modes,
- * buffer mode reads the in-memory buffer and delivers the items directly
- * db mode reads from the buffer and writes to the db, parallelly another
- routine is started that reads from the db and delivers items
- If there is buffer contention in buffer mode (slow network, high upload volume)
- syncdb switches to db mode and starts dbRead
- Once db backlog is delivered, it reverts back to in-memory buffer
- It is automatically started when syncdb is initialised.
- It saves the buffer to db upon receiving quit signal. syncDb#stop()
- */
- func (self *syncDb) bufferRead(deliver func(interface{}, chan bool) bool) {
- var buffer, db chan interface{} // channels representing the two read modes
- var more bool
- var req interface{}
- var entry *syncDbEntry
- var inBatch, inDb int
- batch := new(leveldb.Batch)
- var dbSize chan int
- quit := self.quit
- counterValue := make([]byte, 8)
- // counter is used for keeping the items in order, persisted to db
- // start counter where db was at, 0 if not found
- data, err := self.db.Get(self.counterKey)
- var counter uint64
- if err == nil {
- counter = binary.BigEndian.Uint64(data)
- log.Trace(fmt.Sprintf("syncDb[%v/%v] - counter read from db at %v", self.key.Log(), self.priority, counter))
- } else {
- log.Trace(fmt.Sprintf("syncDb[%v/%v] - counter starts at %v", self.key.Log(), self.priority, counter))
- }
- LOOP:
- for {
- // waiting for item next in the buffer, or quit signal or batch request
- select {
- // buffer only closes when writing to db
- case req = <-buffer:
- // deliver request : this is blocking on network write so
- // it is passed the quit channel as argument, so that it returns
- // if syncdb is stopped. In this case we need to save the item to the db
- more = deliver(req, self.quit)
- if !more {
- log.Debug(fmt.Sprintf("syncDb[%v/%v] quit: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total))
- // received quit signal, save request currently waiting delivery
- // by switching to db mode and closing the buffer
- buffer = nil
- db = self.buffer
- close(db)
- quit = nil // needs to block the quit case in select
- break // break from select, this item will be written to the db
- }
- self.total++
- log.Trace(fmt.Sprintf("syncDb[%v/%v] deliver (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total))
- // by the time deliver returns, there were new writes to the buffer
- // if buffer contention is detected, switch to db mode which drains
- // the buffer so no process will block on pushing store requests
- if len(buffer) == cap(buffer) {
- log.Debug(fmt.Sprintf("syncDb[%v/%v] buffer full %v: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, cap(buffer), self.dbTotal, self.total))
- buffer = nil
- db = self.buffer
- }
- continue LOOP
- // incoming entry to put into db
- case req, more = <-db:
- if !more {
- // only if quit is called, saved all the buffer
- binary.BigEndian.PutUint64(counterValue, counter)
- batch.Put(self.counterKey, counterValue) // persist counter in batch
- self.writeSyncBatch(batch) // save batch
- log.Trace(fmt.Sprintf("syncDb[%v/%v] quitting: save current batch to db", self.key.Log(), self.priority))
- break LOOP
- }
- self.dbTotal++
- self.total++
- // otherwise break after select
- case dbSize = <-self.batch:
- // explicit request for batch
- if inBatch == 0 && quit != nil {
- // there was no writes since the last batch so db depleted
- // switch to buffer mode
- log.Debug(fmt.Sprintf("syncDb[%v/%v] empty db: switching to buffer", self.key.Log(), self.priority))
- db = nil
- buffer = self.buffer
- dbSize <- 0 // indicates to 'caller' that batch has been written
- inDb = 0
- continue LOOP
- }
- binary.BigEndian.PutUint64(counterValue, counter)
- batch.Put(self.counterKey, counterValue)
- log.Debug(fmt.Sprintf("syncDb[%v/%v] write batch %v/%v - %x - %x", self.key.Log(), self.priority, inBatch, counter, self.counterKey, counterValue))
- batch = self.writeSyncBatch(batch)
- dbSize <- inBatch // indicates to 'caller' that batch has been written
- inBatch = 0
- continue LOOP
- // closing syncDb#quit channel is used to signal to all goroutines to quit
- case <-quit:
- // need to save backlog, so switch to db mode
- db = self.buffer
- buffer = nil
- quit = nil
- log.Trace(fmt.Sprintf("syncDb[%v/%v] quitting: save buffer to db", self.key.Log(), self.priority))
- close(db)
- continue LOOP
- }
- // only get here if we put req into db
- entry, err = self.newSyncDbEntry(req, counter)
- if err != nil {
- log.Warn(fmt.Sprintf("syncDb[%v/%v] saving request %v (#%v/%v) failed: %v", self.key.Log(), self.priority, req, inBatch, inDb, err))
- continue LOOP
- }
- batch.Put(entry.key, entry.val)
- log.Trace(fmt.Sprintf("syncDb[%v/%v] to batch %v '%v' (#%v/%v/%v)", self.key.Log(), self.priority, req, entry, inBatch, inDb, counter))
- // if just switched to db mode and not quitting, then launch dbRead
- // in a parallel go routine to send deliveries from db
- if inDb == 0 && quit != nil {
- log.Trace(fmt.Sprintf("syncDb[%v/%v] start dbRead", self.key.Log(), self.priority))
- go self.dbRead(true, counter, deliver)
- }
- inDb++
- inBatch++
- counter++
- // need to save the batch if it gets too large (== dbBatchSize)
- if inBatch%int(self.dbBatchSize) == 0 {
- batch = self.writeSyncBatch(batch)
- }
- }
- log.Info(fmt.Sprintf("syncDb[%v:%v]: saved %v keys (saved counter at %v)", self.key.Log(), self.priority, inBatch, counter))
- close(self.done)
- }
- // writes the batch to the db and returns a new batch object
- func (self *syncDb) writeSyncBatch(batch *leveldb.Batch) *leveldb.Batch {
- err := self.db.Write(batch)
- if err != nil {
- log.Warn(fmt.Sprintf("syncDb[%v/%v] saving batch to db failed: %v", self.key.Log(), self.priority, err))
- return batch
- }
- return new(leveldb.Batch)
- }
- // abstract type for db entries (TODO could be a feature of Receipts)
- type syncDbEntry struct {
- key, val []byte
- }
- func (self syncDbEntry) String() string {
- return fmt.Sprintf("key: %x, value: %x", self.key, self.val)
- }
- /*
- dbRead is iterating over store requests to be sent over to the peer
- this is mainly to prevent crashes due to network output buffer contention (???)
- as well as to make syncronisation resilient to disconnects
- the messages are supposed to be sent in the p2p priority queue.
- the request DB is shared between peers, but domains for each syncdb
- are disjoint. dbkeys (42 bytes) are structured:
- * 0: 0x00 (0x01 reserved for counter key)
- * 1: priorities - priority (so that high priority can be replayed first)
- * 2-33: peers address
- * 34-41: syncdb counter to preserve order (this field is missing for the counter key)
- values (40 bytes) are:
- * 0-31: key
- * 32-39: request id
- dbRead needs a boolean to indicate if on first round all the historical
- record is synced. Second argument to indicate current db counter
- The third is the function to apply
- */
- func (self *syncDb) dbRead(useBatches bool, counter uint64, fun func(interface{}, chan bool) bool) {
- key := make([]byte, 42)
- copy(key, self.start)
- binary.BigEndian.PutUint64(key[34:], counter)
- var batches, n, cnt, total int
- var more bool
- var entry *syncDbEntry
- var it iterator.Iterator
- var del *leveldb.Batch
- batchSizes := make(chan int)
- for {
- // if useBatches is false, cnt is not set
- if useBatches {
- // this could be called before all cnt items sent out
- // so that loop is not blocking while delivering
- // only relevant if cnt is large
- select {
- case self.batch <- batchSizes:
- case <-self.quit:
- return
- }
- // wait for the write to finish and get the item count in the next batch
- cnt = <-batchSizes
- batches++
- if cnt == 0 {
- // empty
- return
- }
- }
- it = self.db.NewIterator()
- it.Seek(key)
- if !it.Valid() {
- copy(key, self.start)
- useBatches = true
- continue
- }
- del = new(leveldb.Batch)
- log.Trace(fmt.Sprintf("syncDb[%v/%v]: new iterator: %x (batch %v, count %v)", self.key.Log(), self.priority, key, batches, cnt))
- for n = 0; !useBatches || n < cnt; it.Next() {
- copy(key, it.Key())
- if len(key) == 0 || key[0] != 0 {
- copy(key, self.start)
- useBatches = true
- break
- }
- val := make([]byte, 40)
- copy(val, it.Value())
- entry = &syncDbEntry{key, val}
- // log.Trace(fmt.Sprintf("syncDb[%v/%v] - %v, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, self.key.Log(), batches, total, self.dbTotal, self.total))
- more = fun(entry, self.quit)
- if !more {
- // quit received when waiting to deliver entry, the entry will not be deleted
- log.Trace(fmt.Sprintf("syncDb[%v/%v] batch %v quit after %v/%v items", self.key.Log(), self.priority, batches, n, cnt))
- break
- }
- // since subsequent batches of the same db session are indexed incrementally
- // deleting earlier batches can be delayed and parallelised
- // this could be batch delete when db is idle (but added complexity esp when quitting)
- del.Delete(key)
- n++
- total++
- }
- log.Debug(fmt.Sprintf("syncDb[%v/%v] - db session closed, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, batches, total, self.dbTotal, self.total))
- self.db.Write(del) // this could be async called only when db is idle
- it.Release()
- }
- }
- //
- func (self *syncDb) stop() {
- close(self.quit)
- <-self.done
- }
- // calculate a dbkey for the request, for the db to work
- // see syncdb for db key structure
- // polimorphic: accepted types, see syncer#addRequest
- func (self *syncDb) newSyncDbEntry(req interface{}, counter uint64) (entry *syncDbEntry, err error) {
- var key storage.Key
- var chunk *storage.Chunk
- var id uint64
- var ok bool
- var sreq *storeRequestMsgData
- if key, ok = req.(storage.Key); ok {
- id = generateId()
- } else if chunk, ok = req.(*storage.Chunk); ok {
- key = chunk.Key
- id = generateId()
- } else if sreq, ok = req.(*storeRequestMsgData); ok {
- key = sreq.Key
- id = sreq.Id
- } else if entry, ok = req.(*syncDbEntry); !ok {
- return nil, fmt.Errorf("type not allowed: %v (%T)", req, req)
- }
- // order by peer > priority > seqid
- // value is request id if exists
- if entry == nil {
- dbkey := make([]byte, 42)
- dbval := make([]byte, 40)
- // encode key
- copy(dbkey[:], self.start[:34]) // db peer
- binary.BigEndian.PutUint64(dbkey[34:], counter)
- // encode value
- copy(dbval, key[:])
- binary.BigEndian.PutUint64(dbval[32:], id)
- entry = &syncDbEntry{dbkey, dbval}
- }
- return
- }
|