proxy.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. package dbconnect
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net"
  8. "net/http"
  9. "net/url"
  10. "time"
  11. "github.com/cloudflare/cloudflared/hello"
  12. "github.com/cloudflare/cloudflared/logger"
  13. "github.com/cloudflare/cloudflared/validation"
  14. "github.com/gorilla/mux"
  15. "github.com/pkg/errors"
  16. )
  17. // Proxy is an HTTP server that proxies requests to a Client.
  18. type Proxy struct {
  19. client Client
  20. accessValidator *validation.Access
  21. logger logger.Service
  22. }
  23. // NewInsecureProxy creates a Proxy that talks to a Client at an origin.
  24. //
  25. // In insecure mode, the Proxy will allow all Command requests.
  26. func NewInsecureProxy(ctx context.Context, origin string) (*Proxy, error) {
  27. originURL, err := url.Parse(origin)
  28. if err != nil {
  29. return nil, errors.Wrap(err, "must provide a valid database url")
  30. }
  31. client, err := NewClient(ctx, originURL)
  32. if err != nil {
  33. return nil, err
  34. }
  35. err = client.Ping(ctx)
  36. if err != nil {
  37. return nil, errors.Wrap(err, "could not connect to the database")
  38. }
  39. logger, err := logger.New()
  40. if err != nil {
  41. return nil, errors.Wrap(err, "error setting up logger")
  42. }
  43. return &Proxy{client, nil, logger}, nil
  44. }
  45. // NewSecureProxy creates a Proxy that talks to a Client at an origin.
  46. //
  47. // In secure mode, the Proxy will reject any Command requests that are
  48. // not authenticated by Cloudflare Access with a valid JWT.
  49. func NewSecureProxy(ctx context.Context, origin, authDomain, applicationAUD string) (*Proxy, error) {
  50. proxy, err := NewInsecureProxy(ctx, origin)
  51. if err != nil {
  52. return nil, err
  53. }
  54. validator, err := validation.NewAccessValidator(ctx, authDomain, authDomain, applicationAUD)
  55. if err != nil {
  56. return nil, err
  57. }
  58. proxy.accessValidator = validator
  59. return proxy, err
  60. }
  61. // IsInsecure gets whether the Proxy will accept a Command from any source.
  62. func (proxy *Proxy) IsInsecure() bool {
  63. return proxy.accessValidator == nil
  64. }
  65. // IsAllowed checks whether a http.Request is allowed to receive data.
  66. //
  67. // By default, requests must pass through Cloudflare Access for authentication.
  68. // If the proxy is explcitly set to insecure mode, all requests will be allowed.
  69. func (proxy *Proxy) IsAllowed(r *http.Request, verbose ...bool) bool {
  70. if proxy.IsInsecure() {
  71. return true
  72. }
  73. // Access and Tunnel should prevent bad JWTs from even reaching the origin,
  74. // but validate tokens anyway as an abundance of caution.
  75. err := proxy.accessValidator.ValidateRequest(r.Context(), r)
  76. if err == nil {
  77. return true
  78. }
  79. // Warn administrators that invalid JWTs are being rejected. This is indicative
  80. // of either a misconfiguration of the CLI or a massive failure of upstream systems.
  81. if len(verbose) > 0 {
  82. cfRay := proxy.getRayHeader(r)
  83. proxy.logger.Infof("dbproxy: Failed JWT authentication: cf-ray: %s %s", cfRay, err)
  84. }
  85. return false
  86. }
  87. // Start the Proxy at a given address and notify the listener channel when the server is online.
  88. func (proxy *Proxy) Start(ctx context.Context, addr string, listenerC chan<- net.Listener) error {
  89. // STOR-611: use a seperate listener and consider web socket support.
  90. httpListener, err := hello.CreateTLSListener(addr)
  91. if err != nil {
  92. return errors.Wrapf(err, "could not create listener at %s", addr)
  93. }
  94. errC := make(chan error)
  95. defer close(errC)
  96. // Starts the HTTP server and begins to serve requests.
  97. go func() {
  98. errC <- proxy.httpListen(ctx, httpListener)
  99. }()
  100. // Continually ping the server until it comes online or 10 attempts fail.
  101. go func() {
  102. var err error
  103. for i := 0; i < 10; i++ {
  104. _, err = http.Get("http://" + httpListener.Addr().String())
  105. // Once no error was detected, notify the listener channel and return.
  106. if err == nil {
  107. listenerC <- httpListener
  108. return
  109. }
  110. // Backoff between requests to ping the server.
  111. <-time.After(1 * time.Second)
  112. }
  113. errC <- errors.Wrap(err, "took too long for the http server to start")
  114. }()
  115. return <-errC
  116. }
  117. // httpListen starts the httpServer and blocks until the context closes.
  118. func (proxy *Proxy) httpListen(ctx context.Context, listener net.Listener) error {
  119. httpServer := &http.Server{
  120. Addr: listener.Addr().String(),
  121. Handler: proxy.httpRouter(),
  122. ReadTimeout: 10 * time.Second,
  123. WriteTimeout: 60 * time.Second,
  124. IdleTimeout: 60 * time.Second,
  125. }
  126. go func() {
  127. <-ctx.Done()
  128. httpServer.Close()
  129. listener.Close()
  130. }()
  131. return httpServer.Serve(listener)
  132. }
  133. // httpRouter creates a mux.Router for the Proxy.
  134. func (proxy *Proxy) httpRouter() *mux.Router {
  135. router := mux.NewRouter()
  136. router.HandleFunc("/ping", proxy.httpPing()).Methods("GET", "HEAD")
  137. router.HandleFunc("/submit", proxy.httpSubmit()).Methods("POST")
  138. return router
  139. }
  140. // httpPing tests the connection to the database.
  141. //
  142. // By default, this endpoint is unauthenticated to allow for health checks.
  143. // To enable authentication, Cloudflare Access must be enabled on this route.
  144. func (proxy *Proxy) httpPing() http.HandlerFunc {
  145. return func(w http.ResponseWriter, r *http.Request) {
  146. ctx := r.Context()
  147. err := proxy.client.Ping(ctx)
  148. if err == nil {
  149. proxy.httpRespond(w, r, http.StatusOK, "")
  150. } else {
  151. proxy.httpRespondErr(w, r, http.StatusInternalServerError, err)
  152. }
  153. }
  154. }
  155. // httpSubmit sends a command to the database and returns its response.
  156. //
  157. // By default, this endpoint will reject requests that do not pass through Cloudflare Access.
  158. // To disable authentication, the --insecure flag must be specified in the command line.
  159. func (proxy *Proxy) httpSubmit() http.HandlerFunc {
  160. return func(w http.ResponseWriter, r *http.Request) {
  161. if !proxy.IsAllowed(r, true) {
  162. proxy.httpRespondErr(w, r, http.StatusForbidden, fmt.Errorf(""))
  163. return
  164. }
  165. var cmd Command
  166. err := json.NewDecoder(r.Body).Decode(&cmd)
  167. if err != nil {
  168. proxy.httpRespondErr(w, r, http.StatusBadRequest, err)
  169. return
  170. }
  171. ctx := r.Context()
  172. data, err := proxy.client.Submit(ctx, &cmd)
  173. if err != nil {
  174. proxy.httpRespondErr(w, r, http.StatusUnprocessableEntity, err)
  175. return
  176. }
  177. w.Header().Set("Content-type", "application/json")
  178. err = json.NewEncoder(w).Encode(data)
  179. if err != nil {
  180. proxy.httpRespondErr(w, r, http.StatusInternalServerError, err)
  181. }
  182. }
  183. }
  184. // httpRespond writes a status code and string response to the response writer.
  185. func (proxy *Proxy) httpRespond(w http.ResponseWriter, r *http.Request, status int, message string) {
  186. w.WriteHeader(status)
  187. // Only expose the message detail of the reponse if the request is not HEAD
  188. // and the user is authenticated. For example, this prevents an unauthenticated
  189. // failed health check from accidentally leaking sensitive information about the Client.
  190. if r.Method != http.MethodHead && proxy.IsAllowed(r) {
  191. if message == "" {
  192. message = http.StatusText(status)
  193. }
  194. fmt.Fprint(w, message)
  195. }
  196. }
  197. // httpRespondErr is similar to httpRespond, except it formats errors to be more friendly.
  198. func (proxy *Proxy) httpRespondErr(w http.ResponseWriter, r *http.Request, defaultStatus int, err error) {
  199. status, err := httpError(defaultStatus, err)
  200. proxy.httpRespond(w, r, status, err.Error())
  201. if len(err.Error()) > 0 {
  202. cfRay := proxy.getRayHeader(r)
  203. proxy.logger.Infof("dbproxy: Database proxy error: cf-ray: %s %s", cfRay, err)
  204. }
  205. }
  206. // getRayHeader returns the request's Cf-ray header.
  207. func (proxy *Proxy) getRayHeader(r *http.Request) string {
  208. return r.Header.Get("Cf-ray")
  209. }
  210. // httpError extracts common errors and returns an status code and friendly error.
  211. func httpError(defaultStatus int, err error) (int, error) {
  212. if err == nil {
  213. return http.StatusNotImplemented, fmt.Errorf("error expected but found none")
  214. }
  215. if err == io.EOF {
  216. return http.StatusBadRequest, fmt.Errorf("request body cannot be empty")
  217. }
  218. if err == context.DeadlineExceeded {
  219. return http.StatusRequestTimeout, err
  220. }
  221. _, ok := err.(net.Error)
  222. if ok {
  223. return http.StatusRequestTimeout, err
  224. }
  225. if err == context.Canceled {
  226. // Does not exist in Golang, but would be: http.StatusClientClosedWithoutResponse
  227. return 444, err
  228. }
  229. return defaultStatus, err
  230. }