123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- package tunnelrpc
- import (
- "context"
- "io"
- "time"
- "github.com/pkg/errors"
- capnp "zombiezen.com/go/capnproto2"
- "zombiezen.com/go/capnproto2/rpc"
- )
- const (
- // These default values are here so that we give some time for the underlying connection/stream
- // to recover in the face of what we believe to be temporarily errors.
- // We don't want to be too aggressive, as the end result of giving a final error (non-temporary)
- // will result in the connection to be dropped.
- // In turn, the other side will probably reconnect, which will put again more pressure in the overall system.
- // So, the best solution is to give it some conservative time to recover.
- defaultSleepBetweenTemporaryError = 500 * time.Millisecond
- defaultMaxRetries = 3
- )
- type readWriterSafeTemporaryErrorCloser struct {
- io.ReadWriteCloser
- retries int
- sleepBetweenRetries time.Duration
- maxRetries int
- }
- func (r *readWriterSafeTemporaryErrorCloser) Read(p []byte) (n int, err error) {
- n, err = r.ReadWriteCloser.Read(p)
- // if there was a failure reading from the read closer, and the error is temporary, try again in some seconds
- // otherwise, just fail without a temporary error.
- if n == 0 && err != nil && isTemporaryError(err) {
- if r.retries >= r.maxRetries {
- return 0, errors.Wrap(err, "failed read from capnproto ReaderWriter after multiple temporary errors")
- } else {
- r.retries += 1
- // sleep for some time to prevent quick read loops that cause exhaustion of CPU resources
- time.Sleep(r.sleepBetweenRetries)
- }
- }
- if err == nil {
- r.retries = 0
- }
- return n, err
- }
- func SafeTransport(rw io.ReadWriteCloser) rpc.Transport {
- return rpc.StreamTransport(&readWriterSafeTemporaryErrorCloser{
- ReadWriteCloser: rw,
- maxRetries: defaultMaxRetries,
- sleepBetweenRetries: defaultSleepBetweenTemporaryError,
- })
- }
- // isTemporaryError reports whether e has a Temporary() method that
- // returns true.
- func isTemporaryError(e error) bool {
- type temp interface {
- Temporary() bool
- }
- t, ok := e.(temp)
- return ok && t.Temporary()
- }
- // NoopCapnpLogger provides a logger to discard all capnp rpc internal logging messages as
- // they are by default provided to stdout if no logger interface is provided. These logging
- // messages in cloudflared have typically not provided a high amount of pratical value
- // as the messages are extremely verbose and don't provide a good insight into the message
- // contents or rpc method names.
- type noopCapnpLogger struct{}
- func (noopCapnpLogger) Infof(ctx context.Context, format string, args ...interface{}) {}
- func (noopCapnpLogger) Errorf(ctx context.Context, format string, args ...interface{}) {}
- func NewClientConn(transport rpc.Transport) *rpc.Conn {
- return rpc.NewConn(transport, rpc.ConnLog(noopCapnpLogger{}))
- }
- func NewServerConn(transport rpc.Transport, client capnp.Client) *rpc.Conn {
- return rpc.NewConn(transport, rpc.MainInterface(client), rpc.ConnLog(noopCapnpLogger{}))
- }
|