kademlia.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package kademlia
  17. import (
  18. "fmt"
  19. "sort"
  20. "strings"
  21. "sync"
  22. "time"
  23. "github.com/ethereum/go-ethereum/log"
  24. "github.com/ethereum/go-ethereum/metrics"
  25. )
  26. //metrics variables
  27. //For metrics, we want to count how many times peers are added/removed
  28. //at a certain index. Thus we do that with an array of counters with
  29. //entry for each index
  30. var (
  31. bucketAddIndexCount []metrics.Counter
  32. bucketRmIndexCount []metrics.Counter
  33. )
  34. const (
  35. bucketSize = 4
  36. proxBinSize = 2
  37. maxProx = 8
  38. connRetryExp = 2
  39. maxPeers = 100
  40. )
  41. var (
  42. purgeInterval = 42 * time.Hour
  43. initialRetryInterval = 42 * time.Millisecond
  44. maxIdleInterval = 42 * 1000 * time.Millisecond
  45. // maxIdleInterval = 42 * 10 0 * time.Millisecond
  46. )
  47. type KadParams struct {
  48. // adjustable parameters
  49. MaxProx int
  50. ProxBinSize int
  51. BucketSize int
  52. PurgeInterval time.Duration
  53. InitialRetryInterval time.Duration
  54. MaxIdleInterval time.Duration
  55. ConnRetryExp int
  56. }
  57. func NewDefaultKadParams() *KadParams {
  58. return &KadParams{
  59. MaxProx: maxProx,
  60. ProxBinSize: proxBinSize,
  61. BucketSize: bucketSize,
  62. PurgeInterval: purgeInterval,
  63. InitialRetryInterval: initialRetryInterval,
  64. MaxIdleInterval: maxIdleInterval,
  65. ConnRetryExp: connRetryExp,
  66. }
  67. }
  68. // Kademlia is a table of active nodes
  69. type Kademlia struct {
  70. addr Address // immutable baseaddress of the table
  71. *KadParams // Kademlia configuration parameters
  72. proxLimit int // state, the PO of the first row of the most proximate bin
  73. proxSize int // state, the number of peers in the most proximate bin
  74. count int // number of active peers (w live connection)
  75. buckets [][]Node // the actual bins
  76. db *KadDb // kaddb, node record database
  77. lock sync.RWMutex // mutex to access buckets
  78. }
  79. type Node interface {
  80. Addr() Address
  81. Url() string
  82. LastActive() time.Time
  83. Drop()
  84. }
  85. // public constructor
  86. // add is the base address of the table
  87. // params is KadParams configuration
  88. func New(addr Address, params *KadParams) *Kademlia {
  89. buckets := make([][]Node, params.MaxProx+1)
  90. kad := &Kademlia{
  91. addr: addr,
  92. KadParams: params,
  93. buckets: buckets,
  94. db: newKadDb(addr, params),
  95. }
  96. kad.initMetricsVariables()
  97. return kad
  98. }
  99. // accessor for KAD base address
  100. func (self *Kademlia) Addr() Address {
  101. return self.addr
  102. }
  103. // accessor for KAD active node count
  104. func (self *Kademlia) Count() int {
  105. defer self.lock.Unlock()
  106. self.lock.Lock()
  107. return self.count
  108. }
  109. // accessor for KAD active node count
  110. func (self *Kademlia) DBCount() int {
  111. return self.db.count()
  112. }
  113. // On is the entry point called when a new nodes is added
  114. // unsafe in that node is not checked to be already active node (to be called once)
  115. func (self *Kademlia) On(node Node, cb func(*NodeRecord, Node) error) (err error) {
  116. log.Debug(fmt.Sprintf("%v", self))
  117. defer self.lock.Unlock()
  118. self.lock.Lock()
  119. index := self.proximityBin(node.Addr())
  120. record := self.db.findOrCreate(index, node.Addr(), node.Url())
  121. if cb != nil {
  122. err = cb(record, node)
  123. log.Trace(fmt.Sprintf("cb(%v, %v) ->%v", record, node, err))
  124. if err != nil {
  125. return fmt.Errorf("unable to add node %v, callback error: %v", node.Addr(), err)
  126. }
  127. log.Debug(fmt.Sprintf("add node record %v with node %v", record, node))
  128. }
  129. // insert in kademlia table of active nodes
  130. bucket := self.buckets[index]
  131. // if bucket is full insertion replaces the worst node
  132. // TODO: give priority to peers with active traffic
  133. if len(bucket) < self.BucketSize { // >= allows us to add peers beyond the bucketsize limitation
  134. self.buckets[index] = append(bucket, node)
  135. bucketAddIndexCount[index].Inc(1)
  136. log.Debug(fmt.Sprintf("add node %v to table", node))
  137. self.setProxLimit(index, true)
  138. record.node = node
  139. self.count++
  140. return nil
  141. }
  142. // always rotate peers
  143. idle := self.MaxIdleInterval
  144. var pos int
  145. var replaced Node
  146. for i, p := range bucket {
  147. idleInt := time.Since(p.LastActive())
  148. if idleInt > idle {
  149. idle = idleInt
  150. pos = i
  151. replaced = p
  152. }
  153. }
  154. if replaced == nil {
  155. log.Debug(fmt.Sprintf("all peers wanted, PO%03d bucket full", index))
  156. return fmt.Errorf("bucket full")
  157. }
  158. log.Debug(fmt.Sprintf("node %v replaced by %v (idle for %v > %v)", replaced, node, idle, self.MaxIdleInterval))
  159. replaced.Drop()
  160. // actually replace in the row. When off(node) is called, the peer is no longer in the row
  161. bucket[pos] = node
  162. // there is no change in bucket cardinalities so no prox limit adjustment is needed
  163. record.node = node
  164. self.count++
  165. return nil
  166. }
  167. // Off is the called when a node is taken offline (from the protocol main loop exit)
  168. func (self *Kademlia) Off(node Node, cb func(*NodeRecord, Node)) (err error) {
  169. self.lock.Lock()
  170. defer self.lock.Unlock()
  171. index := self.proximityBin(node.Addr())
  172. bucketRmIndexCount[index].Inc(1)
  173. bucket := self.buckets[index]
  174. for i := 0; i < len(bucket); i++ {
  175. if node.Addr() == bucket[i].Addr() {
  176. self.buckets[index] = append(bucket[:i], bucket[(i+1):]...)
  177. self.setProxLimit(index, false)
  178. break
  179. }
  180. }
  181. record := self.db.index[node.Addr()]
  182. // callback on remove
  183. if cb != nil {
  184. cb(record, record.node)
  185. }
  186. record.node = nil
  187. self.count--
  188. log.Debug(fmt.Sprintf("remove node %v from table, population now is %v", node, self.count))
  189. return
  190. }
  191. // proxLimit is dynamically adjusted so that
  192. // 1) there is no empty buckets in bin < proxLimit and
  193. // 2) the sum of all items are the minimum possible but higher than ProxBinSize
  194. // adjust Prox (proxLimit and proxSize after an insertion/removal of nodes)
  195. // caller holds the lock
  196. func (self *Kademlia) setProxLimit(r int, on bool) {
  197. // if the change is outside the core (PO lower)
  198. // and the change does not leave a bucket empty then
  199. // no adjustment needed
  200. if r < self.proxLimit && len(self.buckets[r]) > 0 {
  201. return
  202. }
  203. // if on=a node was added, then r must be within prox limit so increment cardinality
  204. if on {
  205. self.proxSize++
  206. curr := len(self.buckets[self.proxLimit])
  207. // if now core is big enough without the furthest bucket, then contract
  208. // this can result in more than one bucket change
  209. for self.proxSize >= self.ProxBinSize+curr && curr > 0 {
  210. self.proxSize -= curr
  211. self.proxLimit++
  212. curr = len(self.buckets[self.proxLimit])
  213. log.Trace(fmt.Sprintf("proxbin contraction (size: %v, limit: %v, bin: %v)", self.proxSize, self.proxLimit, r))
  214. }
  215. return
  216. }
  217. // otherwise
  218. if r >= self.proxLimit {
  219. self.proxSize--
  220. }
  221. // expand core by lowering prox limit until hit zero or cover the empty bucket or reached target cardinality
  222. for (self.proxSize < self.ProxBinSize || r < self.proxLimit) &&
  223. self.proxLimit > 0 {
  224. //
  225. self.proxLimit--
  226. self.proxSize += len(self.buckets[self.proxLimit])
  227. log.Trace(fmt.Sprintf("proxbin expansion (size: %v, limit: %v, bin: %v)", self.proxSize, self.proxLimit, r))
  228. }
  229. }
  230. /*
  231. returns the list of nodes belonging to the same proximity bin
  232. as the target. The most proximate bin will be the union of the bins between
  233. proxLimit and MaxProx.
  234. */
  235. func (self *Kademlia) FindClosest(target Address, max int) []Node {
  236. self.lock.Lock()
  237. defer self.lock.Unlock()
  238. r := nodesByDistance{
  239. target: target,
  240. }
  241. po := self.proximityBin(target)
  242. index := po
  243. step := 1
  244. log.Trace(fmt.Sprintf("serving %v nodes at %v (PO%02d)", max, index, po))
  245. // if max is set to 0, just want a full bucket, dynamic number
  246. min := max
  247. // set limit to max
  248. limit := max
  249. if max == 0 {
  250. min = 1
  251. limit = maxPeers
  252. }
  253. var n int
  254. for index >= 0 {
  255. // add entire bucket
  256. for _, p := range self.buckets[index] {
  257. r.push(p, limit)
  258. n++
  259. }
  260. // terminate if index reached the bottom or enough peers > min
  261. log.Trace(fmt.Sprintf("add %v -> %v (PO%02d, PO%03d)", len(self.buckets[index]), n, index, po))
  262. if n >= min && (step < 0 || max == 0) {
  263. break
  264. }
  265. // reach top most non-empty PO bucket, turn around
  266. if index == self.MaxProx {
  267. index = po
  268. step = -1
  269. }
  270. index += step
  271. }
  272. log.Trace(fmt.Sprintf("serve %d (<=%d) nodes for target lookup %v (PO%03d)", n, max, target, po))
  273. return r.nodes
  274. }
  275. func (self *Kademlia) Suggest() (*NodeRecord, bool, int) {
  276. defer self.lock.RUnlock()
  277. self.lock.RLock()
  278. return self.db.findBest(self.BucketSize, func(i int) int { return len(self.buckets[i]) })
  279. }
  280. // adds node records to kaddb (persisted node record db)
  281. func (self *Kademlia) Add(nrs []*NodeRecord) {
  282. self.db.add(nrs, self.proximityBin)
  283. }
  284. // nodesByDistance is a list of nodes, ordered by distance to target.
  285. type nodesByDistance struct {
  286. nodes []Node
  287. target Address
  288. }
  289. func sortedByDistanceTo(target Address, slice []Node) bool {
  290. var last Address
  291. for i, node := range slice {
  292. if i > 0 {
  293. if target.ProxCmp(node.Addr(), last) < 0 {
  294. return false
  295. }
  296. }
  297. last = node.Addr()
  298. }
  299. return true
  300. }
  301. // push(node, max) adds the given node to the list, keeping the total size
  302. // below max elements.
  303. func (h *nodesByDistance) push(node Node, max int) {
  304. // returns the firt index ix such that func(i) returns true
  305. ix := sort.Search(len(h.nodes), func(i int) bool {
  306. return h.target.ProxCmp(h.nodes[i].Addr(), node.Addr()) >= 0
  307. })
  308. if len(h.nodes) < max {
  309. h.nodes = append(h.nodes, node)
  310. }
  311. if ix < len(h.nodes) {
  312. copy(h.nodes[ix+1:], h.nodes[ix:])
  313. h.nodes[ix] = node
  314. }
  315. }
  316. /*
  317. Taking the proximity order relative to a fix point x classifies the points in
  318. the space (n byte long byte sequences) into bins. Items in each are at
  319. most half as distant from x as items in the previous bin. Given a sample of
  320. uniformly distributed items (a hash function over arbitrary sequence) the
  321. proximity scale maps onto series of subsets with cardinalities on a negative
  322. exponential scale.
  323. It also has the property that any two item belonging to the same bin are at
  324. most half as distant from each other as they are from x.
  325. If we think of random sample of items in the bins as connections in a network of interconnected nodes than relative proximity can serve as the basis for local
  326. decisions for graph traversal where the task is to find a route between two
  327. points. Since in every hop, the finite distance halves, there is
  328. a guaranteed constant maximum limit on the number of hops needed to reach one
  329. node from the other.
  330. */
  331. func (self *Kademlia) proximityBin(other Address) (ret int) {
  332. ret = proximity(self.addr, other)
  333. if ret > self.MaxProx {
  334. ret = self.MaxProx
  335. }
  336. return
  337. }
  338. // provides keyrange for chunk db iteration
  339. func (self *Kademlia) KeyRange(other Address) (start, stop Address) {
  340. defer self.lock.RUnlock()
  341. self.lock.RLock()
  342. return KeyRange(self.addr, other, self.proxLimit)
  343. }
  344. // save persists kaddb on disk (written to file on path in json format.
  345. func (self *Kademlia) Save(path string, cb func(*NodeRecord, Node)) error {
  346. return self.db.save(path, cb)
  347. }
  348. // Load(path) loads the node record database (kaddb) from file on path.
  349. func (self *Kademlia) Load(path string, cb func(*NodeRecord, Node) error) (err error) {
  350. return self.db.load(path, cb)
  351. }
  352. // kademlia table + kaddb table displayed with ascii
  353. func (self *Kademlia) String() string {
  354. defer self.lock.RUnlock()
  355. self.lock.RLock()
  356. defer self.db.lock.RUnlock()
  357. self.db.lock.RLock()
  358. var rows []string
  359. rows = append(rows, "=========================================================================")
  360. rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %v", time.Now().UTC().Format(time.UnixDate), self.addr.String()[:6]))
  361. rows = append(rows, fmt.Sprintf("population: %d (%d), proxLimit: %d, proxSize: %d", self.count, len(self.db.index), self.proxLimit, self.proxSize))
  362. rows = append(rows, fmt.Sprintf("MaxProx: %d, ProxBinSize: %d, BucketSize: %d", self.MaxProx, self.ProxBinSize, self.BucketSize))
  363. for i, bucket := range self.buckets {
  364. if i == self.proxLimit {
  365. rows = append(rows, fmt.Sprintf("============ PROX LIMIT: %d ==========================================", i))
  366. }
  367. row := []string{fmt.Sprintf("%03d", i), fmt.Sprintf("%2d", len(bucket))}
  368. var k int
  369. c := self.db.cursors[i]
  370. for ; k < len(bucket); k++ {
  371. p := bucket[(c+k)%len(bucket)]
  372. row = append(row, p.Addr().String()[:6])
  373. if k == 4 {
  374. break
  375. }
  376. }
  377. for ; k < 4; k++ {
  378. row = append(row, " ")
  379. }
  380. row = append(row, fmt.Sprintf("| %2d %2d", len(self.db.Nodes[i]), self.db.cursors[i]))
  381. for j, p := range self.db.Nodes[i] {
  382. row = append(row, p.Addr.String()[:6])
  383. if j == 3 {
  384. break
  385. }
  386. }
  387. rows = append(rows, strings.Join(row, " "))
  388. if i == self.MaxProx {
  389. }
  390. }
  391. rows = append(rows, "=========================================================================")
  392. return strings.Join(rows, "\n")
  393. }
  394. //We have to build up the array of counters for each index
  395. func (self *Kademlia) initMetricsVariables() {
  396. //create the arrays
  397. bucketAddIndexCount = make([]metrics.Counter, self.MaxProx+1)
  398. bucketRmIndexCount = make([]metrics.Counter, self.MaxProx+1)
  399. //at each index create a metrics counter
  400. for i := 0; i < (self.KadParams.MaxProx + 1); i++ {
  401. bucketAddIndexCount[i] = metrics.NewRegisteredCounter(fmt.Sprintf("network.kademlia.bucket.add.%d.index", i), nil)
  402. bucketRmIndexCount[i] = metrics.NewRegisteredCounter(fmt.Sprintf("network.kademlia.bucket.rm.%d.index", i), nil)
  403. }
  404. }