cloudflared_client.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package quic
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "net"
  7. "time"
  8. "zombiezen.com/go/capnproto2/rpc"
  9. "github.com/google/uuid"
  10. "github.com/cloudflare/cloudflared/tunnelrpc"
  11. "github.com/cloudflare/cloudflared/tunnelrpc/metrics"
  12. "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
  13. )
  14. // CloudflaredClient calls capnp rpc methods of SessionManager and ConfigurationManager.
  15. type CloudflaredClient struct {
  16. client pogs.CloudflaredServer_PogsClient
  17. transport rpc.Transport
  18. requestTimeout time.Duration
  19. }
  20. func NewCloudflaredClient(ctx context.Context, stream io.ReadWriteCloser, requestTimeout time.Duration) (*CloudflaredClient, error) {
  21. n, err := stream.Write(rpcStreamProtocolSignature[:])
  22. if err != nil {
  23. return nil, err
  24. }
  25. if n != len(rpcStreamProtocolSignature) {
  26. return nil, fmt.Errorf("expect to write %d bytes for RPC stream protocol signature, wrote %d", len(rpcStreamProtocolSignature), n)
  27. }
  28. transport := tunnelrpc.SafeTransport(stream)
  29. conn := tunnelrpc.NewClientConn(transport)
  30. client := pogs.NewCloudflaredServer_PogsClient(conn.Bootstrap(ctx), conn)
  31. return &CloudflaredClient{
  32. client: client,
  33. transport: transport,
  34. requestTimeout: requestTimeout,
  35. }, nil
  36. }
  37. func (c *CloudflaredClient) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeIdleAfterHint time.Duration, traceContext string) (*pogs.RegisterUdpSessionResponse, error) {
  38. ctx, cancel := context.WithTimeout(ctx, c.requestTimeout)
  39. defer cancel()
  40. defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Cloudflared, metrics.OperationRegisterUdpSession).Inc()
  41. timer := metrics.NewClientOperationLatencyObserver(metrics.Cloudflared, metrics.OperationRegisterUdpSession)
  42. defer timer.ObserveDuration()
  43. resp, err := c.client.RegisterUdpSession(ctx, sessionID, dstIP, dstPort, closeIdleAfterHint, traceContext)
  44. if err != nil {
  45. metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Cloudflared, metrics.OperationRegisterUdpSession).Inc()
  46. }
  47. return resp, err
  48. }
  49. func (c *CloudflaredClient) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, message string) error {
  50. ctx, cancel := context.WithTimeout(ctx, c.requestTimeout)
  51. defer cancel()
  52. defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Cloudflared, metrics.OperationUnregisterUdpSession).Inc()
  53. timer := metrics.NewClientOperationLatencyObserver(metrics.Cloudflared, metrics.OperationUnregisterUdpSession)
  54. defer timer.ObserveDuration()
  55. err := c.client.UnregisterUdpSession(ctx, sessionID, message)
  56. if err != nil {
  57. metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Cloudflared, metrics.OperationUnregisterUdpSession).Inc()
  58. }
  59. return err
  60. }
  61. func (c *CloudflaredClient) UpdateConfiguration(ctx context.Context, version int32, config []byte) (*pogs.UpdateConfigurationResponse, error) {
  62. ctx, cancel := context.WithTimeout(ctx, c.requestTimeout)
  63. defer cancel()
  64. defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Cloudflared, metrics.OperationUpdateConfiguration).Inc()
  65. timer := metrics.NewClientOperationLatencyObserver(metrics.Cloudflared, metrics.OperationUpdateConfiguration)
  66. defer timer.ObserveDuration()
  67. resp, err := c.client.UpdateConfiguration(ctx, version, config)
  68. if err != nil {
  69. metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Cloudflared, metrics.OperationUpdateConfiguration).Inc()
  70. }
  71. return resp, err
  72. }
  73. func (c *CloudflaredClient) Close() {
  74. _ = c.client.Close()
  75. _ = c.transport.Close()
  76. }