dbstore.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // disk storage layer for the package bzz
  17. // DbStore implements the ChunkStore interface and is used by the DPA as
  18. // persistent storage of chunks
  19. // it implements purging based on access count allowing for external control of
  20. // max capacity
  21. package storage
  22. import (
  23. "archive/tar"
  24. "bytes"
  25. "encoding/binary"
  26. "encoding/hex"
  27. "fmt"
  28. "io"
  29. "io/ioutil"
  30. "sync"
  31. "github.com/ethereum/go-ethereum/log"
  32. "github.com/ethereum/go-ethereum/metrics"
  33. "github.com/ethereum/go-ethereum/rlp"
  34. "github.com/syndtr/goleveldb/leveldb"
  35. "github.com/syndtr/goleveldb/leveldb/iterator"
  36. )
  37. //metrics variables
  38. var (
  39. gcCounter = metrics.NewRegisteredCounter("storage.db.dbstore.gc.count", nil)
  40. dbStoreDeleteCounter = metrics.NewRegisteredCounter("storage.db.dbstore.rm.count", nil)
  41. )
  42. const (
  43. defaultDbCapacity = 5000000
  44. defaultRadius = 0 // not yet used
  45. gcArraySize = 10000
  46. gcArrayFreeRatio = 0.1
  47. // key prefixes for leveldb storage
  48. kpIndex = 0
  49. )
  50. var (
  51. keyAccessCnt = []byte{2}
  52. keyEntryCnt = []byte{3}
  53. keyDataIdx = []byte{4}
  54. keyGCPos = []byte{5}
  55. )
  56. type gcItem struct {
  57. idx uint64
  58. value uint64
  59. idxKey []byte
  60. }
  61. type DbStore struct {
  62. db *LDBDatabase
  63. // this should be stored in db, accessed transactionally
  64. entryCnt, accessCnt, dataIdx, capacity uint64
  65. gcPos, gcStartPos []byte
  66. gcArray []*gcItem
  67. hashfunc SwarmHasher
  68. lock sync.Mutex
  69. }
  70. func NewDbStore(path string, hash SwarmHasher, capacity uint64, radius int) (s *DbStore, err error) {
  71. s = new(DbStore)
  72. s.hashfunc = hash
  73. s.db, err = NewLDBDatabase(path)
  74. if err != nil {
  75. return
  76. }
  77. s.setCapacity(capacity)
  78. s.gcStartPos = make([]byte, 1)
  79. s.gcStartPos[0] = kpIndex
  80. s.gcArray = make([]*gcItem, gcArraySize)
  81. data, _ := s.db.Get(keyEntryCnt)
  82. s.entryCnt = BytesToU64(data)
  83. data, _ = s.db.Get(keyAccessCnt)
  84. s.accessCnt = BytesToU64(data)
  85. data, _ = s.db.Get(keyDataIdx)
  86. s.dataIdx = BytesToU64(data)
  87. s.gcPos, _ = s.db.Get(keyGCPos)
  88. if s.gcPos == nil {
  89. s.gcPos = s.gcStartPos
  90. }
  91. return
  92. }
  93. type dpaDBIndex struct {
  94. Idx uint64
  95. Access uint64
  96. }
  97. func BytesToU64(data []byte) uint64 {
  98. if len(data) < 8 {
  99. return 0
  100. }
  101. return binary.LittleEndian.Uint64(data)
  102. }
  103. func U64ToBytes(val uint64) []byte {
  104. data := make([]byte, 8)
  105. binary.LittleEndian.PutUint64(data, val)
  106. return data
  107. }
  108. func getIndexGCValue(index *dpaDBIndex) uint64 {
  109. return index.Access
  110. }
  111. func (s *DbStore) updateIndexAccess(index *dpaDBIndex) {
  112. index.Access = s.accessCnt
  113. }
  114. func getIndexKey(hash Key) []byte {
  115. HashSize := len(hash)
  116. key := make([]byte, HashSize+1)
  117. key[0] = 0
  118. copy(key[1:], hash[:])
  119. return key
  120. }
  121. func getDataKey(idx uint64) []byte {
  122. key := make([]byte, 9)
  123. key[0] = 1
  124. binary.BigEndian.PutUint64(key[1:9], idx)
  125. return key
  126. }
  127. func encodeIndex(index *dpaDBIndex) []byte {
  128. data, _ := rlp.EncodeToBytes(index)
  129. return data
  130. }
  131. func encodeData(chunk *Chunk) []byte {
  132. return chunk.SData
  133. }
  134. func decodeIndex(data []byte, index *dpaDBIndex) {
  135. dec := rlp.NewStream(bytes.NewReader(data), 0)
  136. dec.Decode(index)
  137. }
  138. func decodeData(data []byte, chunk *Chunk) {
  139. chunk.SData = data
  140. chunk.Size = int64(binary.LittleEndian.Uint64(data[0:8]))
  141. }
  142. func gcListPartition(list []*gcItem, left int, right int, pivotIndex int) int {
  143. pivotValue := list[pivotIndex].value
  144. dd := list[pivotIndex]
  145. list[pivotIndex] = list[right]
  146. list[right] = dd
  147. storeIndex := left
  148. for i := left; i < right; i++ {
  149. if list[i].value < pivotValue {
  150. dd = list[storeIndex]
  151. list[storeIndex] = list[i]
  152. list[i] = dd
  153. storeIndex++
  154. }
  155. }
  156. dd = list[storeIndex]
  157. list[storeIndex] = list[right]
  158. list[right] = dd
  159. return storeIndex
  160. }
  161. func gcListSelect(list []*gcItem, left int, right int, n int) int {
  162. if left == right {
  163. return left
  164. }
  165. pivotIndex := (left + right) / 2
  166. pivotIndex = gcListPartition(list, left, right, pivotIndex)
  167. if n == pivotIndex {
  168. return n
  169. } else {
  170. if n < pivotIndex {
  171. return gcListSelect(list, left, pivotIndex-1, n)
  172. } else {
  173. return gcListSelect(list, pivotIndex+1, right, n)
  174. }
  175. }
  176. }
  177. func (s *DbStore) collectGarbage(ratio float32) {
  178. it := s.db.NewIterator()
  179. it.Seek(s.gcPos)
  180. if it.Valid() {
  181. s.gcPos = it.Key()
  182. } else {
  183. s.gcPos = nil
  184. }
  185. gcnt := 0
  186. for (gcnt < gcArraySize) && (uint64(gcnt) < s.entryCnt) {
  187. if (s.gcPos == nil) || (s.gcPos[0] != kpIndex) {
  188. it.Seek(s.gcStartPos)
  189. if it.Valid() {
  190. s.gcPos = it.Key()
  191. } else {
  192. s.gcPos = nil
  193. }
  194. }
  195. if (s.gcPos == nil) || (s.gcPos[0] != kpIndex) {
  196. break
  197. }
  198. gci := new(gcItem)
  199. gci.idxKey = s.gcPos
  200. var index dpaDBIndex
  201. decodeIndex(it.Value(), &index)
  202. gci.idx = index.Idx
  203. // the smaller, the more likely to be gc'd
  204. gci.value = getIndexGCValue(&index)
  205. s.gcArray[gcnt] = gci
  206. gcnt++
  207. it.Next()
  208. if it.Valid() {
  209. s.gcPos = it.Key()
  210. } else {
  211. s.gcPos = nil
  212. }
  213. }
  214. it.Release()
  215. cutidx := gcListSelect(s.gcArray, 0, gcnt-1, int(float32(gcnt)*ratio))
  216. cutval := s.gcArray[cutidx].value
  217. // fmt.Print(gcnt, " ", s.entryCnt, " ")
  218. // actual gc
  219. for i := 0; i < gcnt; i++ {
  220. if s.gcArray[i].value <= cutval {
  221. gcCounter.Inc(1)
  222. s.delete(s.gcArray[i].idx, s.gcArray[i].idxKey)
  223. }
  224. }
  225. // fmt.Println(s.entryCnt)
  226. s.db.Put(keyGCPos, s.gcPos)
  227. }
  228. // Export writes all chunks from the store to a tar archive, returning the
  229. // number of chunks written.
  230. func (s *DbStore) Export(out io.Writer) (int64, error) {
  231. tw := tar.NewWriter(out)
  232. defer tw.Close()
  233. it := s.db.NewIterator()
  234. defer it.Release()
  235. var count int64
  236. for ok := it.Seek([]byte{kpIndex}); ok; ok = it.Next() {
  237. key := it.Key()
  238. if (key == nil) || (key[0] != kpIndex) {
  239. break
  240. }
  241. var index dpaDBIndex
  242. decodeIndex(it.Value(), &index)
  243. data, err := s.db.Get(getDataKey(index.Idx))
  244. if err != nil {
  245. log.Warn(fmt.Sprintf("Chunk %x found but could not be accessed: %v", key[:], err))
  246. continue
  247. }
  248. hdr := &tar.Header{
  249. Name: hex.EncodeToString(key[1:]),
  250. Mode: 0644,
  251. Size: int64(len(data)),
  252. }
  253. if err := tw.WriteHeader(hdr); err != nil {
  254. return count, err
  255. }
  256. if _, err := tw.Write(data); err != nil {
  257. return count, err
  258. }
  259. count++
  260. }
  261. return count, nil
  262. }
  263. // Import reads chunks into the store from a tar archive, returning the number
  264. // of chunks read.
  265. func (s *DbStore) Import(in io.Reader) (int64, error) {
  266. tr := tar.NewReader(in)
  267. var count int64
  268. for {
  269. hdr, err := tr.Next()
  270. if err == io.EOF {
  271. break
  272. } else if err != nil {
  273. return count, err
  274. }
  275. if len(hdr.Name) != 64 {
  276. log.Warn("ignoring non-chunk file", "name", hdr.Name)
  277. continue
  278. }
  279. key, err := hex.DecodeString(hdr.Name)
  280. if err != nil {
  281. log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err)
  282. continue
  283. }
  284. data, err := ioutil.ReadAll(tr)
  285. if err != nil {
  286. return count, err
  287. }
  288. s.Put(&Chunk{Key: key, SData: data})
  289. count++
  290. }
  291. return count, nil
  292. }
  293. func (s *DbStore) Cleanup() {
  294. //Iterates over the database and checks that there are no faulty chunks
  295. it := s.db.NewIterator()
  296. startPosition := []byte{kpIndex}
  297. it.Seek(startPosition)
  298. var key []byte
  299. var errorsFound, total int
  300. for it.Valid() {
  301. key = it.Key()
  302. if (key == nil) || (key[0] != kpIndex) {
  303. break
  304. }
  305. total++
  306. var index dpaDBIndex
  307. decodeIndex(it.Value(), &index)
  308. data, err := s.db.Get(getDataKey(index.Idx))
  309. if err != nil {
  310. log.Warn(fmt.Sprintf("Chunk %x found but could not be accessed: %v", key[:], err))
  311. s.delete(index.Idx, getIndexKey(key[1:]))
  312. errorsFound++
  313. } else {
  314. hasher := s.hashfunc()
  315. hasher.Write(data)
  316. hash := hasher.Sum(nil)
  317. if !bytes.Equal(hash, key[1:]) {
  318. log.Warn(fmt.Sprintf("Found invalid chunk. Hash mismatch. hash=%x, key=%x", hash, key[:]))
  319. s.delete(index.Idx, getIndexKey(key[1:]))
  320. errorsFound++
  321. }
  322. }
  323. it.Next()
  324. }
  325. it.Release()
  326. log.Warn(fmt.Sprintf("Found %v errors out of %v entries", errorsFound, total))
  327. }
  328. func (s *DbStore) delete(idx uint64, idxKey []byte) {
  329. batch := new(leveldb.Batch)
  330. batch.Delete(idxKey)
  331. batch.Delete(getDataKey(idx))
  332. dbStoreDeleteCounter.Inc(1)
  333. s.entryCnt--
  334. batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
  335. s.db.Write(batch)
  336. }
  337. func (s *DbStore) Counter() uint64 {
  338. s.lock.Lock()
  339. defer s.lock.Unlock()
  340. return s.dataIdx
  341. }
  342. func (s *DbStore) Put(chunk *Chunk) {
  343. s.lock.Lock()
  344. defer s.lock.Unlock()
  345. ikey := getIndexKey(chunk.Key)
  346. var index dpaDBIndex
  347. if s.tryAccessIdx(ikey, &index) {
  348. if chunk.dbStored != nil {
  349. close(chunk.dbStored)
  350. }
  351. log.Trace(fmt.Sprintf("Storing to DB: chunk already exists, only update access"))
  352. return // already exists, only update access
  353. }
  354. data := encodeData(chunk)
  355. //data := ethutil.Encode([]interface{}{entry})
  356. if s.entryCnt >= s.capacity {
  357. s.collectGarbage(gcArrayFreeRatio)
  358. }
  359. batch := new(leveldb.Batch)
  360. batch.Put(getDataKey(s.dataIdx), data)
  361. index.Idx = s.dataIdx
  362. s.updateIndexAccess(&index)
  363. idata := encodeIndex(&index)
  364. batch.Put(ikey, idata)
  365. batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
  366. s.entryCnt++
  367. batch.Put(keyDataIdx, U64ToBytes(s.dataIdx))
  368. s.dataIdx++
  369. batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
  370. s.accessCnt++
  371. s.db.Write(batch)
  372. if chunk.dbStored != nil {
  373. close(chunk.dbStored)
  374. }
  375. log.Trace(fmt.Sprintf("DbStore.Put: %v. db storage counter: %v ", chunk.Key.Log(), s.dataIdx))
  376. }
  377. // try to find index; if found, update access cnt and return true
  378. func (s *DbStore) tryAccessIdx(ikey []byte, index *dpaDBIndex) bool {
  379. idata, err := s.db.Get(ikey)
  380. if err != nil {
  381. return false
  382. }
  383. decodeIndex(idata, index)
  384. batch := new(leveldb.Batch)
  385. batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
  386. s.accessCnt++
  387. s.updateIndexAccess(index)
  388. idata = encodeIndex(index)
  389. batch.Put(ikey, idata)
  390. s.db.Write(batch)
  391. return true
  392. }
  393. func (s *DbStore) Get(key Key) (chunk *Chunk, err error) {
  394. s.lock.Lock()
  395. defer s.lock.Unlock()
  396. var index dpaDBIndex
  397. if s.tryAccessIdx(getIndexKey(key), &index) {
  398. var data []byte
  399. data, err = s.db.Get(getDataKey(index.Idx))
  400. if err != nil {
  401. log.Trace(fmt.Sprintf("DBStore: Chunk %v found but could not be accessed: %v", key.Log(), err))
  402. s.delete(index.Idx, getIndexKey(key))
  403. return
  404. }
  405. hasher := s.hashfunc()
  406. hasher.Write(data)
  407. hash := hasher.Sum(nil)
  408. if !bytes.Equal(hash, key) {
  409. s.delete(index.Idx, getIndexKey(key))
  410. log.Warn("Invalid Chunk in Database. Please repair with command: 'swarm cleandb'")
  411. }
  412. chunk = &Chunk{
  413. Key: key,
  414. }
  415. decodeData(data, chunk)
  416. } else {
  417. err = notFound
  418. }
  419. return
  420. }
  421. func (s *DbStore) updateAccessCnt(key Key) {
  422. s.lock.Lock()
  423. defer s.lock.Unlock()
  424. var index dpaDBIndex
  425. s.tryAccessIdx(getIndexKey(key), &index) // result_chn == nil, only update access cnt
  426. }
  427. func (s *DbStore) setCapacity(c uint64) {
  428. s.lock.Lock()
  429. defer s.lock.Unlock()
  430. s.capacity = c
  431. if s.entryCnt > c {
  432. ratio := float32(1.01) - float32(c)/float32(s.entryCnt)
  433. if ratio < gcArrayFreeRatio {
  434. ratio = gcArrayFreeRatio
  435. }
  436. if ratio > 1 {
  437. ratio = 1
  438. }
  439. for s.entryCnt > c {
  440. s.collectGarbage(ratio)
  441. }
  442. }
  443. }
  444. func (s *DbStore) Close() {
  445. s.db.Close()
  446. }
  447. // describes a section of the DbStore representing the unsynced
  448. // domain relevant to a peer
  449. // Start - Stop designate a continuous area Keys in an address space
  450. // typically the addresses closer to us than to the peer but not closer
  451. // another closer peer in between
  452. // From - To designates a time interval typically from the last disconnect
  453. // till the latest connection (real time traffic is relayed)
  454. type DbSyncState struct {
  455. Start, Stop Key
  456. First, Last uint64
  457. }
  458. // implements the syncer iterator interface
  459. // iterates by storage index (~ time of storage = first entry to db)
  460. type dbSyncIterator struct {
  461. it iterator.Iterator
  462. DbSyncState
  463. }
  464. // initialises a sync iterator from a syncToken (passed in with the handshake)
  465. func (self *DbStore) NewSyncIterator(state DbSyncState) (si *dbSyncIterator, err error) {
  466. if state.First > state.Last {
  467. return nil, fmt.Errorf("no entries found")
  468. }
  469. si = &dbSyncIterator{
  470. it: self.db.NewIterator(),
  471. DbSyncState: state,
  472. }
  473. si.it.Seek(getIndexKey(state.Start))
  474. return si, nil
  475. }
  476. // walk the area from Start to Stop and returns items within time interval
  477. // First to Last
  478. func (self *dbSyncIterator) Next() (key Key) {
  479. for self.it.Valid() {
  480. dbkey := self.it.Key()
  481. if dbkey[0] != 0 {
  482. break
  483. }
  484. key = Key(make([]byte, len(dbkey)-1))
  485. copy(key[:], dbkey[1:])
  486. if bytes.Compare(key[:], self.Start) <= 0 {
  487. self.it.Next()
  488. continue
  489. }
  490. if bytes.Compare(key[:], self.Stop) > 0 {
  491. break
  492. }
  493. var index dpaDBIndex
  494. decodeIndex(self.it.Value(), &index)
  495. self.it.Next()
  496. if (index.Idx >= self.First) && (index.Idx < self.Last) {
  497. return
  498. }
  499. }
  500. self.it.Release()
  501. return nil
  502. }