MetricsQueue.cpp 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. /*
  2. * Copyright (c) Contributors to the Open 3D Engine Project.
  3. * For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. *
  5. * SPDX-License-Identifier: Apache-2.0 OR MIT
  6. *
  7. */
  8. #include <MetricsQueue.h>
  9. #include <Framework/JsonWriter.h>
  10. #include <AzCore/JSON/prettywriter.h>
  11. #include <AzCore/JSON/stringbuffer.h>
  12. #include <AzCore/IO/FileIO.h>
  13. #include <AzCore/IO/Path/Path.h>
  14. #include <AzCore/Serialization/Json/JsonSerialization.h>
  15. #include <AzCore/Serialization/Json/JsonUtils.h>
  16. #include <AzCore/std/sort.h>
  17. #include <sstream>
  18. namespace AWSMetrics
  19. {
  20. const MetricsEvent& MetricsQueue::operator[](int index) const
  21. {
  22. AZ_Assert(index >= 0 && index < m_metrics.size(), "Index for the metrics queue is out of range.");
  23. return m_metrics[index];
  24. }
  25. void MetricsQueue::AddMetrics(const MetricsEvent& metrics)
  26. {
  27. m_sizeSerializedToJson += metrics.GetSizeInBytes();
  28. m_metrics.emplace_back(AZStd::move(metrics));
  29. }
  30. void MetricsQueue::AppendMetrics(MetricsQueue& metricsQueue)
  31. {
  32. if (metricsQueue.GetNumMetrics() == 0)
  33. {
  34. return;
  35. }
  36. m_sizeSerializedToJson += metricsQueue.GetSizeInBytes();
  37. if (m_metrics.size() == 0)
  38. {
  39. m_metrics = AZStd::move(metricsQueue.m_metrics);
  40. }
  41. else
  42. {
  43. AZStd::move(metricsQueue.m_metrics.begin(), metricsQueue.m_metrics.end(), std::back_inserter(m_metrics));
  44. }
  45. }
  46. void MetricsQueue::PushMetricsToFront(MetricsQueue& metricsQueue)
  47. {
  48. if (metricsQueue.GetNumMetrics() == 0)
  49. {
  50. return;
  51. }
  52. m_sizeSerializedToJson += metricsQueue.GetSizeInBytes();
  53. if (m_metrics.size() == 0)
  54. {
  55. m_metrics = AZStd::move(metricsQueue.m_metrics);
  56. }
  57. else
  58. {
  59. AZStd::move(metricsQueue.m_metrics.begin(), metricsQueue.m_metrics.end(), std::front_inserter(m_metrics));
  60. }
  61. }
  62. int MetricsQueue::FilterMetricsByPriority(size_t maxSizeInBytes)
  63. {
  64. if (GetSizeInBytes() < maxSizeInBytes)
  65. {
  66. return 0;
  67. }
  68. int numCurrentMetricsEvents = GetNumMetrics();
  69. AZStd::vector<AZStd::pair<const MetricsEvent*, int>> sortedMetrics;
  70. for (int index = 0; index < numCurrentMetricsEvents; ++index)
  71. {
  72. sortedMetrics.emplace_back(AZStd::pair<const MetricsEvent*, int>(&m_metrics[index], index));
  73. }
  74. // Sort the existing metrics event by event priority.
  75. // We need to reverse the relative order of the metrics events with the same priority to
  76. // avoid newer events being discarded when the max size capacity is reached.
  77. AZStd::stable_sort(sortedMetrics.begin(), sortedMetrics.end(),
  78. [](const AZStd::pair<const MetricsEvent*, int>& left, const AZStd::pair<const MetricsEvent*, int>& right)
  79. {
  80. if (left.first->GetEventPriority() == right.first->GetEventPriority())
  81. {
  82. return left.second > right.second;
  83. }
  84. else
  85. {
  86. return left.first->GetEventPriority() < right.first->GetEventPriority();
  87. }
  88. }
  89. );
  90. AZStd::deque<MetricsEvent> result;
  91. m_sizeSerializedToJson = 0;
  92. for (const AZStd::pair<const MetricsEvent*, int>& pair : sortedMetrics)
  93. {
  94. if (m_sizeSerializedToJson < maxSizeInBytes)
  95. {
  96. m_sizeSerializedToJson += pair.first->GetSizeInBytes();
  97. result.emplace_back(AZStd::move(*(pair.first)));
  98. }
  99. else
  100. {
  101. break;
  102. }
  103. }
  104. m_metrics.clear();
  105. m_metrics = AZStd::move(result);
  106. return numCurrentMetricsEvents - GetNumMetrics();
  107. }
  108. void MetricsQueue::ClearMetrics()
  109. {
  110. m_sizeSerializedToJson = 0;
  111. m_metrics.clear();
  112. }
  113. int MetricsQueue::GetNumMetrics() const
  114. {
  115. return static_cast<int>(m_metrics.size());
  116. }
  117. size_t MetricsQueue::GetSizeInBytes() const
  118. {
  119. return m_sizeSerializedToJson;
  120. }
  121. AZStd::string MetricsQueue::SerializeToJson()
  122. {
  123. std::stringstream stringStream;
  124. AWSCore::JsonOutputStream jsonStream{stringStream};
  125. AWSCore::JsonWriter writer{jsonStream};
  126. SerializeToJson(writer);
  127. return stringStream.str().c_str();
  128. }
  129. bool MetricsQueue::SerializeToJson(AWSCore::JsonWriter& writer) const
  130. {
  131. bool ok = true;
  132. ok = ok && writer.StartArray();
  133. for (auto& metrics : m_metrics)
  134. {
  135. ok = ok && metrics.SerializeToJson(writer);
  136. }
  137. ok = ok && writer.EndArray();
  138. return ok;
  139. }
  140. void MetricsQueue::PopBufferedEventsByServiceLimits(MetricsQueue& bufferedEvents, int maxPayloadSizeInBytes, int maxBatchedRecordsCount)
  141. {
  142. int curNum = 0;
  143. int curSizeInBytes = 0;
  144. while (m_metrics.size() > 0)
  145. {
  146. MetricsEvent& curEvent = m_metrics.front();
  147. curNum += 1;
  148. curSizeInBytes += static_cast<int>(curEvent.GetSizeInBytes());
  149. if (curNum <= maxBatchedRecordsCount && curSizeInBytes <= maxPayloadSizeInBytes)
  150. {
  151. m_sizeSerializedToJson -= curEvent.GetSizeInBytes();
  152. bufferedEvents.AddMetrics(curEvent);
  153. m_metrics.pop_front();
  154. }
  155. else
  156. {
  157. break;
  158. }
  159. }
  160. }
  161. bool MetricsQueue::ReadFromJson(const AZStd::string& filePath)
  162. {
  163. auto result = AZ::JsonSerializationUtils::ReadJsonFile(filePath);
  164. if (!result.IsSuccess() ||!ReadFromJsonDocument(result.GetValue()))
  165. {
  166. AZ_Error("AWSMetrics", false, "Failed to read metrics file %s", filePath.c_str());
  167. return false;
  168. }
  169. return true;
  170. }
  171. bool MetricsQueue::ReadFromJsonDocument(rapidjson::Document& doc)
  172. {
  173. if (!doc.IsArray())
  174. {
  175. return false;
  176. }
  177. for (rapidjson::SizeType metricsIndex = 0; metricsIndex < doc.Size(); metricsIndex++)
  178. {
  179. MetricsEvent metrics;
  180. if (!metrics.ReadFromJson(doc[metricsIndex]))
  181. {
  182. return false;
  183. }
  184. // Read through each element in the array and add it as a new metrics
  185. AddMetrics(metrics);
  186. }
  187. return true;
  188. }
  189. }