123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- // Copyright 2015 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package downloader
- import (
- "context"
- "sync"
- ethereum "github.com/ethereum/go-ethereum"
- "github.com/ethereum/go-ethereum/event"
- "github.com/ethereum/go-ethereum/rpc"
- )
- // PublicDownloaderAPI provides an API which gives information about the current synchronisation status.
- // It offers only methods that operates on data that can be available to anyone without security risks.
- type PublicDownloaderAPI struct {
- d *Downloader
- mux *event.TypeMux
- installSyncSubscription chan chan interface{}
- uninstallSyncSubscription chan *uninstallSyncSubscriptionRequest
- }
- // NewPublicDownloaderAPI create a new PublicDownloaderAPI. The API has an internal event loop that
- // listens for events from the downloader through the global event mux. In case it receives one of
- // these events it broadcasts it to all syncing subscriptions that are installed through the
- // installSyncSubscription channel.
- func NewPublicDownloaderAPI(d *Downloader, m *event.TypeMux) *PublicDownloaderAPI {
- api := &PublicDownloaderAPI{
- d: d,
- mux: m,
- installSyncSubscription: make(chan chan interface{}),
- uninstallSyncSubscription: make(chan *uninstallSyncSubscriptionRequest),
- }
- go api.eventLoop()
- return api
- }
- // eventLoop runs a loop until the event mux closes. It will install and uninstall new
- // sync subscriptions and broadcasts sync status updates to the installed sync subscriptions.
- func (api *PublicDownloaderAPI) eventLoop() {
- var (
- sub = api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{})
- syncSubscriptions = make(map[chan interface{}]struct{})
- )
- for {
- select {
- case i := <-api.installSyncSubscription:
- syncSubscriptions[i] = struct{}{}
- case u := <-api.uninstallSyncSubscription:
- delete(syncSubscriptions, u.c)
- close(u.uninstalled)
- case event := <-sub.Chan():
- if event == nil {
- return
- }
- var notification interface{}
- switch event.Data.(type) {
- case StartEvent:
- notification = &SyncingResult{
- Syncing: true,
- Status: api.d.Progress(),
- }
- case DoneEvent, FailedEvent:
- notification = false
- }
- // broadcast
- for c := range syncSubscriptions {
- c <- notification
- }
- }
- }
- }
- // Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished.
- func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (*rpc.Subscription, error) {
- notifier, supported := rpc.NotifierFromContext(ctx)
- if !supported {
- return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
- }
- rpcSub := notifier.CreateSubscription()
- go func() {
- statuses := make(chan interface{})
- sub := api.SubscribeSyncStatus(statuses)
- for {
- select {
- case status := <-statuses:
- notifier.Notify(rpcSub.ID, status)
- case <-rpcSub.Err():
- sub.Unsubscribe()
- return
- case <-notifier.Closed():
- sub.Unsubscribe()
- return
- }
- }
- }()
- return rpcSub, nil
- }
- // SyncingResult provides information about the current synchronisation status for this node.
- type SyncingResult struct {
- Syncing bool `json:"syncing"`
- Status ethereum.SyncProgress `json:"status"`
- }
- // uninstallSyncSubscriptionRequest uninstalles a syncing subscription in the API event loop.
- type uninstallSyncSubscriptionRequest struct {
- c chan interface{}
- uninstalled chan interface{}
- }
- // SyncStatusSubscription represents a syncing subscription.
- type SyncStatusSubscription struct {
- api *PublicDownloaderAPI // register subscription in event loop of this api instance
- c chan interface{} // channel where events are broadcasted to
- unsubOnce sync.Once // make sure unsubscribe logic is executed once
- }
- // Unsubscribe uninstalls the subscription from the DownloadAPI event loop.
- // The status channel that was passed to subscribeSyncStatus isn't used anymore
- // after this method returns.
- func (s *SyncStatusSubscription) Unsubscribe() {
- s.unsubOnce.Do(func() {
- req := uninstallSyncSubscriptionRequest{s.c, make(chan interface{})}
- s.api.uninstallSyncSubscription <- &req
- for {
- select {
- case <-s.c:
- // drop new status events until uninstall confirmation
- continue
- case <-req.uninstalled:
- return
- }
- }
- })
- }
- // SubscribeSyncStatus creates a subscription that will broadcast new synchronisation updates.
- // The given channel must receive interface values, the result can either
- func (api *PublicDownloaderAPI) SubscribeSyncStatus(status chan interface{}) *SyncStatusSubscription {
- api.installSyncSubscription <- status
- return &SyncStatusSubscription{api: api, c: status}
- }
|