data_aggregator.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. """
  2. Copyright (c) Contributors to the Open 3D Engine Project.
  3. For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. SPDX-License-Identifier: Apache-2.0 OR MIT
  5. """
  6. from argparse import ArgumentParser
  7. import json
  8. from pathlib import Path
  9. import time
  10. import subprocess
  11. import os
  12. from ly_test_tools.mars.filebeat_client import FilebeatClient
  13. class BenchmarkPathException(Exception):
  14. """Custom Exception class for invalid benchmark file paths."""
  15. pass
  16. class RunningStatistics(object):
  17. def __init__(self):
  18. '''
  19. Initializes a helper class for calculating running statstics.
  20. '''
  21. self.count = 0
  22. self.total = 0
  23. self.max = 0
  24. self.min = float('inf')
  25. def update(self, value):
  26. '''
  27. Updates the statistics with a new value.
  28. :param value: The new value to update the statistics with.
  29. '''
  30. self.total += value
  31. self.count += 1
  32. self.max = max(value, self.max)
  33. self.min = min(value, self.min)
  34. def getAvg(self):
  35. '''
  36. Returns the average of the running values.
  37. '''
  38. return self.total / self.count
  39. def getMax(self):
  40. '''
  41. Returns the maximum of the running values.
  42. '''
  43. return self.max
  44. def getMin(self):
  45. '''
  46. Returns the minimum of the running values.
  47. '''
  48. return self.min
  49. def getCount(self):
  50. return self.count
  51. class BenchmarkDataAggregator(object):
  52. def __init__(self, workspace, logger, test_suite):
  53. '''
  54. Initializes an aggregator for benchmark data.
  55. :param workspace: Workspace of the test suite the benchmark was run in
  56. :param logger: Logger used by the test suite the benchmark was run in
  57. :param test_suite: Name of the test suite the benchmark was run in
  58. '''
  59. self.build_dir = workspace.paths.build_directory()
  60. self.results_dir = Path(workspace.paths.project(), 'user/Scripts/PerformanceBenchmarks')
  61. self.test_suite = test_suite if os.environ.get('BUILD_NUMBER') else 'local'
  62. self.filebeat_client = FilebeatClient(logger)
  63. def _update_pass(self, gpu_pass_stats, entry):
  64. '''
  65. Modifies gpu_pass_stats dict keyed by pass name with the time recorded in a pass timestamp entry.
  66. :param gpu_pass_stats: dict aggregating statistics from each pass (key: pass name, value: dict with stats)
  67. :param entry: dict representing the timestamp entry of a pass
  68. :return: Time (in nanoseconds) recorded by this pass
  69. '''
  70. name = entry['passName']
  71. time_ns = entry['timestampResultInNanoseconds']
  72. pass_entry = gpu_pass_stats.get(name, RunningStatistics())
  73. pass_entry.update(time_ns)
  74. gpu_pass_stats[name] = pass_entry
  75. return time_ns
  76. def _process_benchmark(self, benchmark_dir, benchmark_metadata):
  77. '''
  78. Aggregates data from results from a single benchmark contained in a subdirectory of self.results_dir.
  79. :param benchmark_dir: Path of directory containing the benchmark results
  80. :param benchmark_metadata: Dict with benchmark metadata mutated with additional info from metadata file
  81. :return: Tuple with two indexes:
  82. [0]: RunningStatistics for GPU frame times
  83. [1]: Dict aggregating statistics from GPU pass times (key: pass name, value: RunningStatistics)
  84. '''
  85. # Parse benchmark metadata
  86. metadata_file = benchmark_dir / 'benchmark_metadata.json'
  87. if metadata_file.exists():
  88. data = json.loads(metadata_file.read_text())
  89. benchmark_metadata.update(data['ClassData'])
  90. else:
  91. raise BenchmarkPathException(f'Metadata file could not be found at {metadata_file}')
  92. # data structures aggregating statistics from timestamp logs
  93. gpu_frame_stats = RunningStatistics()
  94. cpu_frame_stats = RunningStatistics()
  95. gpu_pass_stats = {} # key: pass name, value: RunningStatistics
  96. # this allows us to add additional data if necessary, e.g. frame_test_timestamps.json
  97. is_timestamp_file = lambda file: file.name.startswith('frame') and file.name.endswith('_timestamps.json')
  98. is_frame_time_file = lambda file: file.name.startswith('cpu_frame') and file.name.endswith('_time.json')
  99. # parse benchmark files
  100. for file in benchmark_dir.iterdir():
  101. if file.is_dir():
  102. continue
  103. if is_timestamp_file(file):
  104. data = json.loads(file.read_text())
  105. entries = data['ClassData']['timestampEntries']
  106. frame_time = sum(self._update_pass(gpu_pass_stats, entry) for entry in entries)
  107. gpu_frame_stats.update(frame_time)
  108. if is_frame_time_file(file):
  109. data = json.loads(file.read_text())
  110. frame_time = data['ClassData']['frameTime']
  111. cpu_frame_stats.update(frame_time)
  112. if gpu_frame_stats.getCount() < 1 and cpu_frame_stats.getCount() < 1:
  113. raise BenchmarkPathException(f'No benchmark logs were found in {benchmark_dir}')
  114. return gpu_frame_stats, gpu_pass_stats, cpu_frame_stats
  115. def _generate_payloads(self, benchmark_metadata, gpu_frame_stats, gpu_pass_stats, cpu_frame_stats):
  116. '''
  117. Generates payloads to send to Filebeat based on aggregated stats and metadata.
  118. :param benchmark_metadata: Dict of benchmark metadata
  119. :param gpu_frame_stats: RunningStatistics for GPU frame data
  120. :param gpu_pass_stats: Dict of aggregated pass RunningStatistics
  121. :param cpu_frame_stats: RunningStatistics for CPU frame data
  122. :return payloads: List of tuples, each with two indexes:
  123. [0]: Elasticsearch index suffix associated with the payload
  124. [1]: Payload dict to deliver to Filebeat
  125. '''
  126. ns_to_ms = lambda ns: ns / 1e6
  127. payloads = []
  128. # add benchmark metadata to payload
  129. if (gpu_frame_stats.getCount() > 0):
  130. # calculate statistics based on aggregated frame data
  131. gpu_frame_payload = {
  132. 'frameTime': {
  133. 'avg': ns_to_ms(gpu_frame_stats.getAvg()),
  134. 'max': ns_to_ms(gpu_frame_stats.getMax()),
  135. 'min': ns_to_ms(gpu_frame_stats.getMin())
  136. }
  137. }
  138. gpu_frame_payload.update(benchmark_metadata)
  139. payloads.append(('gpu.frame_data', gpu_frame_payload))
  140. # calculate statistics for each pass
  141. for name, stat in gpu_pass_stats.items():
  142. gpu_pass_payload = {
  143. 'passName': name,
  144. 'passTime': {
  145. 'avg': ns_to_ms(stat.getAvg()),
  146. 'max': ns_to_ms(stat.getMax())
  147. }
  148. }
  149. # add benchmark metadata to payload
  150. gpu_pass_payload.update(benchmark_metadata)
  151. payloads.append(('gpu.pass_data', gpu_pass_payload))
  152. if (cpu_frame_stats.getCount() > 0):
  153. # calculate statistics based on aggregated frame data
  154. cpu_frame_payload = {
  155. 'frameTime': {
  156. 'avg': cpu_frame_stats.getAvg(),
  157. 'max': cpu_frame_stats.getMax(),
  158. 'min': cpu_frame_stats.getMin()
  159. }
  160. }
  161. cpu_frame_payload.update(benchmark_metadata)
  162. payloads.append(('cpu.frame_data', cpu_frame_payload))
  163. return payloads
  164. def upload_metrics(self, rhi):
  165. '''
  166. Uploads metrics aggregated from all the benchmarks run in a test suite to filebeat.
  167. :param rhi: The RHI the benchmarks were run on
  168. '''
  169. start_timestamp = time.time()
  170. git_commit_data = subprocess.check_output(['git', 'rev-parse', '--short', 'HEAD'], cwd=self.build_dir)
  171. git_commit_hash = git_commit_data.decode('ascii').strip()
  172. build_date = time.strftime('%m/%d/%y', time.localtime(start_timestamp)) # use gmtime if GMT is preferred
  173. for benchmark_dir in self.results_dir.iterdir():
  174. if not benchmark_dir.is_dir():
  175. continue
  176. benchmark_metadata = {
  177. 'gitCommitAndBuildDate': f'{git_commit_hash} {build_date}',
  178. 'RHI': rhi
  179. }
  180. gpu_frame_stats, gpu_pass_stats, cpu_frame_stats = self._process_benchmark(benchmark_dir, benchmark_metadata)
  181. payloads = self._generate_payloads(benchmark_metadata, gpu_frame_stats, gpu_pass_stats, cpu_frame_stats)
  182. for index_suffix, payload in payloads:
  183. self.filebeat_client.send_event(
  184. payload,
  185. f'ly_atom.performance_metrics.{self.test_suite}.{index_suffix}',
  186. start_timestamp
  187. )