service.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. package management
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "net/http"
  7. "net/http/pprof"
  8. "os"
  9. "sync"
  10. "time"
  11. "github.com/go-chi/chi/v5"
  12. "github.com/go-chi/cors"
  13. "github.com/google/uuid"
  14. "github.com/prometheus/client_golang/prometheus/promhttp"
  15. "github.com/rs/zerolog"
  16. "nhooyr.io/websocket"
  17. )
  18. const (
  19. // In the current state, an invalid command was provided by the client
  20. StatusInvalidCommand websocket.StatusCode = 4001
  21. reasonInvalidCommand = "expected start streaming as first event"
  22. // There are a limited number of available streaming log sessions that cloudflared will service, exceeding this
  23. // value will return this error to incoming requests.
  24. StatusSessionLimitExceeded websocket.StatusCode = 4002
  25. reasonSessionLimitExceeded = "limit exceeded for streaming sessions"
  26. // There is a limited idle time while not actively serving a session for a request before dropping the connection.
  27. StatusIdleLimitExceeded websocket.StatusCode = 4003
  28. reasonIdleLimitExceeded = "session was idle for too long"
  29. )
  30. var (
  31. // CORS middleware required to allow dash to access management.argotunnel.com requests
  32. corsHandler = cors.Handler(cors.Options{
  33. // Allows for any subdomain of cloudflare.com
  34. AllowedOrigins: []string{"https://*.cloudflare.com"},
  35. // Required to present cookies or other authentication across origin boundries
  36. AllowCredentials: true,
  37. MaxAge: 300, // Maximum value not ignored by any of major browsers
  38. })
  39. )
  40. type ManagementService struct {
  41. // The management tunnel hostname
  42. Hostname string
  43. // Host details related configurations
  44. serviceIP string
  45. clientID uuid.UUID
  46. label string
  47. // Additional Handlers
  48. metricsHandler http.Handler
  49. log *zerolog.Logger
  50. router chi.Router
  51. // streamingMut is a lock to prevent concurrent requests to start streaming. Utilizing the atomic.Bool is not
  52. // sufficient to complete this operation since many other checks during an incoming new request are needed
  53. // to validate this before setting streaming to true.
  54. streamingMut sync.Mutex
  55. logger LoggerListener
  56. }
  57. func New(managementHostname string,
  58. enableDiagServices bool,
  59. serviceIP string,
  60. clientID uuid.UUID,
  61. label string,
  62. log *zerolog.Logger,
  63. logger LoggerListener,
  64. ) *ManagementService {
  65. s := &ManagementService{
  66. Hostname: managementHostname,
  67. log: log,
  68. logger: logger,
  69. serviceIP: serviceIP,
  70. clientID: clientID,
  71. label: label,
  72. metricsHandler: promhttp.Handler(),
  73. }
  74. r := chi.NewRouter()
  75. r.Use(ValidateAccessTokenQueryMiddleware)
  76. // Default management services
  77. r.With(corsHandler).Get("/ping", ping)
  78. r.With(corsHandler).Head("/ping", ping)
  79. r.Get("/logs", s.logs)
  80. r.With(corsHandler).Get("/host_details", s.getHostDetails)
  81. // Diagnostic management services
  82. if enableDiagServices {
  83. // Prometheus endpoint
  84. r.With(corsHandler).Get("/metrics", s.metricsHandler.ServeHTTP)
  85. // Supports only heap and goroutine
  86. r.With(corsHandler).Get("/debug/pprof/{profile:heap|goroutine}", pprof.Index)
  87. }
  88. s.router = r
  89. return s
  90. }
  91. func (m *ManagementService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  92. m.router.ServeHTTP(w, r)
  93. }
  94. // Management Ping handler
  95. func ping(w http.ResponseWriter, r *http.Request) {
  96. w.WriteHeader(200)
  97. }
  98. // The response provided by the /host_details endpoint
  99. type getHostDetailsResponse struct {
  100. ClientID string `json:"connector_id"`
  101. IP string `json:"ip,omitempty"`
  102. HostName string `json:"hostname,omitempty"`
  103. }
  104. func (m *ManagementService) getHostDetails(w http.ResponseWriter, r *http.Request) {
  105. var getHostDetailsResponse = getHostDetailsResponse{
  106. ClientID: m.clientID.String(),
  107. }
  108. if ip, err := getPrivateIP(m.serviceIP); err == nil {
  109. getHostDetailsResponse.IP = ip
  110. }
  111. getHostDetailsResponse.HostName = m.getLabel()
  112. w.Header().Set("Content-Type", "application/json")
  113. w.WriteHeader(200)
  114. json.NewEncoder(w).Encode(getHostDetailsResponse)
  115. }
  116. func (m *ManagementService) getLabel() string {
  117. if m.label != "" {
  118. return fmt.Sprintf("custom:%s", m.label)
  119. }
  120. // If no label is provided we return the system hostname. This is not
  121. // a fqdn hostname.
  122. hostname, err := os.Hostname()
  123. if err != nil {
  124. return "unknown"
  125. }
  126. return hostname
  127. }
  128. // Get preferred private ip of this machine
  129. func getPrivateIP(addr string) (string, error) {
  130. conn, err := net.DialTimeout("tcp", addr, 1*time.Second)
  131. if err != nil {
  132. return "", err
  133. }
  134. defer conn.Close()
  135. localAddr := conn.LocalAddr().String()
  136. host, _, err := net.SplitHostPort(localAddr)
  137. return host, err
  138. }
  139. // readEvents will loop through all incoming websocket messages from a client and marshal them into the
  140. // proper Event structure and pass through to the events channel. Any invalid messages sent will automatically
  141. // terminate the connection.
  142. func (m *ManagementService) readEvents(c *websocket.Conn, ctx context.Context, events chan<- *ClientEvent) {
  143. for {
  144. event, err := ReadClientEvent(c, ctx)
  145. select {
  146. case <-ctx.Done():
  147. return
  148. default:
  149. if err != nil {
  150. // If the client (or the server) already closed the connection, don't attempt to close it again
  151. if !IsClosed(err, m.log) {
  152. m.log.Err(err).Send()
  153. m.log.Err(c.Close(websocket.StatusUnsupportedData, err.Error())).Send()
  154. }
  155. // Any errors when reading the messages from the client will close the connection
  156. return
  157. }
  158. events <- event
  159. }
  160. }
  161. }
  162. // streamLogs will begin the process of reading from the Session listener and write the log events to the client.
  163. func (m *ManagementService) streamLogs(c *websocket.Conn, ctx context.Context, session *session) {
  164. for session.Active() {
  165. select {
  166. case <-ctx.Done():
  167. session.Stop()
  168. return
  169. case event := <-session.listener:
  170. err := WriteEvent(c, ctx, &EventLog{
  171. ServerEvent: ServerEvent{Type: Logs},
  172. Logs: []*Log{event},
  173. })
  174. if err != nil {
  175. // If the client (or the server) already closed the connection, don't attempt to close it again
  176. if !IsClosed(err, m.log) {
  177. m.log.Err(err).Send()
  178. m.log.Err(c.Close(websocket.StatusInternalError, err.Error())).Send()
  179. }
  180. // Any errors when writing the messages to the client will stop streaming and close the connection
  181. session.Stop()
  182. return
  183. }
  184. default:
  185. // No messages to send
  186. }
  187. }
  188. }
  189. // canStartStream will check the conditions of the request and return if the session can begin streaming.
  190. func (m *ManagementService) canStartStream(session *session) bool {
  191. m.streamingMut.Lock()
  192. defer m.streamingMut.Unlock()
  193. // Limits to one actor for streaming logs
  194. if m.logger.ActiveSessions() > 0 {
  195. // Allow the same user to preempt their existing session to disconnect their old session and start streaming
  196. // with this new session instead.
  197. if existingSession := m.logger.ActiveSession(session.actor); existingSession != nil {
  198. m.log.Info().
  199. Msgf("Another management session request for the same actor was requested; the other session will be disconnected to handle the new request.")
  200. existingSession.Stop()
  201. m.logger.Remove(existingSession)
  202. existingSession.cancel()
  203. } else {
  204. m.log.Warn().
  205. Msgf("Another management session request was attempted but one session already being served; there is a limit of streaming log sessions to reduce overall performance impact.")
  206. return false
  207. }
  208. }
  209. return true
  210. }
  211. // parseFilters will check the ClientEvent for start_streaming and assign filters if provided to the session
  212. func (m *ManagementService) parseFilters(c *websocket.Conn, event *ClientEvent, session *session) bool {
  213. // Expect the first incoming request
  214. startEvent, ok := IntoClientEvent[EventStartStreaming](event, StartStreaming)
  215. if !ok {
  216. return false
  217. }
  218. session.Filters(startEvent.Filters)
  219. return true
  220. }
  221. // Management Streaming Logs accept handler
  222. func (m *ManagementService) logs(w http.ResponseWriter, r *http.Request) {
  223. c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
  224. OriginPatterns: []string{
  225. "*.cloudflare.com",
  226. },
  227. })
  228. if err != nil {
  229. m.log.Debug().Msgf("management handshake: %s", err.Error())
  230. return
  231. }
  232. // Make sure the connection is closed if other go routines fail to close the connection after completing.
  233. defer c.Close(websocket.StatusInternalError, "")
  234. ctx, cancel := context.WithCancel(r.Context())
  235. defer cancel()
  236. events := make(chan *ClientEvent)
  237. go m.readEvents(c, ctx, events)
  238. // Send a heartbeat ping to hold the connection open even if not streaming.
  239. ping := time.NewTicker(15 * time.Second)
  240. defer ping.Stop()
  241. // Close the connection if no operation has occurred after the idle timeout. The timeout is halted
  242. // when streaming logs is active.
  243. idleTimeout := 5 * time.Minute
  244. idle := time.NewTimer(idleTimeout)
  245. defer idle.Stop()
  246. // Fetch the claims from the request context to acquire the actor
  247. claims, ok := ctx.Value(accessClaimsCtxKey).(*managementTokenClaims)
  248. if !ok || claims == nil {
  249. // Typically should never happen as it is provided in the context from the middleware
  250. m.log.Err(c.Close(websocket.StatusInternalError, "missing access_token")).Send()
  251. return
  252. }
  253. session := newSession(logWindow, claims.Actor, cancel)
  254. defer m.logger.Remove(session)
  255. for {
  256. select {
  257. case <-ctx.Done():
  258. m.log.Debug().Msgf("management logs: context cancelled")
  259. c.Close(websocket.StatusNormalClosure, "context closed")
  260. return
  261. case event := <-events:
  262. switch event.Type {
  263. case StartStreaming:
  264. idle.Stop()
  265. // Expect the first incoming request
  266. startEvent, ok := IntoClientEvent[EventStartStreaming](event, StartStreaming)
  267. if !ok {
  268. m.log.Warn().Msgf("expected start_streaming as first recieved event")
  269. m.log.Err(c.Close(StatusInvalidCommand, reasonInvalidCommand)).Send()
  270. return
  271. }
  272. // Make sure the session can start
  273. if !m.canStartStream(session) {
  274. m.log.Err(c.Close(StatusSessionLimitExceeded, reasonSessionLimitExceeded)).Send()
  275. return
  276. }
  277. session.Filters(startEvent.Filters)
  278. m.logger.Listen(session)
  279. m.log.Debug().Msgf("Streaming logs")
  280. go m.streamLogs(c, ctx, session)
  281. continue
  282. case StopStreaming:
  283. idle.Reset(idleTimeout)
  284. // Stop the current session for the current actor who requested it
  285. session.Stop()
  286. m.logger.Remove(session)
  287. case UnknownClientEventType:
  288. fallthrough
  289. default:
  290. // Drop unknown events and close connection
  291. m.log.Debug().Msgf("unexpected management message received: %s", event.Type)
  292. // If the client (or the server) already closed the connection, don't attempt to close it again
  293. if !IsClosed(err, m.log) {
  294. m.log.Err(err).Err(c.Close(websocket.StatusUnsupportedData, err.Error())).Send()
  295. }
  296. return
  297. }
  298. case <-ping.C:
  299. go c.Ping(ctx)
  300. case <-idle.C:
  301. c.Close(StatusIdleLimitExceeded, reasonIdleLimitExceeded)
  302. return
  303. }
  304. }
  305. }