syncdb.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package network
  17. import (
  18. "encoding/binary"
  19. "fmt"
  20. "github.com/ethereum/go-ethereum/log"
  21. "github.com/ethereum/go-ethereum/swarm/storage"
  22. "github.com/syndtr/goleveldb/leveldb"
  23. "github.com/syndtr/goleveldb/leveldb/iterator"
  24. )
  25. const counterKeyPrefix = 0x01
  26. /*
  27. syncDb is a queueing service for outgoing deliveries.
  28. One instance per priority queue for each peer
  29. a syncDb instance maintains an in-memory buffer (of capacity bufferSize)
  30. once its in-memory buffer is full it switches to persisting in db
  31. and dbRead iterator iterates through the items keeping their order
  32. once the db read catches up (there is no more items in the db) then
  33. it switches back to in-memory buffer.
  34. when syncdb is stopped all items in the buffer are saved to the db
  35. */
  36. type syncDb struct {
  37. start []byte // this syncdb starting index in requestdb
  38. key storage.Key // remote peers address key
  39. counterKey []byte // db key to persist counter
  40. priority uint // priotity High|Medium|Low
  41. buffer chan interface{} // incoming request channel
  42. db *storage.LDBDatabase // underlying db (TODO should be interface)
  43. done chan bool // chan to signal goroutines finished quitting
  44. quit chan bool // chan to signal quitting to goroutines
  45. total, dbTotal int // counts for one session
  46. batch chan chan int // channel for batch requests
  47. dbBatchSize uint // number of items before batch is saved
  48. }
  49. // constructor needs a shared request db (leveldb)
  50. // priority is used in the index key
  51. // uses a buffer and a leveldb for persistent storage
  52. // bufferSize, dbBatchSize are config parameters
  53. func newSyncDb(db *storage.LDBDatabase, key storage.Key, priority uint, bufferSize, dbBatchSize uint, deliver func(interface{}, chan bool) bool) *syncDb {
  54. start := make([]byte, 42)
  55. start[1] = byte(priorities - priority)
  56. copy(start[2:34], key)
  57. counterKey := make([]byte, 34)
  58. counterKey[0] = counterKeyPrefix
  59. copy(counterKey[1:], start[1:34])
  60. syncdb := &syncDb{
  61. start: start,
  62. key: key,
  63. counterKey: counterKey,
  64. priority: priority,
  65. buffer: make(chan interface{}, bufferSize),
  66. db: db,
  67. done: make(chan bool),
  68. quit: make(chan bool),
  69. batch: make(chan chan int),
  70. dbBatchSize: dbBatchSize,
  71. }
  72. log.Trace(fmt.Sprintf("syncDb[peer: %v, priority: %v] - initialised", key.Log(), priority))
  73. // starts the main forever loop reading from buffer
  74. go syncdb.bufferRead(deliver)
  75. return syncdb
  76. }
  77. /*
  78. bufferRead is a forever iterator loop that takes care of delivering
  79. outgoing store requests reads from incoming buffer
  80. its argument is the deliver function taking the item as first argument
  81. and a quit channel as second.
  82. Closing of this channel is supposed to abort all waiting for delivery
  83. (typically network write)
  84. The iteration switches between 2 modes,
  85. * buffer mode reads the in-memory buffer and delivers the items directly
  86. * db mode reads from the buffer and writes to the db, parallelly another
  87. routine is started that reads from the db and delivers items
  88. If there is buffer contention in buffer mode (slow network, high upload volume)
  89. syncdb switches to db mode and starts dbRead
  90. Once db backlog is delivered, it reverts back to in-memory buffer
  91. It is automatically started when syncdb is initialised.
  92. It saves the buffer to db upon receiving quit signal. syncDb#stop()
  93. */
  94. func (self *syncDb) bufferRead(deliver func(interface{}, chan bool) bool) {
  95. var buffer, db chan interface{} // channels representing the two read modes
  96. var more bool
  97. var req interface{}
  98. var entry *syncDbEntry
  99. var inBatch, inDb int
  100. batch := new(leveldb.Batch)
  101. var dbSize chan int
  102. quit := self.quit
  103. counterValue := make([]byte, 8)
  104. // counter is used for keeping the items in order, persisted to db
  105. // start counter where db was at, 0 if not found
  106. data, err := self.db.Get(self.counterKey)
  107. var counter uint64
  108. if err == nil {
  109. counter = binary.BigEndian.Uint64(data)
  110. log.Trace(fmt.Sprintf("syncDb[%v/%v] - counter read from db at %v", self.key.Log(), self.priority, counter))
  111. } else {
  112. log.Trace(fmt.Sprintf("syncDb[%v/%v] - counter starts at %v", self.key.Log(), self.priority, counter))
  113. }
  114. LOOP:
  115. for {
  116. // waiting for item next in the buffer, or quit signal or batch request
  117. select {
  118. // buffer only closes when writing to db
  119. case req = <-buffer:
  120. // deliver request : this is blocking on network write so
  121. // it is passed the quit channel as argument, so that it returns
  122. // if syncdb is stopped. In this case we need to save the item to the db
  123. more = deliver(req, self.quit)
  124. if !more {
  125. log.Debug(fmt.Sprintf("syncDb[%v/%v] quit: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total))
  126. // received quit signal, save request currently waiting delivery
  127. // by switching to db mode and closing the buffer
  128. buffer = nil
  129. db = self.buffer
  130. close(db)
  131. quit = nil // needs to block the quit case in select
  132. break // break from select, this item will be written to the db
  133. }
  134. self.total++
  135. log.Trace(fmt.Sprintf("syncDb[%v/%v] deliver (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total))
  136. // by the time deliver returns, there were new writes to the buffer
  137. // if buffer contention is detected, switch to db mode which drains
  138. // the buffer so no process will block on pushing store requests
  139. if len(buffer) == cap(buffer) {
  140. log.Debug(fmt.Sprintf("syncDb[%v/%v] buffer full %v: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, cap(buffer), self.dbTotal, self.total))
  141. buffer = nil
  142. db = self.buffer
  143. }
  144. continue LOOP
  145. // incoming entry to put into db
  146. case req, more = <-db:
  147. if !more {
  148. // only if quit is called, saved all the buffer
  149. binary.BigEndian.PutUint64(counterValue, counter)
  150. batch.Put(self.counterKey, counterValue) // persist counter in batch
  151. self.writeSyncBatch(batch) // save batch
  152. log.Trace(fmt.Sprintf("syncDb[%v/%v] quitting: save current batch to db", self.key.Log(), self.priority))
  153. break LOOP
  154. }
  155. self.dbTotal++
  156. self.total++
  157. // otherwise break after select
  158. case dbSize = <-self.batch:
  159. // explicit request for batch
  160. if inBatch == 0 && quit != nil {
  161. // there was no writes since the last batch so db depleted
  162. // switch to buffer mode
  163. log.Debug(fmt.Sprintf("syncDb[%v/%v] empty db: switching to buffer", self.key.Log(), self.priority))
  164. db = nil
  165. buffer = self.buffer
  166. dbSize <- 0 // indicates to 'caller' that batch has been written
  167. inDb = 0
  168. continue LOOP
  169. }
  170. binary.BigEndian.PutUint64(counterValue, counter)
  171. batch.Put(self.counterKey, counterValue)
  172. log.Debug(fmt.Sprintf("syncDb[%v/%v] write batch %v/%v - %x - %x", self.key.Log(), self.priority, inBatch, counter, self.counterKey, counterValue))
  173. batch = self.writeSyncBatch(batch)
  174. dbSize <- inBatch // indicates to 'caller' that batch has been written
  175. inBatch = 0
  176. continue LOOP
  177. // closing syncDb#quit channel is used to signal to all goroutines to quit
  178. case <-quit:
  179. // need to save backlog, so switch to db mode
  180. db = self.buffer
  181. buffer = nil
  182. quit = nil
  183. log.Trace(fmt.Sprintf("syncDb[%v/%v] quitting: save buffer to db", self.key.Log(), self.priority))
  184. close(db)
  185. continue LOOP
  186. }
  187. // only get here if we put req into db
  188. entry, err = self.newSyncDbEntry(req, counter)
  189. if err != nil {
  190. log.Warn(fmt.Sprintf("syncDb[%v/%v] saving request %v (#%v/%v) failed: %v", self.key.Log(), self.priority, req, inBatch, inDb, err))
  191. continue LOOP
  192. }
  193. batch.Put(entry.key, entry.val)
  194. log.Trace(fmt.Sprintf("syncDb[%v/%v] to batch %v '%v' (#%v/%v/%v)", self.key.Log(), self.priority, req, entry, inBatch, inDb, counter))
  195. // if just switched to db mode and not quitting, then launch dbRead
  196. // in a parallel go routine to send deliveries from db
  197. if inDb == 0 && quit != nil {
  198. log.Trace(fmt.Sprintf("syncDb[%v/%v] start dbRead", self.key.Log(), self.priority))
  199. go self.dbRead(true, counter, deliver)
  200. }
  201. inDb++
  202. inBatch++
  203. counter++
  204. // need to save the batch if it gets too large (== dbBatchSize)
  205. if inBatch%int(self.dbBatchSize) == 0 {
  206. batch = self.writeSyncBatch(batch)
  207. }
  208. }
  209. log.Info(fmt.Sprintf("syncDb[%v:%v]: saved %v keys (saved counter at %v)", self.key.Log(), self.priority, inBatch, counter))
  210. close(self.done)
  211. }
  212. // writes the batch to the db and returns a new batch object
  213. func (self *syncDb) writeSyncBatch(batch *leveldb.Batch) *leveldb.Batch {
  214. err := self.db.Write(batch)
  215. if err != nil {
  216. log.Warn(fmt.Sprintf("syncDb[%v/%v] saving batch to db failed: %v", self.key.Log(), self.priority, err))
  217. return batch
  218. }
  219. return new(leveldb.Batch)
  220. }
  221. // abstract type for db entries (TODO could be a feature of Receipts)
  222. type syncDbEntry struct {
  223. key, val []byte
  224. }
  225. func (self syncDbEntry) String() string {
  226. return fmt.Sprintf("key: %x, value: %x", self.key, self.val)
  227. }
  228. /*
  229. dbRead is iterating over store requests to be sent over to the peer
  230. this is mainly to prevent crashes due to network output buffer contention (???)
  231. as well as to make syncronisation resilient to disconnects
  232. the messages are supposed to be sent in the p2p priority queue.
  233. the request DB is shared between peers, but domains for each syncdb
  234. are disjoint. dbkeys (42 bytes) are structured:
  235. * 0: 0x00 (0x01 reserved for counter key)
  236. * 1: priorities - priority (so that high priority can be replayed first)
  237. * 2-33: peers address
  238. * 34-41: syncdb counter to preserve order (this field is missing for the counter key)
  239. values (40 bytes) are:
  240. * 0-31: key
  241. * 32-39: request id
  242. dbRead needs a boolean to indicate if on first round all the historical
  243. record is synced. Second argument to indicate current db counter
  244. The third is the function to apply
  245. */
  246. func (self *syncDb) dbRead(useBatches bool, counter uint64, fun func(interface{}, chan bool) bool) {
  247. key := make([]byte, 42)
  248. copy(key, self.start)
  249. binary.BigEndian.PutUint64(key[34:], counter)
  250. var batches, n, cnt, total int
  251. var more bool
  252. var entry *syncDbEntry
  253. var it iterator.Iterator
  254. var del *leveldb.Batch
  255. batchSizes := make(chan int)
  256. for {
  257. // if useBatches is false, cnt is not set
  258. if useBatches {
  259. // this could be called before all cnt items sent out
  260. // so that loop is not blocking while delivering
  261. // only relevant if cnt is large
  262. select {
  263. case self.batch <- batchSizes:
  264. case <-self.quit:
  265. return
  266. }
  267. // wait for the write to finish and get the item count in the next batch
  268. cnt = <-batchSizes
  269. batches++
  270. if cnt == 0 {
  271. // empty
  272. return
  273. }
  274. }
  275. it = self.db.NewIterator()
  276. it.Seek(key)
  277. if !it.Valid() {
  278. copy(key, self.start)
  279. useBatches = true
  280. continue
  281. }
  282. del = new(leveldb.Batch)
  283. log.Trace(fmt.Sprintf("syncDb[%v/%v]: new iterator: %x (batch %v, count %v)", self.key.Log(), self.priority, key, batches, cnt))
  284. for n = 0; !useBatches || n < cnt; it.Next() {
  285. copy(key, it.Key())
  286. if len(key) == 0 || key[0] != 0 {
  287. copy(key, self.start)
  288. useBatches = true
  289. break
  290. }
  291. val := make([]byte, 40)
  292. copy(val, it.Value())
  293. entry = &syncDbEntry{key, val}
  294. // log.Trace(fmt.Sprintf("syncDb[%v/%v] - %v, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, self.key.Log(), batches, total, self.dbTotal, self.total))
  295. more = fun(entry, self.quit)
  296. if !more {
  297. // quit received when waiting to deliver entry, the entry will not be deleted
  298. log.Trace(fmt.Sprintf("syncDb[%v/%v] batch %v quit after %v/%v items", self.key.Log(), self.priority, batches, n, cnt))
  299. break
  300. }
  301. // since subsequent batches of the same db session are indexed incrementally
  302. // deleting earlier batches can be delayed and parallelised
  303. // this could be batch delete when db is idle (but added complexity esp when quitting)
  304. del.Delete(key)
  305. n++
  306. total++
  307. }
  308. log.Debug(fmt.Sprintf("syncDb[%v/%v] - db session closed, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, batches, total, self.dbTotal, self.total))
  309. self.db.Write(del) // this could be async called only when db is idle
  310. it.Release()
  311. }
  312. }
  313. //
  314. func (self *syncDb) stop() {
  315. close(self.quit)
  316. <-self.done
  317. }
  318. // calculate a dbkey for the request, for the db to work
  319. // see syncdb for db key structure
  320. // polimorphic: accepted types, see syncer#addRequest
  321. func (self *syncDb) newSyncDbEntry(req interface{}, counter uint64) (entry *syncDbEntry, err error) {
  322. var key storage.Key
  323. var chunk *storage.Chunk
  324. var id uint64
  325. var ok bool
  326. var sreq *storeRequestMsgData
  327. if key, ok = req.(storage.Key); ok {
  328. id = generateId()
  329. } else if chunk, ok = req.(*storage.Chunk); ok {
  330. key = chunk.Key
  331. id = generateId()
  332. } else if sreq, ok = req.(*storeRequestMsgData); ok {
  333. key = sreq.Key
  334. id = sreq.Id
  335. } else if entry, ok = req.(*syncDbEntry); !ok {
  336. return nil, fmt.Errorf("type not allowed: %v (%T)", req, req)
  337. }
  338. // order by peer > priority > seqid
  339. // value is request id if exists
  340. if entry == nil {
  341. dbkey := make([]byte, 42)
  342. dbval := make([]byte, 40)
  343. // encode key
  344. copy(dbkey[:], self.start[:34]) // db peer
  345. binary.BigEndian.PutUint64(dbkey[34:], counter)
  346. // encode value
  347. copy(dbval, key[:])
  348. binary.BigEndian.PutUint64(dbval[32:], id)
  349. entry = &syncDbEntry{dbkey, dbval}
  350. }
  351. return
  352. }