observer_test.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package connection
  2. import (
  3. "strconv"
  4. "sync"
  5. "testing"
  6. "time"
  7. "github.com/stretchr/testify/assert"
  8. )
  9. func TestRegisterServerLocation(t *testing.T) {
  10. m := newTunnelMetrics()
  11. tunnels := 20
  12. var wg sync.WaitGroup
  13. wg.Add(tunnels)
  14. for i := 0; i < tunnels; i++ {
  15. go func(i int) {
  16. id := strconv.Itoa(i)
  17. m.registerServerLocation(id, "LHR")
  18. wg.Done()
  19. }(i)
  20. }
  21. wg.Wait()
  22. for i := 0; i < tunnels; i++ {
  23. id := strconv.Itoa(i)
  24. assert.Equal(t, "LHR", m.oldServerLocations[id])
  25. }
  26. wg.Add(tunnels)
  27. for i := 0; i < tunnels; i++ {
  28. go func(i int) {
  29. id := strconv.Itoa(i)
  30. m.registerServerLocation(id, "AUS")
  31. wg.Done()
  32. }(i)
  33. }
  34. wg.Wait()
  35. for i := 0; i < tunnels; i++ {
  36. id := strconv.Itoa(i)
  37. assert.Equal(t, "AUS", m.oldServerLocations[id])
  38. }
  39. }
  40. func TestObserverEventsDontBlock(t *testing.T) {
  41. observer := NewObserver(&log, &log, false)
  42. var mu sync.Mutex
  43. observer.RegisterSink(EventSinkFunc(func(_ Event) {
  44. // callback will block if lock is already held
  45. mu.Lock()
  46. mu.Unlock()
  47. }))
  48. timeout := time.AfterFunc(5*time.Second, func() {
  49. mu.Unlock() // release the callback on timer expiration
  50. t.Fatal("observer is blocked")
  51. })
  52. mu.Lock() // block the callback
  53. for i := 0; i < 2*observerChannelBufferSize; i++ {
  54. observer.sendRegisteringEvent(0)
  55. }
  56. if pending := timeout.Stop(); pending {
  57. // release the callback if timer hasn't expired yet
  58. mu.Unlock()
  59. }
  60. }
  61. type eventCollectorSink struct {
  62. observedEvents []Event
  63. mu sync.Mutex
  64. }
  65. func (s *eventCollectorSink) OnTunnelEvent(event Event) {
  66. s.mu.Lock()
  67. defer s.mu.Unlock()
  68. s.observedEvents = append(s.observedEvents, event)
  69. }
  70. func (s *eventCollectorSink) assertSawEvent(t *testing.T, event Event) {
  71. s.mu.Lock()
  72. defer s.mu.Unlock()
  73. assert.Contains(t, s.observedEvents, event)
  74. }