123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- package connection
- import (
- "context"
- "io"
- "net"
- "time"
- "github.com/pkg/errors"
- "github.com/cloudflare/cloudflared/management"
- "github.com/cloudflare/cloudflared/tunnelrpc"
- tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
- )
- // registerClient derives a named tunnel rpc client that can then be used to register and unregister connections.
- type registerClientFunc func(context.Context, io.ReadWriteCloser, time.Duration) tunnelrpc.RegistrationClient
- type controlStream struct {
- observer *Observer
- connectedFuse ConnectedFuse
- tunnelProperties *TunnelProperties
- connIndex uint8
- edgeAddress net.IP
- protocol Protocol
- registerClientFunc registerClientFunc
- registerTimeout time.Duration
- gracefulShutdownC <-chan struct{}
- gracePeriod time.Duration
- stoppedGracefully bool
- }
- // ControlStreamHandler registers connections with origintunneld and initiates graceful shutdown.
- type ControlStreamHandler interface {
- // ServeControlStream handles the control plane of the transport in the current goroutine calling this
- ServeControlStream(ctx context.Context, rw io.ReadWriteCloser, connOptions *tunnelpogs.ConnectionOptions, tunnelConfigGetter TunnelConfigJSONGetter) error
- // IsStopped tells whether the method above has finished
- IsStopped() bool
- }
- type TunnelConfigJSONGetter interface {
- GetConfigJSON() ([]byte, error)
- }
- // NewControlStream returns a new instance of ControlStreamHandler
- func NewControlStream(
- observer *Observer,
- connectedFuse ConnectedFuse,
- tunnelProperties *TunnelProperties,
- connIndex uint8,
- edgeAddress net.IP,
- registerClientFunc registerClientFunc,
- registerTimeout time.Duration,
- gracefulShutdownC <-chan struct{},
- gracePeriod time.Duration,
- protocol Protocol,
- ) ControlStreamHandler {
- if registerClientFunc == nil {
- registerClientFunc = tunnelrpc.NewRegistrationClient
- }
- return &controlStream{
- observer: observer,
- connectedFuse: connectedFuse,
- tunnelProperties: tunnelProperties,
- registerClientFunc: registerClientFunc,
- registerTimeout: registerTimeout,
- connIndex: connIndex,
- edgeAddress: edgeAddress,
- gracefulShutdownC: gracefulShutdownC,
- gracePeriod: gracePeriod,
- protocol: protocol,
- }
- }
- func (c *controlStream) ServeControlStream(
- ctx context.Context,
- rw io.ReadWriteCloser,
- connOptions *tunnelpogs.ConnectionOptions,
- tunnelConfigGetter TunnelConfigJSONGetter,
- ) error {
- registrationClient := c.registerClientFunc(ctx, rw, c.registerTimeout)
- registrationDetails, err := registrationClient.RegisterConnection(
- ctx,
- c.tunnelProperties.Credentials.Auth(),
- c.tunnelProperties.Credentials.TunnelID,
- connOptions,
- c.connIndex,
- c.edgeAddress)
- if err != nil {
- defer registrationClient.Close()
- if err.Error() == DuplicateConnectionError {
- c.observer.metrics.regFail.WithLabelValues("dup_edge_conn", "registerConnection").Inc()
- return errDuplicationConnection
- }
- c.observer.metrics.regFail.WithLabelValues("server_error", "registerConnection").Inc()
- return serverRegistrationErrorFromRPC(err)
- }
- c.observer.metrics.regSuccess.WithLabelValues("registerConnection").Inc()
- c.observer.logConnected(registrationDetails.UUID, c.connIndex, registrationDetails.Location, c.edgeAddress, c.protocol)
- c.observer.sendConnectedEvent(c.connIndex, c.protocol, registrationDetails.Location)
- c.connectedFuse.Connected()
- // if conn index is 0 and tunnel is not remotely managed, then send local ingress rules configuration
- if c.connIndex == 0 && !registrationDetails.TunnelIsRemotelyManaged {
- if tunnelConfig, err := tunnelConfigGetter.GetConfigJSON(); err == nil {
- if err := registrationClient.SendLocalConfiguration(ctx, tunnelConfig); err != nil {
- c.observer.metrics.localConfigMetrics.pushesErrors.Inc()
- c.observer.log.Err(err).Msg("unable to send local configuration")
- }
- c.observer.metrics.localConfigMetrics.pushes.Inc()
- } else {
- c.observer.log.Err(err).Msg("failed to obtain current configuration")
- }
- }
- return c.waitForUnregister(ctx, registrationClient)
- }
- func (c *controlStream) waitForUnregister(ctx context.Context, registrationClient tunnelrpc.RegistrationClient) error {
- // wait for connection termination or start of graceful shutdown
- defer registrationClient.Close()
- var shutdownError error
- select {
- case <-ctx.Done():
- shutdownError = ctx.Err()
- break
- case <-c.gracefulShutdownC:
- c.stoppedGracefully = true
- }
- c.observer.sendUnregisteringEvent(c.connIndex)
- err := registrationClient.GracefulShutdown(ctx, c.gracePeriod)
- if err != nil {
- return errors.Wrap(err, "Error shutting down control stream")
- }
- c.observer.log.Info().
- Int(management.EventTypeKey, int(management.Cloudflared)).
- Uint8(LogFieldConnIndex, c.connIndex).
- IPAddr(LogFieldIPAddress, c.edgeAddress).
- Msg("Unregistered tunnel connection")
- return shutdownError
- }
- func (c *controlStream) IsStopped() bool {
- return c.stoppedGracefully
- }
|