tunnel.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717
  1. package supervisor
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "fmt"
  6. "net"
  7. "net/netip"
  8. "runtime/debug"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/getsentry/sentry-go"
  13. "github.com/pkg/errors"
  14. "github.com/quic-go/quic-go"
  15. "github.com/rs/zerolog"
  16. "golang.org/x/sync/errgroup"
  17. "github.com/cloudflare/cloudflared/connection"
  18. "github.com/cloudflare/cloudflared/edgediscovery"
  19. "github.com/cloudflare/cloudflared/edgediscovery/allregions"
  20. "github.com/cloudflare/cloudflared/features"
  21. "github.com/cloudflare/cloudflared/fips"
  22. "github.com/cloudflare/cloudflared/ingress"
  23. "github.com/cloudflare/cloudflared/management"
  24. "github.com/cloudflare/cloudflared/orchestration"
  25. quicpogs "github.com/cloudflare/cloudflared/quic"
  26. v3 "github.com/cloudflare/cloudflared/quic/v3"
  27. "github.com/cloudflare/cloudflared/retry"
  28. "github.com/cloudflare/cloudflared/signal"
  29. "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
  30. "github.com/cloudflare/cloudflared/tunnelstate"
  31. )
  32. const (
  33. dialTimeout = 15 * time.Second
  34. )
  35. type TunnelConfig struct {
  36. GracePeriod time.Duration
  37. ReplaceExisting bool
  38. OSArch string
  39. ClientID string
  40. CloseConnOnce *sync.Once // Used to close connectedSignal no more than once
  41. EdgeAddrs []string
  42. Region string
  43. EdgeIPVersion allregions.ConfigIPVersion
  44. EdgeBindAddr net.IP
  45. HAConnections int
  46. IsAutoupdated bool
  47. LBPool string
  48. Tags []pogs.Tag
  49. Log *zerolog.Logger
  50. LogTransport *zerolog.Logger
  51. Observer *connection.Observer
  52. ReportedVersion string
  53. Retries uint
  54. MaxEdgeAddrRetries uint8
  55. RunFromTerminal bool
  56. NeedPQ bool
  57. NamedTunnel *connection.TunnelProperties
  58. ProtocolSelector connection.ProtocolSelector
  59. EdgeTLSConfigs map[connection.Protocol]*tls.Config
  60. ICMPRouterServer ingress.ICMPRouterServer
  61. RPCTimeout time.Duration
  62. WriteStreamTimeout time.Duration
  63. DisableQUICPathMTUDiscovery bool
  64. QUICConnectionLevelFlowControlLimit uint64
  65. QUICStreamLevelFlowControlLimit uint64
  66. FeatureSelector *features.FeatureSelector
  67. }
  68. func (c *TunnelConfig) connectionOptions(originLocalAddr string, numPreviousAttempts uint8) *pogs.ConnectionOptions {
  69. // attempt to parse out origin IP, but don't fail since it's informational field
  70. host, _, _ := net.SplitHostPort(originLocalAddr)
  71. originIP := net.ParseIP(host)
  72. return &pogs.ConnectionOptions{
  73. Client: c.NamedTunnel.Client,
  74. OriginLocalIP: originIP,
  75. ReplaceExisting: c.ReplaceExisting,
  76. CompressionQuality: 0,
  77. NumPreviousAttempts: numPreviousAttempts,
  78. }
  79. }
  80. func StartTunnelDaemon(
  81. ctx context.Context,
  82. config *TunnelConfig,
  83. orchestrator *orchestration.Orchestrator,
  84. connectedSignal *signal.Signal,
  85. reconnectCh chan ReconnectSignal,
  86. graceShutdownC <-chan struct{},
  87. ) error {
  88. s, err := NewSupervisor(config, orchestrator, reconnectCh, graceShutdownC)
  89. if err != nil {
  90. return err
  91. }
  92. return s.Run(ctx, connectedSignal)
  93. }
  94. type ConnectivityError struct {
  95. reachedMaxRetries bool
  96. }
  97. func NewConnectivityError(hasReachedMaxRetries bool) *ConnectivityError {
  98. return &ConnectivityError{
  99. reachedMaxRetries: hasReachedMaxRetries,
  100. }
  101. }
  102. func (e *ConnectivityError) Error() string {
  103. return fmt.Sprintf("connectivity error - reached max retries: %t", e.HasReachedMaxRetries())
  104. }
  105. func (e *ConnectivityError) HasReachedMaxRetries() bool {
  106. return e.reachedMaxRetries
  107. }
  108. // EdgeAddrHandler provides a mechanism switch between behaviors in ServeTunnel
  109. // for handling the errors when attempting to make edge connections.
  110. type EdgeAddrHandler interface {
  111. // ShouldGetNewAddress will check the edge connection error and determine if
  112. // the edge address should be replaced with a new one. Also, will return if the
  113. // error should be recognized as a connectivity error, or otherwise, a general
  114. // application error.
  115. ShouldGetNewAddress(connIndex uint8, err error) (needsNewAddress bool, connectivityError error)
  116. }
  117. func NewIPAddrFallback(maxRetries uint8) *ipAddrFallback {
  118. return &ipAddrFallback{
  119. retriesByConnIndex: make(map[uint8]uint8),
  120. maxRetries: maxRetries,
  121. }
  122. }
  123. // ipAddrFallback will have more conditions to fall back to a new address for certain
  124. // edge connection errors. This means that this handler will return true for isConnectivityError
  125. // for more cases like duplicate connection register and edge quic dial errors.
  126. type ipAddrFallback struct {
  127. m sync.Mutex
  128. retriesByConnIndex map[uint8]uint8
  129. maxRetries uint8
  130. }
  131. func (f *ipAddrFallback) ShouldGetNewAddress(connIndex uint8, err error) (needsNewAddress bool, connectivityError error) {
  132. f.m.Lock()
  133. defer f.m.Unlock()
  134. switch err.(type) {
  135. case nil: // maintain current IP address
  136. // Try the next address if it was a quic.IdleTimeoutError
  137. // DupConnRegisterTunnelError needs to also receive a new ip address
  138. case connection.DupConnRegisterTunnelError,
  139. *quic.IdleTimeoutError:
  140. return true, nil
  141. // Network problems should be retried with new address immediately and report
  142. // as connectivity error
  143. case edgediscovery.DialError, *connection.EdgeQuicDialError:
  144. if f.retriesByConnIndex[connIndex] >= f.maxRetries {
  145. f.retriesByConnIndex[connIndex] = 0
  146. return true, NewConnectivityError(true)
  147. }
  148. f.retriesByConnIndex[connIndex]++
  149. return true, NewConnectivityError(false)
  150. default: // maintain current IP address
  151. }
  152. return false, nil
  153. }
  154. type EdgeTunnelServer struct {
  155. config *TunnelConfig
  156. orchestrator *orchestration.Orchestrator
  157. sessionManager v3.SessionManager
  158. datagramMetrics v3.Metrics
  159. edgeAddrHandler EdgeAddrHandler
  160. edgeAddrs *edgediscovery.Edge
  161. edgeBindAddr net.IP
  162. reconnectCh chan ReconnectSignal
  163. gracefulShutdownC <-chan struct{}
  164. tracker *tunnelstate.ConnTracker
  165. connAwareLogger *ConnAwareLogger
  166. }
  167. type TunnelServer interface {
  168. Serve(ctx context.Context, connIndex uint8, protocolFallback *protocolFallback, connectedSignal *signal.Signal) error
  169. }
  170. func (e *EdgeTunnelServer) Serve(ctx context.Context, connIndex uint8, protocolFallback *protocolFallback, connectedSignal *signal.Signal) error {
  171. haConnections.Inc()
  172. defer haConnections.Dec()
  173. connectedFuse := newBooleanFuse()
  174. go func() {
  175. if connectedFuse.Await() {
  176. connectedSignal.Notify()
  177. }
  178. }()
  179. // Ensure the above goroutine will terminate if we return without connecting
  180. defer connectedFuse.Fuse(false)
  181. // Fetch IP address to associated connection index
  182. addr, err := e.edgeAddrs.GetAddr(int(connIndex))
  183. switch err.(type) {
  184. case nil: // no error
  185. case edgediscovery.ErrNoAddressesLeft:
  186. return err
  187. default:
  188. return err
  189. }
  190. logger := e.config.Log.With().
  191. Int(management.EventTypeKey, int(management.Cloudflared)).
  192. IPAddr(connection.LogFieldIPAddress, addr.UDP.IP).
  193. Uint8(connection.LogFieldConnIndex, connIndex).
  194. Logger()
  195. connLog := e.connAwareLogger.ReplaceLogger(&logger)
  196. // Each connection to keep its own copy of protocol, because individual connections might fallback
  197. // to another protocol when a particular metal doesn't support new protocol
  198. // Each connection can also have it's own IP version because individual connections might fallback
  199. // to another IP version.
  200. err, shouldFallbackProtocol := e.serveTunnel(
  201. ctx,
  202. connLog,
  203. addr,
  204. connIndex,
  205. connectedFuse,
  206. protocolFallback,
  207. protocolFallback.protocol,
  208. )
  209. // Check if the connection error was from an IP issue with the host or
  210. // establishing a connection to the edge and if so, rotate the IP address.
  211. shouldRotateEdgeIP, cErr := e.edgeAddrHandler.ShouldGetNewAddress(connIndex, err)
  212. if shouldRotateEdgeIP {
  213. // rotate IP, but forcing internal state to assign a new IP to connection index.
  214. if _, err := e.edgeAddrs.GetDifferentAddr(int(connIndex), true); err != nil {
  215. return err
  216. }
  217. // In addition, if it is a connectivity error, and we have exhausted the configurable maximum edge IPs to rotate,
  218. // then just fallback protocol on next iteration run.
  219. connectivityErr, ok := cErr.(*ConnectivityError)
  220. if ok {
  221. shouldFallbackProtocol = connectivityErr.HasReachedMaxRetries()
  222. }
  223. }
  224. // set connection has re-connecting and log the next retrying backoff
  225. duration, ok := protocolFallback.GetMaxBackoffDuration(ctx)
  226. if !ok {
  227. return err
  228. }
  229. e.config.Observer.SendReconnect(connIndex)
  230. connLog.Logger().Info().Msgf("Retrying connection in up to %s", duration)
  231. select {
  232. case <-ctx.Done():
  233. return ctx.Err()
  234. case <-e.gracefulShutdownC:
  235. return nil
  236. case <-protocolFallback.BackoffTimer():
  237. // should we fallback protocol? If not, just return. Otherwise, set new protocol for next method call.
  238. if !shouldFallbackProtocol {
  239. return err
  240. }
  241. // If a single connection has connected with the current protocol, we know we know we don't have to fallback
  242. // to a different protocol.
  243. if e.tracker.HasConnectedWith(e.config.ProtocolSelector.Current()) {
  244. return err
  245. }
  246. if !selectNextProtocol(
  247. connLog.Logger(),
  248. protocolFallback,
  249. e.config.ProtocolSelector,
  250. err,
  251. ) {
  252. return err
  253. }
  254. }
  255. return err
  256. }
  257. // protocolFallback is a wrapper around backoffHandler that will try fallback option when backoff reaches
  258. // max retries
  259. type protocolFallback struct {
  260. retry.BackoffHandler
  261. protocol connection.Protocol
  262. inFallback bool
  263. }
  264. func (pf *protocolFallback) reset() {
  265. pf.ResetNow()
  266. pf.inFallback = false
  267. }
  268. func (pf *protocolFallback) fallback(fallback connection.Protocol) {
  269. pf.ResetNow()
  270. pf.protocol = fallback
  271. pf.inFallback = true
  272. }
  273. // selectNextProtocol picks connection protocol for the next retry iteration,
  274. // returns true if it was able to pick the protocol, false if we are out of options and should stop retrying
  275. func selectNextProtocol(
  276. connLog *zerolog.Logger,
  277. protocolBackoff *protocolFallback,
  278. selector connection.ProtocolSelector,
  279. cause error,
  280. ) bool {
  281. isQuicBroken := isQuicBroken(cause)
  282. _, hasFallback := selector.Fallback()
  283. if protocolBackoff.ReachedMaxRetries() || (hasFallback && isQuicBroken) {
  284. if isQuicBroken {
  285. connLog.Warn().Msg("If this log occurs persistently, and cloudflared is unable to connect to " +
  286. "Cloudflare Network with `quic` protocol, then most likely your machine/network is getting its egress " +
  287. "UDP to port 7844 (or others) blocked or dropped. Make sure to allow egress connectivity as per " +
  288. "https://developers.cloudflare.com/cloudflare-one/connections/connect-apps/configuration/ports-and-ips/\n" +
  289. "If you are using private routing to this Tunnel, then ICMP, UDP (and Private DNS Resolution) will not work " +
  290. "unless your cloudflared can connect with Cloudflare Network with `quic`.")
  291. }
  292. fallback, hasFallback := selector.Fallback()
  293. if !hasFallback {
  294. return false
  295. }
  296. // Already using fallback protocol, no point to retry
  297. if protocolBackoff.protocol == fallback {
  298. return false
  299. }
  300. connLog.Info().Msgf("Switching to fallback protocol %s", fallback)
  301. protocolBackoff.fallback(fallback)
  302. } else if !protocolBackoff.inFallback {
  303. current := selector.Current()
  304. if protocolBackoff.protocol != current {
  305. protocolBackoff.protocol = current
  306. connLog.Info().Msgf("Changing protocol to %s", current)
  307. }
  308. }
  309. return true
  310. }
  311. func isQuicBroken(cause error) bool {
  312. var idleTimeoutError *quic.IdleTimeoutError
  313. if errors.As(cause, &idleTimeoutError) {
  314. return true
  315. }
  316. var transportError *quic.TransportError
  317. if errors.As(cause, &transportError) && strings.Contains(cause.Error(), "operation not permitted") {
  318. return true
  319. }
  320. return false
  321. }
  322. // ServeTunnel runs a single tunnel connection, returns nil on graceful shutdown,
  323. // on error returns a flag indicating if error can be retried
  324. func (e *EdgeTunnelServer) serveTunnel(
  325. ctx context.Context,
  326. connLog *ConnAwareLogger,
  327. addr *allregions.EdgeAddr,
  328. connIndex uint8,
  329. fuse *booleanFuse,
  330. backoff *protocolFallback,
  331. protocol connection.Protocol,
  332. ) (err error, recoverable bool) {
  333. // Treat panics as recoverable errors
  334. defer func() {
  335. if r := recover(); r != nil {
  336. var ok bool
  337. err, ok = r.(error)
  338. if !ok {
  339. err = fmt.Errorf("ServeTunnel: %v", r)
  340. }
  341. err = errors.Wrapf(err, "stack trace: %s", string(debug.Stack()))
  342. recoverable = true
  343. }
  344. }()
  345. defer e.config.Observer.SendDisconnect(connIndex)
  346. err, recoverable = e.serveConnection(
  347. ctx,
  348. connLog,
  349. addr,
  350. connIndex,
  351. fuse,
  352. backoff,
  353. protocol,
  354. )
  355. if err != nil {
  356. switch err := err.(type) {
  357. case connection.DupConnRegisterTunnelError:
  358. connLog.ConnAwareLogger().Err(err).Msg("Unable to establish connection.")
  359. // don't retry this connection anymore, let supervisor pick a new address
  360. return err, false
  361. case connection.ServerRegisterTunnelError:
  362. connLog.ConnAwareLogger().Err(err).Msg("Register tunnel error from server side")
  363. // Don't send registration error return from server to Sentry. They are
  364. // logged on server side
  365. return err.Cause, !err.Permanent
  366. case *connection.EdgeQuicDialError:
  367. return err, false
  368. case ReconnectSignal:
  369. connLog.Logger().Info().
  370. IPAddr(connection.LogFieldIPAddress, addr.UDP.IP).
  371. Uint8(connection.LogFieldConnIndex, connIndex).
  372. Msgf("Restarting connection due to reconnect signal in %s", err.Delay)
  373. err.DelayBeforeReconnect()
  374. return err, true
  375. default:
  376. if err == context.Canceled {
  377. connLog.Logger().Debug().Err(err).Msgf("Serve tunnel error")
  378. return err, false
  379. }
  380. connLog.ConnAwareLogger().Err(err).Msgf("Serve tunnel error")
  381. _, permanent := err.(unrecoverableError)
  382. return err, !permanent
  383. }
  384. }
  385. return nil, false
  386. }
  387. func (e *EdgeTunnelServer) serveConnection(
  388. ctx context.Context,
  389. connLog *ConnAwareLogger,
  390. addr *allregions.EdgeAddr,
  391. connIndex uint8,
  392. fuse *booleanFuse,
  393. backoff *protocolFallback,
  394. protocol connection.Protocol,
  395. ) (err error, recoverable bool) {
  396. connectedFuse := &connectedFuse{
  397. fuse: fuse,
  398. backoff: backoff,
  399. }
  400. controlStream := connection.NewControlStream(
  401. e.config.Observer,
  402. connectedFuse,
  403. e.config.NamedTunnel,
  404. connIndex,
  405. addr.UDP.IP,
  406. nil,
  407. e.config.RPCTimeout,
  408. e.gracefulShutdownC,
  409. e.config.GracePeriod,
  410. protocol,
  411. )
  412. switch protocol {
  413. case connection.QUIC:
  414. // nolint: gosec
  415. connOptions := e.config.connectionOptions(addr.UDP.String(), uint8(backoff.Retries()))
  416. return e.serveQUIC(ctx,
  417. addr.UDP.AddrPort(),
  418. connLog,
  419. connOptions,
  420. controlStream,
  421. connIndex)
  422. case connection.HTTP2:
  423. edgeConn, err := edgediscovery.DialEdge(ctx, dialTimeout, e.config.EdgeTLSConfigs[protocol], addr.TCP, e.edgeBindAddr)
  424. if err != nil {
  425. connLog.ConnAwareLogger().Err(err).Msg("Unable to establish connection with Cloudflare edge")
  426. return err, true
  427. }
  428. // nolint: gosec
  429. connOptions := e.config.connectionOptions(edgeConn.LocalAddr().String(), uint8(backoff.Retries()))
  430. if err := e.serveHTTP2(
  431. ctx,
  432. connLog,
  433. edgeConn,
  434. connOptions,
  435. controlStream,
  436. connIndex,
  437. ); err != nil {
  438. return err, false
  439. }
  440. default:
  441. return fmt.Errorf("invalid protocol selected: %s", protocol), false
  442. }
  443. return
  444. }
  445. type unrecoverableError struct {
  446. err error
  447. }
  448. func (r unrecoverableError) Error() string {
  449. return r.err.Error()
  450. }
  451. func (e *EdgeTunnelServer) serveHTTP2(
  452. ctx context.Context,
  453. connLog *ConnAwareLogger,
  454. tlsServerConn net.Conn,
  455. connOptions *pogs.ConnectionOptions,
  456. controlStreamHandler connection.ControlStreamHandler,
  457. connIndex uint8,
  458. ) error {
  459. pqMode := e.config.FeatureSelector.PostQuantumMode()
  460. if pqMode == features.PostQuantumStrict {
  461. return unrecoverableError{errors.New("HTTP/2 transport does not support post-quantum")}
  462. }
  463. connLog.Logger().Debug().Msgf("Connecting via http2")
  464. h2conn := connection.NewHTTP2Connection(
  465. tlsServerConn,
  466. e.orchestrator,
  467. connOptions,
  468. e.config.Observer,
  469. connIndex,
  470. controlStreamHandler,
  471. e.config.Log,
  472. )
  473. errGroup, serveCtx := errgroup.WithContext(ctx)
  474. errGroup.Go(func() error {
  475. return h2conn.Serve(serveCtx)
  476. })
  477. errGroup.Go(func() error {
  478. err := listenReconnect(serveCtx, e.reconnectCh, e.gracefulShutdownC)
  479. if err != nil {
  480. // forcefully break the connection (this is only used for testing)
  481. // errgroup will return context canceled for the h2conn.Serve
  482. connLog.Logger().Debug().Msg("Forcefully breaking http2 connection")
  483. }
  484. return err
  485. })
  486. return errGroup.Wait()
  487. }
  488. func (e *EdgeTunnelServer) serveQUIC(
  489. ctx context.Context,
  490. edgeAddr netip.AddrPort,
  491. connLogger *ConnAwareLogger,
  492. connOptions *pogs.ConnectionOptions,
  493. controlStreamHandler connection.ControlStreamHandler,
  494. connIndex uint8,
  495. ) (err error, recoverable bool) {
  496. tlsConfig := e.config.EdgeTLSConfigs[connection.QUIC]
  497. pqMode := e.config.FeatureSelector.PostQuantumMode()
  498. curvePref, err := curvePreference(pqMode, fips.IsFipsEnabled(), tlsConfig.CurvePreferences)
  499. if err != nil {
  500. return err, true
  501. }
  502. connLogger.Logger().Info().Msgf("Using %v as curve preferences", curvePref)
  503. tlsConfig.CurvePreferences = curvePref
  504. // quic-go 0.44 increases the initial packet size to 1280 by default. That breaks anyone running tunnel through WARP
  505. // because WARP MTU is 1280.
  506. var initialPacketSize uint16 = 1252
  507. if edgeAddr.Addr().Is4() {
  508. initialPacketSize = 1232
  509. }
  510. quicConfig := &quic.Config{
  511. HandshakeIdleTimeout: quicpogs.HandshakeIdleTimeout,
  512. MaxIdleTimeout: quicpogs.MaxIdleTimeout,
  513. KeepAlivePeriod: quicpogs.MaxIdlePingPeriod,
  514. MaxIncomingStreams: quicpogs.MaxIncomingStreams,
  515. MaxIncomingUniStreams: quicpogs.MaxIncomingStreams,
  516. EnableDatagrams: true,
  517. Tracer: quicpogs.NewClientTracer(connLogger.Logger(), connIndex),
  518. DisablePathMTUDiscovery: e.config.DisableQUICPathMTUDiscovery,
  519. MaxConnectionReceiveWindow: e.config.QUICConnectionLevelFlowControlLimit,
  520. MaxStreamReceiveWindow: e.config.QUICStreamLevelFlowControlLimit,
  521. InitialPacketSize: initialPacketSize,
  522. }
  523. // Dial the QUIC connection to the edge
  524. conn, err := connection.DialQuic(
  525. ctx,
  526. quicConfig,
  527. tlsConfig,
  528. edgeAddr,
  529. e.edgeBindAddr,
  530. connIndex,
  531. connLogger.Logger(),
  532. )
  533. if err != nil {
  534. connLogger.ConnAwareLogger().Err(err).Msgf("Failed to dial a quic connection")
  535. e.reportErrorToSentry(err)
  536. return err, true
  537. }
  538. var datagramSessionManager connection.DatagramSessionHandler
  539. if e.config.FeatureSelector.DatagramVersion() == features.DatagramV3 {
  540. datagramSessionManager = connection.NewDatagramV3Connection(
  541. ctx,
  542. conn,
  543. e.sessionManager,
  544. e.config.ICMPRouterServer,
  545. connIndex,
  546. e.datagramMetrics,
  547. connLogger.Logger(),
  548. )
  549. } else {
  550. datagramSessionManager = connection.NewDatagramV2Connection(
  551. ctx,
  552. conn,
  553. e.config.ICMPRouterServer,
  554. connIndex,
  555. e.config.RPCTimeout,
  556. e.config.WriteStreamTimeout,
  557. e.orchestrator.GetFlowLimiter(),
  558. connLogger.Logger(),
  559. )
  560. }
  561. // Wrap the [quic.Connection] as a TunnelConnection
  562. tunnelConn, err := connection.NewTunnelConnection(
  563. ctx,
  564. conn,
  565. connIndex,
  566. e.orchestrator,
  567. datagramSessionManager,
  568. controlStreamHandler,
  569. connOptions,
  570. e.config.RPCTimeout,
  571. e.config.WriteStreamTimeout,
  572. e.config.GracePeriod,
  573. connLogger.Logger(),
  574. )
  575. if err != nil {
  576. connLogger.ConnAwareLogger().Err(err).Msgf("Failed to create new tunnel connection")
  577. return err, true
  578. }
  579. // Serve the TunnelConnection
  580. errGroup, serveCtx := errgroup.WithContext(ctx)
  581. errGroup.Go(func() error {
  582. err := tunnelConn.Serve(serveCtx)
  583. if err != nil {
  584. connLogger.ConnAwareLogger().Err(err).Msg("Failed to serve tunnel connection")
  585. }
  586. return err
  587. })
  588. errGroup.Go(func() error {
  589. err := listenReconnect(serveCtx, e.reconnectCh, e.gracefulShutdownC)
  590. if err != nil {
  591. // forcefully break the connection (this is only used for testing)
  592. // errgroup will return context canceled for the tunnelConn.Serve
  593. connLogger.Logger().Debug().Msg("Forcefully breaking tunnel connection")
  594. }
  595. return err
  596. })
  597. return errGroup.Wait(), false
  598. }
  599. // The reportErrorToSentry is an helper function that handles
  600. // verifies if an error should be reported to Sentry.
  601. func (e *EdgeTunnelServer) reportErrorToSentry(err error) {
  602. dialErr, ok := err.(*connection.EdgeQuicDialError)
  603. if ok {
  604. // The TransportError provides an Unwrap function however
  605. // the err MAY not always be set
  606. transportErr, ok := dialErr.Cause.(*quic.TransportError)
  607. if ok &&
  608. transportErr.ErrorCode.IsCryptoError() &&
  609. fips.IsFipsEnabled() &&
  610. e.config.FeatureSelector.PostQuantumMode() == features.PostQuantumStrict {
  611. // Only report to Sentry when using FIPS, PQ,
  612. // and the error is a Crypto error reported by
  613. // an EdgeQuicDialError
  614. sentry.CaptureException(err)
  615. }
  616. }
  617. }
  618. func listenReconnect(ctx context.Context, reconnectCh <-chan ReconnectSignal, gracefulShutdownCh <-chan struct{}) error {
  619. select {
  620. case reconnect := <-reconnectCh:
  621. return reconnect
  622. case <-gracefulShutdownCh:
  623. return nil
  624. case <-ctx.Done():
  625. return nil
  626. }
  627. }
  628. type connectedFuse struct {
  629. fuse *booleanFuse
  630. backoff *protocolFallback
  631. }
  632. func (cf *connectedFuse) Connected() {
  633. cf.fuse.Fuse(true)
  634. cf.backoff.reset()
  635. }
  636. func (cf *connectedFuse) IsConnected() bool {
  637. return cf.fuse.Value()
  638. }