distributor_test.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package light implements on-demand retrieval capable state and chain objects
  17. // for the Ethereum Light Client.
  18. package les
  19. import (
  20. "math/rand"
  21. "sync"
  22. "testing"
  23. "time"
  24. )
  25. type testDistReq struct {
  26. cost, procTime, order uint64
  27. canSendTo map[*testDistPeer]struct{}
  28. }
  29. func (r *testDistReq) getCost(dp distPeer) uint64 {
  30. return r.cost
  31. }
  32. func (r *testDistReq) canSend(dp distPeer) bool {
  33. _, ok := r.canSendTo[dp.(*testDistPeer)]
  34. return ok
  35. }
  36. func (r *testDistReq) request(dp distPeer) func() {
  37. return func() { dp.(*testDistPeer).send(r) }
  38. }
  39. type testDistPeer struct {
  40. sent []*testDistReq
  41. sumCost uint64
  42. lock sync.RWMutex
  43. }
  44. func (p *testDistPeer) send(r *testDistReq) {
  45. p.lock.Lock()
  46. defer p.lock.Unlock()
  47. p.sent = append(p.sent, r)
  48. p.sumCost += r.cost
  49. }
  50. func (p *testDistPeer) worker(t *testing.T, checkOrder bool, stop chan struct{}) {
  51. var last uint64
  52. for {
  53. wait := time.Millisecond
  54. p.lock.Lock()
  55. if len(p.sent) > 0 {
  56. rq := p.sent[0]
  57. wait = time.Duration(rq.procTime)
  58. p.sumCost -= rq.cost
  59. if checkOrder {
  60. if rq.order <= last {
  61. t.Errorf("Requests processed in wrong order")
  62. }
  63. last = rq.order
  64. }
  65. p.sent = p.sent[1:]
  66. }
  67. p.lock.Unlock()
  68. select {
  69. case <-stop:
  70. return
  71. case <-time.After(wait):
  72. }
  73. }
  74. }
  75. const (
  76. testDistBufLimit = 10000000
  77. testDistMaxCost = 1000000
  78. testDistPeerCount = 5
  79. testDistReqCount = 50000
  80. testDistMaxResendCount = 3
  81. )
  82. func (p *testDistPeer) waitBefore(cost uint64) (time.Duration, float64) {
  83. p.lock.RLock()
  84. sumCost := p.sumCost + cost
  85. p.lock.RUnlock()
  86. if sumCost < testDistBufLimit {
  87. return 0, float64(testDistBufLimit-sumCost) / float64(testDistBufLimit)
  88. }
  89. return time.Duration(sumCost - testDistBufLimit), 0
  90. }
  91. func (p *testDistPeer) canQueue() bool {
  92. return true
  93. }
  94. func (p *testDistPeer) queueSend(f func()) {
  95. f()
  96. }
  97. func TestRequestDistributor(t *testing.T) {
  98. testRequestDistributor(t, false)
  99. }
  100. func TestRequestDistributorResend(t *testing.T) {
  101. testRequestDistributor(t, true)
  102. }
  103. func testRequestDistributor(t *testing.T, resend bool) {
  104. stop := make(chan struct{})
  105. defer close(stop)
  106. dist := newRequestDistributor(nil, stop)
  107. var peers [testDistPeerCount]*testDistPeer
  108. for i := range peers {
  109. peers[i] = &testDistPeer{}
  110. go peers[i].worker(t, !resend, stop)
  111. dist.registerTestPeer(peers[i])
  112. }
  113. var wg sync.WaitGroup
  114. for i := 1; i <= testDistReqCount; i++ {
  115. cost := uint64(rand.Int63n(testDistMaxCost))
  116. procTime := uint64(rand.Int63n(int64(cost + 1)))
  117. rq := &testDistReq{
  118. cost: cost,
  119. procTime: procTime,
  120. order: uint64(i),
  121. canSendTo: make(map[*testDistPeer]struct{}),
  122. }
  123. for _, peer := range peers {
  124. if rand.Intn(2) != 0 {
  125. rq.canSendTo[peer] = struct{}{}
  126. }
  127. }
  128. wg.Add(1)
  129. req := &distReq{
  130. getCost: rq.getCost,
  131. canSend: rq.canSend,
  132. request: rq.request,
  133. }
  134. chn := dist.queue(req)
  135. go func() {
  136. cnt := 1
  137. if resend && len(rq.canSendTo) != 0 {
  138. cnt = rand.Intn(testDistMaxResendCount) + 1
  139. }
  140. for i := 0; i < cnt; i++ {
  141. if i != 0 {
  142. chn = dist.queue(req)
  143. }
  144. p := <-chn
  145. if p == nil {
  146. if len(rq.canSendTo) != 0 {
  147. t.Errorf("Request that could have been sent was dropped")
  148. }
  149. } else {
  150. peer := p.(*testDistPeer)
  151. if _, ok := rq.canSendTo[peer]; !ok {
  152. t.Errorf("Request sent to wrong peer")
  153. }
  154. }
  155. }
  156. wg.Done()
  157. }()
  158. if rand.Intn(1000) == 0 {
  159. time.Sleep(time.Duration(rand.Intn(5000000)))
  160. }
  161. }
  162. wg.Wait()
  163. }