types.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package storage
  17. import (
  18. "bytes"
  19. "crypto"
  20. "fmt"
  21. "hash"
  22. "io"
  23. "sync"
  24. "github.com/ethereum/go-ethereum/bmt"
  25. "github.com/ethereum/go-ethereum/common"
  26. "github.com/ethereum/go-ethereum/crypto/sha3"
  27. )
  28. type Hasher func() hash.Hash
  29. type SwarmHasher func() SwarmHash
  30. // Peer is the recorded as Source on the chunk
  31. // should probably not be here? but network should wrap chunk object
  32. type Peer interface{}
  33. type Key []byte
  34. func (x Key) Size() uint {
  35. return uint(len(x))
  36. }
  37. func (x Key) isEqual(y Key) bool {
  38. return bytes.Equal(x, y)
  39. }
  40. func (h Key) bits(i, j uint) uint {
  41. ii := i >> 3
  42. jj := i & 7
  43. if ii >= h.Size() {
  44. return 0
  45. }
  46. if jj+j <= 8 {
  47. return uint((h[ii] >> jj) & ((1 << j) - 1))
  48. }
  49. res := uint(h[ii] >> jj)
  50. jj = 8 - jj
  51. j -= jj
  52. for j != 0 {
  53. ii++
  54. if j < 8 {
  55. res += uint(h[ii]&((1<<j)-1)) << jj
  56. return res
  57. }
  58. res += uint(h[ii]) << jj
  59. jj += 8
  60. j -= 8
  61. }
  62. return res
  63. }
  64. func IsZeroKey(key Key) bool {
  65. return len(key) == 0 || bytes.Equal(key, ZeroKey)
  66. }
  67. var ZeroKey = Key(common.Hash{}.Bytes())
  68. func MakeHashFunc(hash string) SwarmHasher {
  69. switch hash {
  70. case "SHA256":
  71. return func() SwarmHash { return &HashWithLength{crypto.SHA256.New()} }
  72. case "SHA3":
  73. return func() SwarmHash { return &HashWithLength{sha3.NewKeccak256()} }
  74. case "BMT":
  75. return func() SwarmHash {
  76. hasher := sha3.NewKeccak256
  77. pool := bmt.NewTreePool(hasher, bmt.DefaultSegmentCount, bmt.DefaultPoolSize)
  78. return bmt.New(pool)
  79. }
  80. }
  81. return nil
  82. }
  83. func (key Key) Hex() string {
  84. return fmt.Sprintf("%064x", []byte(key[:]))
  85. }
  86. func (key Key) Log() string {
  87. if len(key[:]) < 4 {
  88. return fmt.Sprintf("%x", []byte(key[:]))
  89. }
  90. return fmt.Sprintf("%08x", []byte(key[:4]))
  91. }
  92. func (key Key) String() string {
  93. return fmt.Sprintf("%064x", []byte(key)[:])
  94. }
  95. func (key Key) MarshalJSON() (out []byte, err error) {
  96. return []byte(`"` + key.String() + `"`), nil
  97. }
  98. func (key *Key) UnmarshalJSON(value []byte) error {
  99. s := string(value)
  100. *key = make([]byte, 32)
  101. h := common.Hex2Bytes(s[1 : len(s)-1])
  102. copy(*key, h)
  103. return nil
  104. }
  105. // each chunk when first requested opens a record associated with the request
  106. // next time a request for the same chunk arrives, this record is updated
  107. // this request status keeps track of the request ID-s as well as the requesting
  108. // peers and has a channel that is closed when the chunk is retrieved. Multiple
  109. // local callers can wait on this channel (or combined with a timeout, block with a
  110. // select).
  111. type RequestStatus struct {
  112. Key Key
  113. Source Peer
  114. C chan bool
  115. Requesters map[uint64][]interface{}
  116. }
  117. func newRequestStatus(key Key) *RequestStatus {
  118. return &RequestStatus{
  119. Key: key,
  120. Requesters: make(map[uint64][]interface{}),
  121. C: make(chan bool),
  122. }
  123. }
  124. // Chunk also serves as a request object passed to ChunkStores
  125. // in case it is a retrieval request, Data is nil and Size is 0
  126. // Note that Size is not the size of the data chunk, which is Data.Size()
  127. // but the size of the subtree encoded in the chunk
  128. // 0 if request, to be supplied by the dpa
  129. type Chunk struct {
  130. Key Key // always
  131. SData []byte // nil if request, to be supplied by dpa
  132. Size int64 // size of the data covered by the subtree encoded in this chunk
  133. Source Peer // peer
  134. C chan bool // to signal data delivery by the dpa
  135. Req *RequestStatus // request Status needed by netStore
  136. wg *sync.WaitGroup // wg to synchronize
  137. dbStored chan bool // never remove a chunk from memStore before it is written to dbStore
  138. }
  139. func NewChunk(key Key, rs *RequestStatus) *Chunk {
  140. return &Chunk{Key: key, Req: rs}
  141. }
  142. /*
  143. The ChunkStore interface is implemented by :
  144. - MemStore: a memory cache
  145. - DbStore: local disk/db store
  146. - LocalStore: a combination (sequence of) memStore and dbStore
  147. - NetStore: cloud storage abstraction layer
  148. - DPA: local requests for swarm storage and retrieval
  149. */
  150. type ChunkStore interface {
  151. Put(*Chunk) // effectively there is no error even if there is an error
  152. Get(Key) (*Chunk, error)
  153. Close()
  154. }
  155. /*
  156. Chunker is the interface to a component that is responsible for disassembling and assembling larger data and indended to be the dependency of a DPA storage system with fixed maximum chunksize.
  157. It relies on the underlying chunking model.
  158. When calling Split, the caller provides a channel (chan *Chunk) on which it receives chunks to store. The DPA delegates to storage layers (implementing ChunkStore interface).
  159. Split returns an error channel, which the caller can monitor.
  160. After getting notified that all the data has been split (the error channel is closed), the caller can safely read or save the root key. Optionally it times out if not all chunks get stored or not the entire stream of data has been processed. By inspecting the errc channel the caller can check if any explicit errors (typically IO read/write failures) occurred during splitting.
  161. When calling Join with a root key, the caller gets returned a seekable lazy reader. The caller again provides a channel on which the caller receives placeholder chunks with missing data. The DPA is supposed to forward this to the chunk stores and notify the chunker if the data has been delivered (i.e. retrieved from memory cache, disk-persisted db or cloud based swarm delivery). As the seekable reader is used, the chunker then puts these together the relevant parts on demand.
  162. */
  163. type Splitter interface {
  164. /*
  165. When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Key), the root hash of the entire content will fill this once processing finishes.
  166. New chunks to store are coming to caller via the chunk storage channel, which the caller provides.
  167. wg is a Waitgroup (can be nil) that can be used to block until the local storage finishes
  168. The caller gets returned an error channel, if an error is encountered during splitting, it is fed to errC error channel.
  169. A closed error signals process completion at which point the key can be considered final if there were no errors.
  170. */
  171. Split(io.Reader, int64, chan *Chunk, *sync.WaitGroup, *sync.WaitGroup) (Key, error)
  172. /* This is the first step in making files mutable (not chunks)..
  173. Append allows adding more data chunks to the end of the already existsing file.
  174. The key for the root chunk is supplied to load the respective tree.
  175. Rest of the parameters behave like Split.
  176. */
  177. Append(Key, io.Reader, chan *Chunk, *sync.WaitGroup, *sync.WaitGroup) (Key, error)
  178. }
  179. type Joiner interface {
  180. /*
  181. Join reconstructs original content based on a root key.
  182. When joining, the caller gets returned a Lazy SectionReader, which is
  183. seekable and implements on-demand fetching of chunks as and where it is read.
  184. New chunks to retrieve are coming to caller via the Chunk channel, which the caller provides.
  185. If an error is encountered during joining, it appears as a reader error.
  186. The SectionReader.
  187. As a result, partial reads from a document are possible even if other parts
  188. are corrupt or lost.
  189. The chunks are not meant to be validated by the chunker when joining. This
  190. is because it is left to the DPA to decide which sources are trusted.
  191. */
  192. Join(key Key, chunkC chan *Chunk) LazySectionReader
  193. }
  194. type Chunker interface {
  195. Joiner
  196. Splitter
  197. // returns the key length
  198. // KeySize() int64
  199. }
  200. // Size, Seek, Read, ReadAt
  201. type LazySectionReader interface {
  202. Size(chan bool) (int64, error)
  203. io.Seeker
  204. io.Reader
  205. io.ReaderAt
  206. }
  207. type LazyTestSectionReader struct {
  208. *io.SectionReader
  209. }
  210. func (self *LazyTestSectionReader) Size(chan bool) (int64, error) {
  211. return self.SectionReader.Size(), nil
  212. }