utils.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package tunnelrpc
  2. import (
  3. "context"
  4. "io"
  5. "time"
  6. "github.com/pkg/errors"
  7. capnp "zombiezen.com/go/capnproto2"
  8. "zombiezen.com/go/capnproto2/rpc"
  9. )
  10. const (
  11. // These default values are here so that we give some time for the underlying connection/stream
  12. // to recover in the face of what we believe to be temporarily errors.
  13. // We don't want to be too aggressive, as the end result of giving a final error (non-temporary)
  14. // will result in the connection to be dropped.
  15. // In turn, the other side will probably reconnect, which will put again more pressure in the overall system.
  16. // So, the best solution is to give it some conservative time to recover.
  17. defaultSleepBetweenTemporaryError = 500 * time.Millisecond
  18. defaultMaxRetries = 3
  19. )
  20. type readWriterSafeTemporaryErrorCloser struct {
  21. io.ReadWriteCloser
  22. retries int
  23. sleepBetweenRetries time.Duration
  24. maxRetries int
  25. }
  26. func (r *readWriterSafeTemporaryErrorCloser) Read(p []byte) (n int, err error) {
  27. n, err = r.ReadWriteCloser.Read(p)
  28. // if there was a failure reading from the read closer, and the error is temporary, try again in some seconds
  29. // otherwise, just fail without a temporary error.
  30. if n == 0 && err != nil && isTemporaryError(err) {
  31. if r.retries >= r.maxRetries {
  32. return 0, errors.Wrap(err, "failed read from capnproto ReaderWriter after multiple temporary errors")
  33. } else {
  34. r.retries += 1
  35. // sleep for some time to prevent quick read loops that cause exhaustion of CPU resources
  36. time.Sleep(r.sleepBetweenRetries)
  37. }
  38. }
  39. if err == nil {
  40. r.retries = 0
  41. }
  42. return n, err
  43. }
  44. func SafeTransport(rw io.ReadWriteCloser) rpc.Transport {
  45. return rpc.StreamTransport(&readWriterSafeTemporaryErrorCloser{
  46. ReadWriteCloser: rw,
  47. maxRetries: defaultMaxRetries,
  48. sleepBetweenRetries: defaultSleepBetweenTemporaryError,
  49. })
  50. }
  51. // isTemporaryError reports whether e has a Temporary() method that
  52. // returns true.
  53. func isTemporaryError(e error) bool {
  54. type temp interface {
  55. Temporary() bool
  56. }
  57. t, ok := e.(temp)
  58. return ok && t.Temporary()
  59. }
  60. // NoopCapnpLogger provides a logger to discard all capnp rpc internal logging messages as
  61. // they are by default provided to stdout if no logger interface is provided. These logging
  62. // messages in cloudflared have typically not provided a high amount of pratical value
  63. // as the messages are extremely verbose and don't provide a good insight into the message
  64. // contents or rpc method names.
  65. type noopCapnpLogger struct{}
  66. func (noopCapnpLogger) Infof(ctx context.Context, format string, args ...interface{}) {}
  67. func (noopCapnpLogger) Errorf(ctx context.Context, format string, args ...interface{}) {}
  68. func NewClientConn(transport rpc.Transport) *rpc.Conn {
  69. return rpc.NewConn(transport, rpc.ConnLog(noopCapnpLogger{}))
  70. }
  71. func NewServerConn(transport rpc.Transport, client capnp.Client) *rpc.Conn {
  72. return rpc.NewConn(transport, rpc.MainInterface(client), rpc.ConnLog(noopCapnpLogger{}))
  73. }