origin_connection_test.go 8.3 KB


  1. package ingress
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "net"
  8. "net/http"
  9. "net/http/httptest"
  10. "net/url"
  11. "testing"
  12. "time"
  13. "github.com/gobwas/ws/wsutil"
  14. gorillaWS "github.com/gorilla/websocket"
  15. "github.com/stretchr/testify/assert"
  16. "github.com/stretchr/testify/require"
  17. "golang.org/x/net/proxy"
  18. "golang.org/x/sync/errgroup"
  19. "github.com/cloudflare/cloudflared/socks"
  20. "github.com/cloudflare/cloudflared/stream"
  21. "github.com/cloudflare/cloudflared/websocket"
  22. )
  23. const (
  24. testStreamTimeout = time.Second * 3
  25. echoHeaderName = "Test-Cloudflared-Echo"
  26. )
  27. var (
  28. testMessage = []byte("TestStreamOriginConnection")
  29. testResponse = []byte(fmt.Sprintf("echo-%s", testMessage))
  30. )
  31. func TestStreamTCPConnection(t *testing.T) {
  32. cfdConn, originConn := net.Pipe()
  33. tcpConn := tcpConnection{
  34. Conn: cfdConn,
  35. writeTimeout: 30 * time.Second,
  36. }
  37. eyeballConn, edgeConn := net.Pipe()
  38. ctx, cancel := context.WithTimeout(context.Background(), testStreamTimeout)
  39. defer cancel()
  40. errGroup, ctx := errgroup.WithContext(ctx)
  41. errGroup.Go(func() error {
  42. _, err := eyeballConn.Write(testMessage)
  43. require.NoError(t, err)
  44. readBuffer := make([]byte, len(testResponse))
  45. _, err = eyeballConn.Read(readBuffer)
  46. require.NoError(t, err)
  47. require.Equal(t, testResponse, readBuffer)
  48. return nil
  49. })
  50. errGroup.Go(func() error {
  51. echoTCPOrigin(t, originConn)
  52. originConn.Close()
  53. return nil
  54. })
  55. tcpConn.Stream(ctx, edgeConn, TestLogger)
  56. require.NoError(t, errGroup.Wait())
  57. }
  58. func TestDefaultStreamWSOverTCPConnection(t *testing.T) {
  59. cfdConn, originConn := net.Pipe()
  60. tcpOverWSConn := tcpOverWSConnection{
  61. conn: cfdConn,
  62. streamHandler: DefaultStreamHandler,
  63. }
  64. eyeballConn, edgeConn := net.Pipe()
  65. ctx, cancel := context.WithTimeout(context.Background(), testStreamTimeout)
  66. defer cancel()
  67. errGroup, ctx := errgroup.WithContext(ctx)
  68. errGroup.Go(func() error {
  69. echoWSEyeball(t, eyeballConn)
  70. return nil
  71. })
  72. errGroup.Go(func() error {
  73. echoTCPOrigin(t, originConn)
  74. originConn.Close()
  75. return nil
  76. })
  77. tcpOverWSConn.Stream(ctx, edgeConn, TestLogger)
  78. require.NoError(t, errGroup.Wait())
  79. }
  80. // TestSocksStreamWSOverTCPConnection simulates proxying in socks mode.
  81. // Eyeball side runs cloudflared access tcp with --url flag to start a websocket forwarder which
  82. // wraps SOCKS5 traffic in websocket
  83. // Origin side runs a tcpOverWSConnection with socks.StreamHandler
  84. func TestSocksStreamWSOverTCPConnection(t *testing.T) {
  85. var (
  86. sendMessage = t.Name()
  87. echoHeaderIncomingValue = fmt.Sprintf("header-%s", sendMessage)
  88. echoMessage = fmt.Sprintf("echo-%s", sendMessage)
  89. echoHeaderReturnValue = fmt.Sprintf("echo-%s", echoHeaderIncomingValue)
  90. )
  91. statusCodes := []int{
  92. http.StatusOK,
  93. http.StatusTemporaryRedirect,
  94. http.StatusBadRequest,
  95. http.StatusInternalServerError,
  96. }
  97. for _, status := range statusCodes {
  98. handler := func(w http.ResponseWriter, r *http.Request) {
  99. body, err := io.ReadAll(r.Body)
  100. require.NoError(t, err)
  101. require.Equal(t, []byte(sendMessage), body)
  102. require.Equal(t, echoHeaderIncomingValue, r.Header.Get(echoHeaderName))
  103. w.Header().Set(echoHeaderName, echoHeaderReturnValue)
  104. w.WriteHeader(status)
  105. w.Write([]byte(echoMessage))
  106. }
  107. origin := httptest.NewServer(http.HandlerFunc(handler))
  108. defer origin.Close()
  109. originURL, err := url.Parse(origin.URL)
  110. require.NoError(t, err)
  111. originConn, err := net.Dial("tcp", originURL.Host)
  112. require.NoError(t, err)
  113. tcpOverWSConn := tcpOverWSConnection{
  114. conn: originConn,
  115. streamHandler: socks.StreamHandler,
  116. }
  117. wsForwarderOutConn, edgeConn := net.Pipe()
  118. ctx, cancel := context.WithTimeout(context.Background(), testStreamTimeout)
  119. defer cancel()
  120. errGroup, ctx := errgroup.WithContext(ctx)
  121. errGroup.Go(func() error {
  122. tcpOverWSConn.Stream(ctx, edgeConn, TestLogger)
  123. return nil
  124. })
  125. wsForwarderListener, err := net.Listen("tcp", "127.0.0.1:0")
  126. require.NoError(t, err)
  127. errGroup.Go(func() error {
  128. wsForwarderInConn, err := wsForwarderListener.Accept()
  129. require.NoError(t, err)
  130. defer wsForwarderInConn.Close()
  131. stream.Pipe(wsForwarderInConn, &wsEyeball{wsForwarderOutConn}, TestLogger)
  132. return nil
  133. })
  134. eyeballDialer, err := proxy.SOCKS5("tcp", wsForwarderListener.Addr().String(), nil, proxy.Direct)
  135. require.NoError(t, err)
  136. transport := &http.Transport{
  137. Dial: eyeballDialer.Dial,
  138. }
  139. // Request URL doesn't matter because the transport is using eyeballDialer to connectq
  140. req, err := http.NewRequestWithContext(ctx, "GET", "http://test-socks-stream.com", bytes.NewBuffer([]byte(sendMessage)))
  141. assert.NoError(t, err)
  142. req.Header.Set(echoHeaderName, echoHeaderIncomingValue)
  143. resp, err := transport.RoundTrip(req)
  144. assert.NoError(t, err)
  145. assert.Equal(t, status, resp.StatusCode)
  146. require.Equal(t, echoHeaderReturnValue, resp.Header.Get(echoHeaderName))
  147. body, err := io.ReadAll(resp.Body)
  148. require.NoError(t, err)
  149. require.Equal(t, []byte(echoMessage), body)
  150. wsForwarderOutConn.Close()
  151. edgeConn.Close()
  152. tcpOverWSConn.Close()
  153. require.NoError(t, errGroup.Wait())
  154. }
  155. }
  156. func TestWsConnReturnsBeforeStreamReturns(t *testing.T) {
  157. handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  158. eyeballConn := &readWriter{
  159. w: w,
  160. r: r.Body,
  161. }
  162. cfdConn, originConn := net.Pipe()
  163. tcpOverWSConn := tcpOverWSConnection{
  164. conn: cfdConn,
  165. streamHandler: DefaultStreamHandler,
  166. }
  167. go func() {
  168. time.Sleep(time.Millisecond * 10)
  169. // Simulate losing connection to origin
  170. originConn.Close()
  171. }()
  172. ctx := context.WithValue(r.Context(), websocket.PingPeriodContextKey, time.Microsecond)
  173. tcpOverWSConn.Stream(ctx, eyeballConn, TestLogger)
  174. })
  175. server := httptest.NewServer(handler)
  176. defer server.Close()
  177. client := server.Client()
  178. ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
  179. defer cancel()
  180. errGroup, ctx := errgroup.WithContext(ctx)
  181. for i := 0; i < 50; i++ {
  182. eyeballConn, edgeConn := net.Pipe()
  183. req, err := http.NewRequestWithContext(ctx, http.MethodConnect, server.URL, edgeConn)
  184. assert.NoError(t, err)
  185. resp, err := client.Transport.RoundTrip(req)
  186. assert.NoError(t, err)
  187. assert.Equal(t, resp.StatusCode, http.StatusOK)
  188. errGroup.Go(func() error {
  189. for {
  190. if err := wsutil.WriteClientBinary(eyeballConn, testMessage); err != nil {
  191. return nil
  192. }
  193. }
  194. })
  195. }
  196. assert.NoError(t, errGroup.Wait())
  197. }
  198. type wsEyeball struct {
  199. conn net.Conn
  200. }
  201. func (wse *wsEyeball) Read(p []byte) (int, error) {
  202. data, err := wsutil.ReadServerBinary(wse.conn)
  203. if err != nil {
  204. return 0, err
  205. }
  206. return copy(p, data), nil
  207. }
  208. func (wse *wsEyeball) Write(p []byte) (int, error) {
  209. err := wsutil.WriteClientBinary(wse.conn, p)
  210. return len(p), err
  211. }
  212. func echoWSEyeball(t *testing.T, conn net.Conn) {
  213. defer func() {
  214. assert.NoError(t, conn.Close())
  215. }()
  216. if !assert.NoError(t, wsutil.WriteClientBinary(conn, testMessage)) {
  217. return
  218. }
  219. readMsg, err := wsutil.ReadServerBinary(conn)
  220. if !assert.NoError(t, err) {
  221. return
  222. }
  223. assert.Equal(t, testResponse, readMsg)
  224. }
  225. func echoWSOrigin(t *testing.T, expectMessages bool) *httptest.Server {
  226. var upgrader = gorillaWS.Upgrader{
  227. ReadBufferSize: 10,
  228. WriteBufferSize: 10,
  229. }
  230. ws := func(w http.ResponseWriter, r *http.Request) {
  231. header := make(http.Header)
  232. for k, vs := range r.Header {
  233. if k == "Test-Cloudflared-Echo" {
  234. header[k] = vs
  235. }
  236. }
  237. conn, err := upgrader.Upgrade(w, r, header)
  238. require.NoError(t, err)
  239. defer conn.Close()
  240. sawMessage := false
  241. for {
  242. messageType, p, err := conn.ReadMessage()
  243. if err != nil {
  244. if expectMessages && !sawMessage {
  245. t.Errorf("unexpected error: %v", err)
  246. }
  247. return
  248. }
  249. assert.Equal(t, testMessage, p)
  250. sawMessage = true
  251. if err := conn.WriteMessage(messageType, testResponse); err != nil {
  252. return
  253. }
  254. }
  255. }
  256. // NewTLSServer starts the server in another thread
  257. return httptest.NewTLSServer(http.HandlerFunc(ws))
  258. }
  259. func echoTCPOrigin(t *testing.T, conn net.Conn) {
  260. readBuffer := make([]byte, len(testMessage))
  261. _, err := conn.Read(readBuffer)
  262. assert.NoError(t, err)
  263. assert.Equal(t, testMessage, readBuffer)
  264. _, err = conn.Write(testResponse)
  265. assert.NoError(t, err)
  266. }
  267. type readWriter struct {
  268. w io.Writer
  269. r io.Reader
  270. }
  271. func (r *readWriter) Read(p []byte) (n int, err error) {
  272. return r.r.Read(p)
  273. }
  274. func (r *readWriter) Write(p []byte) (n int, err error) {
  275. return r.w.Write(p)
  276. }