observer.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package connection
  2. import (
  3. "net"
  4. "strings"
  5. "github.com/google/uuid"
  6. "github.com/rs/zerolog"
  7. "github.com/cloudflare/cloudflared/management"
  8. )
  9. const (
  10. LogFieldConnectionID = "connection"
  11. LogFieldLocation = "location"
  12. LogFieldIPAddress = "ip"
  13. LogFieldProtocol = "protocol"
  14. observerChannelBufferSize = 16
  15. )
  16. type Observer struct {
  17. log *zerolog.Logger
  18. logTransport *zerolog.Logger
  19. metrics *tunnelMetrics
  20. tunnelEventChan chan Event
  21. addSinkChan chan EventSink
  22. }
  23. type EventSink interface {
  24. OnTunnelEvent(event Event)
  25. }
  26. func NewObserver(log, logTransport *zerolog.Logger) *Observer {
  27. o := &Observer{
  28. log: log,
  29. logTransport: logTransport,
  30. metrics: newTunnelMetrics(),
  31. tunnelEventChan: make(chan Event, observerChannelBufferSize),
  32. addSinkChan: make(chan EventSink, observerChannelBufferSize),
  33. }
  34. go o.dispatchEvents()
  35. return o
  36. }
  37. func (o *Observer) RegisterSink(sink EventSink) {
  38. o.addSinkChan <- sink
  39. }
  40. func (o *Observer) logConnected(connectionID uuid.UUID, connIndex uint8, location string, address net.IP, protocol Protocol) {
  41. o.log.Info().
  42. Int(management.EventTypeKey, int(management.Cloudflared)).
  43. Str(LogFieldConnectionID, connectionID.String()).
  44. Uint8(LogFieldConnIndex, connIndex).
  45. Str(LogFieldLocation, location).
  46. IPAddr(LogFieldIPAddress, address).
  47. Str(LogFieldProtocol, protocol.String()).
  48. Msg("Registered tunnel connection")
  49. o.metrics.registerServerLocation(uint8ToString(connIndex), location)
  50. }
  51. func (o *Observer) sendRegisteringEvent(connIndex uint8) {
  52. o.sendEvent(Event{Index: connIndex, EventType: RegisteringTunnel})
  53. }
  54. func (o *Observer) sendConnectedEvent(connIndex uint8, protocol Protocol, location string, edgeAddress net.IP) {
  55. o.sendEvent(Event{Index: connIndex, EventType: Connected, Protocol: protocol, Location: location, EdgeAddress: edgeAddress})
  56. }
  57. func (o *Observer) SendURL(url string) {
  58. o.sendEvent(Event{EventType: SetURL, URL: url})
  59. if !strings.HasPrefix(url, "https://") {
  60. // We add https:// in the prefix for backwards compatibility as we used to do that with the old free tunnels
  61. // and some tools (like `wrangler tail`) are regexp-ing for that specifically.
  62. url = "https://" + url
  63. }
  64. o.metrics.userHostnamesCounts.WithLabelValues(url).Inc()
  65. }
  66. func (o *Observer) SendReconnect(connIndex uint8) {
  67. o.sendEvent(Event{Index: connIndex, EventType: Reconnecting})
  68. }
  69. func (o *Observer) sendUnregisteringEvent(connIndex uint8) {
  70. o.sendEvent(Event{Index: connIndex, EventType: Unregistering})
  71. }
  72. func (o *Observer) SendDisconnect(connIndex uint8) {
  73. o.sendEvent(Event{Index: connIndex, EventType: Disconnected})
  74. }
  75. func (o *Observer) sendEvent(e Event) {
  76. select {
  77. case o.tunnelEventChan <- e:
  78. break
  79. default:
  80. o.log.Warn().Msg("observer channel buffer is full")
  81. }
  82. }
  83. func (o *Observer) dispatchEvents() {
  84. var sinks []EventSink
  85. for {
  86. select {
  87. case sink := <-o.addSinkChan:
  88. sinks = append(sinks, sink)
  89. case evt := <-o.tunnelEventChan:
  90. for _, sink := range sinks {
  91. sink.OnTunnelEvent(evt)
  92. }
  93. }
  94. }
  95. }
  96. type EventSinkFunc func(event Event)
  97. func (f EventSinkFunc) OnTunnelEvent(event Event) {
  98. f(event)
  99. }