rpc.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. package connection
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. rpc "zombiezen.com/go/capnproto2/rpc"
  7. "github.com/cloudflare/cloudflared/h2mux"
  8. "github.com/cloudflare/cloudflared/logger"
  9. "github.com/cloudflare/cloudflared/tunnelrpc"
  10. tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
  11. )
  12. // NewRPCClient creates and returns a new RPC client, which will communicate
  13. // using a stream on the given muxer
  14. func NewRPCClient(
  15. ctx context.Context,
  16. muxer *h2mux.Muxer,
  17. logger logger.Service,
  18. openStreamTimeout time.Duration,
  19. ) (client tunnelpogs.TunnelServer_PogsClient, err error) {
  20. openStreamCtx, openStreamCancel := context.WithTimeout(ctx, openStreamTimeout)
  21. defer openStreamCancel()
  22. stream, err := muxer.OpenRPCStream(openStreamCtx)
  23. if err != nil {
  24. return
  25. }
  26. if !isRPCStreamResponse(stream.Headers) {
  27. stream.Close()
  28. err = fmt.Errorf("rpc: bad response headers: %v", stream.Headers)
  29. return
  30. }
  31. conn := rpc.NewConn(
  32. tunnelrpc.NewTransportLogger(logger, rpc.StreamTransport(stream)),
  33. tunnelrpc.ConnLog(logger),
  34. )
  35. client = tunnelpogs.TunnelServer_PogsClient{Client: conn.Bootstrap(ctx), Conn: conn}
  36. return client, nil
  37. }
  38. func isRPCStreamResponse(headers []h2mux.Header) bool {
  39. return len(headers) == 1 &&
  40. headers[0].Name == ":status" &&
  41. headers[0].Value == "200"
  42. }