123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518 |
- // Copyright 2018 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package trie
- import (
- "sync"
- "time"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/metrics"
- )
- var (
- memcacheFlushTimeTimer = metrics.NewRegisteredResettingTimer("trie/memcache/flush/time", nil)
- memcacheFlushNodesMeter = metrics.NewRegisteredMeter("trie/memcache/flush/nodes", nil)
- memcacheFlushSizeMeter = metrics.NewRegisteredMeter("trie/memcache/flush/size", nil)
- memcacheGCTimeTimer = metrics.NewRegisteredResettingTimer("trie/memcache/gc/time", nil)
- memcacheGCNodesMeter = metrics.NewRegisteredMeter("trie/memcache/gc/nodes", nil)
- memcacheGCSizeMeter = metrics.NewRegisteredMeter("trie/memcache/gc/size", nil)
- memcacheCommitTimeTimer = metrics.NewRegisteredResettingTimer("trie/memcache/commit/time", nil)
- memcacheCommitNodesMeter = metrics.NewRegisteredMeter("trie/memcache/commit/nodes", nil)
- memcacheCommitSizeMeter = metrics.NewRegisteredMeter("trie/memcache/commit/size", nil)
- )
- // secureKeyPrefix is the database key prefix used to store trie node preimages.
- var secureKeyPrefix = []byte("secure-key-")
- // secureKeyLength is the length of the above prefix + 32byte hash.
- const secureKeyLength = 11 + 32
- // DatabaseReader wraps the Get and Has method of a backing store for the trie.
- type DatabaseReader interface {
- // Get retrieves the value associated with key form the database.
- Get(key []byte) (value []byte, err error)
- // Has retrieves whether a key is present in the database.
- Has(key []byte) (bool, error)
- }
- // Database is an intermediate write layer between the trie data structures and
- // the disk database. The aim is to accumulate trie writes in-memory and only
- // periodically flush a couple tries to disk, garbage collecting the remainder.
- type Database struct {
- diskdb ethdb.Database // Persistent storage for matured trie nodes
- nodes map[common.Hash]*cachedNode // Data and references relationships of a node
- oldest common.Hash // Oldest tracked node, flush-list head
- newest common.Hash // Newest tracked node, flush-list tail
- preimages map[common.Hash][]byte // Preimages of nodes from the secure trie
- seckeybuf [secureKeyLength]byte // Ephemeral buffer for calculating preimage keys
- gctime time.Duration // Time spent on garbage collection since last commit
- gcnodes uint64 // Nodes garbage collected since last commit
- gcsize common.StorageSize // Data storage garbage collected since last commit
- flushtime time.Duration // Time spent on data flushing since last commit
- flushnodes uint64 // Nodes flushed since last commit
- flushsize common.StorageSize // Data storage flushed since last commit
- nodesSize common.StorageSize // Storage size of the nodes cache (exc. flushlist)
- preimagesSize common.StorageSize // Storage size of the preimages cache
- lock sync.RWMutex
- }
- // cachedNode is all the information we know about a single cached node in the
- // memory database write layer.
- type cachedNode struct {
- blob []byte // Cached data block of the trie node
- parents int // Number of live nodes referencing this one
- children map[common.Hash]int // Children referenced by this nodes
- flushPrev common.Hash // Previous node in the flush-list
- flushNext common.Hash // Next node in the flush-list
- }
- // NewDatabase creates a new trie database to store ephemeral trie content before
- // its written out to disk or garbage collected.
- func NewDatabase(diskdb ethdb.Database) *Database {
- return &Database{
- diskdb: diskdb,
- nodes: map[common.Hash]*cachedNode{
- {}: {children: make(map[common.Hash]int)},
- },
- preimages: make(map[common.Hash][]byte),
- }
- }
- // DiskDB retrieves the persistent storage backing the trie database.
- func (db *Database) DiskDB() DatabaseReader {
- return db.diskdb
- }
- // Insert writes a new trie node to the memory database if it's yet unknown. The
- // method will make a copy of the slice.
- func (db *Database) Insert(hash common.Hash, blob []byte) {
- db.lock.Lock()
- defer db.lock.Unlock()
- db.insert(hash, blob)
- }
- // insert is the private locked version of Insert.
- func (db *Database) insert(hash common.Hash, blob []byte) {
- // If the node's already cached, skip
- if _, ok := db.nodes[hash]; ok {
- return
- }
- db.nodes[hash] = &cachedNode{
- blob: common.CopyBytes(blob),
- children: make(map[common.Hash]int),
- flushPrev: db.newest,
- }
- // Update the flush-list endpoints
- if db.oldest == (common.Hash{}) {
- db.oldest, db.newest = hash, hash
- } else {
- db.nodes[db.newest].flushNext, db.newest = hash, hash
- }
- db.nodesSize += common.StorageSize(common.HashLength + len(blob))
- }
- // insertPreimage writes a new trie node pre-image to the memory database if it's
- // yet unknown. The method will make a copy of the slice.
- //
- // Note, this method assumes that the database's lock is held!
- func (db *Database) insertPreimage(hash common.Hash, preimage []byte) {
- if _, ok := db.preimages[hash]; ok {
- return
- }
- db.preimages[hash] = common.CopyBytes(preimage)
- db.preimagesSize += common.StorageSize(common.HashLength + len(preimage))
- }
- // Node retrieves a cached trie node from memory. If it cannot be found cached,
- // the method queries the persistent database for the content.
- func (db *Database) Node(hash common.Hash) ([]byte, error) {
- // Retrieve the node from cache if available
- db.lock.RLock()
- node := db.nodes[hash]
- db.lock.RUnlock()
- if node != nil {
- return node.blob, nil
- }
- // Content unavailable in memory, attempt to retrieve from disk
- return db.diskdb.Get(hash[:])
- }
- // preimage retrieves a cached trie node pre-image from memory. If it cannot be
- // found cached, the method queries the persistent database for the content.
- func (db *Database) preimage(hash common.Hash) ([]byte, error) {
- // Retrieve the node from cache if available
- db.lock.RLock()
- preimage := db.preimages[hash]
- db.lock.RUnlock()
- if preimage != nil {
- return preimage, nil
- }
- // Content unavailable in memory, attempt to retrieve from disk
- return db.diskdb.Get(db.secureKey(hash[:]))
- }
- // secureKey returns the database key for the preimage of key, as an ephemeral
- // buffer. The caller must not hold onto the return value because it will become
- // invalid on the next call.
- func (db *Database) secureKey(key []byte) []byte {
- buf := append(db.seckeybuf[:0], secureKeyPrefix...)
- buf = append(buf, key...)
- return buf
- }
- // Nodes retrieves the hashes of all the nodes cached within the memory database.
- // This method is extremely expensive and should only be used to validate internal
- // states in test code.
- func (db *Database) Nodes() []common.Hash {
- db.lock.RLock()
- defer db.lock.RUnlock()
- var hashes = make([]common.Hash, 0, len(db.nodes))
- for hash := range db.nodes {
- if hash != (common.Hash{}) { // Special case for "root" references/nodes
- hashes = append(hashes, hash)
- }
- }
- return hashes
- }
- // Reference adds a new reference from a parent node to a child node.
- func (db *Database) Reference(child common.Hash, parent common.Hash) {
- db.lock.RLock()
- defer db.lock.RUnlock()
- db.reference(child, parent)
- }
- // reference is the private locked version of Reference.
- func (db *Database) reference(child common.Hash, parent common.Hash) {
- // If the node does not exist, it's a node pulled from disk, skip
- node, ok := db.nodes[child]
- if !ok {
- return
- }
- // If the reference already exists, only duplicate for roots
- if _, ok = db.nodes[parent].children[child]; ok && parent != (common.Hash{}) {
- return
- }
- node.parents++
- db.nodes[parent].children[child]++
- }
- // Dereference removes an existing reference from a parent node to a child node.
- func (db *Database) Dereference(child common.Hash, parent common.Hash) {
- db.lock.Lock()
- defer db.lock.Unlock()
- nodes, storage, start := len(db.nodes), db.nodesSize, time.Now()
- db.dereference(child, parent)
- db.gcnodes += uint64(nodes - len(db.nodes))
- db.gcsize += storage - db.nodesSize
- db.gctime += time.Since(start)
- memcacheGCTimeTimer.Update(time.Since(start))
- memcacheGCSizeMeter.Mark(int64(storage - db.nodesSize))
- memcacheGCNodesMeter.Mark(int64(nodes - len(db.nodes)))
- log.Debug("Dereferenced trie from memory database", "nodes", nodes-len(db.nodes), "size", storage-db.nodesSize, "time", time.Since(start),
- "gcnodes", db.gcnodes, "gcsize", db.gcsize, "gctime", db.gctime, "livenodes", len(db.nodes), "livesize", db.nodesSize)
- }
- // dereference is the private locked version of Dereference.
- func (db *Database) dereference(child common.Hash, parent common.Hash) {
- // Dereference the parent-child
- node := db.nodes[parent]
- node.children[child]--
- if node.children[child] == 0 {
- delete(node.children, child)
- }
- // If the child does not exist, it's a previously committed node.
- node, ok := db.nodes[child]
- if !ok {
- return
- }
- // If there are no more references to the child, delete it and cascade
- node.parents--
- if node.parents == 0 {
- // Remove the node from the flush-list
- if child == db.oldest {
- db.oldest = node.flushNext
- } else {
- db.nodes[node.flushPrev].flushNext = node.flushNext
- db.nodes[node.flushNext].flushPrev = node.flushPrev
- }
- // Dereference all children and delete the node
- for hash := range node.children {
- db.dereference(hash, child)
- }
- delete(db.nodes, child)
- db.nodesSize -= common.StorageSize(common.HashLength + len(node.blob))
- }
- }
- // Cap iteratively flushes old but still referenced trie nodes until the total
- // memory usage goes below the given threshold.
- func (db *Database) Cap(limit common.StorageSize) error {
- // Create a database batch to flush persistent data out. It is important that
- // outside code doesn't see an inconsistent state (referenced data removed from
- // memory cache during commit but not yet in persistent storage). This is ensured
- // by only uncaching existing data when the database write finalizes.
- db.lock.RLock()
- nodes, storage, start := len(db.nodes), db.nodesSize, time.Now()
- batch := db.diskdb.NewBatch()
- // db.nodesSize only contains the useful data in the cache, but when reporting
- // the total memory consumption, the maintenance metadata is also needed to be
- // counted. For every useful node, we track 2 extra hashes as the flushlist.
- size := db.nodesSize + common.StorageSize(len(db.nodes)*2*common.HashLength)
- // If the preimage cache got large enough, push to disk. If it's still small
- // leave for later to deduplicate writes.
- flushPreimages := db.preimagesSize > 4*1024*1024
- if flushPreimages {
- for hash, preimage := range db.preimages {
- if err := batch.Put(db.secureKey(hash[:]), preimage); err != nil {
- log.Error("Failed to commit preimage from trie database", "err", err)
- db.lock.RUnlock()
- return err
- }
- if batch.ValueSize() > ethdb.IdealBatchSize {
- if err := batch.Write(); err != nil {
- db.lock.RUnlock()
- return err
- }
- batch.Reset()
- }
- }
- }
- // Keep committing nodes from the flush-list until we're below allowance
- oldest := db.oldest
- for size > limit && oldest != (common.Hash{}) {
- // Fetch the oldest referenced node and push into the batch
- node := db.nodes[oldest]
- if err := batch.Put(oldest[:], node.blob); err != nil {
- db.lock.RUnlock()
- return err
- }
- // If we exceeded the ideal batch size, commit and reset
- if batch.ValueSize() >= ethdb.IdealBatchSize {
- if err := batch.Write(); err != nil {
- log.Error("Failed to write flush list to disk", "err", err)
- db.lock.RUnlock()
- return err
- }
- batch.Reset()
- }
- // Iterate to the next flush item, or abort if the size cap was achieved. Size
- // is the total size, including both the useful cached data (hash -> blob), as
- // well as the flushlist metadata (2*hash). When flushing items from the cache,
- // we need to reduce both.
- size -= common.StorageSize(3*common.HashLength + len(node.blob))
- oldest = node.flushNext
- }
- // Flush out any remainder data from the last batch
- if err := batch.Write(); err != nil {
- log.Error("Failed to write flush list to disk", "err", err)
- db.lock.RUnlock()
- return err
- }
- db.lock.RUnlock()
- // Write successful, clear out the flushed data
- db.lock.Lock()
- defer db.lock.Unlock()
- if flushPreimages {
- db.preimages = make(map[common.Hash][]byte)
- db.preimagesSize = 0
- }
- for db.oldest != oldest {
- node := db.nodes[db.oldest]
- delete(db.nodes, db.oldest)
- db.oldest = node.flushNext
- db.nodesSize -= common.StorageSize(common.HashLength + len(node.blob))
- }
- if db.oldest != (common.Hash{}) {
- db.nodes[db.oldest].flushPrev = common.Hash{}
- }
- db.flushnodes += uint64(nodes - len(db.nodes))
- db.flushsize += storage - db.nodesSize
- db.flushtime += time.Since(start)
- memcacheFlushTimeTimer.Update(time.Since(start))
- memcacheFlushSizeMeter.Mark(int64(storage - db.nodesSize))
- memcacheFlushNodesMeter.Mark(int64(nodes - len(db.nodes)))
- log.Debug("Persisted nodes from memory database", "nodes", nodes-len(db.nodes), "size", storage-db.nodesSize, "time", time.Since(start),
- "flushnodes", db.flushnodes, "flushsize", db.flushsize, "flushtime", db.flushtime, "livenodes", len(db.nodes), "livesize", db.nodesSize)
- return nil
- }
- // Commit iterates over all the children of a particular node, writes them out
- // to disk, forcefully tearing down all references in both directions.
- //
- // As a side effect, all pre-images accumulated up to this point are also written.
- func (db *Database) Commit(node common.Hash, report bool) error {
- // Create a database batch to flush persistent data out. It is important that
- // outside code doesn't see an inconsistent state (referenced data removed from
- // memory cache during commit but not yet in persistent storage). This is ensured
- // by only uncaching existing data when the database write finalizes.
- db.lock.RLock()
- start := time.Now()
- batch := db.diskdb.NewBatch()
- // Move all of the accumulated preimages into a write batch
- for hash, preimage := range db.preimages {
- if err := batch.Put(db.secureKey(hash[:]), preimage); err != nil {
- log.Error("Failed to commit preimage from trie database", "err", err)
- db.lock.RUnlock()
- return err
- }
- if batch.ValueSize() > ethdb.IdealBatchSize {
- if err := batch.Write(); err != nil {
- return err
- }
- batch.Reset()
- }
- }
- // Move the trie itself into the batch, flushing if enough data is accumulated
- nodes, storage := len(db.nodes), db.nodesSize
- if err := db.commit(node, batch); err != nil {
- log.Error("Failed to commit trie from trie database", "err", err)
- db.lock.RUnlock()
- return err
- }
- // Write batch ready, unlock for readers during persistence
- if err := batch.Write(); err != nil {
- log.Error("Failed to write trie to disk", "err", err)
- db.lock.RUnlock()
- return err
- }
- db.lock.RUnlock()
- // Write successful, clear out the flushed data
- db.lock.Lock()
- defer db.lock.Unlock()
- db.preimages = make(map[common.Hash][]byte)
- db.preimagesSize = 0
- db.uncache(node)
- memcacheCommitTimeTimer.Update(time.Since(start))
- memcacheCommitSizeMeter.Mark(int64(storage - db.nodesSize))
- memcacheCommitNodesMeter.Mark(int64(nodes - len(db.nodes)))
- logger := log.Info
- if !report {
- logger = log.Debug
- }
- logger("Persisted trie from memory database", "nodes", nodes-len(db.nodes)+int(db.flushnodes), "size", storage-db.nodesSize+db.flushsize, "time", time.Since(start)+db.flushtime,
- "gcnodes", db.gcnodes, "gcsize", db.gcsize, "gctime", db.gctime, "livenodes", len(db.nodes), "livesize", db.nodesSize)
- // Reset the garbage collection statistics
- db.gcnodes, db.gcsize, db.gctime = 0, 0, 0
- db.flushnodes, db.flushsize, db.flushtime = 0, 0, 0
- return nil
- }
- // commit is the private locked version of Commit.
- func (db *Database) commit(hash common.Hash, batch ethdb.Batch) error {
- // If the node does not exist, it's a previously committed node
- node, ok := db.nodes[hash]
- if !ok {
- return nil
- }
- for child := range node.children {
- if err := db.commit(child, batch); err != nil {
- return err
- }
- }
- if err := batch.Put(hash[:], node.blob); err != nil {
- return err
- }
- // If we've reached an optimal batch size, commit and start over
- if batch.ValueSize() >= ethdb.IdealBatchSize {
- if err := batch.Write(); err != nil {
- return err
- }
- batch.Reset()
- }
- return nil
- }
- // uncache is the post-processing step of a commit operation where the already
- // persisted trie is removed from the cache. The reason behind the two-phase
- // commit is to ensure consistent data availability while moving from memory
- // to disk.
- func (db *Database) uncache(hash common.Hash) {
- // If the node does not exist, we're done on this path
- node, ok := db.nodes[hash]
- if !ok {
- return
- }
- // Node still exists, remove it from the flush-list
- if hash == db.oldest {
- db.oldest = node.flushNext
- } else {
- db.nodes[node.flushPrev].flushNext = node.flushNext
- db.nodes[node.flushNext].flushPrev = node.flushPrev
- }
- // Uncache the node's subtries and remove the node itself too
- for child := range node.children {
- db.uncache(child)
- }
- delete(db.nodes, hash)
- db.nodesSize -= common.StorageSize(common.HashLength + len(node.blob))
- }
- // Size returns the current storage size of the memory cache in front of the
- // persistent database layer.
- func (db *Database) Size() (common.StorageSize, common.StorageSize) {
- db.lock.RLock()
- defer db.lock.RUnlock()
- // db.nodesSize only contains the useful data in the cache, but when reporting
- // the total memory consumption, the maintenance metadata is also needed to be
- // counted. For every useful node, we track 2 extra hashes as the flushlist.
- var flushlistSize = common.StorageSize(len(db.nodes) * 2 * common.HashLength)
- return db.nodesSize + flushlistSize, db.preimagesSize
- }
|