orchestrator_test.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765
  1. package orchestration
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net"
  8. "net/http"
  9. "net/http/httptest"
  10. "sync"
  11. "testing"
  12. "time"
  13. "github.com/gobwas/ws/wsutil"
  14. "github.com/google/uuid"
  15. gows "github.com/gorilla/websocket"
  16. "github.com/rs/zerolog"
  17. "github.com/stretchr/testify/require"
  18. "github.com/cloudflare/cloudflared/config"
  19. "github.com/cloudflare/cloudflared/connection"
  20. "github.com/cloudflare/cloudflared/ingress"
  21. "github.com/cloudflare/cloudflared/management"
  22. "github.com/cloudflare/cloudflared/tracing"
  23. "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
  24. )
  25. var (
  26. testLogger = zerolog.Nop()
  27. testTags = []pogs.Tag{
  28. {
  29. Name: "package",
  30. Value: "orchestration",
  31. },
  32. {
  33. Name: "purpose",
  34. Value: "test",
  35. },
  36. }
  37. )
  38. // TestUpdateConfiguration tests that
  39. // - configurations can be deserialized
  40. // - proxy can be updated
  41. // - last applied version and error are returned
  42. // - configurations can be deserialized
  43. // - receiving an old version is noop
  44. func TestUpdateConfiguration(t *testing.T) {
  45. initConfig := &Config{
  46. Ingress: &ingress.Ingress{},
  47. }
  48. orchestrator, err := NewOrchestrator(context.Background(), initConfig, testTags, []ingress.Rule{ingress.NewManagementRule(management.New("management.argotunnel.com", false, "1.1.1.1:80", uuid.Nil, "", &testLogger, nil))}, &testLogger)
  49. require.NoError(t, err)
  50. initOriginProxy, err := orchestrator.GetOriginProxy()
  51. require.NoError(t, err)
  52. require.Implements(t, (*connection.OriginProxy)(nil), initOriginProxy)
  53. configJSONV2 := []byte(`
  54. {
  55. "unknown_field": "not_deserialized",
  56. "originRequest": {
  57. "connectTimeout": 90,
  58. "noHappyEyeballs": true
  59. },
  60. "ingress": [
  61. {
  62. "hostname": "jira.tunnel.org",
  63. "path": "^\/login",
  64. "service": "http://192.16.19.1:443",
  65. "originRequest": {
  66. "noTLSVerify": true,
  67. "connectTimeout": 10
  68. }
  69. },
  70. {
  71. "hostname": "jira.tunnel.org",
  72. "service": "http://172.32.20.6:80",
  73. "originRequest": {
  74. "noTLSVerify": true,
  75. "connectTimeout": 30
  76. }
  77. },
  78. {
  79. "service": "http_status:404"
  80. }
  81. ],
  82. "warp-routing": {
  83. "connectTimeout": 10
  84. }
  85. }
  86. `)
  87. updateWithValidation(t, orchestrator, 2, configJSONV2)
  88. configV2 := orchestrator.config
  89. // Validate internal ingress rules
  90. require.Equal(t, "management.argotunnel.com", configV2.Ingress.InternalRules[0].Hostname)
  91. require.True(t, configV2.Ingress.InternalRules[0].Matches("management.argotunnel.com", "/ping"))
  92. require.Equal(t, "management", configV2.Ingress.InternalRules[0].Service.String())
  93. // Validate ingress rule 0
  94. require.Equal(t, "jira.tunnel.org", configV2.Ingress.Rules[0].Hostname)
  95. require.True(t, configV2.Ingress.Rules[0].Matches("jira.tunnel.org", "/login"))
  96. require.True(t, configV2.Ingress.Rules[0].Matches("jira.tunnel.org", "/login/2fa"))
  97. require.False(t, configV2.Ingress.Rules[0].Matches("jira.tunnel.org", "/users"))
  98. require.Equal(t, "http://192.16.19.1:443", configV2.Ingress.Rules[0].Service.String())
  99. require.Len(t, configV2.Ingress.Rules, 3)
  100. // originRequest of this ingress rule overrides global default
  101. require.Equal(t, config.CustomDuration{Duration: time.Second * 10}, configV2.Ingress.Rules[0].Config.ConnectTimeout)
  102. require.Equal(t, true, configV2.Ingress.Rules[0].Config.NoTLSVerify)
  103. // Inherited from global default
  104. require.Equal(t, true, configV2.Ingress.Rules[0].Config.NoHappyEyeballs)
  105. // Validate ingress rule 1
  106. require.Equal(t, "jira.tunnel.org", configV2.Ingress.Rules[1].Hostname)
  107. require.True(t, configV2.Ingress.Rules[1].Matches("jira.tunnel.org", "/users"))
  108. require.Equal(t, "http://172.32.20.6:80", configV2.Ingress.Rules[1].Service.String())
  109. // originRequest of this ingress rule overrides global default
  110. require.Equal(t, config.CustomDuration{Duration: time.Second * 30}, configV2.Ingress.Rules[1].Config.ConnectTimeout)
  111. require.Equal(t, true, configV2.Ingress.Rules[1].Config.NoTLSVerify)
  112. // Inherited from global default
  113. require.Equal(t, true, configV2.Ingress.Rules[1].Config.NoHappyEyeballs)
  114. // Validate ingress rule 2, it's the catch-all rule
  115. require.True(t, configV2.Ingress.Rules[2].Matches("blogs.tunnel.io", "/2022/02/10"))
  116. // Inherited from global default
  117. require.Equal(t, config.CustomDuration{Duration: time.Second * 90}, configV2.Ingress.Rules[2].Config.ConnectTimeout)
  118. require.Equal(t, false, configV2.Ingress.Rules[2].Config.NoTLSVerify)
  119. require.Equal(t, true, configV2.Ingress.Rules[2].Config.NoHappyEyeballs)
  120. require.Equal(t, configV2.WarpRouting.ConnectTimeout.Duration, 10*time.Second)
  121. originProxyV2, err := orchestrator.GetOriginProxy()
  122. require.NoError(t, err)
  123. require.Implements(t, (*connection.OriginProxy)(nil), originProxyV2)
  124. require.NotEqual(t, originProxyV2, initOriginProxy)
  125. // Should not downgrade to an older version
  126. resp := orchestrator.UpdateConfig(1, nil)
  127. require.NoError(t, resp.Err)
  128. require.Equal(t, int32(2), resp.LastAppliedVersion)
  129. invalidJSON := []byte(`
  130. {
  131. "originRequest":
  132. }
  133. `)
  134. resp = orchestrator.UpdateConfig(3, invalidJSON)
  135. require.Error(t, resp.Err)
  136. require.Equal(t, int32(2), resp.LastAppliedVersion)
  137. originProxyV3, err := orchestrator.GetOriginProxy()
  138. require.NoError(t, err)
  139. require.Equal(t, originProxyV2, originProxyV3)
  140. configJSONV10 := []byte(`
  141. {
  142. "ingress": [
  143. {
  144. "service": "hello-world"
  145. }
  146. ],
  147. "warp-routing": {
  148. }
  149. }
  150. `)
  151. updateWithValidation(t, orchestrator, 10, configJSONV10)
  152. configV10 := orchestrator.config
  153. require.Len(t, configV10.Ingress.Rules, 1)
  154. require.True(t, configV10.Ingress.Rules[0].Matches("blogs.tunnel.io", "/2022/02/10"))
  155. require.Equal(t, ingress.HelloWorldService, configV10.Ingress.Rules[0].Service.String())
  156. originProxyV10, err := orchestrator.GetOriginProxy()
  157. require.NoError(t, err)
  158. require.Implements(t, (*connection.OriginProxy)(nil), originProxyV10)
  159. require.NotEqual(t, originProxyV10, originProxyV2)
  160. }
  161. // Validates that a new version 0 will be applied if the configuration is loaded locally.
  162. // This will happen when a locally managed tunnel is migrated to remote configuration and receives its first configuration.
  163. func TestUpdateConfiguration_FromMigration(t *testing.T) {
  164. initConfig := &Config{
  165. Ingress: &ingress.Ingress{},
  166. }
  167. orchestrator, err := NewOrchestrator(context.Background(), initConfig, testTags, []ingress.Rule{}, &testLogger)
  168. require.NoError(t, err)
  169. initOriginProxy, err := orchestrator.GetOriginProxy()
  170. require.NoError(t, err)
  171. require.Implements(t, (*connection.OriginProxy)(nil), initOriginProxy)
  172. configJSONV2 := []byte(`
  173. {
  174. "ingress": [
  175. {
  176. "service": "http_status:404"
  177. }
  178. ],
  179. "warp-routing": {
  180. }
  181. }
  182. `)
  183. updateWithValidation(t, orchestrator, 0, configJSONV2)
  184. require.Len(t, orchestrator.config.Ingress.Rules, 1)
  185. }
  186. // Validates that the default ingress rule will be set if there is no rule provided from the remote.
  187. func TestUpdateConfiguration_WithoutIngressRule(t *testing.T) {
  188. initConfig := &Config{
  189. Ingress: &ingress.Ingress{},
  190. }
  191. orchestrator, err := NewOrchestrator(context.Background(), initConfig, testTags, []ingress.Rule{}, &testLogger)
  192. require.NoError(t, err)
  193. initOriginProxy, err := orchestrator.GetOriginProxy()
  194. require.NoError(t, err)
  195. require.Implements(t, (*connection.OriginProxy)(nil), initOriginProxy)
  196. // We need to create an empty RemoteConfigJSON because that will get unmarshalled to a RemoteConfig
  197. emptyConfig := &ingress.RemoteConfigJSON{}
  198. configBytes, err := json.Marshal(emptyConfig)
  199. if err != nil {
  200. require.FailNow(t, "The RemoteConfigJSON shouldn't fail while being marshalled")
  201. }
  202. updateWithValidation(t, orchestrator, 0, configBytes)
  203. require.Len(t, orchestrator.config.Ingress.Rules, 1)
  204. }
  205. // TestConcurrentUpdateAndRead makes sure orchestrator can receive updates and return origin proxy concurrently
  206. func TestConcurrentUpdateAndRead(t *testing.T) {
  207. const (
  208. concurrentRequests = 200
  209. hostname = "public.tunnels.org"
  210. expectedHost = "internal.tunnels.svc.cluster.local"
  211. tcpBody = "testProxyTCP"
  212. )
  213. httpOrigin := httptest.NewServer(&validateHostHandler{
  214. expectedHost: expectedHost,
  215. body: t.Name(),
  216. })
  217. defer httpOrigin.Close()
  218. tcpOrigin, err := net.Listen("tcp", "127.0.0.1:0")
  219. require.NoError(t, err)
  220. defer tcpOrigin.Close()
  221. var (
  222. configJSONV1 = []byte(fmt.Sprintf(`
  223. {
  224. "originRequest": {
  225. "connectTimeout": 90,
  226. "noHappyEyeballs": true
  227. },
  228. "ingress": [
  229. {
  230. "hostname": "%s",
  231. "service": "%s",
  232. "originRequest": {
  233. "httpHostHeader": "%s",
  234. "connectTimeout": 10
  235. }
  236. },
  237. {
  238. "service": "http_status:404"
  239. }
  240. ],
  241. "warp-routing": {
  242. }
  243. }
  244. `, hostname, httpOrigin.URL, expectedHost))
  245. configJSONV2 = []byte(`
  246. {
  247. "ingress": [
  248. {
  249. "service": "http_status:204"
  250. }
  251. ],
  252. "warp-routing": {
  253. }
  254. }
  255. `)
  256. configJSONV3 = []byte(`
  257. {
  258. "ingress": [
  259. {
  260. "service": "http_status:418"
  261. }
  262. ],
  263. "warp-routing": {
  264. }
  265. }
  266. `)
  267. // appliedV2 makes sure v3 is applied after v2
  268. appliedV2 = make(chan struct{})
  269. initConfig = &Config{
  270. Ingress: &ingress.Ingress{},
  271. }
  272. )
  273. ctx, cancel := context.WithCancel(context.Background())
  274. defer cancel()
  275. orchestrator, err := NewOrchestrator(ctx, initConfig, testTags, []ingress.Rule{}, &testLogger)
  276. require.NoError(t, err)
  277. updateWithValidation(t, orchestrator, 1, configJSONV1)
  278. var wg sync.WaitGroup
  279. // tcpOrigin will be closed when the test exits. Only the handler routines are included in the wait group
  280. go func() {
  281. serveTCPOrigin(t, tcpOrigin, &wg)
  282. }()
  283. for i := 0; i < concurrentRequests; i++ {
  284. originProxy, err := orchestrator.GetOriginProxy()
  285. require.NoError(t, err)
  286. wg.Add(1)
  287. go func(i int, originProxy connection.OriginProxy) {
  288. defer wg.Done()
  289. resp, err := proxyHTTP(originProxy, hostname)
  290. require.NoError(t, err, "proxyHTTP %d failed %v", i, err)
  291. defer resp.Body.Close()
  292. var warpRoutingDisabled bool
  293. // The response can be from initOrigin, http_status:204 or http_status:418
  294. switch resp.StatusCode {
  295. // v1 proxy, warp enabled
  296. case 200:
  297. body, err := io.ReadAll(resp.Body)
  298. require.NoError(t, err)
  299. require.Equal(t, t.Name(), string(body))
  300. warpRoutingDisabled = false
  301. // v2 proxy, warp disabled
  302. case 204:
  303. require.Greater(t, i, concurrentRequests/4)
  304. warpRoutingDisabled = true
  305. // v3 proxy, warp enabled
  306. case 418:
  307. require.Greater(t, i, concurrentRequests/2)
  308. warpRoutingDisabled = false
  309. }
  310. // Once we have originProxy, it won't be changed by configuration updates.
  311. // We can infer the version by the ProxyHTTP response code
  312. pr, pw := io.Pipe()
  313. w := newRespReadWriteFlusher()
  314. // Write TCP message and make sure it's echo back. This has to be done in a go routune since ProxyTCP doesn't
  315. // return until the stream is closed.
  316. if !warpRoutingDisabled {
  317. wg.Add(1)
  318. go func() {
  319. defer wg.Done()
  320. defer pw.Close()
  321. tcpEyeball(t, pw, tcpBody, w)
  322. }()
  323. }
  324. err = proxyTCP(ctx, originProxy, tcpOrigin.Addr().String(), w, pr)
  325. if warpRoutingDisabled {
  326. require.Error(t, err, "expect proxyTCP %d to return error", i)
  327. } else {
  328. require.NoError(t, err, "proxyTCP %d failed %v", i, err)
  329. }
  330. }(i, originProxy)
  331. if i == concurrentRequests/4 {
  332. wg.Add(1)
  333. go func() {
  334. defer wg.Done()
  335. updateWithValidation(t, orchestrator, 2, configJSONV2)
  336. close(appliedV2)
  337. }()
  338. }
  339. if i == concurrentRequests/2 {
  340. wg.Add(1)
  341. go func() {
  342. defer wg.Done()
  343. // Makes sure v2 is applied before v3
  344. <-appliedV2
  345. updateWithValidation(t, orchestrator, 3, configJSONV3)
  346. }()
  347. }
  348. }
  349. wg.Wait()
  350. }
  351. func proxyHTTP(originProxy connection.OriginProxy, hostname string) (*http.Response, error) {
  352. req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s", hostname), nil)
  353. if err != nil {
  354. return nil, err
  355. }
  356. w := httptest.NewRecorder()
  357. log := zerolog.Nop()
  358. respWriter, err := connection.NewHTTP2RespWriter(req, w, connection.TypeHTTP, &log)
  359. if err != nil {
  360. return nil, err
  361. }
  362. err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedHTTPRequest(req, 0, &log), false)
  363. if err != nil {
  364. return nil, err
  365. }
  366. return w.Result(), nil
  367. }
  368. func tcpEyeball(t *testing.T, reqWriter io.WriteCloser, body string, respReadWriter *respReadWriteFlusher) {
  369. writeN, err := reqWriter.Write([]byte(body))
  370. require.NoError(t, err)
  371. readBuffer := make([]byte, writeN)
  372. n, err := respReadWriter.Read(readBuffer)
  373. require.NoError(t, err)
  374. require.Equal(t, body, string(readBuffer[:n]))
  375. require.Equal(t, writeN, n)
  376. }
  377. func proxyTCP(ctx context.Context, originProxy connection.OriginProxy, originAddr string, w http.ResponseWriter, reqBody io.ReadCloser) error {
  378. req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s", originAddr), reqBody)
  379. if err != nil {
  380. return err
  381. }
  382. log := zerolog.Nop()
  383. respWriter, err := connection.NewHTTP2RespWriter(req, w, connection.TypeTCP, &log)
  384. if err != nil {
  385. return err
  386. }
  387. tcpReq := &connection.TCPRequest{
  388. Dest: originAddr,
  389. CFRay: "123",
  390. LBProbe: false,
  391. }
  392. rws := connection.NewHTTPResponseReadWriterAcker(respWriter, w.(http.Flusher), req)
  393. return originProxy.ProxyTCP(ctx, rws, tcpReq)
  394. }
  395. func serveTCPOrigin(t *testing.T, tcpOrigin net.Listener, wg *sync.WaitGroup) {
  396. for {
  397. conn, err := tcpOrigin.Accept()
  398. if err != nil {
  399. return
  400. }
  401. wg.Add(1)
  402. go func() {
  403. defer wg.Done()
  404. defer conn.Close()
  405. echoTCP(t, conn)
  406. }()
  407. }
  408. }
  409. func echoTCP(t *testing.T, conn net.Conn) {
  410. readBuf := make([]byte, 1000)
  411. readN, err := conn.Read(readBuf)
  412. require.NoError(t, err)
  413. writeN, err := conn.Write(readBuf[:readN])
  414. require.NoError(t, err)
  415. require.Equal(t, readN, writeN)
  416. }
  417. type validateHostHandler struct {
  418. expectedHost string
  419. body string
  420. }
  421. func (vhh *validateHostHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  422. if r.Host != vhh.expectedHost {
  423. w.WriteHeader(http.StatusBadRequest)
  424. return
  425. }
  426. w.WriteHeader(http.StatusOK)
  427. w.Write([]byte(vhh.body))
  428. }
  429. func updateWithValidation(t *testing.T, orchestrator *Orchestrator, version int32, config []byte) {
  430. resp := orchestrator.UpdateConfig(version, config)
  431. require.NoError(t, resp.Err)
  432. require.Equal(t, version, resp.LastAppliedVersion)
  433. }
  434. // TestClosePreviousProxies makes sure proxies started in the pervious configuration version are shutdown
  435. func TestClosePreviousProxies(t *testing.T) {
  436. var (
  437. hostname = "hello.tunnel1.org"
  438. configWithHelloWorld = []byte(fmt.Sprintf(`
  439. {
  440. "ingress": [
  441. {
  442. "hostname": "%s",
  443. "service": "hello-world"
  444. },
  445. {
  446. "service": "http_status:404"
  447. }
  448. ],
  449. "warp-routing": {
  450. }
  451. }
  452. `, hostname))
  453. configTeapot = []byte(`
  454. {
  455. "ingress": [
  456. {
  457. "service": "http_status:418"
  458. }
  459. ],
  460. "warp-routing": {
  461. }
  462. }
  463. `)
  464. initConfig = &Config{
  465. Ingress: &ingress.Ingress{},
  466. }
  467. )
  468. ctx, cancel := context.WithCancel(context.Background())
  469. orchestrator, err := NewOrchestrator(ctx, initConfig, testTags, []ingress.Rule{}, &testLogger)
  470. require.NoError(t, err)
  471. updateWithValidation(t, orchestrator, 1, configWithHelloWorld)
  472. originProxyV1, err := orchestrator.GetOriginProxy()
  473. require.NoError(t, err)
  474. resp, err := proxyHTTP(originProxyV1, hostname)
  475. require.NoError(t, err)
  476. require.Equal(t, http.StatusOK, resp.StatusCode)
  477. updateWithValidation(t, orchestrator, 2, configTeapot)
  478. originProxyV2, err := orchestrator.GetOriginProxy()
  479. require.NoError(t, err)
  480. resp, err = proxyHTTP(originProxyV2, hostname)
  481. require.NoError(t, err)
  482. require.Equal(t, http.StatusTeapot, resp.StatusCode)
  483. // The hello-world server in config v1 should have been stopped. We wait a bit since it's closed asynchronously.
  484. time.Sleep(time.Millisecond * 10)
  485. resp, err = proxyHTTP(originProxyV1, hostname)
  486. require.Error(t, err)
  487. require.Nil(t, resp)
  488. // Apply the config with hello world server again, orchestrator should spin up another hello world server
  489. updateWithValidation(t, orchestrator, 3, configWithHelloWorld)
  490. originProxyV3, err := orchestrator.GetOriginProxy()
  491. require.NoError(t, err)
  492. require.NotEqual(t, originProxyV1, originProxyV3)
  493. resp, err = proxyHTTP(originProxyV3, hostname)
  494. require.NoError(t, err)
  495. require.Equal(t, http.StatusOK, resp.StatusCode)
  496. // cancel the context should terminate the last proxy
  497. cancel()
  498. // Wait for proxies to shutdown
  499. time.Sleep(time.Millisecond * 10)
  500. resp, err = proxyHTTP(originProxyV3, hostname)
  501. require.Error(t, err)
  502. require.Nil(t, resp)
  503. }
  504. // TestPersistentConnection makes sure updating the ingress doesn't intefere with existing connections
  505. func TestPersistentConnection(t *testing.T) {
  506. const (
  507. hostname = "http://ws.tunnel.org"
  508. )
  509. msg := t.Name()
  510. initConfig := &Config{
  511. Ingress: &ingress.Ingress{},
  512. }
  513. orchestrator, err := NewOrchestrator(context.Background(), initConfig, testTags, []ingress.Rule{}, &testLogger)
  514. require.NoError(t, err)
  515. wsOrigin := httptest.NewServer(http.HandlerFunc(wsEcho))
  516. defer wsOrigin.Close()
  517. tcpOrigin, err := net.Listen("tcp", "127.0.0.1:0")
  518. require.NoError(t, err)
  519. defer tcpOrigin.Close()
  520. configWithWSAndWarp := []byte(fmt.Sprintf(`
  521. {
  522. "ingress": [
  523. {
  524. "service": "%s"
  525. }
  526. ],
  527. "warp-routing": {
  528. }
  529. }
  530. `, wsOrigin.URL))
  531. updateWithValidation(t, orchestrator, 1, configWithWSAndWarp)
  532. originProxy, err := orchestrator.GetOriginProxy()
  533. require.NoError(t, err)
  534. wsReqReader, wsReqWriter := io.Pipe()
  535. wsRespReadWriter := newRespReadWriteFlusher()
  536. tcpReqReader, tcpReqWriter := io.Pipe()
  537. tcpRespReadWriter := newRespReadWriteFlusher()
  538. ctx, cancel := context.WithCancel(context.Background())
  539. defer cancel()
  540. var wg sync.WaitGroup
  541. wg.Add(3)
  542. // Start TCP origin
  543. go func() {
  544. defer wg.Done()
  545. conn, err := tcpOrigin.Accept()
  546. require.NoError(t, err)
  547. defer conn.Close()
  548. // Expect 3 TCP messages
  549. for i := 0; i < 3; i++ {
  550. echoTCP(t, conn)
  551. }
  552. }()
  553. // Simulate cloudflared recieving a TCP connection
  554. go func() {
  555. defer wg.Done()
  556. require.NoError(t, proxyTCP(ctx, originProxy, tcpOrigin.Addr().String(), tcpRespReadWriter, tcpReqReader))
  557. }()
  558. // Simulate cloudflared recieving a WS connection
  559. go func() {
  560. defer wg.Done()
  561. req, err := http.NewRequest(http.MethodGet, hostname, wsReqReader)
  562. require.NoError(t, err)
  563. // ProxyHTTP will add Connection, Upgrade and Sec-Websocket-Version headers
  564. req.Header.Add("Sec-WebSocket-Key", "dGhlIHNhbXBsZSBub25jZQ==")
  565. log := zerolog.Nop()
  566. respWriter, err := connection.NewHTTP2RespWriter(req, wsRespReadWriter, connection.TypeWebsocket, &log)
  567. require.NoError(t, err)
  568. err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedHTTPRequest(req, 0, &log), true)
  569. require.NoError(t, err)
  570. }()
  571. // Simulate eyeball WS and TCP connections
  572. validateWsEcho(t, msg, wsReqWriter, wsRespReadWriter)
  573. tcpEyeball(t, tcpReqWriter, msg, tcpRespReadWriter)
  574. configNoWSAndWarp := []byte(`
  575. {
  576. "ingress": [
  577. {
  578. "service": "http_status:404"
  579. }
  580. ],
  581. "warp-routing": {
  582. }
  583. }
  584. `)
  585. updateWithValidation(t, orchestrator, 2, configNoWSAndWarp)
  586. // Make sure connection is still up
  587. validateWsEcho(t, msg, wsReqWriter, wsRespReadWriter)
  588. tcpEyeball(t, tcpReqWriter, msg, tcpRespReadWriter)
  589. updateWithValidation(t, orchestrator, 3, configWithWSAndWarp)
  590. // Make sure connection is still up
  591. validateWsEcho(t, msg, wsReqWriter, wsRespReadWriter)
  592. tcpEyeball(t, tcpReqWriter, msg, tcpRespReadWriter)
  593. wsReqWriter.Close()
  594. tcpReqWriter.Close()
  595. wg.Wait()
  596. }
  597. func TestSerializeLocalConfig(t *testing.T) {
  598. c := &newLocalConfig{
  599. RemoteConfig: ingress.RemoteConfig{
  600. Ingress: ingress.Ingress{},
  601. },
  602. ConfigurationFlags: map[string]string{"a": "b"},
  603. }
  604. result, _ := json.Marshal(c)
  605. fmt.Println(string(result))
  606. }
  607. func wsEcho(w http.ResponseWriter, r *http.Request) {
  608. upgrader := gows.Upgrader{}
  609. conn, err := upgrader.Upgrade(w, r, nil)
  610. if err != nil {
  611. return
  612. }
  613. defer conn.Close()
  614. for {
  615. mt, message, err := conn.ReadMessage()
  616. if err != nil {
  617. fmt.Println("read message err", err)
  618. break
  619. }
  620. err = conn.WriteMessage(mt, message)
  621. if err != nil {
  622. fmt.Println("write message err", err)
  623. break
  624. }
  625. }
  626. }
  627. func validateWsEcho(t *testing.T, msg string, reqWriter io.Writer, respReadWriter io.ReadWriter) {
  628. err := wsutil.WriteClientText(reqWriter, []byte(msg))
  629. require.NoError(t, err)
  630. receivedMsg, err := wsutil.ReadServerText(respReadWriter)
  631. require.NoError(t, err)
  632. require.Equal(t, msg, string(receivedMsg))
  633. }
  634. type respReadWriteFlusher struct {
  635. io.Reader
  636. w io.Writer
  637. headers http.Header
  638. statusCode int
  639. setStatusOnce sync.Once
  640. hasStatus chan struct{}
  641. }
  642. func newRespReadWriteFlusher() *respReadWriteFlusher {
  643. pr, pw := io.Pipe()
  644. return &respReadWriteFlusher{
  645. Reader: pr,
  646. w: pw,
  647. headers: make(http.Header),
  648. hasStatus: make(chan struct{}),
  649. }
  650. }
  651. func (rrw *respReadWriteFlusher) Write(buf []byte) (int, error) {
  652. rrw.WriteHeader(http.StatusOK)
  653. return rrw.w.Write(buf)
  654. }
  655. func (rrw *respReadWriteFlusher) Flush() {}
  656. func (rrw *respReadWriteFlusher) Header() http.Header {
  657. return rrw.headers
  658. }
  659. func (rrw *respReadWriteFlusher) WriteHeader(statusCode int) {
  660. rrw.setStatusOnce.Do(func() {
  661. rrw.statusCode = statusCode
  662. close(rrw.hasStatus)
  663. })
  664. }