request_client_stream.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package quic
  2. import (
  3. "fmt"
  4. "io"
  5. capnp "zombiezen.com/go/capnproto2"
  6. "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
  7. )
  8. // RequestClientStream is a stream to provide requests to the server. This operation is typically driven by the edge service.
  9. type RequestClientStream struct {
  10. io.ReadWriteCloser
  11. }
  12. // WriteConnectRequestData writes requestMeta to a stream.
  13. func (rcs *RequestClientStream) WriteConnectRequestData(dest string, connectionType pogs.ConnectionType, metadata ...pogs.Metadata) error {
  14. connectRequest := &pogs.ConnectRequest{
  15. Dest: dest,
  16. Type: connectionType,
  17. Metadata: metadata,
  18. }
  19. msg, err := connectRequest.ToPogs()
  20. if err != nil {
  21. return err
  22. }
  23. if err := writeDataStreamPreamble(rcs); err != nil {
  24. return err
  25. }
  26. return capnp.NewEncoder(rcs).Encode(msg)
  27. }
  28. // ReadConnectResponseData reads the response from the rpc stream to a ConnectResponse.
  29. func (rcs *RequestClientStream) ReadConnectResponseData() (*pogs.ConnectResponse, error) {
  30. signature, err := determineProtocol(rcs)
  31. if err != nil {
  32. return nil, err
  33. }
  34. if signature != dataStreamProtocolSignature {
  35. return nil, fmt.Errorf("wrong protocol signature %v", signature)
  36. }
  37. // This is a NO-OP for now. We could cause a branching if we wanted to use multiple versions.
  38. if _, err := readVersion(rcs); err != nil {
  39. return nil, err
  40. }
  41. msg, err := capnp.NewDecoder(rcs).Decode()
  42. if err != nil {
  43. return nil, err
  44. }
  45. r := &pogs.ConnectResponse{}
  46. if err := r.FromPogs(msg); err != nil {
  47. return nil, err
  48. }
  49. return r, nil
  50. }