syncdb_test.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package network
  17. import (
  18. "bytes"
  19. "fmt"
  20. "io/ioutil"
  21. "os"
  22. "path/filepath"
  23. "testing"
  24. "time"
  25. "github.com/ethereum/go-ethereum/crypto"
  26. "github.com/ethereum/go-ethereum/log"
  27. "github.com/ethereum/go-ethereum/swarm/storage"
  28. )
  29. func init() {
  30. log.Root().SetHandler(log.LvlFilterHandler(log.LvlCrit, log.StreamHandler(os.Stderr, log.TerminalFormat(false))))
  31. }
  32. type testSyncDb struct {
  33. *syncDb
  34. c int
  35. t *testing.T
  36. fromDb chan bool
  37. delivered [][]byte
  38. sent []int
  39. dbdir string
  40. at int
  41. }
  42. func newTestSyncDb(priority, bufferSize, batchSize int, dbdir string, t *testing.T) *testSyncDb {
  43. if len(dbdir) == 0 {
  44. tmp, err := ioutil.TempDir(os.TempDir(), "syncdb-test")
  45. if err != nil {
  46. t.Fatalf("unable to create temporary direcory %v: %v", tmp, err)
  47. }
  48. dbdir = tmp
  49. }
  50. db, err := storage.NewLDBDatabase(filepath.Join(dbdir, "requestdb"))
  51. if err != nil {
  52. t.Fatalf("unable to create db: %v", err)
  53. }
  54. self := &testSyncDb{
  55. fromDb: make(chan bool),
  56. dbdir: dbdir,
  57. t: t,
  58. }
  59. h := crypto.Keccak256Hash([]byte{0})
  60. key := storage.Key(h[:])
  61. self.syncDb = newSyncDb(db, key, uint(priority), uint(bufferSize), uint(batchSize), self.deliver)
  62. // kick off db iterator right away, if no items on db this will allow
  63. // reading from the buffer
  64. return self
  65. }
  66. func (self *testSyncDb) close() {
  67. self.db.Close()
  68. os.RemoveAll(self.dbdir)
  69. }
  70. func (self *testSyncDb) push(n int) {
  71. for i := 0; i < n; i++ {
  72. self.buffer <- storage.Key(crypto.Keccak256([]byte{byte(self.c)}))
  73. self.sent = append(self.sent, self.c)
  74. self.c++
  75. }
  76. log.Debug(fmt.Sprintf("pushed %v requests", n))
  77. }
  78. func (self *testSyncDb) draindb() {
  79. it := self.db.NewIterator()
  80. defer it.Release()
  81. for {
  82. it.Seek(self.start)
  83. if !it.Valid() {
  84. return
  85. }
  86. k := it.Key()
  87. if len(k) == 0 || k[0] == 1 {
  88. return
  89. }
  90. it.Release()
  91. it = self.db.NewIterator()
  92. }
  93. }
  94. func (self *testSyncDb) deliver(req interface{}, quit chan bool) bool {
  95. _, db := req.(*syncDbEntry)
  96. key, _, _, _, err := parseRequest(req)
  97. if err != nil {
  98. self.t.Fatalf("unexpected error of key %v: %v", key, err)
  99. }
  100. self.delivered = append(self.delivered, key)
  101. select {
  102. case self.fromDb <- db:
  103. return true
  104. case <-quit:
  105. return false
  106. }
  107. }
  108. func (self *testSyncDb) expect(n int, db bool) {
  109. var ok bool
  110. // for n items
  111. for i := 0; i < n; i++ {
  112. ok = <-self.fromDb
  113. if self.at+1 > len(self.delivered) {
  114. self.t.Fatalf("expected %v, got %v", self.at+1, len(self.delivered))
  115. }
  116. if len(self.sent) > self.at && !bytes.Equal(crypto.Keccak256([]byte{byte(self.sent[self.at])}), self.delivered[self.at]) {
  117. self.t.Fatalf("expected delivery %v/%v/%v to be hash of %v, from db: %v = %v", i, n, self.at, self.sent[self.at], ok, db)
  118. log.Debug(fmt.Sprintf("%v/%v/%v to be hash of %v, from db: %v = %v", i, n, self.at, self.sent[self.at], ok, db))
  119. }
  120. if !ok && db {
  121. self.t.Fatalf("expected delivery %v/%v/%v from db", i, n, self.at)
  122. }
  123. if ok && !db {
  124. self.t.Fatalf("expected delivery %v/%v/%v from cache", i, n, self.at)
  125. }
  126. self.at++
  127. }
  128. }
  129. func TestSyncDb(t *testing.T) {
  130. t.Skip("fails randomly on all platforms")
  131. priority := High
  132. bufferSize := 5
  133. batchSize := 2 * bufferSize
  134. s := newTestSyncDb(priority, bufferSize, batchSize, "", t)
  135. defer s.close()
  136. defer s.stop()
  137. s.dbRead(false, 0, s.deliver)
  138. s.draindb()
  139. s.push(4)
  140. s.expect(1, false)
  141. // 3 in buffer
  142. time.Sleep(100 * time.Millisecond)
  143. s.push(3)
  144. // push over limit
  145. s.expect(1, false)
  146. // one popped from the buffer, then contention detected
  147. s.expect(4, true)
  148. s.push(4)
  149. s.expect(5, true)
  150. // depleted db, switch back to buffer
  151. s.draindb()
  152. s.push(5)
  153. s.expect(4, false)
  154. s.push(3)
  155. s.expect(4, false)
  156. // buffer depleted
  157. time.Sleep(100 * time.Millisecond)
  158. s.push(6)
  159. s.expect(1, false)
  160. // push into buffer full, switch to db
  161. s.expect(5, true)
  162. s.draindb()
  163. s.push(1)
  164. s.expect(1, false)
  165. }
  166. func TestSaveSyncDb(t *testing.T) {
  167. amount := 30
  168. priority := High
  169. bufferSize := amount
  170. batchSize := 10
  171. s := newTestSyncDb(priority, bufferSize, batchSize, "", t)
  172. go s.dbRead(false, 0, s.deliver)
  173. s.push(amount)
  174. s.stop()
  175. s.db.Close()
  176. s = newTestSyncDb(priority, bufferSize, batchSize, s.dbdir, t)
  177. go s.dbRead(false, 0, s.deliver)
  178. s.expect(amount, true)
  179. for i, key := range s.delivered {
  180. expKey := crypto.Keccak256([]byte{byte(i)})
  181. if !bytes.Equal(key, expKey) {
  182. t.Fatalf("delivery %v expected to be key %x, got %x", i, expKey, key)
  183. }
  184. }
  185. s.push(amount)
  186. s.expect(amount, false)
  187. for i := amount; i < 2*amount; i++ {
  188. key := s.delivered[i]
  189. expKey := crypto.Keccak256([]byte{byte(i - amount)})
  190. if !bytes.Equal(key, expKey) {
  191. t.Fatalf("delivery %v expected to be key %x, got %x", i, expKey, key)
  192. }
  193. }
  194. s.stop()
  195. s.db.Close()
  196. s = newTestSyncDb(priority, bufferSize, batchSize, s.dbdir, t)
  197. defer s.close()
  198. defer s.stop()
  199. go s.dbRead(false, 0, s.deliver)
  200. s.push(1)
  201. s.expect(1, false)
  202. }