123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279 |
- package dbconnect
- import (
- "context"
- "encoding/json"
- "fmt"
- "io"
- "net"
- "net/http"
- "net/url"
- "time"
- "github.com/cloudflare/cloudflared/hello"
- "github.com/cloudflare/cloudflared/logger"
- "github.com/cloudflare/cloudflared/validation"
- "github.com/gorilla/mux"
- "github.com/pkg/errors"
- )
- // Proxy is an HTTP server that proxies requests to a Client.
- type Proxy struct {
- client Client
- accessValidator *validation.Access
- logger logger.Service
- }
- // NewInsecureProxy creates a Proxy that talks to a Client at an origin.
- //
- // In insecure mode, the Proxy will allow all Command requests.
- func NewInsecureProxy(ctx context.Context, origin string) (*Proxy, error) {
- originURL, err := url.Parse(origin)
- if err != nil {
- return nil, errors.Wrap(err, "must provide a valid database url")
- }
- client, err := NewClient(ctx, originURL)
- if err != nil {
- return nil, err
- }
- err = client.Ping(ctx)
- if err != nil {
- return nil, errors.Wrap(err, "could not connect to the database")
- }
- logger, err := logger.New()
- if err != nil {
- return nil, errors.Wrap(err, "error setting up logger")
- }
- return &Proxy{client, nil, logger}, nil
- }
- // NewSecureProxy creates a Proxy that talks to a Client at an origin.
- //
- // In secure mode, the Proxy will reject any Command requests that are
- // not authenticated by Cloudflare Access with a valid JWT.
- func NewSecureProxy(ctx context.Context, origin, authDomain, applicationAUD string) (*Proxy, error) {
- proxy, err := NewInsecureProxy(ctx, origin)
- if err != nil {
- return nil, err
- }
- validator, err := validation.NewAccessValidator(ctx, authDomain, authDomain, applicationAUD)
- if err != nil {
- return nil, err
- }
- proxy.accessValidator = validator
- return proxy, err
- }
- // IsInsecure gets whether the Proxy will accept a Command from any source.
- func (proxy *Proxy) IsInsecure() bool {
- return proxy.accessValidator == nil
- }
- // IsAllowed checks whether a http.Request is allowed to receive data.
- //
- // By default, requests must pass through Cloudflare Access for authentication.
- // If the proxy is explcitly set to insecure mode, all requests will be allowed.
- func (proxy *Proxy) IsAllowed(r *http.Request, verbose ...bool) bool {
- if proxy.IsInsecure() {
- return true
- }
- // Access and Tunnel should prevent bad JWTs from even reaching the origin,
- // but validate tokens anyway as an abundance of caution.
- err := proxy.accessValidator.ValidateRequest(r.Context(), r)
- if err == nil {
- return true
- }
- // Warn administrators that invalid JWTs are being rejected. This is indicative
- // of either a misconfiguration of the CLI or a massive failure of upstream systems.
- if len(verbose) > 0 {
- cfRay := proxy.getRayHeader(r)
- proxy.logger.Infof("dbproxy: Failed JWT authentication: cf-ray: %s %s", cfRay, err)
- }
- return false
- }
- // Start the Proxy at a given address and notify the listener channel when the server is online.
- func (proxy *Proxy) Start(ctx context.Context, addr string, listenerC chan<- net.Listener) error {
- // STOR-611: use a seperate listener and consider web socket support.
- httpListener, err := hello.CreateTLSListener(addr)
- if err != nil {
- return errors.Wrapf(err, "could not create listener at %s", addr)
- }
- errC := make(chan error)
- defer close(errC)
- // Starts the HTTP server and begins to serve requests.
- go func() {
- errC <- proxy.httpListen(ctx, httpListener)
- }()
- // Continually ping the server until it comes online or 10 attempts fail.
- go func() {
- var err error
- for i := 0; i < 10; i++ {
- _, err = http.Get("http://" + httpListener.Addr().String())
- // Once no error was detected, notify the listener channel and return.
- if err == nil {
- listenerC <- httpListener
- return
- }
- // Backoff between requests to ping the server.
- <-time.After(1 * time.Second)
- }
- errC <- errors.Wrap(err, "took too long for the http server to start")
- }()
- return <-errC
- }
- // httpListen starts the httpServer and blocks until the context closes.
- func (proxy *Proxy) httpListen(ctx context.Context, listener net.Listener) error {
- httpServer := &http.Server{
- Addr: listener.Addr().String(),
- Handler: proxy.httpRouter(),
- ReadTimeout: 10 * time.Second,
- WriteTimeout: 60 * time.Second,
- IdleTimeout: 60 * time.Second,
- }
- go func() {
- <-ctx.Done()
- httpServer.Close()
- listener.Close()
- }()
- return httpServer.Serve(listener)
- }
- // httpRouter creates a mux.Router for the Proxy.
- func (proxy *Proxy) httpRouter() *mux.Router {
- router := mux.NewRouter()
- router.HandleFunc("/ping", proxy.httpPing()).Methods("GET", "HEAD")
- router.HandleFunc("/submit", proxy.httpSubmit()).Methods("POST")
- return router
- }
- // httpPing tests the connection to the database.
- //
- // By default, this endpoint is unauthenticated to allow for health checks.
- // To enable authentication, Cloudflare Access must be enabled on this route.
- func (proxy *Proxy) httpPing() http.HandlerFunc {
- return func(w http.ResponseWriter, r *http.Request) {
- ctx := r.Context()
- err := proxy.client.Ping(ctx)
- if err == nil {
- proxy.httpRespond(w, r, http.StatusOK, "")
- } else {
- proxy.httpRespondErr(w, r, http.StatusInternalServerError, err)
- }
- }
- }
- // httpSubmit sends a command to the database and returns its response.
- //
- // By default, this endpoint will reject requests that do not pass through Cloudflare Access.
- // To disable authentication, the --insecure flag must be specified in the command line.
- func (proxy *Proxy) httpSubmit() http.HandlerFunc {
- return func(w http.ResponseWriter, r *http.Request) {
- if !proxy.IsAllowed(r, true) {
- proxy.httpRespondErr(w, r, http.StatusForbidden, fmt.Errorf(""))
- return
- }
- var cmd Command
- err := json.NewDecoder(r.Body).Decode(&cmd)
- if err != nil {
- proxy.httpRespondErr(w, r, http.StatusBadRequest, err)
- return
- }
- ctx := r.Context()
- data, err := proxy.client.Submit(ctx, &cmd)
- if err != nil {
- proxy.httpRespondErr(w, r, http.StatusUnprocessableEntity, err)
- return
- }
- w.Header().Set("Content-type", "application/json")
- err = json.NewEncoder(w).Encode(data)
- if err != nil {
- proxy.httpRespondErr(w, r, http.StatusInternalServerError, err)
- }
- }
- }
- // httpRespond writes a status code and string response to the response writer.
- func (proxy *Proxy) httpRespond(w http.ResponseWriter, r *http.Request, status int, message string) {
- w.WriteHeader(status)
- // Only expose the message detail of the reponse if the request is not HEAD
- // and the user is authenticated. For example, this prevents an unauthenticated
- // failed health check from accidentally leaking sensitive information about the Client.
- if r.Method != http.MethodHead && proxy.IsAllowed(r) {
- if message == "" {
- message = http.StatusText(status)
- }
- fmt.Fprint(w, message)
- }
- }
- // httpRespondErr is similar to httpRespond, except it formats errors to be more friendly.
- func (proxy *Proxy) httpRespondErr(w http.ResponseWriter, r *http.Request, defaultStatus int, err error) {
- status, err := httpError(defaultStatus, err)
- proxy.httpRespond(w, r, status, err.Error())
- if len(err.Error()) > 0 {
- cfRay := proxy.getRayHeader(r)
- proxy.logger.Infof("dbproxy: Database proxy error: cf-ray: %s %s", cfRay, err)
- }
- }
- // getRayHeader returns the request's Cf-ray header.
- func (proxy *Proxy) getRayHeader(r *http.Request) string {
- return r.Header.Get("Cf-ray")
- }
- // httpError extracts common errors and returns an status code and friendly error.
- func httpError(defaultStatus int, err error) (int, error) {
- if err == nil {
- return http.StatusNotImplemented, fmt.Errorf("error expected but found none")
- }
- if err == io.EOF {
- return http.StatusBadRequest, fmt.Errorf("request body cannot be empty")
- }
- if err == context.DeadlineExceeded {
- return http.StatusRequestTimeout, err
- }
- _, ok := err.(net.Error)
- if ok {
- return http.StatusRequestTimeout, err
- }
- if err == context.Canceled {
- // Does not exist in Golang, but would be: http.StatusClientClosedWithoutResponse
- return 444, err
- }
- return defaultStatus, err
- }
|