observer_test.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package connection
  2. import (
  3. "strconv"
  4. "sync"
  5. "testing"
  6. "time"
  7. "github.com/prometheus/client_golang/prometheus"
  8. dto "github.com/prometheus/client_model/go"
  9. "github.com/stretchr/testify/assert"
  10. )
  11. func TestSendUrl(t *testing.T) {
  12. observer := NewObserver(&log, &log)
  13. observer.SendURL("my-url.com")
  14. assert.Equal(t, 1.0, getCounterValue(t, observer.metrics.userHostnamesCounts, "https://my-url.com"))
  15. observer.SendURL("https://another-long-one.com")
  16. assert.Equal(t, 1.0, getCounterValue(t, observer.metrics.userHostnamesCounts, "https://another-long-one.com"))
  17. }
  18. func getCounterValue(t *testing.T, metric *prometheus.CounterVec, val string) float64 {
  19. var m = &dto.Metric{}
  20. err := metric.WithLabelValues(val).Write(m)
  21. assert.NoError(t, err)
  22. return m.Counter.GetValue()
  23. }
  24. func TestRegisterServerLocation(t *testing.T) {
  25. m := newTunnelMetrics()
  26. tunnels := 20
  27. var wg sync.WaitGroup
  28. wg.Add(tunnels)
  29. for i := 0; i < tunnels; i++ {
  30. go func(i int) {
  31. id := strconv.Itoa(i)
  32. m.registerServerLocation(id, "LHR")
  33. wg.Done()
  34. }(i)
  35. }
  36. wg.Wait()
  37. for i := 0; i < tunnels; i++ {
  38. id := strconv.Itoa(i)
  39. assert.Equal(t, "LHR", m.oldServerLocations[id])
  40. }
  41. wg.Add(tunnels)
  42. for i := 0; i < tunnels; i++ {
  43. go func(i int) {
  44. id := strconv.Itoa(i)
  45. m.registerServerLocation(id, "AUS")
  46. wg.Done()
  47. }(i)
  48. }
  49. wg.Wait()
  50. for i := 0; i < tunnels; i++ {
  51. id := strconv.Itoa(i)
  52. assert.Equal(t, "AUS", m.oldServerLocations[id])
  53. }
  54. }
  55. func TestObserverEventsDontBlock(t *testing.T) {
  56. observer := NewObserver(&log, &log)
  57. var mu sync.Mutex
  58. observer.RegisterSink(EventSinkFunc(func(_ Event) {
  59. // callback will block if lock is already held
  60. mu.Lock()
  61. mu.Unlock()
  62. }))
  63. timeout := time.AfterFunc(5*time.Second, func() {
  64. mu.Unlock() // release the callback on timer expiration
  65. t.Fatal("observer is blocked")
  66. })
  67. mu.Lock() // block the callback
  68. for i := 0; i < 2*observerChannelBufferSize; i++ {
  69. observer.sendRegisteringEvent(0)
  70. }
  71. if pending := timeout.Stop(); pending {
  72. // release the callback if timer hasn't expired yet
  73. mu.Unlock()
  74. }
  75. }
  76. type eventCollectorSink struct {
  77. observedEvents []Event
  78. mu sync.Mutex
  79. }
  80. func (s *eventCollectorSink) OnTunnelEvent(event Event) {
  81. s.mu.Lock()
  82. defer s.mu.Unlock()
  83. s.observedEvents = append(s.observedEvents, event)
  84. }
  85. func (s *eventCollectorSink) assertSawEvent(t *testing.T, event Event) {
  86. s.mu.Lock()
  87. defer s.mu.Unlock()
  88. assert.Contains(t, s.observedEvents, event)
  89. }