123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601 |
- // Copyright 2016 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- // disk storage layer for the package bzz
- // DbStore implements the ChunkStore interface and is used by the DPA as
- // persistent storage of chunks
- // it implements purging based on access count allowing for external control of
- // max capacity
- package storage
- import (
- "archive/tar"
- "bytes"
- "encoding/binary"
- "encoding/hex"
- "fmt"
- "io"
- "io/ioutil"
- "sync"
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/metrics"
- "github.com/ethereum/go-ethereum/rlp"
- "github.com/syndtr/goleveldb/leveldb"
- "github.com/syndtr/goleveldb/leveldb/iterator"
- )
- //metrics variables
- var (
- gcCounter = metrics.NewRegisteredCounter("storage.db.dbstore.gc.count", nil)
- dbStoreDeleteCounter = metrics.NewRegisteredCounter("storage.db.dbstore.rm.count", nil)
- )
- const (
- defaultDbCapacity = 5000000
- defaultRadius = 0 // not yet used
- gcArraySize = 10000
- gcArrayFreeRatio = 0.1
- // key prefixes for leveldb storage
- kpIndex = 0
- )
- var (
- keyAccessCnt = []byte{2}
- keyEntryCnt = []byte{3}
- keyDataIdx = []byte{4}
- keyGCPos = []byte{5}
- )
- type gcItem struct {
- idx uint64
- value uint64
- idxKey []byte
- }
- type DbStore struct {
- db *LDBDatabase
- // this should be stored in db, accessed transactionally
- entryCnt, accessCnt, dataIdx, capacity uint64
- gcPos, gcStartPos []byte
- gcArray []*gcItem
- hashfunc SwarmHasher
- lock sync.Mutex
- }
- func NewDbStore(path string, hash SwarmHasher, capacity uint64, radius int) (s *DbStore, err error) {
- s = new(DbStore)
- s.hashfunc = hash
- s.db, err = NewLDBDatabase(path)
- if err != nil {
- return
- }
- s.setCapacity(capacity)
- s.gcStartPos = make([]byte, 1)
- s.gcStartPos[0] = kpIndex
- s.gcArray = make([]*gcItem, gcArraySize)
- data, _ := s.db.Get(keyEntryCnt)
- s.entryCnt = BytesToU64(data)
- data, _ = s.db.Get(keyAccessCnt)
- s.accessCnt = BytesToU64(data)
- data, _ = s.db.Get(keyDataIdx)
- s.dataIdx = BytesToU64(data)
- s.gcPos, _ = s.db.Get(keyGCPos)
- if s.gcPos == nil {
- s.gcPos = s.gcStartPos
- }
- return
- }
- type dpaDBIndex struct {
- Idx uint64
- Access uint64
- }
- func BytesToU64(data []byte) uint64 {
- if len(data) < 8 {
- return 0
- }
- return binary.LittleEndian.Uint64(data)
- }
- func U64ToBytes(val uint64) []byte {
- data := make([]byte, 8)
- binary.LittleEndian.PutUint64(data, val)
- return data
- }
- func getIndexGCValue(index *dpaDBIndex) uint64 {
- return index.Access
- }
- func (s *DbStore) updateIndexAccess(index *dpaDBIndex) {
- index.Access = s.accessCnt
- }
- func getIndexKey(hash Key) []byte {
- HashSize := len(hash)
- key := make([]byte, HashSize+1)
- key[0] = 0
- copy(key[1:], hash[:])
- return key
- }
- func getDataKey(idx uint64) []byte {
- key := make([]byte, 9)
- key[0] = 1
- binary.BigEndian.PutUint64(key[1:9], idx)
- return key
- }
- func encodeIndex(index *dpaDBIndex) []byte {
- data, _ := rlp.EncodeToBytes(index)
- return data
- }
- func encodeData(chunk *Chunk) []byte {
- return chunk.SData
- }
- func decodeIndex(data []byte, index *dpaDBIndex) {
- dec := rlp.NewStream(bytes.NewReader(data), 0)
- dec.Decode(index)
- }
- func decodeData(data []byte, chunk *Chunk) {
- chunk.SData = data
- chunk.Size = int64(binary.LittleEndian.Uint64(data[0:8]))
- }
- func gcListPartition(list []*gcItem, left int, right int, pivotIndex int) int {
- pivotValue := list[pivotIndex].value
- dd := list[pivotIndex]
- list[pivotIndex] = list[right]
- list[right] = dd
- storeIndex := left
- for i := left; i < right; i++ {
- if list[i].value < pivotValue {
- dd = list[storeIndex]
- list[storeIndex] = list[i]
- list[i] = dd
- storeIndex++
- }
- }
- dd = list[storeIndex]
- list[storeIndex] = list[right]
- list[right] = dd
- return storeIndex
- }
- func gcListSelect(list []*gcItem, left int, right int, n int) int {
- if left == right {
- return left
- }
- pivotIndex := (left + right) / 2
- pivotIndex = gcListPartition(list, left, right, pivotIndex)
- if n == pivotIndex {
- return n
- } else {
- if n < pivotIndex {
- return gcListSelect(list, left, pivotIndex-1, n)
- } else {
- return gcListSelect(list, pivotIndex+1, right, n)
- }
- }
- }
- func (s *DbStore) collectGarbage(ratio float32) {
- it := s.db.NewIterator()
- it.Seek(s.gcPos)
- if it.Valid() {
- s.gcPos = it.Key()
- } else {
- s.gcPos = nil
- }
- gcnt := 0
- for (gcnt < gcArraySize) && (uint64(gcnt) < s.entryCnt) {
- if (s.gcPos == nil) || (s.gcPos[0] != kpIndex) {
- it.Seek(s.gcStartPos)
- if it.Valid() {
- s.gcPos = it.Key()
- } else {
- s.gcPos = nil
- }
- }
- if (s.gcPos == nil) || (s.gcPos[0] != kpIndex) {
- break
- }
- gci := new(gcItem)
- gci.idxKey = s.gcPos
- var index dpaDBIndex
- decodeIndex(it.Value(), &index)
- gci.idx = index.Idx
- // the smaller, the more likely to be gc'd
- gci.value = getIndexGCValue(&index)
- s.gcArray[gcnt] = gci
- gcnt++
- it.Next()
- if it.Valid() {
- s.gcPos = it.Key()
- } else {
- s.gcPos = nil
- }
- }
- it.Release()
- cutidx := gcListSelect(s.gcArray, 0, gcnt-1, int(float32(gcnt)*ratio))
- cutval := s.gcArray[cutidx].value
- // fmt.Print(gcnt, " ", s.entryCnt, " ")
- // actual gc
- for i := 0; i < gcnt; i++ {
- if s.gcArray[i].value <= cutval {
- gcCounter.Inc(1)
- s.delete(s.gcArray[i].idx, s.gcArray[i].idxKey)
- }
- }
- // fmt.Println(s.entryCnt)
- s.db.Put(keyGCPos, s.gcPos)
- }
- // Export writes all chunks from the store to a tar archive, returning the
- // number of chunks written.
- func (s *DbStore) Export(out io.Writer) (int64, error) {
- tw := tar.NewWriter(out)
- defer tw.Close()
- it := s.db.NewIterator()
- defer it.Release()
- var count int64
- for ok := it.Seek([]byte{kpIndex}); ok; ok = it.Next() {
- key := it.Key()
- if (key == nil) || (key[0] != kpIndex) {
- break
- }
- var index dpaDBIndex
- decodeIndex(it.Value(), &index)
- data, err := s.db.Get(getDataKey(index.Idx))
- if err != nil {
- log.Warn(fmt.Sprintf("Chunk %x found but could not be accessed: %v", key[:], err))
- continue
- }
- hdr := &tar.Header{
- Name: hex.EncodeToString(key[1:]),
- Mode: 0644,
- Size: int64(len(data)),
- }
- if err := tw.WriteHeader(hdr); err != nil {
- return count, err
- }
- if _, err := tw.Write(data); err != nil {
- return count, err
- }
- count++
- }
- return count, nil
- }
- // Import reads chunks into the store from a tar archive, returning the number
- // of chunks read.
- func (s *DbStore) Import(in io.Reader) (int64, error) {
- tr := tar.NewReader(in)
- var count int64
- for {
- hdr, err := tr.Next()
- if err == io.EOF {
- break
- } else if err != nil {
- return count, err
- }
- if len(hdr.Name) != 64 {
- log.Warn("ignoring non-chunk file", "name", hdr.Name)
- continue
- }
- key, err := hex.DecodeString(hdr.Name)
- if err != nil {
- log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err)
- continue
- }
- data, err := ioutil.ReadAll(tr)
- if err != nil {
- return count, err
- }
- s.Put(&Chunk{Key: key, SData: data})
- count++
- }
- return count, nil
- }
- func (s *DbStore) Cleanup() {
- //Iterates over the database and checks that there are no faulty chunks
- it := s.db.NewIterator()
- startPosition := []byte{kpIndex}
- it.Seek(startPosition)
- var key []byte
- var errorsFound, total int
- for it.Valid() {
- key = it.Key()
- if (key == nil) || (key[0] != kpIndex) {
- break
- }
- total++
- var index dpaDBIndex
- decodeIndex(it.Value(), &index)
- data, err := s.db.Get(getDataKey(index.Idx))
- if err != nil {
- log.Warn(fmt.Sprintf("Chunk %x found but could not be accessed: %v", key[:], err))
- s.delete(index.Idx, getIndexKey(key[1:]))
- errorsFound++
- } else {
- hasher := s.hashfunc()
- hasher.Write(data)
- hash := hasher.Sum(nil)
- if !bytes.Equal(hash, key[1:]) {
- log.Warn(fmt.Sprintf("Found invalid chunk. Hash mismatch. hash=%x, key=%x", hash, key[:]))
- s.delete(index.Idx, getIndexKey(key[1:]))
- errorsFound++
- }
- }
- it.Next()
- }
- it.Release()
- log.Warn(fmt.Sprintf("Found %v errors out of %v entries", errorsFound, total))
- }
- func (s *DbStore) delete(idx uint64, idxKey []byte) {
- batch := new(leveldb.Batch)
- batch.Delete(idxKey)
- batch.Delete(getDataKey(idx))
- dbStoreDeleteCounter.Inc(1)
- s.entryCnt--
- batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
- s.db.Write(batch)
- }
- func (s *DbStore) Counter() uint64 {
- s.lock.Lock()
- defer s.lock.Unlock()
- return s.dataIdx
- }
- func (s *DbStore) Put(chunk *Chunk) {
- s.lock.Lock()
- defer s.lock.Unlock()
- ikey := getIndexKey(chunk.Key)
- var index dpaDBIndex
- if s.tryAccessIdx(ikey, &index) {
- if chunk.dbStored != nil {
- close(chunk.dbStored)
- }
- log.Trace(fmt.Sprintf("Storing to DB: chunk already exists, only update access"))
- return // already exists, only update access
- }
- data := encodeData(chunk)
- //data := ethutil.Encode([]interface{}{entry})
- if s.entryCnt >= s.capacity {
- s.collectGarbage(gcArrayFreeRatio)
- }
- batch := new(leveldb.Batch)
- batch.Put(getDataKey(s.dataIdx), data)
- index.Idx = s.dataIdx
- s.updateIndexAccess(&index)
- idata := encodeIndex(&index)
- batch.Put(ikey, idata)
- batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
- s.entryCnt++
- batch.Put(keyDataIdx, U64ToBytes(s.dataIdx))
- s.dataIdx++
- batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
- s.accessCnt++
- s.db.Write(batch)
- if chunk.dbStored != nil {
- close(chunk.dbStored)
- }
- log.Trace(fmt.Sprintf("DbStore.Put: %v. db storage counter: %v ", chunk.Key.Log(), s.dataIdx))
- }
- // try to find index; if found, update access cnt and return true
- func (s *DbStore) tryAccessIdx(ikey []byte, index *dpaDBIndex) bool {
- idata, err := s.db.Get(ikey)
- if err != nil {
- return false
- }
- decodeIndex(idata, index)
- batch := new(leveldb.Batch)
- batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
- s.accessCnt++
- s.updateIndexAccess(index)
- idata = encodeIndex(index)
- batch.Put(ikey, idata)
- s.db.Write(batch)
- return true
- }
- func (s *DbStore) Get(key Key) (chunk *Chunk, err error) {
- s.lock.Lock()
- defer s.lock.Unlock()
- var index dpaDBIndex
- if s.tryAccessIdx(getIndexKey(key), &index) {
- var data []byte
- data, err = s.db.Get(getDataKey(index.Idx))
- if err != nil {
- log.Trace(fmt.Sprintf("DBStore: Chunk %v found but could not be accessed: %v", key.Log(), err))
- s.delete(index.Idx, getIndexKey(key))
- return
- }
- hasher := s.hashfunc()
- hasher.Write(data)
- hash := hasher.Sum(nil)
- if !bytes.Equal(hash, key) {
- s.delete(index.Idx, getIndexKey(key))
- log.Warn("Invalid Chunk in Database. Please repair with command: 'swarm cleandb'")
- }
- chunk = &Chunk{
- Key: key,
- }
- decodeData(data, chunk)
- } else {
- err = notFound
- }
- return
- }
- func (s *DbStore) updateAccessCnt(key Key) {
- s.lock.Lock()
- defer s.lock.Unlock()
- var index dpaDBIndex
- s.tryAccessIdx(getIndexKey(key), &index) // result_chn == nil, only update access cnt
- }
- func (s *DbStore) setCapacity(c uint64) {
- s.lock.Lock()
- defer s.lock.Unlock()
- s.capacity = c
- if s.entryCnt > c {
- ratio := float32(1.01) - float32(c)/float32(s.entryCnt)
- if ratio < gcArrayFreeRatio {
- ratio = gcArrayFreeRatio
- }
- if ratio > 1 {
- ratio = 1
- }
- for s.entryCnt > c {
- s.collectGarbage(ratio)
- }
- }
- }
- func (s *DbStore) Close() {
- s.db.Close()
- }
- // describes a section of the DbStore representing the unsynced
- // domain relevant to a peer
- // Start - Stop designate a continuous area Keys in an address space
- // typically the addresses closer to us than to the peer but not closer
- // another closer peer in between
- // From - To designates a time interval typically from the last disconnect
- // till the latest connection (real time traffic is relayed)
- type DbSyncState struct {
- Start, Stop Key
- First, Last uint64
- }
- // implements the syncer iterator interface
- // iterates by storage index (~ time of storage = first entry to db)
- type dbSyncIterator struct {
- it iterator.Iterator
- DbSyncState
- }
- // initialises a sync iterator from a syncToken (passed in with the handshake)
- func (self *DbStore) NewSyncIterator(state DbSyncState) (si *dbSyncIterator, err error) {
- if state.First > state.Last {
- return nil, fmt.Errorf("no entries found")
- }
- si = &dbSyncIterator{
- it: self.db.NewIterator(),
- DbSyncState: state,
- }
- si.it.Seek(getIndexKey(state.Start))
- return si, nil
- }
- // walk the area from Start to Stop and returns items within time interval
- // First to Last
- func (self *dbSyncIterator) Next() (key Key) {
- for self.it.Valid() {
- dbkey := self.it.Key()
- if dbkey[0] != 0 {
- break
- }
- key = Key(make([]byte, len(dbkey)-1))
- copy(key[:], dbkey[1:])
- if bytes.Compare(key[:], self.Start) <= 0 {
- self.it.Next()
- continue
- }
- if bytes.Compare(key[:], self.Stop) > 0 {
- break
- }
- var index dpaDBIndex
- decodeIndex(self.it.Value(), &index)
- self.it.Next()
- if (index.Idx >= self.First) && (index.Idx < self.Last) {
- return
- }
- }
- self.it.Release()
- return nil
- }
|