observer.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package connection
  2. import (
  3. "net"
  4. "strings"
  5. "github.com/google/uuid"
  6. "github.com/rs/zerolog"
  7. "github.com/cloudflare/cloudflared/management"
  8. )
  9. const (
  10. LogFieldConnectionID = "connection"
  11. LogFieldLocation = "location"
  12. LogFieldIPAddress = "ip"
  13. LogFieldProtocol = "protocol"
  14. observerChannelBufferSize = 16
  15. )
  16. type Observer struct {
  17. log *zerolog.Logger
  18. logTransport *zerolog.Logger
  19. metrics *tunnelMetrics
  20. tunnelEventChan chan Event
  21. addSinkChan chan EventSink
  22. }
  23. type EventSink interface {
  24. OnTunnelEvent(event Event)
  25. }
  26. func NewObserver(log, logTransport *zerolog.Logger) *Observer {
  27. o := &Observer{
  28. log: log,
  29. logTransport: logTransport,
  30. metrics: newTunnelMetrics(),
  31. tunnelEventChan: make(chan Event, observerChannelBufferSize),
  32. addSinkChan: make(chan EventSink, observerChannelBufferSize),
  33. }
  34. go o.dispatchEvents()
  35. return o
  36. }
  37. func (o *Observer) RegisterSink(sink EventSink) {
  38. o.addSinkChan <- sink
  39. }
  40. func (o *Observer) logConnected(connectionID uuid.UUID, connIndex uint8, location string, address net.IP, protocol Protocol) {
  41. o.sendEvent(Event{Index: connIndex, EventType: Connected, Location: location})
  42. o.log.Info().
  43. Int(management.EventTypeKey, int(management.Cloudflared)).
  44. Str(LogFieldConnectionID, connectionID.String()).
  45. Uint8(LogFieldConnIndex, connIndex).
  46. Str(LogFieldLocation, location).
  47. IPAddr(LogFieldIPAddress, address).
  48. Str(LogFieldProtocol, protocol.String()).
  49. Msg("Registered tunnel connection")
  50. o.metrics.registerServerLocation(uint8ToString(connIndex), location)
  51. }
  52. func (o *Observer) sendRegisteringEvent(connIndex uint8) {
  53. o.sendEvent(Event{Index: connIndex, EventType: RegisteringTunnel})
  54. }
  55. func (o *Observer) sendConnectedEvent(connIndex uint8, protocol Protocol, location string) {
  56. o.sendEvent(Event{Index: connIndex, EventType: Connected, Protocol: protocol, Location: location})
  57. }
  58. func (o *Observer) SendURL(url string) {
  59. o.sendEvent(Event{EventType: SetURL, URL: url})
  60. if !strings.HasPrefix(url, "https://") {
  61. // We add https:// in the prefix for backwards compatibility as we used to do that with the old free tunnels
  62. // and some tools (like `wrangler tail`) are regexp-ing for that specifically.
  63. url = "https://" + url
  64. }
  65. o.metrics.userHostnamesCounts.WithLabelValues(url).Inc()
  66. }
  67. func (o *Observer) SendReconnect(connIndex uint8) {
  68. o.sendEvent(Event{Index: connIndex, EventType: Reconnecting})
  69. }
  70. func (o *Observer) sendUnregisteringEvent(connIndex uint8) {
  71. o.sendEvent(Event{Index: connIndex, EventType: Unregistering})
  72. }
  73. func (o *Observer) SendDisconnect(connIndex uint8) {
  74. o.sendEvent(Event{Index: connIndex, EventType: Disconnected})
  75. }
  76. func (o *Observer) sendEvent(e Event) {
  77. select {
  78. case o.tunnelEventChan <- e:
  79. break
  80. default:
  81. o.log.Warn().Msg("observer channel buffer is full")
  82. }
  83. }
  84. func (o *Observer) dispatchEvents() {
  85. var sinks []EventSink
  86. for {
  87. select {
  88. case sink := <-o.addSinkChan:
  89. sinks = append(sinks, sink)
  90. case evt := <-o.tunnelEventChan:
  91. for _, sink := range sinks {
  92. sink.OnTunnelEvent(evt)
  93. }
  94. }
  95. }
  96. }
  97. type EventSinkFunc func(event Event)
  98. func (f EventSinkFunc) OnTunnelEvent(event Event) {
  99. f(event)
  100. }