util.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. import logging
  2. import os
  3. import platform
  4. import subprocess
  5. from contextlib import contextmanager
  6. from time import sleep
  7. import sys
  8. import pytest
  9. import requests
  10. import yaml
  11. import json
  12. from retrying import retry
  13. from constants import METRICS_PORT, MAX_RETRIES, BACKOFF_SECS
  14. def configure_logger():
  15. logger = logging.getLogger(__name__)
  16. logger.setLevel(logging.DEBUG)
  17. handler = logging.StreamHandler(sys.stdout)
  18. logger.addHandler(handler)
  19. return logger
  20. LOGGER = configure_logger()
  21. def select_platform(plat):
  22. return pytest.mark.skipif(
  23. platform.system() != plat, reason=f"Only runs on {plat}")
  24. def fips_enabled():
  25. env_fips = os.getenv("COMPONENT_TESTS_FIPS")
  26. return env_fips is not None and env_fips != "0"
  27. nofips = pytest.mark.skipif(
  28. fips_enabled(), reason=f"Only runs without FIPS (COMPONENT_TESTS_FIPS=0)")
  29. def write_config(directory, config):
  30. config_path = directory / "config.yml"
  31. with open(config_path, 'w') as outfile:
  32. yaml.dump(config, outfile)
  33. return config_path
  34. def start_cloudflared(directory, config, cfd_args=["run"], cfd_pre_args=["tunnel"], new_process=False,
  35. allow_input=False, capture_output=True, root=False, skip_config_flag=False, expect_success=True):
  36. config_path = None
  37. if not skip_config_flag:
  38. config_path = write_config(directory, config.full_config)
  39. cmd = cloudflared_cmd(config, config_path, cfd_args, cfd_pre_args, root)
  40. if new_process:
  41. return run_cloudflared_background(cmd, allow_input, capture_output)
  42. # By setting check=True, it will raise an exception if the process exits with non-zero exit code
  43. return subprocess.run(cmd, check=expect_success, capture_output=capture_output)
  44. def cloudflared_cmd(config, config_path, cfd_args, cfd_pre_args, root):
  45. cmd = []
  46. if root:
  47. cmd += ["sudo"]
  48. cmd += [config.cloudflared_binary]
  49. cmd += cfd_pre_args
  50. if config_path is not None:
  51. cmd += ["--config", str(config_path)]
  52. cmd += cfd_args
  53. LOGGER.info(f"Run cmd {cmd} with config {config}")
  54. return cmd
  55. @contextmanager
  56. def run_cloudflared_background(cmd, allow_input, capture_output):
  57. output = subprocess.PIPE if capture_output else subprocess.DEVNULL
  58. stdin = subprocess.PIPE if allow_input else None
  59. cfd = None
  60. try:
  61. cfd = subprocess.Popen(cmd, stdin=stdin, stdout=output, stderr=output)
  62. yield cfd
  63. finally:
  64. if cfd:
  65. cfd.terminate()
  66. if capture_output:
  67. LOGGER.info(f"cloudflared log: {cfd.stderr.read()}")
  68. def get_quicktunnel_url():
  69. quicktunnel_url = f'http://localhost:{METRICS_PORT}/quicktunnel'
  70. with requests.Session() as s:
  71. resp = send_request(s, quicktunnel_url, True)
  72. hostname = resp.json()["hostname"]
  73. assert hostname, \
  74. f"Quicktunnel endpoint returned {hostname} but we expected a url"
  75. return f"https://{hostname}"
  76. def wait_tunnel_ready(tunnel_url=None, require_min_connections=1, cfd_logs=None):
  77. try:
  78. inner_wait_tunnel_ready(tunnel_url, require_min_connections)
  79. except Exception as e:
  80. if cfd_logs is not None:
  81. _log_cloudflared_logs(cfd_logs)
  82. raise e
  83. @retry(stop_max_attempt_number=MAX_RETRIES, wait_fixed=BACKOFF_SECS * 1000)
  84. def inner_wait_tunnel_ready(tunnel_url=None, require_min_connections=1):
  85. metrics_url = f'http://localhost:{METRICS_PORT}/ready'
  86. with requests.Session() as s:
  87. resp = send_request(s, metrics_url, True)
  88. ready_connections = resp.json()["readyConnections"]
  89. assert ready_connections >= require_min_connections, \
  90. f"Ready endpoint returned {resp.json()} but we expect at least {require_min_connections} connections"
  91. if tunnel_url is not None:
  92. send_request(s, tunnel_url, True)
  93. def _log_cloudflared_logs(cfd_logs):
  94. log_file = cfd_logs
  95. if os.path.isdir(cfd_logs):
  96. files = os.listdir(cfd_logs)
  97. if len(files) == 0:
  98. return
  99. log_file = os.path.join(cfd_logs, files[0])
  100. with open(log_file, "r") as f:
  101. LOGGER.warning("Cloudflared Tunnel was not ready:")
  102. for line in f.readlines():
  103. LOGGER.warning(line)
  104. @retry(stop_max_attempt_number=MAX_RETRIES, wait_fixed=BACKOFF_SECS * 1000)
  105. def check_tunnel_not_connected():
  106. url = f'http://localhost:{METRICS_PORT}/ready'
  107. try:
  108. resp = requests.get(url, timeout=BACKOFF_SECS)
  109. assert resp.status_code == 503, f"Expect {url} returns 503, got {resp.status_code}"
  110. assert resp.json()[
  111. "readyConnections"] == 0, "Expected all connections to be terminated (pending reconnect)"
  112. # cloudflared might already terminate
  113. except requests.exceptions.ConnectionError as e:
  114. LOGGER.warning(f"Failed to connect to {url}, error: {e}")
  115. def get_tunnel_connector_id():
  116. url = f'http://localhost:{METRICS_PORT}/ready'
  117. try:
  118. resp = requests.get(url, timeout=1)
  119. return resp.json()["connectorId"]
  120. # cloudflared might already terminated
  121. except requests.exceptions.ConnectionError as e:
  122. LOGGER.warning(f"Failed to connect to {url}, error: {e}")
  123. # In some cases we don't need to check response status, such as when sending batch requests to generate logs
  124. def send_requests(url, count, require_ok=True):
  125. errors = 0
  126. with requests.Session() as s:
  127. for _ in range(count):
  128. resp = send_request(s, url, require_ok)
  129. if resp is None:
  130. errors += 1
  131. sleep(0.01)
  132. if errors > 0:
  133. LOGGER.warning(
  134. f"{errors} out of {count} requests to {url} return non-200 status")
  135. @retry(stop_max_attempt_number=MAX_RETRIES, wait_fixed=BACKOFF_SECS * 1000)
  136. def send_request(session, url, require_ok):
  137. resp = session.get(url, timeout=BACKOFF_SECS)
  138. if require_ok:
  139. assert resp.status_code == 200, f"{url} returned {resp}"
  140. return resp if resp.status_code == 200 else None