registration_client.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package tunnelrpc
  2. import (
  3. "context"
  4. "io"
  5. "net"
  6. "time"
  7. "github.com/google/uuid"
  8. "zombiezen.com/go/capnproto2/rpc"
  9. "github.com/cloudflare/cloudflared/tunnelrpc/metrics"
  10. "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
  11. )
  12. type RegistrationClient interface {
  13. RegisterConnection(
  14. ctx context.Context,
  15. auth pogs.TunnelAuth,
  16. tunnelID uuid.UUID,
  17. options *pogs.ConnectionOptions,
  18. connIndex uint8,
  19. edgeAddress net.IP,
  20. ) (*pogs.ConnectionDetails, error)
  21. SendLocalConfiguration(ctx context.Context, config []byte) error
  22. GracefulShutdown(ctx context.Context, gracePeriod time.Duration) error
  23. Close()
  24. }
  25. type registrationClient struct {
  26. client pogs.RegistrationServer_PogsClient
  27. transport rpc.Transport
  28. requestTimeout time.Duration
  29. }
  30. func NewRegistrationClient(ctx context.Context, stream io.ReadWriteCloser, requestTimeout time.Duration) RegistrationClient {
  31. transport := SafeTransport(stream)
  32. conn := NewClientConn(transport)
  33. client := pogs.NewRegistrationServer_PogsClient(conn.Bootstrap(ctx), conn)
  34. return &registrationClient{
  35. client: client,
  36. transport: transport,
  37. requestTimeout: requestTimeout,
  38. }
  39. }
  40. func (r *registrationClient) RegisterConnection(
  41. ctx context.Context,
  42. auth pogs.TunnelAuth,
  43. tunnelID uuid.UUID,
  44. options *pogs.ConnectionOptions,
  45. connIndex uint8,
  46. edgeAddress net.IP,
  47. ) (*pogs.ConnectionDetails, error) {
  48. ctx, cancel := context.WithTimeout(ctx, r.requestTimeout)
  49. defer cancel()
  50. defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Registration, metrics.OperationRegisterConnection).Inc()
  51. timer := metrics.NewClientOperationLatencyObserver(metrics.Registration, metrics.OperationRegisterConnection)
  52. defer timer.ObserveDuration()
  53. conn, err := r.client.RegisterConnection(ctx, auth, tunnelID, connIndex, options)
  54. if err != nil {
  55. metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Registration, metrics.OperationRegisterConnection).Inc()
  56. }
  57. return conn, err
  58. }
  59. func (r *registrationClient) SendLocalConfiguration(ctx context.Context, config []byte) error {
  60. ctx, cancel := context.WithTimeout(ctx, r.requestTimeout)
  61. defer cancel()
  62. defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Registration, metrics.OperationUpdateLocalConfiguration).Inc()
  63. timer := metrics.NewClientOperationLatencyObserver(metrics.Registration, metrics.OperationUpdateLocalConfiguration)
  64. defer timer.ObserveDuration()
  65. err := r.client.SendLocalConfiguration(ctx, config)
  66. if err != nil {
  67. metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Registration, metrics.OperationUpdateLocalConfiguration).Inc()
  68. }
  69. return err
  70. }
  71. func (r *registrationClient) GracefulShutdown(ctx context.Context, gracePeriod time.Duration) error {
  72. ctx, cancel := context.WithTimeout(ctx, gracePeriod)
  73. defer cancel()
  74. defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Registration, metrics.OperationUnregisterConnection).Inc()
  75. timer := metrics.NewClientOperationLatencyObserver(metrics.Registration, metrics.OperationUnregisterConnection)
  76. defer timer.ObserveDuration()
  77. err := r.client.UnregisterConnection(ctx)
  78. if err != nil {
  79. metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Registration, metrics.OperationUnregisterConnection).Inc()
  80. return err
  81. }
  82. return nil
  83. }
  84. func (r *registrationClient) Close() {
  85. // Closing the client will also close the connection
  86. _ = r.client.Close()
  87. // Closing the transport also closes the stream
  88. _ = r.transport.Close()
  89. }