pyramid.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package storage
  17. import (
  18. "encoding/binary"
  19. "errors"
  20. "io"
  21. "sync"
  22. "time"
  23. )
  24. /*
  25. The main idea of a pyramid chunker is to process the input data without knowing the entire size apriori.
  26. For this to be achieved, the chunker tree is built from the ground up until the data is exhausted.
  27. This opens up new aveneus such as easy append and other sort of modifications to the tree thereby avoiding
  28. duplication of data chunks.
  29. Below is an example of a two level chunks tree. The leaf chunks are called data chunks and all the above
  30. chunks are called tree chunks. The tree chunk above data chunks is level 0 and so on until it reaches
  31. the root tree chunk.
  32. T10 <- Tree chunk lvl1
  33. |
  34. __________________________|_____________________________
  35. / | | \
  36. / | \ \
  37. __T00__ ___T01__ ___T02__ ___T03__ <- Tree chunks lvl 0
  38. / / \ / / \ / / \ / / \
  39. / / \ / / \ / / \ / / \
  40. D1 D2 ... D128 D1 D2 ... D128 D1 D2 ... D128 D1 D2 ... D128 <- Data Chunks
  41. The split function continuously read the data and creates data chunks and send them to storage.
  42. When certain no of data chunks are created (defaultBranches), a signal is sent to create a tree
  43. entry. When the level 0 tree entries reaches certain threshold (defaultBranches), another signal
  44. is sent to a tree entry one level up.. and so on... until only the data is exhausted AND only one
  45. tree entry is present in certain level. The key of tree entry is given out as the rootKey of the file.
  46. */
  47. var (
  48. errLoadingTreeRootChunk = errors.New("LoadTree Error: Could not load root chunk")
  49. errLoadingTreeChunk = errors.New("LoadTree Error: Could not load chunk")
  50. )
  51. const (
  52. ChunkProcessors = 8
  53. DefaultBranches int64 = 128
  54. splitTimeout = time.Minute * 5
  55. )
  56. const (
  57. DataChunk = 0
  58. TreeChunk = 1
  59. )
  60. type ChunkerParams struct {
  61. Branches int64
  62. Hash string
  63. }
  64. func NewChunkerParams() *ChunkerParams {
  65. return &ChunkerParams{
  66. Branches: DefaultBranches,
  67. Hash: SHA3Hash,
  68. }
  69. }
  70. // Entry to create a tree node
  71. type TreeEntry struct {
  72. level int
  73. branchCount int64
  74. subtreeSize uint64
  75. chunk []byte
  76. key []byte
  77. index int // used in append to indicate the index of existing tree entry
  78. updatePending bool // indicates if the entry is loaded from existing tree
  79. }
  80. func NewTreeEntry(pyramid *PyramidChunker) *TreeEntry {
  81. return &TreeEntry{
  82. level: 0,
  83. branchCount: 0,
  84. subtreeSize: 0,
  85. chunk: make([]byte, pyramid.chunkSize+8),
  86. key: make([]byte, pyramid.hashSize),
  87. index: 0,
  88. updatePending: false,
  89. }
  90. }
  91. // Used by the hash processor to create a data/tree chunk and send to storage
  92. type chunkJob struct {
  93. key Key
  94. chunk []byte
  95. size int64
  96. parentWg *sync.WaitGroup
  97. chunkType int // used to identify the tree related chunks for debugging
  98. chunkLvl int // leaf-1 is level 0 and goes upwards until it reaches root
  99. }
  100. type PyramidChunker struct {
  101. hashFunc SwarmHasher
  102. chunkSize int64
  103. hashSize int64
  104. branches int64
  105. workerCount int64
  106. workerLock sync.RWMutex
  107. }
  108. func NewPyramidChunker(params *ChunkerParams) (self *PyramidChunker) {
  109. self = &PyramidChunker{}
  110. self.hashFunc = MakeHashFunc(params.Hash)
  111. self.branches = params.Branches
  112. self.hashSize = int64(self.hashFunc().Size())
  113. self.chunkSize = self.hashSize * self.branches
  114. self.workerCount = 0
  115. return
  116. }
  117. func (self *PyramidChunker) Join(key Key, chunkC chan *Chunk) LazySectionReader {
  118. return &LazyChunkReader{
  119. key: key,
  120. chunkC: chunkC,
  121. chunkSize: self.chunkSize,
  122. branches: self.branches,
  123. hashSize: self.hashSize,
  124. }
  125. }
  126. func (self *PyramidChunker) incrementWorkerCount() {
  127. self.workerLock.Lock()
  128. defer self.workerLock.Unlock()
  129. self.workerCount += 1
  130. }
  131. func (self *PyramidChunker) getWorkerCount() int64 {
  132. self.workerLock.Lock()
  133. defer self.workerLock.Unlock()
  134. return self.workerCount
  135. }
  136. func (self *PyramidChunker) decrementWorkerCount() {
  137. self.workerLock.Lock()
  138. defer self.workerLock.Unlock()
  139. self.workerCount -= 1
  140. }
  141. func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, storageWG, processorWG *sync.WaitGroup) (Key, error) {
  142. jobC := make(chan *chunkJob, 2*ChunkProcessors)
  143. wg := &sync.WaitGroup{}
  144. errC := make(chan error)
  145. quitC := make(chan bool)
  146. rootKey := make([]byte, self.hashSize)
  147. chunkLevel := make([][]*TreeEntry, self.branches)
  148. wg.Add(1)
  149. go self.prepareChunks(false, chunkLevel, data, rootKey, quitC, wg, jobC, processorWG, chunkC, errC, storageWG)
  150. // closes internal error channel if all subprocesses in the workgroup finished
  151. go func() {
  152. // waiting for all chunks to finish
  153. wg.Wait()
  154. // if storage waitgroup is non-nil, we wait for storage to finish too
  155. if storageWG != nil {
  156. storageWG.Wait()
  157. }
  158. //We close errC here because this is passed down to 8 parallel routines underneath.
  159. // if a error happens in one of them.. that particular routine raises error...
  160. // once they all complete successfully, the control comes back and we can safely close this here.
  161. close(errC)
  162. }()
  163. defer close(quitC)
  164. select {
  165. case err := <-errC:
  166. if err != nil {
  167. return nil, err
  168. }
  169. case <-time.NewTimer(splitTimeout).C:
  170. }
  171. return rootKey, nil
  172. }
  173. func (self *PyramidChunker) Append(key Key, data io.Reader, chunkC chan *Chunk, storageWG, processorWG *sync.WaitGroup) (Key, error) {
  174. quitC := make(chan bool)
  175. rootKey := make([]byte, self.hashSize)
  176. chunkLevel := make([][]*TreeEntry, self.branches)
  177. // Load the right most unfinished tree chunks in every level
  178. self.loadTree(chunkLevel, key, chunkC, quitC)
  179. jobC := make(chan *chunkJob, 2*ChunkProcessors)
  180. wg := &sync.WaitGroup{}
  181. errC := make(chan error)
  182. wg.Add(1)
  183. go self.prepareChunks(true, chunkLevel, data, rootKey, quitC, wg, jobC, processorWG, chunkC, errC, storageWG)
  184. // closes internal error channel if all subprocesses in the workgroup finished
  185. go func() {
  186. // waiting for all chunks to finish
  187. wg.Wait()
  188. // if storage waitgroup is non-nil, we wait for storage to finish too
  189. if storageWG != nil {
  190. storageWG.Wait()
  191. }
  192. close(errC)
  193. }()
  194. defer close(quitC)
  195. select {
  196. case err := <-errC:
  197. if err != nil {
  198. return nil, err
  199. }
  200. case <-time.NewTimer(splitTimeout).C:
  201. }
  202. return rootKey, nil
  203. }
  204. func (self *PyramidChunker) processor(id int64, jobC chan *chunkJob, chunkC chan *Chunk, errC chan error, quitC chan bool, swg, wwg *sync.WaitGroup) {
  205. defer self.decrementWorkerCount()
  206. hasher := self.hashFunc()
  207. if wwg != nil {
  208. defer wwg.Done()
  209. }
  210. for {
  211. select {
  212. case job, ok := <-jobC:
  213. if !ok {
  214. return
  215. }
  216. self.processChunk(id, hasher, job, chunkC, swg)
  217. case <-quitC:
  218. return
  219. }
  220. }
  221. }
  222. func (self *PyramidChunker) processChunk(id int64, hasher SwarmHash, job *chunkJob, chunkC chan *Chunk, swg *sync.WaitGroup) {
  223. hasher.ResetWithLength(job.chunk[:8]) // 8 bytes of length
  224. hasher.Write(job.chunk[8:]) // minus 8 []byte length
  225. h := hasher.Sum(nil)
  226. newChunk := &Chunk{
  227. Key: h,
  228. SData: job.chunk,
  229. Size: job.size,
  230. wg: swg,
  231. }
  232. // report hash of this chunk one level up (keys corresponds to the proper subslice of the parent chunk)
  233. copy(job.key, h)
  234. // send off new chunk to storage
  235. if chunkC != nil {
  236. if swg != nil {
  237. swg.Add(1)
  238. }
  239. }
  240. job.parentWg.Done()
  241. if chunkC != nil {
  242. chunkC <- newChunk
  243. }
  244. }
  245. func (self *PyramidChunker) loadTree(chunkLevel [][]*TreeEntry, key Key, chunkC chan *Chunk, quitC chan bool) error {
  246. // Get the root chunk to get the total size
  247. chunk := retrieve(key, chunkC, quitC)
  248. if chunk == nil {
  249. return errLoadingTreeRootChunk
  250. }
  251. //if data size is less than a chunk... add a parent with update as pending
  252. if chunk.Size <= self.chunkSize {
  253. newEntry := &TreeEntry{
  254. level: 0,
  255. branchCount: 1,
  256. subtreeSize: uint64(chunk.Size),
  257. chunk: make([]byte, self.chunkSize+8),
  258. key: make([]byte, self.hashSize),
  259. index: 0,
  260. updatePending: true,
  261. }
  262. copy(newEntry.chunk[8:], chunk.Key)
  263. chunkLevel[0] = append(chunkLevel[0], newEntry)
  264. return nil
  265. }
  266. var treeSize int64
  267. var depth int
  268. treeSize = self.chunkSize
  269. for ; treeSize < chunk.Size; treeSize *= self.branches {
  270. depth++
  271. }
  272. // Add the root chunk entry
  273. branchCount := int64(len(chunk.SData)-8) / self.hashSize
  274. newEntry := &TreeEntry{
  275. level: depth - 1,
  276. branchCount: branchCount,
  277. subtreeSize: uint64(chunk.Size),
  278. chunk: chunk.SData,
  279. key: key,
  280. index: 0,
  281. updatePending: true,
  282. }
  283. chunkLevel[depth-1] = append(chunkLevel[depth-1], newEntry)
  284. // Add the rest of the tree
  285. for lvl := depth - 1; lvl >= 1; lvl-- {
  286. //TODO(jmozah): instead of loading finished branches and then trim in the end,
  287. //avoid loading them in the first place
  288. for _, ent := range chunkLevel[lvl] {
  289. branchCount = int64(len(ent.chunk)-8) / self.hashSize
  290. for i := int64(0); i < branchCount; i++ {
  291. key := ent.chunk[8+(i*self.hashSize) : 8+((i+1)*self.hashSize)]
  292. newChunk := retrieve(key, chunkC, quitC)
  293. if newChunk == nil {
  294. return errLoadingTreeChunk
  295. }
  296. bewBranchCount := int64(len(newChunk.SData)-8) / self.hashSize
  297. newEntry := &TreeEntry{
  298. level: lvl - 1,
  299. branchCount: bewBranchCount,
  300. subtreeSize: uint64(newChunk.Size),
  301. chunk: newChunk.SData,
  302. key: key,
  303. index: 0,
  304. updatePending: true,
  305. }
  306. chunkLevel[lvl-1] = append(chunkLevel[lvl-1], newEntry)
  307. }
  308. // We need to get only the right most unfinished branch.. so trim all finished branches
  309. if int64(len(chunkLevel[lvl-1])) >= self.branches {
  310. chunkLevel[lvl-1] = nil
  311. }
  312. }
  313. }
  314. return nil
  315. }
  316. func (self *PyramidChunker) prepareChunks(isAppend bool, chunkLevel [][]*TreeEntry, data io.Reader, rootKey []byte, quitC chan bool, wg *sync.WaitGroup, jobC chan *chunkJob, processorWG *sync.WaitGroup, chunkC chan *Chunk, errC chan error, storageWG *sync.WaitGroup) {
  317. defer wg.Done()
  318. chunkWG := &sync.WaitGroup{}
  319. totalDataSize := 0
  320. // processorWG keeps track of workers spawned for hashing chunks
  321. if processorWG != nil {
  322. processorWG.Add(1)
  323. }
  324. self.incrementWorkerCount()
  325. go self.processor(self.workerCount, jobC, chunkC, errC, quitC, storageWG, processorWG)
  326. parent := NewTreeEntry(self)
  327. var unFinishedChunk *Chunk
  328. if isAppend && len(chunkLevel[0]) != 0 {
  329. lastIndex := len(chunkLevel[0]) - 1
  330. ent := chunkLevel[0][lastIndex]
  331. if ent.branchCount < self.branches {
  332. parent = &TreeEntry{
  333. level: 0,
  334. branchCount: ent.branchCount,
  335. subtreeSize: ent.subtreeSize,
  336. chunk: ent.chunk,
  337. key: ent.key,
  338. index: lastIndex,
  339. updatePending: true,
  340. }
  341. lastBranch := parent.branchCount - 1
  342. lastKey := parent.chunk[8+lastBranch*self.hashSize : 8+(lastBranch+1)*self.hashSize]
  343. unFinishedChunk = retrieve(lastKey, chunkC, quitC)
  344. if unFinishedChunk.Size < self.chunkSize {
  345. parent.subtreeSize = parent.subtreeSize - uint64(unFinishedChunk.Size)
  346. parent.branchCount = parent.branchCount - 1
  347. } else {
  348. unFinishedChunk = nil
  349. }
  350. }
  351. }
  352. for index := 0; ; index++ {
  353. var n int
  354. var err error
  355. chunkData := make([]byte, self.chunkSize+8)
  356. if unFinishedChunk != nil {
  357. copy(chunkData, unFinishedChunk.SData)
  358. n, err = data.Read(chunkData[8+unFinishedChunk.Size:])
  359. n += int(unFinishedChunk.Size)
  360. unFinishedChunk = nil
  361. } else {
  362. n, err = data.Read(chunkData[8:])
  363. }
  364. totalDataSize += n
  365. if err != nil {
  366. if err == io.EOF || err == io.ErrUnexpectedEOF {
  367. if parent.branchCount == 1 {
  368. // Data is exactly one chunk.. pick the last chunk key as root
  369. chunkWG.Wait()
  370. lastChunksKey := parent.chunk[8 : 8+self.hashSize]
  371. copy(rootKey, lastChunksKey)
  372. break
  373. }
  374. } else {
  375. close(quitC)
  376. break
  377. }
  378. }
  379. // Data ended in chunk boundary.. just signal to start bulding tree
  380. if n == 0 {
  381. self.buildTree(isAppend, chunkLevel, parent, chunkWG, jobC, quitC, true, rootKey)
  382. break
  383. } else {
  384. pkey := self.enqueueDataChunk(chunkData, uint64(n), parent, chunkWG, jobC, quitC)
  385. // update tree related parent data structures
  386. parent.subtreeSize += uint64(n)
  387. parent.branchCount++
  388. // Data got exhausted... signal to send any parent tree related chunks
  389. if int64(n) < self.chunkSize {
  390. // only one data chunk .. so dont add any parent chunk
  391. if parent.branchCount <= 1 {
  392. chunkWG.Wait()
  393. copy(rootKey, pkey)
  394. break
  395. }
  396. self.buildTree(isAppend, chunkLevel, parent, chunkWG, jobC, quitC, true, rootKey)
  397. break
  398. }
  399. if parent.branchCount == self.branches {
  400. self.buildTree(isAppend, chunkLevel, parent, chunkWG, jobC, quitC, false, rootKey)
  401. parent = NewTreeEntry(self)
  402. }
  403. }
  404. workers := self.getWorkerCount()
  405. if int64(len(jobC)) > workers && workers < ChunkProcessors {
  406. if processorWG != nil {
  407. processorWG.Add(1)
  408. }
  409. self.incrementWorkerCount()
  410. go self.processor(self.workerCount, jobC, chunkC, errC, quitC, storageWG, processorWG)
  411. }
  412. }
  413. }
  414. func (self *PyramidChunker) buildTree(isAppend bool, chunkLevel [][]*TreeEntry, ent *TreeEntry, chunkWG *sync.WaitGroup, jobC chan *chunkJob, quitC chan bool, last bool, rootKey []byte) {
  415. chunkWG.Wait()
  416. self.enqueueTreeChunk(chunkLevel, ent, chunkWG, jobC, quitC, last)
  417. compress := false
  418. endLvl := self.branches
  419. for lvl := int64(0); lvl < self.branches; lvl++ {
  420. lvlCount := int64(len(chunkLevel[lvl]))
  421. if lvlCount >= self.branches {
  422. endLvl = lvl + 1
  423. compress = true
  424. break
  425. }
  426. }
  427. if !compress && !last {
  428. return
  429. }
  430. // Wait for all the keys to be processed before compressing the tree
  431. chunkWG.Wait()
  432. for lvl := int64(ent.level); lvl < endLvl; lvl++ {
  433. lvlCount := int64(len(chunkLevel[lvl]))
  434. if lvlCount == 1 && last {
  435. copy(rootKey, chunkLevel[lvl][0].key)
  436. return
  437. }
  438. for startCount := int64(0); startCount < lvlCount; startCount += self.branches {
  439. endCount := startCount + self.branches
  440. if endCount > lvlCount {
  441. endCount = lvlCount
  442. }
  443. var nextLvlCount int64
  444. var tempEntry *TreeEntry
  445. if len(chunkLevel[lvl+1]) > 0 {
  446. nextLvlCount = int64(len(chunkLevel[lvl+1]) - 1)
  447. tempEntry = chunkLevel[lvl+1][nextLvlCount]
  448. }
  449. if isAppend && tempEntry != nil && tempEntry.updatePending {
  450. updateEntry := &TreeEntry{
  451. level: int(lvl + 1),
  452. branchCount: 0,
  453. subtreeSize: 0,
  454. chunk: make([]byte, self.chunkSize+8),
  455. key: make([]byte, self.hashSize),
  456. index: int(nextLvlCount),
  457. updatePending: true,
  458. }
  459. for index := int64(0); index < lvlCount; index++ {
  460. updateEntry.branchCount++
  461. updateEntry.subtreeSize += chunkLevel[lvl][index].subtreeSize
  462. copy(updateEntry.chunk[8+(index*self.hashSize):8+((index+1)*self.hashSize)], chunkLevel[lvl][index].key[:self.hashSize])
  463. }
  464. self.enqueueTreeChunk(chunkLevel, updateEntry, chunkWG, jobC, quitC, last)
  465. } else {
  466. noOfBranches := endCount - startCount
  467. newEntry := &TreeEntry{
  468. level: int(lvl + 1),
  469. branchCount: noOfBranches,
  470. subtreeSize: 0,
  471. chunk: make([]byte, (noOfBranches*self.hashSize)+8),
  472. key: make([]byte, self.hashSize),
  473. index: int(nextLvlCount),
  474. updatePending: false,
  475. }
  476. index := int64(0)
  477. for i := startCount; i < endCount; i++ {
  478. entry := chunkLevel[lvl][i]
  479. newEntry.subtreeSize += entry.subtreeSize
  480. copy(newEntry.chunk[8+(index*self.hashSize):8+((index+1)*self.hashSize)], entry.key[:self.hashSize])
  481. index++
  482. }
  483. self.enqueueTreeChunk(chunkLevel, newEntry, chunkWG, jobC, quitC, last)
  484. }
  485. }
  486. if !isAppend {
  487. chunkWG.Wait()
  488. if compress {
  489. chunkLevel[lvl] = nil
  490. }
  491. }
  492. }
  493. }
  494. func (self *PyramidChunker) enqueueTreeChunk(chunkLevel [][]*TreeEntry, ent *TreeEntry, chunkWG *sync.WaitGroup, jobC chan *chunkJob, quitC chan bool, last bool) {
  495. if ent != nil {
  496. // wait for data chunks to get over before processing the tree chunk
  497. if last {
  498. chunkWG.Wait()
  499. }
  500. binary.LittleEndian.PutUint64(ent.chunk[:8], ent.subtreeSize)
  501. ent.key = make([]byte, self.hashSize)
  502. chunkWG.Add(1)
  503. select {
  504. case jobC <- &chunkJob{ent.key, ent.chunk[:ent.branchCount*self.hashSize+8], int64(ent.subtreeSize), chunkWG, TreeChunk, 0}:
  505. case <-quitC:
  506. }
  507. // Update or append based on weather it is a new entry or being reused
  508. if ent.updatePending {
  509. chunkWG.Wait()
  510. chunkLevel[ent.level][ent.index] = ent
  511. } else {
  512. chunkLevel[ent.level] = append(chunkLevel[ent.level], ent)
  513. }
  514. }
  515. }
  516. func (self *PyramidChunker) enqueueDataChunk(chunkData []byte, size uint64, parent *TreeEntry, chunkWG *sync.WaitGroup, jobC chan *chunkJob, quitC chan bool) Key {
  517. binary.LittleEndian.PutUint64(chunkData[:8], size)
  518. pkey := parent.chunk[8+parent.branchCount*self.hashSize : 8+(parent.branchCount+1)*self.hashSize]
  519. chunkWG.Add(1)
  520. select {
  521. case jobC <- &chunkJob{pkey, chunkData[:size+8], int64(size), chunkWG, DataChunk, -1}:
  522. case <-quitC:
  523. }
  524. return pkey
  525. }