orchestrator_test.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826
  1. package orchestration
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net"
  8. "net/http"
  9. "net/http/httptest"
  10. "sync"
  11. "testing"
  12. "time"
  13. "github.com/gobwas/ws/wsutil"
  14. "github.com/google/uuid"
  15. gows "github.com/gorilla/websocket"
  16. "github.com/rs/zerolog"
  17. "github.com/stretchr/testify/assert"
  18. "github.com/stretchr/testify/require"
  19. "github.com/cloudflare/cloudflared/cmd/cloudflared/flags"
  20. "github.com/cloudflare/cloudflared/config"
  21. "github.com/cloudflare/cloudflared/connection"
  22. "github.com/cloudflare/cloudflared/ingress"
  23. "github.com/cloudflare/cloudflared/management"
  24. "github.com/cloudflare/cloudflared/tracing"
  25. "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
  26. )
  27. var (
  28. testLogger = zerolog.Nop()
  29. testTags = []pogs.Tag{
  30. {
  31. Name: "package",
  32. Value: "orchestration",
  33. },
  34. {
  35. Name: "purpose",
  36. Value: "test",
  37. },
  38. }
  39. )
  40. // TestUpdateConfiguration tests that
  41. // - configurations can be deserialized
  42. // - proxy can be updated
  43. // - last applied version and error are returned
  44. // - configurations can be deserialized
  45. // - receiving an old version is noop
  46. func TestUpdateConfiguration(t *testing.T) {
  47. initConfig := &Config{
  48. Ingress: &ingress.Ingress{},
  49. }
  50. orchestrator, err := NewOrchestrator(context.Background(), initConfig, testTags, []ingress.Rule{ingress.NewManagementRule(management.New("management.argotunnel.com", false, "1.1.1.1:80", uuid.Nil, "", &testLogger, nil))}, &testLogger)
  51. require.NoError(t, err)
  52. initOriginProxy, err := orchestrator.GetOriginProxy()
  53. require.NoError(t, err)
  54. require.Implements(t, (*connection.OriginProxy)(nil), initOriginProxy)
  55. configJSONV2 := []byte(`
  56. {
  57. "unknown_field": "not_deserialized",
  58. "originRequest": {
  59. "connectTimeout": 90,
  60. "noHappyEyeballs": true
  61. },
  62. "ingress": [
  63. {
  64. "hostname": "jira.tunnel.org",
  65. "path": "^\/login",
  66. "service": "http://192.16.19.1:443",
  67. "originRequest": {
  68. "noTLSVerify": true,
  69. "connectTimeout": 10
  70. }
  71. },
  72. {
  73. "hostname": "jira.tunnel.org",
  74. "service": "http://172.32.20.6:80",
  75. "originRequest": {
  76. "noTLSVerify": true,
  77. "connectTimeout": 30
  78. }
  79. },
  80. {
  81. "service": "http_status:404"
  82. }
  83. ],
  84. "warp-routing": {
  85. "connectTimeout": 10
  86. }
  87. }
  88. `)
  89. updateWithValidation(t, orchestrator, 2, configJSONV2)
  90. configV2 := orchestrator.config
  91. // Validate internal ingress rules
  92. require.Equal(t, "management.argotunnel.com", configV2.Ingress.InternalRules[0].Hostname)
  93. require.True(t, configV2.Ingress.InternalRules[0].Matches("management.argotunnel.com", "/ping"))
  94. require.Equal(t, "management", configV2.Ingress.InternalRules[0].Service.String())
  95. // Validate ingress rule 0
  96. require.Equal(t, "jira.tunnel.org", configV2.Ingress.Rules[0].Hostname)
  97. require.True(t, configV2.Ingress.Rules[0].Matches("jira.tunnel.org", "/login"))
  98. require.True(t, configV2.Ingress.Rules[0].Matches("jira.tunnel.org", "/login/2fa"))
  99. require.False(t, configV2.Ingress.Rules[0].Matches("jira.tunnel.org", "/users"))
  100. require.Equal(t, "http://192.16.19.1:443", configV2.Ingress.Rules[0].Service.String())
  101. require.Len(t, configV2.Ingress.Rules, 3)
  102. // originRequest of this ingress rule overrides global default
  103. require.Equal(t, config.CustomDuration{Duration: time.Second * 10}, configV2.Ingress.Rules[0].Config.ConnectTimeout)
  104. require.True(t, configV2.Ingress.Rules[0].Config.NoTLSVerify)
  105. // Inherited from global default
  106. require.True(t, configV2.Ingress.Rules[0].Config.NoHappyEyeballs)
  107. // Validate ingress rule 1
  108. require.Equal(t, "jira.tunnel.org", configV2.Ingress.Rules[1].Hostname)
  109. require.True(t, configV2.Ingress.Rules[1].Matches("jira.tunnel.org", "/users"))
  110. require.Equal(t, "http://172.32.20.6:80", configV2.Ingress.Rules[1].Service.String())
  111. // originRequest of this ingress rule overrides global default
  112. require.Equal(t, config.CustomDuration{Duration: time.Second * 30}, configV2.Ingress.Rules[1].Config.ConnectTimeout)
  113. require.True(t, configV2.Ingress.Rules[1].Config.NoTLSVerify)
  114. // Inherited from global default
  115. require.True(t, configV2.Ingress.Rules[1].Config.NoHappyEyeballs)
  116. // Validate ingress rule 2, it's the catch-all rule
  117. require.True(t, configV2.Ingress.Rules[2].Matches("blogs.tunnel.io", "/2022/02/10"))
  118. // Inherited from global default
  119. require.Equal(t, config.CustomDuration{Duration: time.Second * 90}, configV2.Ingress.Rules[2].Config.ConnectTimeout)
  120. require.False(t, configV2.Ingress.Rules[2].Config.NoTLSVerify)
  121. require.True(t, configV2.Ingress.Rules[2].Config.NoHappyEyeballs)
  122. require.Equal(t, 10*time.Second, configV2.WarpRouting.ConnectTimeout.Duration)
  123. originProxyV2, err := orchestrator.GetOriginProxy()
  124. require.NoError(t, err)
  125. require.Implements(t, (*connection.OriginProxy)(nil), originProxyV2)
  126. require.NotEqual(t, originProxyV2, initOriginProxy)
  127. // Should not downgrade to an older version
  128. resp := orchestrator.UpdateConfig(1, nil)
  129. require.NoError(t, resp.Err)
  130. require.Equal(t, int32(2), resp.LastAppliedVersion)
  131. invalidJSON := []byte(`
  132. {
  133. "originRequest":
  134. }
  135. `)
  136. resp = orchestrator.UpdateConfig(3, invalidJSON)
  137. require.Error(t, resp.Err)
  138. require.Equal(t, int32(2), resp.LastAppliedVersion)
  139. originProxyV3, err := orchestrator.GetOriginProxy()
  140. require.NoError(t, err)
  141. require.Equal(t, originProxyV2, originProxyV3)
  142. configJSONV10 := []byte(`
  143. {
  144. "ingress": [
  145. {
  146. "service": "hello-world"
  147. }
  148. ],
  149. "warp-routing": {
  150. }
  151. }
  152. `)
  153. updateWithValidation(t, orchestrator, 10, configJSONV10)
  154. configV10 := orchestrator.config
  155. require.Len(t, configV10.Ingress.Rules, 1)
  156. require.True(t, configV10.Ingress.Rules[0].Matches("blogs.tunnel.io", "/2022/02/10"))
  157. require.Equal(t, ingress.HelloWorldService, configV10.Ingress.Rules[0].Service.String())
  158. originProxyV10, err := orchestrator.GetOriginProxy()
  159. require.NoError(t, err)
  160. require.Implements(t, (*connection.OriginProxy)(nil), originProxyV10)
  161. require.NotEqual(t, originProxyV10, originProxyV2)
  162. }
  163. // Validates that a new version 0 will be applied if the configuration is loaded locally.
  164. // This will happen when a locally managed tunnel is migrated to remote configuration and receives its first configuration.
  165. func TestUpdateConfiguration_FromMigration(t *testing.T) {
  166. initConfig := &Config{
  167. Ingress: &ingress.Ingress{},
  168. }
  169. orchestrator, err := NewOrchestrator(context.Background(), initConfig, testTags, []ingress.Rule{}, &testLogger)
  170. require.NoError(t, err)
  171. initOriginProxy, err := orchestrator.GetOriginProxy()
  172. require.NoError(t, err)
  173. require.Implements(t, (*connection.OriginProxy)(nil), initOriginProxy)
  174. configJSONV2 := []byte(`
  175. {
  176. "ingress": [
  177. {
  178. "service": "http_status:404"
  179. }
  180. ],
  181. "warp-routing": {
  182. }
  183. }
  184. `)
  185. updateWithValidation(t, orchestrator, 0, configJSONV2)
  186. require.Len(t, orchestrator.config.Ingress.Rules, 1)
  187. }
  188. // Validates that the default ingress rule will be set if there is no rule provided from the remote.
  189. func TestUpdateConfiguration_WithoutIngressRule(t *testing.T) {
  190. initConfig := &Config{
  191. Ingress: &ingress.Ingress{},
  192. }
  193. orchestrator, err := NewOrchestrator(context.Background(), initConfig, testTags, []ingress.Rule{}, &testLogger)
  194. require.NoError(t, err)
  195. initOriginProxy, err := orchestrator.GetOriginProxy()
  196. require.NoError(t, err)
  197. require.Implements(t, (*connection.OriginProxy)(nil), initOriginProxy)
  198. // We need to create an empty RemoteConfigJSON because that will get unmarshalled to a RemoteConfig
  199. emptyConfig := &ingress.RemoteConfigJSON{}
  200. configBytes, err := json.Marshal(emptyConfig)
  201. if err != nil {
  202. require.FailNow(t, "The RemoteConfigJSON shouldn't fail while being marshalled")
  203. }
  204. updateWithValidation(t, orchestrator, 0, configBytes)
  205. require.Len(t, orchestrator.config.Ingress.Rules, 1)
  206. }
  207. // TestConcurrentUpdateAndRead makes sure orchestrator can receive updates and return origin proxy concurrently
  208. func TestConcurrentUpdateAndRead(t *testing.T) {
  209. const (
  210. concurrentRequests = 200
  211. hostname = "public.tunnels.org"
  212. expectedHost = "internal.tunnels.svc.cluster.local"
  213. tcpBody = "testProxyTCP"
  214. )
  215. httpOrigin := httptest.NewServer(&validateHostHandler{
  216. expectedHost: expectedHost,
  217. body: t.Name(),
  218. })
  219. defer httpOrigin.Close()
  220. tcpOrigin, err := net.Listen("tcp", "127.0.0.1:0")
  221. require.NoError(t, err)
  222. defer tcpOrigin.Close()
  223. var (
  224. configJSONV1 = []byte(fmt.Sprintf(`
  225. {
  226. "originRequest": {
  227. "connectTimeout": 90,
  228. "noHappyEyeballs": true
  229. },
  230. "ingress": [
  231. {
  232. "hostname": "%s",
  233. "service": "%s",
  234. "originRequest": {
  235. "httpHostHeader": "%s",
  236. "connectTimeout": 10
  237. }
  238. },
  239. {
  240. "service": "http_status:404"
  241. }
  242. ],
  243. "warp-routing": {
  244. }
  245. }
  246. `, hostname, httpOrigin.URL, expectedHost))
  247. configJSONV2 = []byte(`
  248. {
  249. "ingress": [
  250. {
  251. "service": "http_status:204"
  252. }
  253. ],
  254. "warp-routing": {
  255. }
  256. }
  257. `)
  258. configJSONV3 = []byte(`
  259. {
  260. "ingress": [
  261. {
  262. "service": "http_status:418"
  263. }
  264. ],
  265. "warp-routing": {
  266. }
  267. }
  268. `)
  269. // appliedV2 makes sure v3 is applied after v2
  270. appliedV2 = make(chan struct{})
  271. initConfig = &Config{
  272. Ingress: &ingress.Ingress{},
  273. }
  274. )
  275. ctx, cancel := context.WithCancel(context.Background())
  276. defer cancel()
  277. orchestrator, err := NewOrchestrator(ctx, initConfig, testTags, []ingress.Rule{}, &testLogger)
  278. require.NoError(t, err)
  279. updateWithValidation(t, orchestrator, 1, configJSONV1)
  280. var wg sync.WaitGroup
  281. // tcpOrigin will be closed when the test exits. Only the handler routines are included in the wait group
  282. go func() {
  283. serveTCPOrigin(t, tcpOrigin, &wg)
  284. }()
  285. for i := 0; i < concurrentRequests; i++ {
  286. originProxy, err := orchestrator.GetOriginProxy()
  287. require.NoError(t, err)
  288. wg.Add(1)
  289. go func(i int, originProxy connection.OriginProxy) {
  290. defer wg.Done()
  291. resp, err := proxyHTTP(originProxy, hostname)
  292. assert.NoError(t, err, "proxyHTTP %d failed %v", i, err)
  293. defer resp.Body.Close()
  294. var warpRoutingDisabled bool
  295. // The response can be from initOrigin, http_status:204 or http_status:418
  296. switch resp.StatusCode {
  297. // v1 proxy, warp enabled
  298. case 200:
  299. body, err := io.ReadAll(resp.Body)
  300. assert.NoError(t, err)
  301. assert.Equal(t, t.Name(), string(body))
  302. warpRoutingDisabled = false
  303. // v2 proxy, warp disabled
  304. case 204:
  305. assert.Greater(t, i, concurrentRequests/4)
  306. warpRoutingDisabled = true
  307. // v3 proxy, warp enabled
  308. case 418:
  309. assert.Greater(t, i, concurrentRequests/2)
  310. warpRoutingDisabled = false
  311. }
  312. // Once we have originProxy, it won't be changed by configuration updates.
  313. // We can infer the version by the ProxyHTTP response code
  314. pr, pw := io.Pipe()
  315. w := newRespReadWriteFlusher()
  316. // Write TCP message and make sure it's echo back. This has to be done in a go routune since ProxyTCP doesn't
  317. // return until the stream is closed.
  318. if !warpRoutingDisabled {
  319. wg.Add(1)
  320. go func() {
  321. defer wg.Done()
  322. defer pw.Close()
  323. tcpEyeball(t, pw, tcpBody, w)
  324. }()
  325. }
  326. err = proxyTCP(ctx, originProxy, tcpOrigin.Addr().String(), w, pr)
  327. if warpRoutingDisabled {
  328. assert.Error(t, err, "expect proxyTCP %d to return error", i)
  329. } else {
  330. assert.NoError(t, err, "proxyTCP %d failed %v", i, err)
  331. }
  332. }(i, originProxy)
  333. if i == concurrentRequests/4 {
  334. wg.Add(1)
  335. go func() {
  336. defer wg.Done()
  337. updateWithValidation(t, orchestrator, 2, configJSONV2)
  338. close(appliedV2)
  339. }()
  340. }
  341. if i == concurrentRequests/2 {
  342. wg.Add(1)
  343. go func() {
  344. defer wg.Done()
  345. // Makes sure v2 is applied before v3
  346. <-appliedV2
  347. updateWithValidation(t, orchestrator, 3, configJSONV3)
  348. }()
  349. }
  350. }
  351. wg.Wait()
  352. }
  353. // TestOverrideWarpRoutingConfigWithLocalValues tests that if a value is defined in the Config.ConfigurationFlags,
  354. // it will override the value that comes from the remote result.
  355. func TestOverrideWarpRoutingConfigWithLocalValues(t *testing.T) {
  356. ctx, cancel := context.WithCancel(context.Background())
  357. defer cancel()
  358. assertMaxActiveFlows := func(orchestrator *Orchestrator, expectedValue uint64) {
  359. configJson, err := orchestrator.GetConfigJSON()
  360. require.NoError(t, err)
  361. var result map[string]interface{}
  362. err = json.Unmarshal(configJson, &result)
  363. require.NoError(t, err)
  364. warpRouting := result["warp-routing"].(map[string]interface{})
  365. require.EqualValues(t, expectedValue, warpRouting["maxActiveFlows"])
  366. }
  367. remoteValue := uint64(100)
  368. remoteIngress := ingress.Ingress{}
  369. remoteWarpConfig := ingress.WarpRoutingConfig{
  370. MaxActiveFlows: remoteValue,
  371. }
  372. remoteConfig := &Config{
  373. Ingress: &remoteIngress,
  374. WarpRouting: remoteWarpConfig,
  375. ConfigurationFlags: map[string]string{},
  376. }
  377. orchestrator, err := NewOrchestrator(ctx, remoteConfig, testTags, []ingress.Rule{}, &testLogger)
  378. require.NoError(t, err)
  379. assertMaxActiveFlows(orchestrator, remoteValue)
  380. // Add a local override for the maxActiveFlows
  381. localValue := uint64(500)
  382. remoteConfig.ConfigurationFlags[flags.MaxActiveFlows] = fmt.Sprintf("%d", localValue)
  383. // Force a configuration refresh
  384. err = orchestrator.updateIngress(remoteIngress, remoteWarpConfig)
  385. require.NoError(t, err)
  386. // Check the value being used is the local one
  387. assertMaxActiveFlows(orchestrator, localValue)
  388. // Remove local override for the maxActiveFlows
  389. delete(remoteConfig.ConfigurationFlags, flags.MaxActiveFlows)
  390. // Force a configuration refresh
  391. err = orchestrator.updateIngress(remoteIngress, remoteWarpConfig)
  392. require.NoError(t, err)
  393. // Check the value being used is now the remote again
  394. assertMaxActiveFlows(orchestrator, remoteValue)
  395. }
  396. func proxyHTTP(originProxy connection.OriginProxy, hostname string) (*http.Response, error) {
  397. req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s", hostname), nil)
  398. if err != nil {
  399. return nil, err
  400. }
  401. w := httptest.NewRecorder()
  402. log := zerolog.Nop()
  403. respWriter, err := connection.NewHTTP2RespWriter(req, w, connection.TypeHTTP, &log)
  404. if err != nil {
  405. return nil, err
  406. }
  407. err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedHTTPRequest(req, 0, &log), false)
  408. if err != nil {
  409. return nil, err
  410. }
  411. return w.Result(), nil
  412. }
  413. // nolint: testifylint // this is used inside go routines so it can't use `require.`
  414. func tcpEyeball(t *testing.T, reqWriter io.WriteCloser, body string, respReadWriter *respReadWriteFlusher) {
  415. writeN, err := reqWriter.Write([]byte(body))
  416. assert.NoError(t, err)
  417. readBuffer := make([]byte, writeN)
  418. n, err := respReadWriter.Read(readBuffer)
  419. assert.NoError(t, err)
  420. assert.Equal(t, body, string(readBuffer[:n]))
  421. assert.Equal(t, writeN, n)
  422. }
  423. func proxyTCP(ctx context.Context, originProxy connection.OriginProxy, originAddr string, w http.ResponseWriter, reqBody io.ReadCloser) error {
  424. req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s", originAddr), reqBody)
  425. if err != nil {
  426. return err
  427. }
  428. log := zerolog.Nop()
  429. respWriter, err := connection.NewHTTP2RespWriter(req, w, connection.TypeTCP, &log)
  430. if err != nil {
  431. return err
  432. }
  433. tcpReq := &connection.TCPRequest{
  434. Dest: originAddr,
  435. CFRay: "123",
  436. LBProbe: false,
  437. }
  438. rws := connection.NewHTTPResponseReadWriterAcker(respWriter, w.(http.Flusher), req)
  439. return originProxy.ProxyTCP(ctx, rws, tcpReq)
  440. }
  441. func serveTCPOrigin(t *testing.T, tcpOrigin net.Listener, wg *sync.WaitGroup) {
  442. for {
  443. conn, err := tcpOrigin.Accept()
  444. if err != nil {
  445. return
  446. }
  447. wg.Add(1)
  448. go func() {
  449. defer wg.Done()
  450. defer conn.Close()
  451. echoTCP(t, conn)
  452. }()
  453. }
  454. }
  455. // nolint: testifylint // this is used inside go routines so it can't use `require.`
  456. func echoTCP(t *testing.T, conn net.Conn) {
  457. readBuf := make([]byte, 1000)
  458. readN, err := conn.Read(readBuf)
  459. assert.NoError(t, err)
  460. writeN, err := conn.Write(readBuf[:readN])
  461. assert.NoError(t, err)
  462. assert.Equal(t, readN, writeN)
  463. }
  464. type validateHostHandler struct {
  465. expectedHost string
  466. body string
  467. }
  468. func (vhh *validateHostHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  469. if r.Host != vhh.expectedHost {
  470. w.WriteHeader(http.StatusBadRequest)
  471. return
  472. }
  473. w.WriteHeader(http.StatusOK)
  474. _, _ = w.Write([]byte(vhh.body))
  475. }
  476. // nolint: testifylint // this is used inside go routines so it can't use `require.`
  477. func updateWithValidation(t *testing.T, orchestrator *Orchestrator, version int32, config []byte) {
  478. resp := orchestrator.UpdateConfig(version, config)
  479. assert.NoError(t, resp.Err)
  480. assert.Equal(t, version, resp.LastAppliedVersion)
  481. }
  482. // TestClosePreviousProxies makes sure proxies started in the previous configuration version are shutdown
  483. func TestClosePreviousProxies(t *testing.T) {
  484. var (
  485. hostname = "hello.tunnel1.org"
  486. configWithHelloWorld = []byte(fmt.Sprintf(`
  487. {
  488. "ingress": [
  489. {
  490. "hostname": "%s",
  491. "service": "hello-world"
  492. },
  493. {
  494. "service": "http_status:404"
  495. }
  496. ],
  497. "warp-routing": {
  498. }
  499. }
  500. `, hostname))
  501. configTeapot = []byte(`
  502. {
  503. "ingress": [
  504. {
  505. "service": "http_status:418"
  506. }
  507. ],
  508. "warp-routing": {
  509. }
  510. }
  511. `)
  512. initConfig = &Config{
  513. Ingress: &ingress.Ingress{},
  514. }
  515. )
  516. ctx, cancel := context.WithCancel(context.Background())
  517. orchestrator, err := NewOrchestrator(ctx, initConfig, testTags, []ingress.Rule{}, &testLogger)
  518. require.NoError(t, err)
  519. updateWithValidation(t, orchestrator, 1, configWithHelloWorld)
  520. originProxyV1, err := orchestrator.GetOriginProxy()
  521. require.NoError(t, err)
  522. // nolint: bodyclose
  523. resp, err := proxyHTTP(originProxyV1, hostname)
  524. require.NoError(t, err)
  525. require.Equal(t, http.StatusOK, resp.StatusCode)
  526. updateWithValidation(t, orchestrator, 2, configTeapot)
  527. originProxyV2, err := orchestrator.GetOriginProxy()
  528. require.NoError(t, err)
  529. // nolint: bodyclose
  530. resp, err = proxyHTTP(originProxyV2, hostname)
  531. require.NoError(t, err)
  532. require.Equal(t, http.StatusTeapot, resp.StatusCode)
  533. // The hello-world server in config v1 should have been stopped. We wait a bit since it's closed asynchronously.
  534. time.Sleep(time.Millisecond * 10)
  535. // nolint: bodyclose
  536. resp, err = proxyHTTP(originProxyV1, hostname)
  537. require.Error(t, err)
  538. require.Nil(t, resp)
  539. // Apply the config with hello world server again, orchestrator should spin up another hello world server
  540. updateWithValidation(t, orchestrator, 3, configWithHelloWorld)
  541. originProxyV3, err := orchestrator.GetOriginProxy()
  542. require.NoError(t, err)
  543. require.NotEqual(t, originProxyV1, originProxyV3)
  544. // nolint: bodyclose
  545. resp, err = proxyHTTP(originProxyV3, hostname)
  546. require.NoError(t, err)
  547. require.Equal(t, http.StatusOK, resp.StatusCode)
  548. // cancel the context should terminate the last proxy
  549. cancel()
  550. // Wait for proxies to shutdown
  551. time.Sleep(time.Millisecond * 10)
  552. // nolint: bodyclose
  553. resp, err = proxyHTTP(originProxyV3, hostname)
  554. require.Error(t, err)
  555. require.Nil(t, resp)
  556. }
  557. // TestPersistentConnection makes sure updating the ingress doesn't intefere with existing connections
  558. func TestPersistentConnection(t *testing.T) {
  559. const (
  560. hostname = "http://ws.tunnel.org"
  561. )
  562. msg := t.Name()
  563. initConfig := &Config{
  564. Ingress: &ingress.Ingress{},
  565. }
  566. orchestrator, err := NewOrchestrator(context.Background(), initConfig, testTags, []ingress.Rule{}, &testLogger)
  567. require.NoError(t, err)
  568. wsOrigin := httptest.NewServer(http.HandlerFunc(wsEcho))
  569. defer wsOrigin.Close()
  570. tcpOrigin, err := net.Listen("tcp", "127.0.0.1:0")
  571. require.NoError(t, err)
  572. defer tcpOrigin.Close()
  573. configWithWSAndWarp := []byte(fmt.Sprintf(`
  574. {
  575. "ingress": [
  576. {
  577. "service": "%s"
  578. }
  579. ],
  580. "warp-routing": {
  581. }
  582. }
  583. `, wsOrigin.URL))
  584. updateWithValidation(t, orchestrator, 1, configWithWSAndWarp)
  585. originProxy, err := orchestrator.GetOriginProxy()
  586. require.NoError(t, err)
  587. wsReqReader, wsReqWriter := io.Pipe()
  588. wsRespReadWriter := newRespReadWriteFlusher()
  589. tcpReqReader, tcpReqWriter := io.Pipe()
  590. tcpRespReadWriter := newRespReadWriteFlusher()
  591. ctx, cancel := context.WithCancel(context.Background())
  592. defer cancel()
  593. var wg sync.WaitGroup
  594. wg.Add(3)
  595. // Start TCP origin
  596. go func() {
  597. defer wg.Done()
  598. conn, err := tcpOrigin.Accept()
  599. assert.NoError(t, err)
  600. defer conn.Close()
  601. // Expect 3 TCP messages
  602. for i := 0; i < 3; i++ {
  603. echoTCP(t, conn)
  604. }
  605. }()
  606. // Simulate cloudflared receiving a TCP connection
  607. go func() {
  608. defer wg.Done()
  609. assert.NoError(t, proxyTCP(ctx, originProxy, tcpOrigin.Addr().String(), tcpRespReadWriter, tcpReqReader))
  610. }()
  611. // Simulate cloudflared receiving a WS connection
  612. go func() {
  613. defer wg.Done()
  614. req, err := http.NewRequest(http.MethodGet, hostname, wsReqReader)
  615. assert.NoError(t, err)
  616. // ProxyHTTP will add Connection, Upgrade and Sec-Websocket-Version headers
  617. req.Header.Add("Sec-WebSocket-Key", "dGhlIHNhbXBsZSBub25jZQ==")
  618. log := zerolog.Nop()
  619. respWriter, err := connection.NewHTTP2RespWriter(req, wsRespReadWriter, connection.TypeWebsocket, &log)
  620. assert.NoError(t, err)
  621. err = originProxy.ProxyHTTP(respWriter, tracing.NewTracedHTTPRequest(req, 0, &log), true)
  622. assert.NoError(t, err)
  623. }()
  624. // Simulate eyeball WS and TCP connections
  625. validateWsEcho(t, msg, wsReqWriter, wsRespReadWriter)
  626. tcpEyeball(t, tcpReqWriter, msg, tcpRespReadWriter)
  627. configNoWSAndWarp := []byte(`
  628. {
  629. "ingress": [
  630. {
  631. "service": "http_status:404"
  632. }
  633. ],
  634. "warp-routing": {
  635. }
  636. }
  637. `)
  638. updateWithValidation(t, orchestrator, 2, configNoWSAndWarp)
  639. // Make sure connection is still up
  640. validateWsEcho(t, msg, wsReqWriter, wsRespReadWriter)
  641. tcpEyeball(t, tcpReqWriter, msg, tcpRespReadWriter)
  642. updateWithValidation(t, orchestrator, 3, configWithWSAndWarp)
  643. // Make sure connection is still up
  644. validateWsEcho(t, msg, wsReqWriter, wsRespReadWriter)
  645. tcpEyeball(t, tcpReqWriter, msg, tcpRespReadWriter)
  646. wsReqWriter.Close()
  647. tcpReqWriter.Close()
  648. wg.Wait()
  649. }
  650. func TestSerializeLocalConfig(t *testing.T) {
  651. c := &newLocalConfig{
  652. RemoteConfig: ingress.RemoteConfig{
  653. Ingress: ingress.Ingress{},
  654. },
  655. ConfigurationFlags: map[string]string{"a": "b"},
  656. }
  657. result, _ := json.Marshal(c)
  658. fmt.Println(string(result))
  659. }
  660. func wsEcho(w http.ResponseWriter, r *http.Request) {
  661. upgrader := gows.Upgrader{}
  662. conn, err := upgrader.Upgrade(w, r, nil)
  663. if err != nil {
  664. return
  665. }
  666. defer conn.Close()
  667. for {
  668. mt, message, err := conn.ReadMessage()
  669. if err != nil {
  670. fmt.Println("read message err", err)
  671. break
  672. }
  673. err = conn.WriteMessage(mt, message)
  674. if err != nil {
  675. fmt.Println("write message err", err)
  676. break
  677. }
  678. }
  679. }
  680. func validateWsEcho(t *testing.T, msg string, reqWriter io.Writer, respReadWriter io.ReadWriter) {
  681. err := wsutil.WriteClientText(reqWriter, []byte(msg))
  682. require.NoError(t, err)
  683. receivedMsg, err := wsutil.ReadServerText(respReadWriter)
  684. require.NoError(t, err)
  685. require.Equal(t, msg, string(receivedMsg))
  686. }
  687. type respReadWriteFlusher struct {
  688. io.Reader
  689. w io.Writer
  690. headers http.Header
  691. statusCode int
  692. setStatusOnce sync.Once
  693. hasStatus chan struct{}
  694. }
  695. func newRespReadWriteFlusher() *respReadWriteFlusher {
  696. pr, pw := io.Pipe()
  697. return &respReadWriteFlusher{
  698. Reader: pr,
  699. w: pw,
  700. headers: make(http.Header),
  701. hasStatus: make(chan struct{}),
  702. }
  703. }
  704. func (rrw *respReadWriteFlusher) Write(buf []byte) (int, error) {
  705. rrw.WriteHeader(http.StatusOK)
  706. return rrw.w.Write(buf)
  707. }
  708. func (rrw *respReadWriteFlusher) Flush() {}
  709. func (rrw *respReadWriteFlusher) Header() http.Header {
  710. return rrw.headers
  711. }
  712. func (rrw *respReadWriteFlusher) WriteHeader(statusCode int) {
  713. rrw.setStatusOnce.Do(func() {
  714. rrw.statusCode = statusCode
  715. close(rrw.hasStatus)
  716. })
  717. }