observer.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package connection
  2. import (
  3. "fmt"
  4. "net/url"
  5. "strings"
  6. tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
  7. "github.com/rs/zerolog"
  8. )
  9. const (
  10. LogFieldLocation = "location"
  11. observerChannelBufferSize = 16
  12. )
  13. type Observer struct {
  14. log *zerolog.Logger
  15. logTransport *zerolog.Logger
  16. metrics *tunnelMetrics
  17. tunnelEventChan chan Event
  18. uiEnabled bool
  19. addSinkChan chan EventSink
  20. }
  21. type EventSink interface {
  22. OnTunnelEvent(event Event)
  23. }
  24. func NewObserver(log, logTransport *zerolog.Logger, uiEnabled bool) *Observer {
  25. o := &Observer{
  26. log: log,
  27. logTransport: logTransport,
  28. metrics: newTunnelMetrics(),
  29. uiEnabled: uiEnabled,
  30. tunnelEventChan: make(chan Event, observerChannelBufferSize),
  31. addSinkChan: make(chan EventSink, observerChannelBufferSize),
  32. }
  33. go o.dispatchEvents()
  34. return o
  35. }
  36. func (o *Observer) RegisterSink(sink EventSink) {
  37. o.addSinkChan <- sink
  38. }
  39. func (o *Observer) logServerInfo(connIndex uint8, location, msg string) {
  40. o.sendEvent(Event{Index: connIndex, EventType: Connected, Location: location})
  41. o.log.Info().
  42. Uint8(LogFieldConnIndex, connIndex).
  43. Str(LogFieldLocation, location).
  44. Msg(msg)
  45. o.metrics.registerServerLocation(uint8ToString(connIndex), location)
  46. }
  47. func (o *Observer) logTrialHostname(registration *tunnelpogs.TunnelRegistration) error {
  48. // Print out the user's trial zone URL in a nice box (if they requested and got one and UI flag is not set)
  49. if !o.uiEnabled {
  50. if registrationURL, err := url.Parse(registration.Url); err == nil {
  51. for _, line := range asciiBox(trialZoneMsg(registrationURL.String()), 2) {
  52. o.log.Info().Msg(line)
  53. }
  54. } else {
  55. o.log.Error().Msg("Failed to connect tunnel, please try again.")
  56. return fmt.Errorf("empty URL in response from Cloudflare edge")
  57. }
  58. }
  59. return nil
  60. }
  61. // Print out the given lines in a nice ASCII box.
  62. func asciiBox(lines []string, padding int) (box []string) {
  63. maxLen := maxLen(lines)
  64. spacer := strings.Repeat(" ", padding)
  65. border := "+" + strings.Repeat("-", maxLen+(padding*2)) + "+"
  66. box = append(box, border)
  67. for _, line := range lines {
  68. box = append(box, "|"+spacer+line+strings.Repeat(" ", maxLen-len(line))+spacer+"|")
  69. }
  70. box = append(box, border)
  71. return
  72. }
  73. func maxLen(lines []string) int {
  74. max := 0
  75. for _, line := range lines {
  76. if len(line) > max {
  77. max = len(line)
  78. }
  79. }
  80. return max
  81. }
  82. func trialZoneMsg(url string) []string {
  83. return []string{
  84. "Your free tunnel has started! Visit it:",
  85. " " + url,
  86. }
  87. }
  88. func (o *Observer) sendRegisteringEvent(connIndex uint8) {
  89. o.sendEvent(Event{Index: connIndex, EventType: RegisteringTunnel})
  90. }
  91. func (o *Observer) sendConnectedEvent(connIndex uint8, location string) {
  92. o.sendEvent(Event{Index: connIndex, EventType: Connected, Location: location})
  93. }
  94. func (o *Observer) sendURL(url string) {
  95. o.sendEvent(Event{EventType: SetURL, URL: url})
  96. }
  97. func (o *Observer) SendReconnect(connIndex uint8) {
  98. o.sendEvent(Event{Index: connIndex, EventType: Reconnecting})
  99. }
  100. func (o *Observer) sendUnregisteringEvent(connIndex uint8) {
  101. o.sendEvent(Event{Index: connIndex, EventType: Unregistering})
  102. }
  103. func (o *Observer) SendDisconnect(connIndex uint8) {
  104. o.sendEvent(Event{Index: connIndex, EventType: Disconnected})
  105. }
  106. func (o *Observer) sendEvent(e Event) {
  107. select {
  108. case o.tunnelEventChan <- e:
  109. break
  110. default:
  111. o.log.Warn().Msg("observer channel buffer is full")
  112. }
  113. }
  114. func (o *Observer) dispatchEvents() {
  115. var sinks []EventSink
  116. for {
  117. select {
  118. case sink := <-o.addSinkChan:
  119. sinks = append(sinks, sink)
  120. case evt := <-o.tunnelEventChan:
  121. for _, sink := range sinks {
  122. sink.OnTunnelEvent(evt)
  123. }
  124. }
  125. }
  126. }
  127. type EventSinkFunc func(event Event)
  128. func (f EventSinkFunc) OnTunnelEvent(event Event) {
  129. f(event)
  130. }