cloudflared_server.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package quic
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "time"
  7. "github.com/cloudflare/cloudflared/tunnelrpc"
  8. "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
  9. )
  10. // HandleRequestFunc wraps the proxied request from the upstream and also provides methods on the stream to
  11. // handle the response back.
  12. type HandleRequestFunc = func(ctx context.Context, stream *RequestServerStream) error
  13. // CloudflaredServer provides a handler interface for a client to provide methods to handle the different types of
  14. // requests that can be communicated by the stream.
  15. type CloudflaredServer struct {
  16. handleRequest HandleRequestFunc
  17. sessionManager pogs.SessionManager
  18. configManager pogs.ConfigurationManager
  19. responseTimeout time.Duration
  20. }
  21. func NewCloudflaredServer(handleRequest HandleRequestFunc, sessionManager pogs.SessionManager, configManager pogs.ConfigurationManager, responseTimeout time.Duration) *CloudflaredServer {
  22. return &CloudflaredServer{
  23. handleRequest: handleRequest,
  24. sessionManager: sessionManager,
  25. configManager: configManager,
  26. responseTimeout: responseTimeout,
  27. }
  28. }
  29. // Serve executes the defined handlers in ServerStream on the provided stream if it is a proper RPC stream with the
  30. // correct preamble protocol signature.
  31. func (s *CloudflaredServer) Serve(ctx context.Context, stream io.ReadWriteCloser) error {
  32. signature, err := determineProtocol(stream)
  33. if err != nil {
  34. return err
  35. }
  36. switch signature {
  37. case dataStreamProtocolSignature:
  38. return s.handleRequest(ctx, &RequestServerStream{stream})
  39. case rpcStreamProtocolSignature:
  40. return s.handleRPC(ctx, stream)
  41. default:
  42. return fmt.Errorf("unknown protocol %v", signature)
  43. }
  44. }
  45. func (s *CloudflaredServer) handleRPC(ctx context.Context, stream io.ReadWriteCloser) error {
  46. ctx, cancel := context.WithTimeout(ctx, s.responseTimeout)
  47. defer cancel()
  48. transport := tunnelrpc.SafeTransport(stream)
  49. defer transport.Close()
  50. main := pogs.CloudflaredServer_ServerToClient(s.sessionManager, s.configManager)
  51. rpcConn := tunnelrpc.NewServerConn(transport, main.Client)
  52. defer rpcConn.Close()
  53. // We ignore the errors here because if cloudflared fails to handle a request, we will just move on.
  54. select {
  55. case <-rpcConn.Done():
  56. case <-ctx.Done():
  57. }
  58. return nil
  59. }