manager_test.go 6.4 KB


  1. package datagramsession
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "net"
  8. "sync"
  9. "testing"
  10. "time"
  11. "github.com/google/uuid"
  12. "github.com/rs/zerolog"
  13. "github.com/stretchr/testify/require"
  14. "golang.org/x/sync/errgroup"
  15. "github.com/cloudflare/cloudflared/packet"
  16. )
  17. var (
  18. nopLogger = zerolog.Nop()
  19. )
  20. func TestManagerServe(t *testing.T) {
  21. const (
  22. sessions = 2
  23. msgs = 5
  24. remoteUnregisterMsg = "eyeball closed connection"
  25. )
  26. requestChan := make(chan *packet.Session)
  27. transport := mockQUICTransport{
  28. sessions: make(map[uuid.UUID]chan []byte),
  29. }
  30. for i := 0; i < sessions; i++ {
  31. transport.sessions[uuid.New()] = make(chan []byte)
  32. }
  33. mg := NewManager(&nopLogger, transport.MuxSession, requestChan)
  34. ctx, cancel := context.WithCancel(context.Background())
  35. serveDone := make(chan struct{})
  36. go func(ctx context.Context) {
  37. mg.Serve(ctx)
  38. close(serveDone)
  39. }(ctx)
  40. errGroup, ctx := errgroup.WithContext(ctx)
  41. for sessionID, eyeballRespChan := range transport.sessions {
  42. // Assign loop variables to local variables
  43. sID := sessionID
  44. payload := testPayload(sID)
  45. expectResp := testResponse(payload)
  46. cfdConn, originConn := net.Pipe()
  47. origin := mockOrigin{
  48. expectMsgCount: msgs,
  49. expectedMsg: payload,
  50. expectedResp: expectResp,
  51. conn: originConn,
  52. }
  53. eyeball := mockEyeballSession{
  54. id: sID,
  55. expectedMsgCount: msgs,
  56. expectedMsg: payload,
  57. expectedResponse: expectResp,
  58. respReceiver: eyeballRespChan,
  59. }
  60. // Assign loop variables to local variables
  61. errGroup.Go(func() error {
  62. session, err := mg.RegisterSession(ctx, sID, cfdConn)
  63. require.NoError(t, err)
  64. reqErrGroup, reqCtx := errgroup.WithContext(ctx)
  65. reqErrGroup.Go(func() error {
  66. return origin.serve()
  67. })
  68. reqErrGroup.Go(func() error {
  69. return eyeball.serve(reqCtx, requestChan)
  70. })
  71. sessionDone := make(chan struct{})
  72. go func() {
  73. closedByRemote, err := session.Serve(ctx, time.Minute*2)
  74. closeSession := &errClosedSession{
  75. message: remoteUnregisterMsg,
  76. byRemote: true,
  77. }
  78. require.Equal(t, closeSession, err)
  79. require.True(t, closedByRemote)
  80. close(sessionDone)
  81. }()
  82. // Make sure eyeball and origin have received all messages before unregistering the session
  83. require.NoError(t, reqErrGroup.Wait())
  84. require.NoError(t, mg.UnregisterSession(ctx, sID, remoteUnregisterMsg, true))
  85. <-sessionDone
  86. return nil
  87. })
  88. }
  89. require.NoError(t, errGroup.Wait())
  90. cancel()
  91. <-serveDone
  92. }
  93. func TestTimeout(t *testing.T) {
  94. const (
  95. testTimeout = time.Millisecond * 50
  96. )
  97. mg := NewManager(&nopLogger, nil, nil)
  98. mg.timeout = testTimeout
  99. ctx := context.Background()
  100. sessionID := uuid.New()
  101. // session manager is not running, so event loop is not running and therefore calling the APIs should timeout
  102. session, err := mg.RegisterSession(ctx, sessionID, nil)
  103. require.ErrorIs(t, err, context.DeadlineExceeded)
  104. require.Nil(t, session)
  105. err = mg.UnregisterSession(ctx, sessionID, "session gone", true)
  106. require.ErrorIs(t, err, context.DeadlineExceeded)
  107. }
  108. func TestUnregisterSessionCloseSession(t *testing.T) {
  109. sessionID := uuid.New()
  110. payload := []byte(t.Name())
  111. sender := newMockTransportSender(sessionID, payload)
  112. mg := NewManager(&nopLogger, sender.muxSession, nil)
  113. ctx, cancel := context.WithCancel(context.Background())
  114. managerDone := make(chan struct{})
  115. go func() {
  116. err := mg.Serve(ctx)
  117. require.Error(t, err)
  118. close(managerDone)
  119. }()
  120. cfdConn, originConn := net.Pipe()
  121. session, err := mg.RegisterSession(ctx, sessionID, cfdConn)
  122. require.NoError(t, err)
  123. require.NotNil(t, session)
  124. unregisteredChan := make(chan struct{})
  125. go func() {
  126. _, err := originConn.Write(payload)
  127. require.NoError(t, err)
  128. err = mg.UnregisterSession(ctx, sessionID, "eyeball closed session", true)
  129. require.NoError(t, err)
  130. close(unregisteredChan)
  131. }()
  132. closedByRemote, err := session.Serve(ctx, time.Minute)
  133. require.True(t, closedByRemote)
  134. require.Error(t, err)
  135. <-unregisteredChan
  136. cancel()
  137. <-managerDone
  138. }
  139. func TestManagerCtxDoneCloseSessions(t *testing.T) {
  140. sessionID := uuid.New()
  141. payload := []byte(t.Name())
  142. sender := newMockTransportSender(sessionID, payload)
  143. mg := NewManager(&nopLogger, sender.muxSession, nil)
  144. ctx, cancel := context.WithCancel(context.Background())
  145. var wg sync.WaitGroup
  146. wg.Add(1)
  147. go func() {
  148. defer wg.Done()
  149. err := mg.Serve(ctx)
  150. require.Error(t, err)
  151. }()
  152. cfdConn, originConn := net.Pipe()
  153. session, err := mg.RegisterSession(ctx, sessionID, cfdConn)
  154. require.NoError(t, err)
  155. require.NotNil(t, session)
  156. wg.Add(1)
  157. go func() {
  158. defer wg.Done()
  159. _, err := originConn.Write(payload)
  160. require.NoError(t, err)
  161. cancel()
  162. }()
  163. closedByRemote, err := session.Serve(ctx, time.Minute)
  164. require.False(t, closedByRemote)
  165. require.Error(t, err)
  166. wg.Wait()
  167. }
  168. type mockOrigin struct {
  169. expectMsgCount int
  170. expectedMsg []byte
  171. expectedResp []byte
  172. conn io.ReadWriteCloser
  173. }
  174. func (mo *mockOrigin) serve() error {
  175. expectedMsgLen := len(mo.expectedMsg)
  176. readBuffer := make([]byte, expectedMsgLen+1)
  177. for i := 0; i < mo.expectMsgCount; i++ {
  178. n, err := mo.conn.Read(readBuffer)
  179. if err != nil {
  180. return err
  181. }
  182. if n != expectedMsgLen {
  183. return fmt.Errorf("Expect to read %d bytes, read %d", expectedMsgLen, n)
  184. }
  185. if !bytes.Equal(readBuffer[:n], mo.expectedMsg) {
  186. return fmt.Errorf("Expect %v, read %v", mo.expectedMsg, readBuffer[:n])
  187. }
  188. _, err = mo.conn.Write(mo.expectedResp)
  189. if err != nil {
  190. return err
  191. }
  192. }
  193. return nil
  194. }
  195. func testPayload(sessionID uuid.UUID) []byte {
  196. return []byte(fmt.Sprintf("Message from %s", sessionID))
  197. }
  198. func testResponse(msg []byte) []byte {
  199. return []byte(fmt.Sprintf("Response to %v", msg))
  200. }
  201. type mockQUICTransport struct {
  202. sessions map[uuid.UUID]chan []byte
  203. }
  204. func (me *mockQUICTransport) MuxSession(session *packet.Session) error {
  205. s := me.sessions[session.ID]
  206. s <- session.Payload
  207. return nil
  208. }
  209. type mockEyeballSession struct {
  210. id uuid.UUID
  211. expectedMsgCount int
  212. expectedMsg []byte
  213. expectedResponse []byte
  214. respReceiver <-chan []byte
  215. }
  216. func (me *mockEyeballSession) serve(ctx context.Context, requestChan chan *packet.Session) error {
  217. for i := 0; i < me.expectedMsgCount; i++ {
  218. requestChan <- &packet.Session{
  219. ID: me.id,
  220. Payload: me.expectedMsg,
  221. }
  222. resp := <-me.respReceiver
  223. if !bytes.Equal(resp, me.expectedResponse) {
  224. return fmt.Errorf("Expect %v, read %v", me.expectedResponse, resp)
  225. }
  226. fmt.Println("Resp", resp)
  227. }
  228. return nil
  229. }