test_tail.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. #!/usr/bin/env python
  2. import asyncio
  3. import json
  4. import pytest
  5. import requests
  6. import websockets
  7. from websockets.client import connect, WebSocketClientProtocol
  8. from conftest import CfdModes
  9. from constants import MAX_RETRIES, BACKOFF_SECS
  10. from retrying import retry
  11. from cli import CloudflaredCli
  12. from util import LOGGER, start_cloudflared, write_config, wait_tunnel_ready
  13. class TestTail:
  14. @pytest.mark.asyncio
  15. async def test_start_stop_streaming(self, tmp_path, component_tests_config):
  16. """
  17. Validates that a websocket connection to management.argotunnel.com/logs can be opened
  18. with the access token and start and stop streaming on-demand.
  19. """
  20. print("test_start_stop_streaming")
  21. config = component_tests_config(cfd_mode=CfdModes.NAMED, run_proxy_dns=False, provide_ingress=False)
  22. LOGGER.debug(config)
  23. config_path = write_config(tmp_path, config.full_config)
  24. with start_cloudflared(tmp_path, config, cfd_args=["run", "--hello-world"], new_process=True):
  25. wait_tunnel_ready(tunnel_url=config.get_url(), require_min_connections=1)
  26. cfd_cli = CloudflaredCli(config, config_path, LOGGER)
  27. url = cfd_cli.get_management_wsurl("logs", config, config_path)
  28. async with connect(url, open_timeout=5, close_timeout=3) as websocket:
  29. await websocket.send('{"type": "start_streaming"}')
  30. await websocket.send('{"type": "stop_streaming"}')
  31. await websocket.send('{"type": "start_streaming"}')
  32. await websocket.send('{"type": "stop_streaming"}')
  33. @pytest.mark.asyncio
  34. async def test_streaming_logs(self, tmp_path, component_tests_config):
  35. """
  36. Validates that a streaming logs connection will stream logs
  37. """
  38. print("test_streaming_logs")
  39. config = component_tests_config(cfd_mode=CfdModes.NAMED, run_proxy_dns=False, provide_ingress=False)
  40. LOGGER.debug(config)
  41. config_path = write_config(tmp_path, config.full_config)
  42. with start_cloudflared(tmp_path, config, cfd_args=["run", "--hello-world"], new_process=True):
  43. wait_tunnel_ready(tunnel_url=config.get_url(), require_min_connections=1)
  44. cfd_cli = CloudflaredCli(config, config_path, LOGGER)
  45. url = cfd_cli.get_management_wsurl("logs", config, config_path)
  46. async with connect(url, open_timeout=5, close_timeout=5) as websocket:
  47. # send start_streaming
  48. await websocket.send(json.dumps({
  49. "type": "start_streaming",
  50. "filters": {
  51. "events": ["http"]
  52. }
  53. }))
  54. # send some http requests to the tunnel to trigger some logs
  55. await generate_and_validate_http_events(websocket, config.get_url(), 10)
  56. # send stop_streaming
  57. await websocket.send('{"type": "stop_streaming"}')
  58. @pytest.mark.asyncio
  59. async def test_streaming_logs_filters(self, tmp_path, component_tests_config):
  60. """
  61. Validates that a streaming logs connection will stream logs
  62. but not http when filters applied.
  63. """
  64. print("test_streaming_logs_filters")
  65. config = component_tests_config(cfd_mode=CfdModes.NAMED, run_proxy_dns=False, provide_ingress=False)
  66. LOGGER.debug(config)
  67. config_path = write_config(tmp_path, config.full_config)
  68. with start_cloudflared(tmp_path, config, cfd_args=["run", "--hello-world"], new_process=True):
  69. wait_tunnel_ready(tunnel_url=config.get_url(), require_min_connections=1)
  70. cfd_cli = CloudflaredCli(config, config_path, LOGGER)
  71. url = cfd_cli.get_management_wsurl("logs", config, config_path)
  72. async with connect(url, open_timeout=5, close_timeout=5) as websocket:
  73. # send start_streaming with tcp logs only
  74. await websocket.send(json.dumps({
  75. "type": "start_streaming",
  76. "filters": {
  77. "events": ["tcp"],
  78. "level": "debug"
  79. }
  80. }))
  81. # don't expect any http logs
  82. await generate_and_validate_no_log_event(websocket, config.get_url())
  83. # send stop_streaming
  84. await websocket.send('{"type": "stop_streaming"}')
  85. @pytest.mark.asyncio
  86. async def test_streaming_logs_sampling(self, tmp_path, component_tests_config):
  87. """
  88. Validates that a streaming logs connection will stream logs with sampling.
  89. """
  90. print("test_streaming_logs_sampling")
  91. config = component_tests_config(cfd_mode=CfdModes.NAMED, run_proxy_dns=False, provide_ingress=False)
  92. LOGGER.debug(config)
  93. config_path = write_config(tmp_path, config.full_config)
  94. with start_cloudflared(tmp_path, config, cfd_args=["run", "--hello-world"], new_process=True):
  95. wait_tunnel_ready(tunnel_url=config.get_url(), require_min_connections=1)
  96. cfd_cli = CloudflaredCli(config, config_path, LOGGER)
  97. url = cfd_cli.get_management_wsurl("logs", config, config_path)
  98. async with connect(url, open_timeout=5, close_timeout=5) as websocket:
  99. # send start_streaming with info logs only
  100. await websocket.send(json.dumps({
  101. "type": "start_streaming",
  102. "filters": {
  103. "sampling": 0.5,
  104. "events": ["http"]
  105. }
  106. }))
  107. # don't expect any http logs
  108. count = await generate_and_validate_http_events(websocket, config.get_url(), 10)
  109. assert count < (10 * 2) # There are typically always two log lines for http requests (request and response)
  110. # send stop_streaming
  111. await websocket.send('{"type": "stop_streaming"}')
  112. @pytest.mark.asyncio
  113. async def test_streaming_logs_actor_override(self, tmp_path, component_tests_config):
  114. """
  115. Validates that a streaming logs session can be overriden by the same actor
  116. """
  117. print("test_streaming_logs_actor_override")
  118. config = component_tests_config(cfd_mode=CfdModes.NAMED, run_proxy_dns=False, provide_ingress=False)
  119. LOGGER.debug(config)
  120. config_path = write_config(tmp_path, config.full_config)
  121. with start_cloudflared(tmp_path, config, cfd_args=["run", "--hello-world"], new_process=True):
  122. wait_tunnel_ready(tunnel_url=config.get_url(), require_min_connections=1)
  123. cfd_cli = CloudflaredCli(config, config_path, LOGGER)
  124. url = cfd_cli.get_management_wsurl("logs", config, config_path)
  125. task = asyncio.ensure_future(start_streaming_to_be_remotely_closed(url))
  126. override_task = asyncio.ensure_future(start_streaming_override(url))
  127. await asyncio.wait([task, override_task])
  128. assert task.exception() == None, task.exception()
  129. assert override_task.exception() == None, override_task.exception()
  130. async def start_streaming_to_be_remotely_closed(url):
  131. async with connect(url, open_timeout=5, close_timeout=5) as websocket:
  132. try:
  133. await websocket.send(json.dumps({"type": "start_streaming"}))
  134. await asyncio.sleep(10)
  135. assert websocket.closed, "expected this request to be forcibly closed by the override"
  136. except websockets.ConnectionClosed:
  137. # we expect the request to be closed
  138. pass
  139. async def start_streaming_override(url):
  140. # wait for the first connection to be established
  141. await asyncio.sleep(1)
  142. async with connect(url, open_timeout=5, close_timeout=5) as websocket:
  143. await websocket.send(json.dumps({"type": "start_streaming"}))
  144. await asyncio.sleep(1)
  145. await websocket.send(json.dumps({"type": "stop_streaming"}))
  146. await asyncio.sleep(1)
  147. # Every http request has two log lines sent
  148. async def generate_and_validate_http_events(websocket: WebSocketClientProtocol, url: str, count_send: int):
  149. for i in range(count_send):
  150. send_request(url)
  151. # There are typically always two log lines for http requests (request and response)
  152. count = 0
  153. while True:
  154. try:
  155. req_line = await asyncio.wait_for(websocket.recv(), 2)
  156. log_line = json.loads(req_line)
  157. assert log_line["type"] == "logs"
  158. assert log_line["logs"][0]["event"] == "http"
  159. count += 1
  160. except asyncio.TimeoutError:
  161. # ignore timeout from waiting for recv
  162. break
  163. return count
  164. # Every http request has two log lines sent
  165. async def generate_and_validate_no_log_event(websocket: WebSocketClientProtocol, url: str):
  166. send_request(url)
  167. try:
  168. # wait for 5 seconds and make sure we hit the timeout and not recv any events
  169. req_line = await asyncio.wait_for(websocket.recv(), 5)
  170. assert req_line == None, "expected no logs for the specified filters"
  171. except asyncio.TimeoutError:
  172. pass
  173. @retry(stop_max_attempt_number=MAX_RETRIES, wait_fixed=BACKOFF_SECS * 1000)
  174. def send_request(url, headers={}):
  175. with requests.Session() as s:
  176. resp = s.get(url, timeout=BACKOFF_SECS, headers=headers)
  177. assert resp.status_code == 200, f"{url} returned {resp}"
  178. return resp.status_code == 200