real_time_data_processing.py 12 KB


  1. """
  2. Copyright (c) Contributors to the Open 3D Engine Project.
  3. For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. SPDX-License-Identifier: Apache-2.0 OR MIT
  5. """
  6. from constructs import Construct
  7. from aws_cdk import (
  8. CfnOutput,
  9. Duration,
  10. Fn,
  11. aws_iam as iam,
  12. aws_kinesisanalytics as analytics,
  13. aws_lambda as lambda_,
  14. aws_logs as logs
  15. )
  16. import os
  17. from . import aws_metrics_constants
  18. from .aws_utils import resource_name_sanitizer
  19. class RealTimeDataProcessing:
  20. """
  21. Create the AWS resources used for real time data processing
  22. """
  23. def __init__(self, stack: Construct, input_stream_arn: str, application_name: str) -> None:
  24. self._stack = stack
  25. self._input_stream_arn = input_stream_arn
  26. self._application_name = application_name
  27. self._create_analytics_processing_lambda()
  28. self._create_analytics_application()
  29. def _create_analytics_application(self) -> None:
  30. """
  31. Generate the Kinesis data analytics application to process the real-time data.
  32. The sample application filters input events and counts the total number of login within one minute.
  33. :return: The created Kinesis data analytics application.
  34. """
  35. self._analytics_application_role = self._create_analytics_application_role()
  36. self._analytics_application = analytics.CfnApplication(
  37. self._stack,
  38. 'AnalyticsApplication',
  39. application_name=resource_name_sanitizer.sanitize_resource_name(
  40. f'{self._stack.stack_name}-AnalyticsApplication', 'kinesis_application'),
  41. inputs=[
  42. analytics.CfnApplication.InputProperty(
  43. input_schema=analytics.CfnApplication.InputSchemaProperty(
  44. record_columns=[
  45. analytics.CfnApplication.RecordColumnProperty(
  46. name='event_id',
  47. sql_type='VARCHAR(32)',
  48. mapping='$.event_id'
  49. ),
  50. analytics.CfnApplication.RecordColumnProperty(
  51. name='event_type',
  52. sql_type='VARCHAR(32)',
  53. mapping='$.event_type'
  54. ),
  55. analytics.CfnApplication.RecordColumnProperty(
  56. name='event_name',
  57. sql_type='VARCHAR(32)',
  58. mapping='$.event_name'
  59. ),
  60. analytics.CfnApplication.RecordColumnProperty(
  61. name='event_version',
  62. sql_type='VARCHAR(32)',
  63. mapping='$.event_version'
  64. ),
  65. analytics.CfnApplication.RecordColumnProperty(
  66. name='event_timestamp',
  67. sql_type='VARCHAR(32)',
  68. mapping='$.event_timestamp'
  69. ),
  70. analytics.CfnApplication.RecordColumnProperty(
  71. name='application_id',
  72. sql_type='VARCHAR(32)',
  73. mapping='$.application_id'
  74. )
  75. ],
  76. record_format=analytics.CfnApplication.RecordFormatProperty(
  77. record_format_type='JSON'
  78. )
  79. ),
  80. name_prefix='AnalyticsApp',
  81. kinesis_streams_input=analytics.CfnApplication.KinesisStreamsInputProperty(
  82. resource_arn=self._input_stream_arn,
  83. role_arn=self._analytics_application_role.role_arn
  84. )
  85. )
  86. ],
  87. application_description='',
  88. application_code=aws_metrics_constants.KINESIS_APPLICATION_CODE
  89. )
  90. self._application_output = analytics.CfnApplicationOutput(
  91. self._stack,
  92. 'AnalyticsApplicationOutput',
  93. application_name=self._analytics_application.ref,
  94. output=analytics.CfnApplicationOutput.OutputProperty(
  95. destination_schema=analytics.CfnApplicationOutput.DestinationSchemaProperty(
  96. record_format_type='JSON'
  97. ),
  98. lambda_output=analytics.CfnApplicationOutput.LambdaOutputProperty(
  99. resource_arn=self._analytics_processing_lambda.function_arn,
  100. role_arn=self._analytics_application_role.role_arn
  101. ),
  102. name='DESTINATION_STREAM'
  103. ),
  104. )
  105. CfnOutput(
  106. self._stack,
  107. id='AnalyticsApplicationName',
  108. description='Kinesis Data Analytics application to process the real-time metrics data',
  109. export_name=f"{self._application_name}:AnalyticsApplication",
  110. value=self._analytics_application.application_name)
  111. def _create_analytics_application_role(self) -> iam.Role:
  112. """
  113. Generate the IAM role for the Kinesis analytics application to read events from the input stream
  114. and send the processed data to the analytics processing lambda.
  115. :return: The created IAM role.
  116. """
  117. kinesis_access_policy_document = iam.PolicyDocument(
  118. statements=[
  119. iam.PolicyStatement(
  120. actions=[
  121. 'kinesis:DescribeStream',
  122. 'kinesis:GetShardIterator',
  123. 'kinesis:GetRecords',
  124. 'kinesis:ListShards'
  125. ],
  126. effect=iam.Effect.ALLOW,
  127. sid='ReadKinesisStream',
  128. resources=[
  129. self._input_stream_arn
  130. ]
  131. )
  132. ]
  133. )
  134. lambda_access_policy_document = iam.PolicyDocument(
  135. statements=[
  136. iam.PolicyStatement(
  137. actions=[
  138. 'lambda:InvokeFunction',
  139. 'lambda:GetFunctionConfiguration',
  140. ],
  141. effect=iam.Effect.ALLOW,
  142. sid='AnalyticsProcessingInvokePermissions',
  143. resources=[
  144. self._analytics_processing_lambda.function_arn
  145. ]
  146. )
  147. ]
  148. )
  149. kinesis_analytics_role = iam.Role(
  150. self._stack,
  151. id='AnalyticsApplicationRole',
  152. role_name=resource_name_sanitizer.sanitize_resource_name(
  153. f'{self._stack.stack_name}-AnalyticsApplicationRole', 'iam_role'),
  154. assumed_by=iam.ServicePrincipal(
  155. service='kinesisanalytics.amazonaws.com'
  156. ),
  157. inline_policies={
  158. 'KinesisAccess': kinesis_access_policy_document,
  159. 'LambdaAccess': lambda_access_policy_document
  160. }
  161. )
  162. return kinesis_analytics_role
  163. def _create_analytics_processing_lambda(self) -> None:
  164. """
  165. Generate the analytics processing lambda to send processed data to CloudWatch for visualization.
  166. """
  167. analytics_processing_function_name = resource_name_sanitizer.sanitize_resource_name(
  168. f'{self._stack.stack_name}-AnalyticsProcessingLambda', 'lambda_function')
  169. self._analytics_processing_lambda_role = self._create_analytics_processing_lambda_role(
  170. analytics_processing_function_name
  171. )
  172. self._analytics_processing_lambda = lambda_.Function(
  173. self._stack,
  174. id='AnalyticsProcessingLambda',
  175. function_name=analytics_processing_function_name,
  176. log_retention=logs.RetentionDays.ONE_MONTH,
  177. memory_size=aws_metrics_constants.LAMBDA_MEMORY_SIZE_IN_MB,
  178. runtime=lambda_.Runtime.PYTHON_3_9,
  179. timeout=Duration.minutes(aws_metrics_constants.LAMBDA_TIMEOUT_IN_MINUTES),
  180. handler='analytics_processing.lambda_handler',
  181. code=lambda_.Code.from_asset(
  182. os.path.join(os.path.dirname(__file__), 'lambdas', 'analytics_processing_lambda')),
  183. role=self._analytics_processing_lambda_role
  184. )
  185. CfnOutput(
  186. self._stack,
  187. id='AnalyticsProcessingLambdaName',
  188. description='Lambda function for sending processed data to CloudWatch.',
  189. export_name=f"{self._application_name}:AnalyticsProcessingLambda",
  190. value=self._analytics_processing_lambda.function_name)
  191. def _create_analytics_processing_lambda_role(self, function_name: str) -> iam.Role:
  192. """
  193. Generate the IAM role for the analytics processing lambda to send metrics to CloudWatch.
  194. @param function_name Name of the Lambda function.
  195. @return The created IAM role.
  196. """
  197. analytics_processing_policy_document = iam.PolicyDocument(
  198. statements=[
  199. # The following policy limits the user to publishing metrics only in the namespace named AWSMetrics.
  200. # Check the following document for more details:
  201. # https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/iam-cw-condition-keys-namespace.html
  202. iam.PolicyStatement(
  203. actions=[
  204. 'cloudwatch:PutMetricData',
  205. ],
  206. effect=iam.Effect.ALLOW,
  207. resources=[
  208. '*'
  209. ],
  210. conditions={
  211. "StringEquals": {
  212. "cloudwatch:namespace": "AWSMetrics"
  213. }
  214. }
  215. ),
  216. iam.PolicyStatement(
  217. actions=[
  218. 'logs:CreateLogGroup',
  219. 'logs:CreateLogStream',
  220. 'logs:PutDestination',
  221. 'logs:PutLogEvents'
  222. ],
  223. effect=iam.Effect.ALLOW,
  224. resources=[
  225. Fn.sub(
  226. 'arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:'
  227. '/aws/lambda/${FunctionName}*',
  228. variables={
  229. 'FunctionName': function_name
  230. }
  231. )
  232. ]
  233. )
  234. ]
  235. )
  236. analytics_processing_lambda_role = iam.Role(
  237. self._stack,
  238. id='AnalyticsLambdaRole',
  239. role_name=resource_name_sanitizer.sanitize_resource_name(
  240. f'{self._stack.stack_name}-AnalyticsLambdaRole', 'iam_role'),
  241. assumed_by=iam.ServicePrincipal(
  242. service='lambda.amazonaws.com'
  243. ),
  244. inline_policies={
  245. 'AnalyticsProcessingPolicy': analytics_processing_policy_document
  246. }
  247. )
  248. return analytics_processing_lambda_role
  249. @property
  250. def analytics_processing_lambda_name(self) -> lambda_.Function.function_name:
  251. return self._analytics_processing_lambda.function_name
  252. @property
  253. def analytics_processing_lambda_arn(self) -> lambda_.Function.function_arn:
  254. return self._analytics_processing_lambda.function_arn
  255. @property
  256. def analytics_application_lambda_role_arn(self) -> iam.Role.role_arn:
  257. return self._analytics_processing_lambda_role.role_arn
  258. @property
  259. def analytics_application_name(self) -> analytics.CfnApplication.application_name:
  260. return self._analytics_application.application_name
  261. @property
  262. def analytics_application_role_arn(self) -> iam.Role.role_arn:
  263. return self._analytics_application_role.role_arn