metrics.go 17 KB


  1. package origin
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/cloudflare/cloudflared/h2mux"
  6. "github.com/prometheus/client_golang/prometheus"
  7. )
  8. const (
  9. metricsNamespace = "cloudflared"
  10. tunnelSubsystem = "tunnel"
  11. muxerSubsystem = "muxer"
  12. )
  13. type muxerMetrics struct {
  14. rtt *prometheus.GaugeVec
  15. rttMin *prometheus.GaugeVec
  16. rttMax *prometheus.GaugeVec
  17. receiveWindowAve *prometheus.GaugeVec
  18. sendWindowAve *prometheus.GaugeVec
  19. receiveWindowMin *prometheus.GaugeVec
  20. receiveWindowMax *prometheus.GaugeVec
  21. sendWindowMin *prometheus.GaugeVec
  22. sendWindowMax *prometheus.GaugeVec
  23. inBoundRateCurr *prometheus.GaugeVec
  24. inBoundRateMin *prometheus.GaugeVec
  25. inBoundRateMax *prometheus.GaugeVec
  26. outBoundRateCurr *prometheus.GaugeVec
  27. outBoundRateMin *prometheus.GaugeVec
  28. outBoundRateMax *prometheus.GaugeVec
  29. compBytesBefore *prometheus.GaugeVec
  30. compBytesAfter *prometheus.GaugeVec
  31. compRateAve *prometheus.GaugeVec
  32. }
  33. type TunnelMetrics struct {
  34. haConnections prometheus.Gauge
  35. activeStreams prometheus.Gauge
  36. totalRequests prometheus.Counter
  37. requestsPerTunnel *prometheus.CounterVec
  38. // concurrentRequestsLock is a mutex for concurrentRequests and maxConcurrentRequests
  39. concurrentRequestsLock sync.Mutex
  40. concurrentRequestsPerTunnel *prometheus.GaugeVec
  41. // concurrentRequests records count of concurrent requests for each tunnel
  42. concurrentRequests map[string]uint64
  43. maxConcurrentRequestsPerTunnel *prometheus.GaugeVec
  44. // concurrentRequests records max count of concurrent requests for each tunnel
  45. maxConcurrentRequests map[string]uint64
  46. timerRetries prometheus.Gauge
  47. responseByCode *prometheus.CounterVec
  48. responseCodePerTunnel *prometheus.CounterVec
  49. serverLocations *prometheus.GaugeVec
  50. // locationLock is a mutex for oldServerLocations
  51. locationLock sync.Mutex
  52. // oldServerLocations stores the last server the tunnel was connected to
  53. oldServerLocations map[string]string
  54. regSuccess *prometheus.CounterVec
  55. regFail *prometheus.CounterVec
  56. rpcFail *prometheus.CounterVec
  57. muxerMetrics *muxerMetrics
  58. tunnelsHA tunnelsForHA
  59. userHostnamesCounts *prometheus.CounterVec
  60. }
  61. func newMuxerMetrics() *muxerMetrics {
  62. rtt := prometheus.NewGaugeVec(
  63. prometheus.GaugeOpts{
  64. Namespace: metricsNamespace,
  65. Subsystem: muxerSubsystem,
  66. Name: "rtt",
  67. Help: "Round-trip time in millisecond",
  68. },
  69. []string{"connection_id"},
  70. )
  71. prometheus.MustRegister(rtt)
  72. rttMin := prometheus.NewGaugeVec(
  73. prometheus.GaugeOpts{
  74. Namespace: metricsNamespace,
  75. Subsystem: muxerSubsystem,
  76. Name: "rtt_min",
  77. Help: "Shortest round-trip time in millisecond",
  78. },
  79. []string{"connection_id"},
  80. )
  81. prometheus.MustRegister(rttMin)
  82. rttMax := prometheus.NewGaugeVec(
  83. prometheus.GaugeOpts{
  84. Namespace: metricsNamespace,
  85. Subsystem: muxerSubsystem,
  86. Name: "rtt_max",
  87. Help: "Longest round-trip time in millisecond",
  88. },
  89. []string{"connection_id"},
  90. )
  91. prometheus.MustRegister(rttMax)
  92. receiveWindowAve := prometheus.NewGaugeVec(
  93. prometheus.GaugeOpts{
  94. Namespace: metricsNamespace,
  95. Subsystem: muxerSubsystem,
  96. Name: "receive_window_ave",
  97. Help: "Average receive window size in bytes",
  98. },
  99. []string{"connection_id"},
  100. )
  101. prometheus.MustRegister(receiveWindowAve)
  102. sendWindowAve := prometheus.NewGaugeVec(
  103. prometheus.GaugeOpts{
  104. Namespace: metricsNamespace,
  105. Subsystem: muxerSubsystem,
  106. Name: "send_window_ave",
  107. Help: "Average send window size in bytes",
  108. },
  109. []string{"connection_id"},
  110. )
  111. prometheus.MustRegister(sendWindowAve)
  112. receiveWindowMin := prometheus.NewGaugeVec(
  113. prometheus.GaugeOpts{
  114. Namespace: metricsNamespace,
  115. Subsystem: muxerSubsystem,
  116. Name: "receive_window_min",
  117. Help: "Smallest receive window size in bytes",
  118. },
  119. []string{"connection_id"},
  120. )
  121. prometheus.MustRegister(receiveWindowMin)
  122. receiveWindowMax := prometheus.NewGaugeVec(
  123. prometheus.GaugeOpts{
  124. Namespace: metricsNamespace,
  125. Subsystem: muxerSubsystem,
  126. Name: "receive_window_max",
  127. Help: "Largest receive window size in bytes",
  128. },
  129. []string{"connection_id"},
  130. )
  131. prometheus.MustRegister(receiveWindowMax)
  132. sendWindowMin := prometheus.NewGaugeVec(
  133. prometheus.GaugeOpts{
  134. Namespace: metricsNamespace,
  135. Subsystem: muxerSubsystem,
  136. Name: "send_window_min",
  137. Help: "Smallest send window size in bytes",
  138. },
  139. []string{"connection_id"},
  140. )
  141. prometheus.MustRegister(sendWindowMin)
  142. sendWindowMax := prometheus.NewGaugeVec(
  143. prometheus.GaugeOpts{
  144. Namespace: metricsNamespace,
  145. Subsystem: muxerSubsystem,
  146. Name: "send_window_max",
  147. Help: "Largest send window size in bytes",
  148. },
  149. []string{"connection_id"},
  150. )
  151. prometheus.MustRegister(sendWindowMax)
  152. inBoundRateCurr := prometheus.NewGaugeVec(
  153. prometheus.GaugeOpts{
  154. Namespace: metricsNamespace,
  155. Subsystem: muxerSubsystem,
  156. Name: "inbound_bytes_per_sec_curr",
  157. Help: "Current inbounding bytes per second, 0 if there is no incoming connection",
  158. },
  159. []string{"connection_id"},
  160. )
  161. prometheus.MustRegister(inBoundRateCurr)
  162. inBoundRateMin := prometheus.NewGaugeVec(
  163. prometheus.GaugeOpts{
  164. Namespace: metricsNamespace,
  165. Subsystem: muxerSubsystem,
  166. Name: "inbound_bytes_per_sec_min",
  167. Help: "Minimum non-zero inbounding bytes per second",
  168. },
  169. []string{"connection_id"},
  170. )
  171. prometheus.MustRegister(inBoundRateMin)
  172. inBoundRateMax := prometheus.NewGaugeVec(
  173. prometheus.GaugeOpts{
  174. Namespace: metricsNamespace,
  175. Subsystem: muxerSubsystem,
  176. Name: "inbound_bytes_per_sec_max",
  177. Help: "Maximum inbounding bytes per second",
  178. },
  179. []string{"connection_id"},
  180. )
  181. prometheus.MustRegister(inBoundRateMax)
  182. outBoundRateCurr := prometheus.NewGaugeVec(
  183. prometheus.GaugeOpts{
  184. Namespace: metricsNamespace,
  185. Subsystem: muxerSubsystem,
  186. Name: "outbound_bytes_per_sec_curr",
  187. Help: "Current outbounding bytes per second, 0 if there is no outgoing traffic",
  188. },
  189. []string{"connection_id"},
  190. )
  191. prometheus.MustRegister(outBoundRateCurr)
  192. outBoundRateMin := prometheus.NewGaugeVec(
  193. prometheus.GaugeOpts{
  194. Namespace: metricsNamespace,
  195. Subsystem: muxerSubsystem,
  196. Name: "outbound_bytes_per_sec_min",
  197. Help: "Minimum non-zero outbounding bytes per second",
  198. },
  199. []string{"connection_id"},
  200. )
  201. prometheus.MustRegister(outBoundRateMin)
  202. outBoundRateMax := prometheus.NewGaugeVec(
  203. prometheus.GaugeOpts{
  204. Namespace: metricsNamespace,
  205. Subsystem: muxerSubsystem,
  206. Name: "outbound_bytes_per_sec_max",
  207. Help: "Maximum outbounding bytes per second",
  208. },
  209. []string{"connection_id"},
  210. )
  211. prometheus.MustRegister(outBoundRateMax)
  212. compBytesBefore := prometheus.NewGaugeVec(
  213. prometheus.GaugeOpts{
  214. Namespace: metricsNamespace,
  215. Subsystem: muxerSubsystem,
  216. Name: "comp_bytes_before",
  217. Help: "Bytes sent via cross-stream compression, pre compression",
  218. },
  219. []string{"connection_id"},
  220. )
  221. prometheus.MustRegister(compBytesBefore)
  222. compBytesAfter := prometheus.NewGaugeVec(
  223. prometheus.GaugeOpts{
  224. Namespace: metricsNamespace,
  225. Subsystem: muxerSubsystem,
  226. Name: "comp_bytes_after",
  227. Help: "Bytes sent via cross-stream compression, post compression",
  228. },
  229. []string{"connection_id"},
  230. )
  231. prometheus.MustRegister(compBytesAfter)
  232. compRateAve := prometheus.NewGaugeVec(
  233. prometheus.GaugeOpts{
  234. Namespace: metricsNamespace,
  235. Subsystem: muxerSubsystem,
  236. Name: "comp_rate_ave",
  237. Help: "Average outbound cross-stream compression ratio",
  238. },
  239. []string{"connection_id"},
  240. )
  241. prometheus.MustRegister(compRateAve)
  242. return &muxerMetrics{
  243. rtt: rtt,
  244. rttMin: rttMin,
  245. rttMax: rttMax,
  246. receiveWindowAve: receiveWindowAve,
  247. sendWindowAve: sendWindowAve,
  248. receiveWindowMin: receiveWindowMin,
  249. receiveWindowMax: receiveWindowMax,
  250. sendWindowMin: sendWindowMin,
  251. sendWindowMax: sendWindowMax,
  252. inBoundRateCurr: inBoundRateCurr,
  253. inBoundRateMin: inBoundRateMin,
  254. inBoundRateMax: inBoundRateMax,
  255. outBoundRateCurr: outBoundRateCurr,
  256. outBoundRateMin: outBoundRateMin,
  257. outBoundRateMax: outBoundRateMax,
  258. compBytesBefore: compBytesBefore,
  259. compBytesAfter: compBytesAfter,
  260. compRateAve: compRateAve,
  261. }
  262. }
  263. func (m *muxerMetrics) update(connectionID string, metrics *h2mux.MuxerMetrics) {
  264. m.rtt.WithLabelValues(connectionID).Set(convertRTTMilliSec(metrics.RTT))
  265. m.rttMin.WithLabelValues(connectionID).Set(convertRTTMilliSec(metrics.RTTMin))
  266. m.rttMax.WithLabelValues(connectionID).Set(convertRTTMilliSec(metrics.RTTMax))
  267. m.receiveWindowAve.WithLabelValues(connectionID).Set(metrics.ReceiveWindowAve)
  268. m.sendWindowAve.WithLabelValues(connectionID).Set(metrics.SendWindowAve)
  269. m.receiveWindowMin.WithLabelValues(connectionID).Set(float64(metrics.ReceiveWindowMin))
  270. m.receiveWindowMax.WithLabelValues(connectionID).Set(float64(metrics.ReceiveWindowMax))
  271. m.sendWindowMin.WithLabelValues(connectionID).Set(float64(metrics.SendWindowMin))
  272. m.sendWindowMax.WithLabelValues(connectionID).Set(float64(metrics.SendWindowMax))
  273. m.inBoundRateCurr.WithLabelValues(connectionID).Set(float64(metrics.InBoundRateCurr))
  274. m.inBoundRateMin.WithLabelValues(connectionID).Set(float64(metrics.InBoundRateMin))
  275. m.inBoundRateMax.WithLabelValues(connectionID).Set(float64(metrics.InBoundRateMax))
  276. m.outBoundRateCurr.WithLabelValues(connectionID).Set(float64(metrics.OutBoundRateCurr))
  277. m.outBoundRateMin.WithLabelValues(connectionID).Set(float64(metrics.OutBoundRateMin))
  278. m.outBoundRateMax.WithLabelValues(connectionID).Set(float64(metrics.OutBoundRateMax))
  279. m.compBytesBefore.WithLabelValues(connectionID).Set(float64(metrics.CompBytesBefore.Value()))
  280. m.compBytesAfter.WithLabelValues(connectionID).Set(float64(metrics.CompBytesAfter.Value()))
  281. m.compRateAve.WithLabelValues(connectionID).Set(float64(metrics.CompRateAve()))
  282. }
  283. func convertRTTMilliSec(t time.Duration) float64 {
  284. return float64(t / time.Millisecond)
  285. }
  286. // Metrics that can be collected without asking the edge
  287. func NewTunnelMetrics() *TunnelMetrics {
  288. haConnections := prometheus.NewGauge(
  289. prometheus.GaugeOpts{
  290. Namespace: metricsNamespace,
  291. Subsystem: tunnelSubsystem,
  292. Name: "ha_connections",
  293. Help: "Number of active ha connections",
  294. })
  295. prometheus.MustRegister(haConnections)
  296. activeStreams := h2mux.NewActiveStreamsMetrics(metricsNamespace, tunnelSubsystem)
  297. totalRequests := prometheus.NewCounter(
  298. prometheus.CounterOpts{
  299. Namespace: metricsNamespace,
  300. Subsystem: tunnelSubsystem,
  301. Name: "total_requests",
  302. Help: "Amount of requests proxied through all the tunnels",
  303. })
  304. prometheus.MustRegister(totalRequests)
  305. requestsPerTunnel := prometheus.NewCounterVec(
  306. prometheus.CounterOpts{
  307. Namespace: metricsNamespace,
  308. Subsystem: tunnelSubsystem,
  309. Name: "requests_per_tunnel",
  310. Help: "Amount of requests proxied through each tunnel",
  311. },
  312. []string{"connection_id"},
  313. )
  314. prometheus.MustRegister(requestsPerTunnel)
  315. concurrentRequestsPerTunnel := prometheus.NewGaugeVec(
  316. prometheus.GaugeOpts{
  317. Namespace: metricsNamespace,
  318. Subsystem: tunnelSubsystem,
  319. Name: "concurrent_requests_per_tunnel",
  320. Help: "Concurrent requests proxied through each tunnel",
  321. },
  322. []string{"connection_id"},
  323. )
  324. prometheus.MustRegister(concurrentRequestsPerTunnel)
  325. maxConcurrentRequestsPerTunnel := prometheus.NewGaugeVec(
  326. prometheus.GaugeOpts{
  327. Namespace: metricsNamespace,
  328. Subsystem: tunnelSubsystem,
  329. Name: "max_concurrent_requests_per_tunnel",
  330. Help: "Largest number of concurrent requests proxied through each tunnel so far",
  331. },
  332. []string{"connection_id"},
  333. )
  334. prometheus.MustRegister(maxConcurrentRequestsPerTunnel)
  335. timerRetries := prometheus.NewGauge(
  336. prometheus.GaugeOpts{
  337. Namespace: metricsNamespace,
  338. Subsystem: tunnelSubsystem,
  339. Name: "timer_retries",
  340. Help: "Unacknowledged heart beats count",
  341. })
  342. prometheus.MustRegister(timerRetries)
  343. responseByCode := prometheus.NewCounterVec(
  344. prometheus.CounterOpts{
  345. Namespace: metricsNamespace,
  346. Subsystem: tunnelSubsystem,
  347. Name: "response_by_code",
  348. Help: "Count of responses by HTTP status code",
  349. },
  350. []string{"status_code"},
  351. )
  352. prometheus.MustRegister(responseByCode)
  353. responseCodePerTunnel := prometheus.NewCounterVec(
  354. prometheus.CounterOpts{
  355. Namespace: metricsNamespace,
  356. Subsystem: tunnelSubsystem,
  357. Name: "response_code_per_tunnel",
  358. Help: "Count of responses by HTTP status code fore each tunnel",
  359. },
  360. []string{"connection_id", "status_code"},
  361. )
  362. prometheus.MustRegister(responseCodePerTunnel)
  363. serverLocations := prometheus.NewGaugeVec(
  364. prometheus.GaugeOpts{
  365. Namespace: metricsNamespace,
  366. Subsystem: tunnelSubsystem,
  367. Name: "server_locations",
  368. Help: "Where each tunnel is connected to. 1 means current location, 0 means previous locations.",
  369. },
  370. []string{"connection_id", "location"},
  371. )
  372. prometheus.MustRegister(serverLocations)
  373. rpcFail := prometheus.NewCounterVec(
  374. prometheus.CounterOpts{
  375. Namespace: metricsNamespace,
  376. Subsystem: tunnelSubsystem,
  377. Name: "tunnel_rpc_fail",
  378. Help: "Count of RPC connection errors by type",
  379. },
  380. []string{"error", "rpcName"},
  381. )
  382. prometheus.MustRegister(rpcFail)
  383. registerFail := prometheus.NewCounterVec(
  384. prometheus.CounterOpts{
  385. Namespace: metricsNamespace,
  386. Subsystem: tunnelSubsystem,
  387. Name: "tunnel_register_fail",
  388. Help: "Count of tunnel registration errors by type",
  389. },
  390. []string{"error", "rpcName"},
  391. )
  392. prometheus.MustRegister(registerFail)
  393. userHostnamesCounts := prometheus.NewCounterVec(
  394. prometheus.CounterOpts{
  395. Namespace: metricsNamespace,
  396. Subsystem: tunnelSubsystem,
  397. Name: "user_hostnames_counts",
  398. Help: "Which user hostnames cloudflared is serving",
  399. },
  400. []string{"userHostname"},
  401. )
  402. prometheus.MustRegister(userHostnamesCounts)
  403. registerSuccess := prometheus.NewCounterVec(
  404. prometheus.CounterOpts{
  405. Namespace: metricsNamespace,
  406. Subsystem: tunnelSubsystem,
  407. Name: "tunnel_register_success",
  408. Help: "Count of successful tunnel registrations",
  409. },
  410. []string{"rpcName"},
  411. )
  412. prometheus.MustRegister(registerSuccess)
  413. return &TunnelMetrics{
  414. haConnections: haConnections,
  415. activeStreams: activeStreams,
  416. totalRequests: totalRequests,
  417. requestsPerTunnel: requestsPerTunnel,
  418. concurrentRequestsPerTunnel: concurrentRequestsPerTunnel,
  419. concurrentRequests: make(map[string]uint64),
  420. maxConcurrentRequestsPerTunnel: maxConcurrentRequestsPerTunnel,
  421. maxConcurrentRequests: make(map[string]uint64),
  422. timerRetries: timerRetries,
  423. responseByCode: responseByCode,
  424. responseCodePerTunnel: responseCodePerTunnel,
  425. serverLocations: serverLocations,
  426. oldServerLocations: make(map[string]string),
  427. muxerMetrics: newMuxerMetrics(),
  428. tunnelsHA: NewTunnelsForHA(),
  429. regSuccess: registerSuccess,
  430. regFail: registerFail,
  431. rpcFail: rpcFail,
  432. userHostnamesCounts: userHostnamesCounts,
  433. }
  434. }
  435. func (t *TunnelMetrics) incrementHaConnections() {
  436. t.haConnections.Inc()
  437. }
  438. func (t *TunnelMetrics) decrementHaConnections() {
  439. t.haConnections.Dec()
  440. }
  441. func (t *TunnelMetrics) updateMuxerMetrics(connectionID string, metrics *h2mux.MuxerMetrics) {
  442. t.muxerMetrics.update(connectionID, metrics)
  443. }
  444. func (t *TunnelMetrics) incrementRequests(connectionID string) {
  445. t.concurrentRequestsLock.Lock()
  446. var concurrentRequests uint64
  447. var ok bool
  448. if concurrentRequests, ok = t.concurrentRequests[connectionID]; ok {
  449. t.concurrentRequests[connectionID]++
  450. concurrentRequests++
  451. } else {
  452. t.concurrentRequests[connectionID] = 1
  453. concurrentRequests = 1
  454. }
  455. if maxConcurrentRequests, ok := t.maxConcurrentRequests[connectionID]; (ok && maxConcurrentRequests < concurrentRequests) || !ok {
  456. t.maxConcurrentRequests[connectionID] = concurrentRequests
  457. t.maxConcurrentRequestsPerTunnel.WithLabelValues(connectionID).Set(float64(concurrentRequests))
  458. }
  459. t.concurrentRequestsLock.Unlock()
  460. t.totalRequests.Inc()
  461. t.requestsPerTunnel.WithLabelValues(connectionID).Inc()
  462. t.concurrentRequestsPerTunnel.WithLabelValues(connectionID).Inc()
  463. }
  464. func (t *TunnelMetrics) decrementConcurrentRequests(connectionID string) {
  465. t.concurrentRequestsLock.Lock()
  466. if _, ok := t.concurrentRequests[connectionID]; ok {
  467. t.concurrentRequests[connectionID]--
  468. }
  469. t.concurrentRequestsLock.Unlock()
  470. t.concurrentRequestsPerTunnel.WithLabelValues(connectionID).Dec()
  471. }
  472. func (t *TunnelMetrics) incrementResponses(connectionID, code string) {
  473. t.responseByCode.WithLabelValues(code).Inc()
  474. t.responseCodePerTunnel.WithLabelValues(connectionID, code).Inc()
  475. }
  476. func (t *TunnelMetrics) registerServerLocation(connectionID, loc string) {
  477. t.locationLock.Lock()
  478. defer t.locationLock.Unlock()
  479. if oldLoc, ok := t.oldServerLocations[connectionID]; ok && oldLoc == loc {
  480. return
  481. } else if ok {
  482. t.serverLocations.WithLabelValues(connectionID, oldLoc).Dec()
  483. }
  484. t.serverLocations.WithLabelValues(connectionID, loc).Inc()
  485. t.oldServerLocations[connectionID] = loc
  486. }