kaddb.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package kademlia
  17. import (
  18. "encoding/json"
  19. "fmt"
  20. "io/ioutil"
  21. "os"
  22. "sync"
  23. "time"
  24. "github.com/ethereum/go-ethereum/log"
  25. )
  26. type NodeData interface {
  27. json.Marshaler
  28. json.Unmarshaler
  29. }
  30. // allow inactive peers under
  31. type NodeRecord struct {
  32. Addr Address // address of node
  33. Url string // Url, used to connect to node
  34. After time.Time // next call after time
  35. Seen time.Time // last connected at time
  36. Meta *json.RawMessage // arbitrary metadata saved for a peer
  37. node Node
  38. }
  39. func (self *NodeRecord) setSeen() {
  40. t := time.Now()
  41. self.Seen = t
  42. self.After = t
  43. }
  44. func (self *NodeRecord) String() string {
  45. return fmt.Sprintf("<%v>", self.Addr)
  46. }
  47. // persisted node record database ()
  48. type KadDb struct {
  49. Address Address
  50. Nodes [][]*NodeRecord
  51. index map[Address]*NodeRecord
  52. cursors []int
  53. lock sync.RWMutex
  54. purgeInterval time.Duration
  55. initialRetryInterval time.Duration
  56. connRetryExp int
  57. }
  58. func newKadDb(addr Address, params *KadParams) *KadDb {
  59. return &KadDb{
  60. Address: addr,
  61. Nodes: make([][]*NodeRecord, params.MaxProx+1), // overwritten by load
  62. cursors: make([]int, params.MaxProx+1),
  63. index: make(map[Address]*NodeRecord),
  64. purgeInterval: params.PurgeInterval,
  65. initialRetryInterval: params.InitialRetryInterval,
  66. connRetryExp: params.ConnRetryExp,
  67. }
  68. }
  69. func (self *KadDb) findOrCreate(index int, a Address, url string) *NodeRecord {
  70. defer self.lock.Unlock()
  71. self.lock.Lock()
  72. record, found := self.index[a]
  73. if !found {
  74. record = &NodeRecord{
  75. Addr: a,
  76. Url: url,
  77. }
  78. log.Info(fmt.Sprintf("add new record %v to kaddb", record))
  79. // insert in kaddb
  80. self.index[a] = record
  81. self.Nodes[index] = append(self.Nodes[index], record)
  82. } else {
  83. log.Info(fmt.Sprintf("found record %v in kaddb", record))
  84. }
  85. // update last seen time
  86. record.setSeen()
  87. // update with url in case IP/port changes
  88. record.Url = url
  89. return record
  90. }
  91. // add adds node records to kaddb (persisted node record db)
  92. func (self *KadDb) add(nrs []*NodeRecord, proximityBin func(Address) int) {
  93. defer self.lock.Unlock()
  94. self.lock.Lock()
  95. var n int
  96. var nodes []*NodeRecord
  97. for _, node := range nrs {
  98. _, found := self.index[node.Addr]
  99. if !found && node.Addr != self.Address {
  100. node.setSeen()
  101. self.index[node.Addr] = node
  102. index := proximityBin(node.Addr)
  103. dbcursor := self.cursors[index]
  104. nodes = self.Nodes[index]
  105. // this is inefficient for allocation, need to just append then shift
  106. newnodes := make([]*NodeRecord, len(nodes)+1)
  107. copy(newnodes[:], nodes[:dbcursor])
  108. newnodes[dbcursor] = node
  109. copy(newnodes[dbcursor+1:], nodes[dbcursor:])
  110. log.Trace(fmt.Sprintf("new nodes: %v, nodes: %v", newnodes, nodes))
  111. self.Nodes[index] = newnodes
  112. n++
  113. }
  114. }
  115. if n > 0 {
  116. log.Debug(fmt.Sprintf("%d/%d node records (new/known)", n, len(nrs)))
  117. }
  118. }
  119. /*
  120. next return one node record with the highest priority for desired
  121. connection.
  122. This is used to pick candidates for live nodes that are most wanted for
  123. a higly connected low centrality network structure for Swarm which best suits
  124. for a Kademlia-style routing.
  125. * Starting as naive node with empty db, this implements Kademlia bootstrapping
  126. * As a mature node, it fills short lines. All on demand.
  127. The candidate is chosen using the following strategy:
  128. We check for missing online nodes in the buckets for 1 upto Max BucketSize rounds.
  129. On each round we proceed from the low to high proximity order buckets.
  130. If the number of active nodes (=connected peers) is < rounds, then start looking
  131. for a known candidate. To determine if there is a candidate to recommend the
  132. kaddb node record database row corresponding to the bucket is checked.
  133. If the row cursor is on position i, the ith element in the row is chosen.
  134. If the record is scheduled not to be retried before NOW, the next element is taken.
  135. If the record is scheduled to be retried, it is set as checked, scheduled for
  136. checking and is returned. The time of the next check is in X (duration) such that
  137. X = ConnRetryExp * delta where delta is the time past since the last check and
  138. ConnRetryExp is constant obsoletion factor. (Note that when node records are added
  139. from peer messages, they are marked as checked and placed at the cursor, ie.
  140. given priority over older entries). Entries which were checked more than
  141. purgeInterval ago are deleted from the kaddb row. If no candidate is found after
  142. a full round of checking the next bucket up is considered. If no candidate is
  143. found when we reach the maximum-proximity bucket, the next round starts.
  144. node record a is more favoured to b a > b iff a is a passive node (record of
  145. offline past peer)
  146. |proxBin(a)| < |proxBin(b)|
  147. || (proxBin(a) < proxBin(b) && |proxBin(a)| == |proxBin(b)|)
  148. || (proxBin(a) == proxBin(b) && lastChecked(a) < lastChecked(b))
  149. The second argument returned names the first missing slot found
  150. */
  151. func (self *KadDb) findBest(maxBinSize int, binSize func(int) int) (node *NodeRecord, need bool, proxLimit int) {
  152. // return nil, proxLimit indicates that all buckets are filled
  153. defer self.lock.Unlock()
  154. self.lock.Lock()
  155. var interval time.Duration
  156. var found bool
  157. var purge []bool
  158. var delta time.Duration
  159. var cursor int
  160. var count int
  161. var after time.Time
  162. // iterate over columns maximum bucketsize times
  163. for rounds := 1; rounds <= maxBinSize; rounds++ {
  164. ROUND:
  165. // iterate over rows from PO 0 upto MaxProx
  166. for po, dbrow := range self.Nodes {
  167. // if row has rounds connected peers, then take the next
  168. if binSize(po) >= rounds {
  169. continue ROUND
  170. }
  171. if !need {
  172. // set proxlimit to the PO where the first missing slot is found
  173. proxLimit = po
  174. need = true
  175. }
  176. purge = make([]bool, len(dbrow))
  177. // there is a missing slot - finding a node to connect to
  178. // select a node record from the relavant kaddb row (of identical prox order)
  179. ROW:
  180. for cursor = self.cursors[po]; !found && count < len(dbrow); cursor = (cursor + 1) % len(dbrow) {
  181. count++
  182. node = dbrow[cursor]
  183. // skip already connected nodes
  184. if node.node != nil {
  185. log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d/%d) already connected", node.Addr, po, cursor, len(dbrow)))
  186. continue ROW
  187. }
  188. // if node is scheduled to connect
  189. if node.After.After(time.Now()) {
  190. log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) skipped. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After))
  191. continue ROW
  192. }
  193. delta = time.Since(node.Seen)
  194. if delta < self.initialRetryInterval {
  195. delta = self.initialRetryInterval
  196. }
  197. if delta > self.purgeInterval {
  198. // remove node
  199. purge[cursor] = true
  200. log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) unreachable since %v. Removed", node.Addr, po, cursor, node.Seen))
  201. continue ROW
  202. }
  203. log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) ready to be tried. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After))
  204. // scheduling next check
  205. interval = delta * time.Duration(self.connRetryExp)
  206. after = time.Now().Add(interval)
  207. log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) selected as candidate connection %v. seen at %v (%v ago), selectable since %v, retry after %v (in %v)", node.Addr, po, cursor, rounds, node.Seen, delta, node.After, after, interval))
  208. node.After = after
  209. found = true
  210. } // ROW
  211. self.cursors[po] = cursor
  212. self.delete(po, purge)
  213. if found {
  214. return node, need, proxLimit
  215. }
  216. } // ROUND
  217. } // ROUNDS
  218. return nil, need, proxLimit
  219. }
  220. // deletes the noderecords of a kaddb row corresponding to the indexes
  221. // caller must hold the dblock
  222. // the call is unsafe, no index checks
  223. func (self *KadDb) delete(row int, purge []bool) {
  224. var nodes []*NodeRecord
  225. dbrow := self.Nodes[row]
  226. for i, del := range purge {
  227. if i == self.cursors[row] {
  228. //reset cursor
  229. self.cursors[row] = len(nodes)
  230. }
  231. // delete the entry to be purged
  232. if del {
  233. delete(self.index, dbrow[i].Addr)
  234. continue
  235. }
  236. // otherwise append to new list
  237. nodes = append(nodes, dbrow[i])
  238. }
  239. self.Nodes[row] = nodes
  240. }
  241. // save persists kaddb on disk (written to file on path in json format.
  242. func (self *KadDb) save(path string, cb func(*NodeRecord, Node)) error {
  243. defer self.lock.Unlock()
  244. self.lock.Lock()
  245. var n int
  246. for _, b := range self.Nodes {
  247. for _, node := range b {
  248. n++
  249. node.After = time.Now()
  250. node.Seen = time.Now()
  251. if cb != nil {
  252. cb(node, node.node)
  253. }
  254. }
  255. }
  256. data, err := json.MarshalIndent(self, "", " ")
  257. if err != nil {
  258. return err
  259. }
  260. err = ioutil.WriteFile(path, data, os.ModePerm)
  261. if err != nil {
  262. log.Warn(fmt.Sprintf("unable to save kaddb with %v nodes to %v: %v", n, path, err))
  263. } else {
  264. log.Info(fmt.Sprintf("saved kaddb with %v nodes to %v", n, path))
  265. }
  266. return err
  267. }
  268. // Load(path) loads the node record database (kaddb) from file on path.
  269. func (self *KadDb) load(path string, cb func(*NodeRecord, Node) error) (err error) {
  270. defer self.lock.Unlock()
  271. self.lock.Lock()
  272. var data []byte
  273. data, err = ioutil.ReadFile(path)
  274. if err != nil {
  275. return
  276. }
  277. err = json.Unmarshal(data, self)
  278. if err != nil {
  279. return
  280. }
  281. var n int
  282. var purge []bool
  283. for po, b := range self.Nodes {
  284. purge = make([]bool, len(b))
  285. ROW:
  286. for i, node := range b {
  287. if cb != nil {
  288. err = cb(node, node.node)
  289. if err != nil {
  290. purge[i] = true
  291. continue ROW
  292. }
  293. }
  294. n++
  295. if node.After.IsZero() {
  296. node.After = time.Now()
  297. }
  298. self.index[node.Addr] = node
  299. }
  300. self.delete(po, purge)
  301. }
  302. log.Info(fmt.Sprintf("loaded kaddb with %v nodes from %v", n, path))
  303. return
  304. }
  305. // accessor for KAD offline db count
  306. func (self *KadDb) count() int {
  307. defer self.lock.Unlock()
  308. self.lock.Lock()
  309. return len(self.index)
  310. }