123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955 |
- // Copyright 2016 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package discv5
- import (
- "bytes"
- "encoding/binary"
- "fmt"
- "math"
- "math/rand"
- "sort"
- "time"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/common/mclock"
- "github.com/ethereum/go-ethereum/crypto"
- "github.com/ethereum/go-ethereum/log"
- )
- const (
- ticketTimeBucketLen = time.Minute
- timeWindow = 10 // * ticketTimeBucketLen
- wantTicketsInWindow = 10
- collectFrequency = time.Second * 30
- registerFrequency = time.Second * 60
- maxCollectDebt = 10
- maxRegisterDebt = 5
- keepTicketConst = time.Minute * 10
- keepTicketExp = time.Minute * 5
- targetWaitTime = time.Minute * 10
- topicQueryTimeout = time.Second * 5
- topicQueryResend = time.Minute
- // topic radius detection
- maxRadius = 0xffffffffffffffff
- radiusTC = time.Minute * 20
- radiusBucketsPerBit = 8
- minSlope = 1
- minPeakSize = 40
- maxNoAdjust = 20
- lookupWidth = 8
- minRightSum = 20
- searchForceQuery = 4
- )
- // timeBucket represents absolute monotonic time in minutes.
- // It is used as the index into the per-topic ticket buckets.
- type timeBucket int
- type ticket struct {
- topics []Topic
- regTime []mclock.AbsTime // Per-topic local absolute time when the ticket can be used.
- // The serial number that was issued by the server.
- serial uint32
- // Used by registrar, tracks absolute time when the ticket was created.
- issueTime mclock.AbsTime
- // Fields used only by registrants
- node *Node // the registrar node that signed this ticket
- refCnt int // tracks number of topics that will be registered using this ticket
- pong []byte // encoded pong packet signed by the registrar
- }
- // ticketRef refers to a single topic in a ticket.
- type ticketRef struct {
- t *ticket
- idx int // index of the topic in t.topics and t.regTime
- }
- func (ref ticketRef) topic() Topic {
- return ref.t.topics[ref.idx]
- }
- func (ref ticketRef) topicRegTime() mclock.AbsTime {
- return ref.t.regTime[ref.idx]
- }
- func pongToTicket(localTime mclock.AbsTime, topics []Topic, node *Node, p *ingressPacket) (*ticket, error) {
- wps := p.data.(*pong).WaitPeriods
- if len(topics) != len(wps) {
- return nil, fmt.Errorf("bad wait period list: got %d values, want %d", len(topics), len(wps))
- }
- if rlpHash(topics) != p.data.(*pong).TopicHash {
- return nil, fmt.Errorf("bad topic hash")
- }
- t := &ticket{
- issueTime: localTime,
- node: node,
- topics: topics,
- pong: p.rawData,
- regTime: make([]mclock.AbsTime, len(wps)),
- }
- // Convert wait periods to local absolute time.
- for i, wp := range wps {
- t.regTime[i] = localTime + mclock.AbsTime(time.Second*time.Duration(wp))
- }
- return t, nil
- }
- func ticketToPong(t *ticket, pong *pong) {
- pong.Expiration = uint64(t.issueTime / mclock.AbsTime(time.Second))
- pong.TopicHash = rlpHash(t.topics)
- pong.TicketSerial = t.serial
- pong.WaitPeriods = make([]uint32, len(t.regTime))
- for i, regTime := range t.regTime {
- pong.WaitPeriods[i] = uint32(time.Duration(regTime-t.issueTime) / time.Second)
- }
- }
- type ticketStore struct {
- // radius detector and target address generator
- // exists for both searched and registered topics
- radius map[Topic]*topicRadius
- // Contains buckets (for each absolute minute) of tickets
- // that can be used in that minute.
- // This is only set if the topic is being registered.
- tickets map[Topic]*topicTickets
- regQueue []Topic // Topic registration queue for round robin attempts
- regSet map[Topic]struct{} // Topic registration queue contents for fast filling
- nodes map[*Node]*ticket
- nodeLastReq map[*Node]reqInfo
- lastBucketFetched timeBucket
- nextTicketCached *ticketRef
- nextTicketReg mclock.AbsTime
- searchTopicMap map[Topic]searchTopic
- nextTopicQueryCleanup mclock.AbsTime
- queriesSent map[*Node]map[common.Hash]sentQuery
- }
- type searchTopic struct {
- foundChn chan<- *Node
- }
- type sentQuery struct {
- sent mclock.AbsTime
- lookup lookupInfo
- }
- type topicTickets struct {
- buckets map[timeBucket][]ticketRef
- nextLookup mclock.AbsTime
- nextReg mclock.AbsTime
- }
- func newTicketStore() *ticketStore {
- return &ticketStore{
- radius: make(map[Topic]*topicRadius),
- tickets: make(map[Topic]*topicTickets),
- regSet: make(map[Topic]struct{}),
- nodes: make(map[*Node]*ticket),
- nodeLastReq: make(map[*Node]reqInfo),
- searchTopicMap: make(map[Topic]searchTopic),
- queriesSent: make(map[*Node]map[common.Hash]sentQuery),
- }
- }
- // addTopic starts tracking a topic. If register is true,
- // the local node will register the topic and tickets will be collected.
- func (s *ticketStore) addTopic(topic Topic, register bool) {
- log.Trace("Adding discovery topic", "topic", topic, "register", register)
- if s.radius[topic] == nil {
- s.radius[topic] = newTopicRadius(topic)
- }
- if register && s.tickets[topic] == nil {
- s.tickets[topic] = &topicTickets{buckets: make(map[timeBucket][]ticketRef)}
- }
- }
- func (s *ticketStore) addSearchTopic(t Topic, foundChn chan<- *Node) {
- s.addTopic(t, false)
- if s.searchTopicMap[t].foundChn == nil {
- s.searchTopicMap[t] = searchTopic{foundChn: foundChn}
- }
- }
- func (s *ticketStore) removeSearchTopic(t Topic) {
- if st := s.searchTopicMap[t]; st.foundChn != nil {
- delete(s.searchTopicMap, t)
- }
- }
- // removeRegisterTopic deletes all tickets for the given topic.
- func (s *ticketStore) removeRegisterTopic(topic Topic) {
- log.Trace("Removing discovery topic", "topic", topic)
- if s.tickets[topic] == nil {
- log.Warn("Removing non-existent discovery topic", "topic", topic)
- return
- }
- for _, list := range s.tickets[topic].buckets {
- for _, ref := range list {
- ref.t.refCnt--
- if ref.t.refCnt == 0 {
- delete(s.nodes, ref.t.node)
- delete(s.nodeLastReq, ref.t.node)
- }
- }
- }
- delete(s.tickets, topic)
- }
- func (s *ticketStore) regTopicSet() []Topic {
- topics := make([]Topic, 0, len(s.tickets))
- for topic := range s.tickets {
- topics = append(topics, topic)
- }
- return topics
- }
- // nextRegisterLookup returns the target of the next lookup for ticket collection.
- func (s *ticketStore) nextRegisterLookup() (lookupInfo, time.Duration) {
- // Queue up any new topics (or discarded ones), preserving iteration order
- for topic := range s.tickets {
- if _, ok := s.regSet[topic]; !ok {
- s.regQueue = append(s.regQueue, topic)
- s.regSet[topic] = struct{}{}
- }
- }
- // Iterate over the set of all topics and look up the next suitable one
- for len(s.regQueue) > 0 {
- // Fetch the next topic from the queue, and ensure it still exists
- topic := s.regQueue[0]
- s.regQueue = s.regQueue[1:]
- delete(s.regSet, topic)
- if s.tickets[topic] == nil {
- continue
- }
- // If the topic needs more tickets, return it
- if s.tickets[topic].nextLookup < mclock.Now() {
- next, delay := s.radius[topic].nextTarget(false), 100*time.Millisecond
- log.Trace("Found discovery topic to register", "topic", topic, "target", next.target, "delay", delay)
- return next, delay
- }
- }
- // No registration topics found or all exhausted, sleep
- delay := 40 * time.Second
- log.Trace("No topic found to register", "delay", delay)
- return lookupInfo{}, delay
- }
- func (s *ticketStore) nextSearchLookup(topic Topic) lookupInfo {
- tr := s.radius[topic]
- target := tr.nextTarget(tr.radiusLookupCnt >= searchForceQuery)
- if target.radiusLookup {
- tr.radiusLookupCnt++
- } else {
- tr.radiusLookupCnt = 0
- }
- return target
- }
- // ticketsInWindow returns the tickets of a given topic in the registration window.
- func (s *ticketStore) ticketsInWindow(topic Topic) []ticketRef {
- // Sanity check that the topic still exists before operating on it
- if s.tickets[topic] == nil {
- log.Warn("Listing non-existing discovery tickets", "topic", topic)
- return nil
- }
- // Gather all the tickers in the next time window
- var tickets []ticketRef
- buckets := s.tickets[topic].buckets
- for idx := timeBucket(0); idx < timeWindow; idx++ {
- tickets = append(tickets, buckets[s.lastBucketFetched+idx]...)
- }
- log.Trace("Retrieved discovery registration tickets", "topic", topic, "from", s.lastBucketFetched, "tickets", len(tickets))
- return tickets
- }
- func (s *ticketStore) removeExcessTickets(t Topic) {
- tickets := s.ticketsInWindow(t)
- if len(tickets) <= wantTicketsInWindow {
- return
- }
- sort.Sort(ticketRefByWaitTime(tickets))
- for _, r := range tickets[wantTicketsInWindow:] {
- s.removeTicketRef(r)
- }
- }
- type ticketRefByWaitTime []ticketRef
- // Len is the number of elements in the collection.
- func (s ticketRefByWaitTime) Len() int {
- return len(s)
- }
- func (ref ticketRef) waitTime() mclock.AbsTime {
- return ref.t.regTime[ref.idx] - ref.t.issueTime
- }
- // Less reports whether the element with
- // index i should sort before the element with index j.
- func (s ticketRefByWaitTime) Less(i, j int) bool {
- return s[i].waitTime() < s[j].waitTime()
- }
- // Swap swaps the elements with indexes i and j.
- func (s ticketRefByWaitTime) Swap(i, j int) {
- s[i], s[j] = s[j], s[i]
- }
- func (s *ticketStore) addTicketRef(r ticketRef) {
- topic := r.t.topics[r.idx]
- tickets := s.tickets[topic]
- if tickets == nil {
- log.Warn("Adding ticket to non-existent topic", "topic", topic)
- return
- }
- bucket := timeBucket(r.t.regTime[r.idx] / mclock.AbsTime(ticketTimeBucketLen))
- tickets.buckets[bucket] = append(tickets.buckets[bucket], r)
- r.t.refCnt++
- min := mclock.Now() - mclock.AbsTime(collectFrequency)*maxCollectDebt
- if tickets.nextLookup < min {
- tickets.nextLookup = min
- }
- tickets.nextLookup += mclock.AbsTime(collectFrequency)
- //s.removeExcessTickets(topic)
- }
- func (s *ticketStore) nextFilteredTicket() (*ticketRef, time.Duration) {
- now := mclock.Now()
- for {
- ticket, wait := s.nextRegisterableTicket()
- if ticket == nil {
- return ticket, wait
- }
- log.Trace("Found discovery ticket to register", "node", ticket.t.node, "serial", ticket.t.serial, "wait", wait)
- regTime := now + mclock.AbsTime(wait)
- topic := ticket.t.topics[ticket.idx]
- if s.tickets[topic] != nil && regTime >= s.tickets[topic].nextReg {
- return ticket, wait
- }
- s.removeTicketRef(*ticket)
- }
- }
- func (s *ticketStore) ticketRegistered(ref ticketRef) {
- now := mclock.Now()
- topic := ref.t.topics[ref.idx]
- tickets := s.tickets[topic]
- min := now - mclock.AbsTime(registerFrequency)*maxRegisterDebt
- if min > tickets.nextReg {
- tickets.nextReg = min
- }
- tickets.nextReg += mclock.AbsTime(registerFrequency)
- s.tickets[topic] = tickets
- s.removeTicketRef(ref)
- }
- // nextRegisterableTicket returns the next ticket that can be used
- // to register.
- //
- // If the returned wait time <= zero the ticket can be used. For a positive
- // wait time, the caller should requery the next ticket later.
- //
- // A ticket can be returned more than once with <= zero wait time in case
- // the ticket contains multiple topics.
- func (s *ticketStore) nextRegisterableTicket() (*ticketRef, time.Duration) {
- now := mclock.Now()
- if s.nextTicketCached != nil {
- return s.nextTicketCached, time.Duration(s.nextTicketCached.topicRegTime() - now)
- }
- for bucket := s.lastBucketFetched; ; bucket++ {
- var (
- empty = true // true if there are no tickets
- nextTicket ticketRef // uninitialized if this bucket is empty
- )
- for _, tickets := range s.tickets {
- //s.removeExcessTickets(topic)
- if len(tickets.buckets) != 0 {
- empty = false
- list := tickets.buckets[bucket]
- for _, ref := range list {
- //debugLog(fmt.Sprintf(" nrt bucket = %d node = %x sn = %v wait = %v", bucket, ref.t.node.ID[:8], ref.t.serial, time.Duration(ref.topicRegTime()-now)))
- if nextTicket.t == nil || ref.topicRegTime() < nextTicket.topicRegTime() {
- nextTicket = ref
- }
- }
- }
- }
- if empty {
- return nil, 0
- }
- if nextTicket.t != nil {
- s.nextTicketCached = &nextTicket
- return &nextTicket, time.Duration(nextTicket.topicRegTime() - now)
- }
- s.lastBucketFetched = bucket
- }
- }
- // removeTicket removes a ticket from the ticket store
- func (s *ticketStore) removeTicketRef(ref ticketRef) {
- log.Trace("Removing discovery ticket reference", "node", ref.t.node.ID, "serial", ref.t.serial)
- // Make nextRegisterableTicket return the next available ticket.
- s.nextTicketCached = nil
- topic := ref.topic()
- tickets := s.tickets[topic]
- if tickets == nil {
- log.Trace("Removing tickets from unknown topic", "topic", topic)
- return
- }
- bucket := timeBucket(ref.t.regTime[ref.idx] / mclock.AbsTime(ticketTimeBucketLen))
- list := tickets.buckets[bucket]
- idx := -1
- for i, bt := range list {
- if bt.t == ref.t {
- idx = i
- break
- }
- }
- if idx == -1 {
- panic(nil)
- }
- list = append(list[:idx], list[idx+1:]...)
- if len(list) != 0 {
- tickets.buckets[bucket] = list
- } else {
- delete(tickets.buckets, bucket)
- }
- ref.t.refCnt--
- if ref.t.refCnt == 0 {
- delete(s.nodes, ref.t.node)
- delete(s.nodeLastReq, ref.t.node)
- }
- }
- type lookupInfo struct {
- target common.Hash
- topic Topic
- radiusLookup bool
- }
- type reqInfo struct {
- pingHash []byte
- lookup lookupInfo
- time mclock.AbsTime
- }
- // returns -1 if not found
- func (t *ticket) findIdx(topic Topic) int {
- for i, tt := range t.topics {
- if tt == topic {
- return i
- }
- }
- return -1
- }
- func (s *ticketStore) registerLookupDone(lookup lookupInfo, nodes []*Node, ping func(n *Node) []byte) {
- now := mclock.Now()
- for i, n := range nodes {
- if i == 0 || (binary.BigEndian.Uint64(n.sha[:8])^binary.BigEndian.Uint64(lookup.target[:8])) < s.radius[lookup.topic].minRadius {
- if lookup.radiusLookup {
- if lastReq, ok := s.nodeLastReq[n]; !ok || time.Duration(now-lastReq.time) > radiusTC {
- s.nodeLastReq[n] = reqInfo{pingHash: ping(n), lookup: lookup, time: now}
- }
- } else {
- if s.nodes[n] == nil {
- s.nodeLastReq[n] = reqInfo{pingHash: ping(n), lookup: lookup, time: now}
- }
- }
- }
- }
- }
- func (s *ticketStore) searchLookupDone(lookup lookupInfo, nodes []*Node, query func(n *Node, topic Topic) []byte) {
- now := mclock.Now()
- for i, n := range nodes {
- if i == 0 || (binary.BigEndian.Uint64(n.sha[:8])^binary.BigEndian.Uint64(lookup.target[:8])) < s.radius[lookup.topic].minRadius {
- if lookup.radiusLookup {
- if lastReq, ok := s.nodeLastReq[n]; !ok || time.Duration(now-lastReq.time) > radiusTC {
- s.nodeLastReq[n] = reqInfo{pingHash: nil, lookup: lookup, time: now}
- }
- } // else {
- if s.canQueryTopic(n, lookup.topic) {
- hash := query(n, lookup.topic)
- if hash != nil {
- s.addTopicQuery(common.BytesToHash(hash), n, lookup)
- }
- }
- //}
- }
- }
- }
- func (s *ticketStore) adjustWithTicket(now mclock.AbsTime, targetHash common.Hash, t *ticket) {
- for i, topic := range t.topics {
- if tt, ok := s.radius[topic]; ok {
- tt.adjustWithTicket(now, targetHash, ticketRef{t, i})
- }
- }
- }
- func (s *ticketStore) addTicket(localTime mclock.AbsTime, pingHash []byte, ticket *ticket) {
- log.Trace("Adding discovery ticket", "node", ticket.node.ID, "serial", ticket.serial)
- lastReq, ok := s.nodeLastReq[ticket.node]
- if !(ok && bytes.Equal(pingHash, lastReq.pingHash)) {
- return
- }
- s.adjustWithTicket(localTime, lastReq.lookup.target, ticket)
- if lastReq.lookup.radiusLookup || s.nodes[ticket.node] != nil {
- return
- }
- topic := lastReq.lookup.topic
- topicIdx := ticket.findIdx(topic)
- if topicIdx == -1 {
- return
- }
- bucket := timeBucket(localTime / mclock.AbsTime(ticketTimeBucketLen))
- if s.lastBucketFetched == 0 || bucket < s.lastBucketFetched {
- s.lastBucketFetched = bucket
- }
- if _, ok := s.tickets[topic]; ok {
- wait := ticket.regTime[topicIdx] - localTime
- rnd := rand.ExpFloat64()
- if rnd > 10 {
- rnd = 10
- }
- if float64(wait) < float64(keepTicketConst)+float64(keepTicketExp)*rnd {
- // use the ticket to register this topic
- //fmt.Println("addTicket", ticket.node.ID[:8], ticket.node.addr().String(), ticket.serial, ticket.pong)
- s.addTicketRef(ticketRef{ticket, topicIdx})
- }
- }
- if ticket.refCnt > 0 {
- s.nextTicketCached = nil
- s.nodes[ticket.node] = ticket
- }
- }
- func (s *ticketStore) getNodeTicket(node *Node) *ticket {
- if s.nodes[node] == nil {
- log.Trace("Retrieving node ticket", "node", node.ID, "serial", nil)
- } else {
- log.Trace("Retrieving node ticket", "node", node.ID, "serial", s.nodes[node].serial)
- }
- return s.nodes[node]
- }
- func (s *ticketStore) canQueryTopic(node *Node, topic Topic) bool {
- qq := s.queriesSent[node]
- if qq != nil {
- now := mclock.Now()
- for _, sq := range qq {
- if sq.lookup.topic == topic && sq.sent > now-mclock.AbsTime(topicQueryResend) {
- return false
- }
- }
- }
- return true
- }
- func (s *ticketStore) addTopicQuery(hash common.Hash, node *Node, lookup lookupInfo) {
- now := mclock.Now()
- qq := s.queriesSent[node]
- if qq == nil {
- qq = make(map[common.Hash]sentQuery)
- s.queriesSent[node] = qq
- }
- qq[hash] = sentQuery{sent: now, lookup: lookup}
- s.cleanupTopicQueries(now)
- }
- func (s *ticketStore) cleanupTopicQueries(now mclock.AbsTime) {
- if s.nextTopicQueryCleanup > now {
- return
- }
- exp := now - mclock.AbsTime(topicQueryResend)
- for n, qq := range s.queriesSent {
- for h, q := range qq {
- if q.sent < exp {
- delete(qq, h)
- }
- }
- if len(qq) == 0 {
- delete(s.queriesSent, n)
- }
- }
- s.nextTopicQueryCleanup = now + mclock.AbsTime(topicQueryTimeout)
- }
- func (s *ticketStore) gotTopicNodes(from *Node, hash common.Hash, nodes []rpcNode) (timeout bool) {
- now := mclock.Now()
- //fmt.Println("got", from.addr().String(), hash, len(nodes))
- qq := s.queriesSent[from]
- if qq == nil {
- return true
- }
- q, ok := qq[hash]
- if !ok || now > q.sent+mclock.AbsTime(topicQueryTimeout) {
- return true
- }
- inside := float64(0)
- if len(nodes) > 0 {
- inside = 1
- }
- s.radius[q.lookup.topic].adjust(now, q.lookup.target, from.sha, inside)
- chn := s.searchTopicMap[q.lookup.topic].foundChn
- if chn == nil {
- //fmt.Println("no channel")
- return false
- }
- for _, node := range nodes {
- ip := node.IP
- if ip.IsUnspecified() || ip.IsLoopback() {
- ip = from.IP
- }
- n := NewNode(node.ID, ip, node.UDP, node.TCP)
- select {
- case chn <- n:
- default:
- return false
- }
- }
- return false
- }
- type topicRadius struct {
- topic Topic
- topicHashPrefix uint64
- radius, minRadius uint64
- buckets []topicRadiusBucket
- converged bool
- radiusLookupCnt int
- }
- type topicRadiusEvent int
- const (
- trOutside topicRadiusEvent = iota
- trInside
- trNoAdjust
- trCount
- )
- type topicRadiusBucket struct {
- weights [trCount]float64
- lastTime mclock.AbsTime
- value float64
- lookupSent map[common.Hash]mclock.AbsTime
- }
- func (b *topicRadiusBucket) update(now mclock.AbsTime) {
- if now == b.lastTime {
- return
- }
- exp := math.Exp(-float64(now-b.lastTime) / float64(radiusTC))
- for i, w := range b.weights {
- b.weights[i] = w * exp
- }
- b.lastTime = now
- for target, tm := range b.lookupSent {
- if now-tm > mclock.AbsTime(respTimeout) {
- b.weights[trNoAdjust] += 1
- delete(b.lookupSent, target)
- }
- }
- }
- func (b *topicRadiusBucket) adjust(now mclock.AbsTime, inside float64) {
- b.update(now)
- if inside <= 0 {
- b.weights[trOutside] += 1
- } else {
- if inside >= 1 {
- b.weights[trInside] += 1
- } else {
- b.weights[trInside] += inside
- b.weights[trOutside] += 1 - inside
- }
- }
- }
- func newTopicRadius(t Topic) *topicRadius {
- topicHash := crypto.Keccak256Hash([]byte(t))
- topicHashPrefix := binary.BigEndian.Uint64(topicHash[0:8])
- return &topicRadius{
- topic: t,
- topicHashPrefix: topicHashPrefix,
- radius: maxRadius,
- minRadius: maxRadius,
- }
- }
- func (r *topicRadius) getBucketIdx(addrHash common.Hash) int {
- prefix := binary.BigEndian.Uint64(addrHash[0:8])
- var log2 float64
- if prefix != r.topicHashPrefix {
- log2 = math.Log2(float64(prefix ^ r.topicHashPrefix))
- }
- bucket := int((64 - log2) * radiusBucketsPerBit)
- max := 64*radiusBucketsPerBit - 1
- if bucket > max {
- return max
- }
- if bucket < 0 {
- return 0
- }
- return bucket
- }
- func (r *topicRadius) targetForBucket(bucket int) common.Hash {
- min := math.Pow(2, 64-float64(bucket+1)/radiusBucketsPerBit)
- max := math.Pow(2, 64-float64(bucket)/radiusBucketsPerBit)
- a := uint64(min)
- b := randUint64n(uint64(max - min))
- xor := a + b
- if xor < a {
- xor = ^uint64(0)
- }
- prefix := r.topicHashPrefix ^ xor
- var target common.Hash
- binary.BigEndian.PutUint64(target[0:8], prefix)
- globalRandRead(target[8:])
- return target
- }
- // package rand provides a Read function in Go 1.6 and later, but
- // we can't use it yet because we still support Go 1.5.
- func globalRandRead(b []byte) {
- pos := 0
- val := 0
- for n := 0; n < len(b); n++ {
- if pos == 0 {
- val = rand.Int()
- pos = 7
- }
- b[n] = byte(val)
- val >>= 8
- pos--
- }
- }
- func (r *topicRadius) isInRadius(addrHash common.Hash) bool {
- nodePrefix := binary.BigEndian.Uint64(addrHash[0:8])
- dist := nodePrefix ^ r.topicHashPrefix
- return dist < r.radius
- }
- func (r *topicRadius) chooseLookupBucket(a, b int) int {
- if a < 0 {
- a = 0
- }
- if a > b {
- return -1
- }
- c := 0
- for i := a; i <= b; i++ {
- if i >= len(r.buckets) || r.buckets[i].weights[trNoAdjust] < maxNoAdjust {
- c++
- }
- }
- if c == 0 {
- return -1
- }
- rnd := randUint(uint32(c))
- for i := a; i <= b; i++ {
- if i >= len(r.buckets) || r.buckets[i].weights[trNoAdjust] < maxNoAdjust {
- if rnd == 0 {
- return i
- }
- rnd--
- }
- }
- panic(nil) // should never happen
- }
- func (r *topicRadius) needMoreLookups(a, b int, maxValue float64) bool {
- var max float64
- if a < 0 {
- a = 0
- }
- if b >= len(r.buckets) {
- b = len(r.buckets) - 1
- if r.buckets[b].value > max {
- max = r.buckets[b].value
- }
- }
- if b >= a {
- for i := a; i <= b; i++ {
- if r.buckets[i].value > max {
- max = r.buckets[i].value
- }
- }
- }
- return maxValue-max < minPeakSize
- }
- func (r *topicRadius) recalcRadius() (radius uint64, radiusLookup int) {
- maxBucket := 0
- maxValue := float64(0)
- now := mclock.Now()
- v := float64(0)
- for i := range r.buckets {
- r.buckets[i].update(now)
- v += r.buckets[i].weights[trOutside] - r.buckets[i].weights[trInside]
- r.buckets[i].value = v
- //fmt.Printf("%v %v | ", v, r.buckets[i].weights[trNoAdjust])
- }
- //fmt.Println()
- slopeCross := -1
- for i, b := range r.buckets {
- v := b.value
- if v < float64(i)*minSlope {
- slopeCross = i
- break
- }
- if v > maxValue {
- maxValue = v
- maxBucket = i + 1
- }
- }
- minRadBucket := len(r.buckets)
- sum := float64(0)
- for minRadBucket > 0 && sum < minRightSum {
- minRadBucket--
- b := r.buckets[minRadBucket]
- sum += b.weights[trInside] + b.weights[trOutside]
- }
- r.minRadius = uint64(math.Pow(2, 64-float64(minRadBucket)/radiusBucketsPerBit))
- lookupLeft := -1
- if r.needMoreLookups(0, maxBucket-lookupWidth-1, maxValue) {
- lookupLeft = r.chooseLookupBucket(maxBucket-lookupWidth, maxBucket-1)
- }
- lookupRight := -1
- if slopeCross != maxBucket && (minRadBucket <= maxBucket || r.needMoreLookups(maxBucket+lookupWidth, len(r.buckets)-1, maxValue)) {
- for len(r.buckets) <= maxBucket+lookupWidth {
- r.buckets = append(r.buckets, topicRadiusBucket{lookupSent: make(map[common.Hash]mclock.AbsTime)})
- }
- lookupRight = r.chooseLookupBucket(maxBucket, maxBucket+lookupWidth-1)
- }
- if lookupLeft == -1 {
- radiusLookup = lookupRight
- } else {
- if lookupRight == -1 {
- radiusLookup = lookupLeft
- } else {
- if randUint(2) == 0 {
- radiusLookup = lookupLeft
- } else {
- radiusLookup = lookupRight
- }
- }
- }
- //fmt.Println("mb", maxBucket, "sc", slopeCross, "mrb", minRadBucket, "ll", lookupLeft, "lr", lookupRight, "mv", maxValue)
- if radiusLookup == -1 {
- // no more radius lookups needed at the moment, return a radius
- r.converged = true
- rad := maxBucket
- if minRadBucket < rad {
- rad = minRadBucket
- }
- radius = ^uint64(0)
- if rad > 0 {
- radius = uint64(math.Pow(2, 64-float64(rad)/radiusBucketsPerBit))
- }
- r.radius = radius
- }
- return
- }
- func (r *topicRadius) nextTarget(forceRegular bool) lookupInfo {
- if !forceRegular {
- _, radiusLookup := r.recalcRadius()
- if radiusLookup != -1 {
- target := r.targetForBucket(radiusLookup)
- r.buckets[radiusLookup].lookupSent[target] = mclock.Now()
- return lookupInfo{target: target, topic: r.topic, radiusLookup: true}
- }
- }
- radExt := r.radius / 2
- if radExt > maxRadius-r.radius {
- radExt = maxRadius - r.radius
- }
- rnd := randUint64n(r.radius) + randUint64n(2*radExt)
- if rnd > radExt {
- rnd -= radExt
- } else {
- rnd = radExt - rnd
- }
- prefix := r.topicHashPrefix ^ rnd
- var target common.Hash
- binary.BigEndian.PutUint64(target[0:8], prefix)
- globalRandRead(target[8:])
- return lookupInfo{target: target, topic: r.topic, radiusLookup: false}
- }
- func (r *topicRadius) adjustWithTicket(now mclock.AbsTime, targetHash common.Hash, t ticketRef) {
- wait := t.t.regTime[t.idx] - t.t.issueTime
- inside := float64(wait)/float64(targetWaitTime) - 0.5
- if inside > 1 {
- inside = 1
- }
- if inside < 0 {
- inside = 0
- }
- r.adjust(now, targetHash, t.t.node.sha, inside)
- }
- func (r *topicRadius) adjust(now mclock.AbsTime, targetHash, addrHash common.Hash, inside float64) {
- bucket := r.getBucketIdx(addrHash)
- //fmt.Println("adjust", bucket, len(r.buckets), inside)
- if bucket >= len(r.buckets) {
- return
- }
- r.buckets[bucket].adjust(now, inside)
- delete(r.buckets[bucket].lookupSent, targetHash)
- }
|