real_time_data_processing.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. """
  2. Copyright (c) Contributors to the Open 3D Engine Project.
  3. For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. SPDX-License-Identifier: Apache-2.0 OR MIT
  5. """
  6. from constructs import Construct
  7. from aws_cdk import (
  8. CfnOutput,
  9. Duration,
  10. Fn,
  11. aws_iam as iam,
  12. aws_kinesisanalytics as analytics,
  13. aws_lambda as lambda_,
  14. aws_logs as logs
  15. )
  16. import os
  17. from . import aws_metrics_constants
  18. from .aws_utils import resource_name_sanitizer
  19. class RealTimeDataProcessing:
  20. """
  21. Create the AWS resources used for real time data processing
  22. """
  23. def __init__(self, stack: Construct, input_stream_arn: str, application_name: str) -> None:
  24. self._stack = stack
  25. self._input_stream_arn = input_stream_arn
  26. self._application_name = application_name
  27. self._create_analytics_processing_lambda()
  28. self._create_analytics_application()
  29. def _create_analytics_application(self) -> None:
  30. """
  31. Generate the Kinesis data analytics application to process the real-time data.
  32. The sample application filters input events and counts the total number of login within one minute.
  33. :return: The created Kinesis data analytics application.
  34. """
  35. self._analytics_application_role = self._create_analytics_application_role()
  36. self._analytics_application = analytics.CfnApplication(
  37. self._stack,
  38. 'AnalyticsApplication',
  39. application_name=resource_name_sanitizer.sanitize_resource_name(
  40. f'{self._stack.stack_name}-AnalyticsApplication', 'kinesis_application'),
  41. inputs=[
  42. analytics.CfnApplication.InputProperty(
  43. input_schema=analytics.CfnApplication.InputSchemaProperty(
  44. record_columns=[
  45. analytics.CfnApplication.RecordColumnProperty(
  46. name='event_id',
  47. sql_type='VARCHAR(32)',
  48. mapping='$.event_id'
  49. ),
  50. analytics.CfnApplication.RecordColumnProperty(
  51. name='event_type',
  52. sql_type='VARCHAR(32)',
  53. mapping='$.event_type'
  54. ),
  55. analytics.CfnApplication.RecordColumnProperty(
  56. name='event_name',
  57. sql_type='VARCHAR(32)',
  58. mapping='$.event_name'
  59. ),
  60. analytics.CfnApplication.RecordColumnProperty(
  61. name='event_version',
  62. sql_type='VARCHAR(32)',
  63. mapping='$.event_version'
  64. ),
  65. analytics.CfnApplication.RecordColumnProperty(
  66. name='event_timestamp',
  67. sql_type='VARCHAR(32)',
  68. mapping='$.event_timestamp'
  69. ),
  70. analytics.CfnApplication.RecordColumnProperty(
  71. name='application_id',
  72. sql_type='VARCHAR(32)',
  73. mapping='$.application_id'
  74. )
  75. ],
  76. record_format=analytics.CfnApplication.RecordFormatProperty(
  77. record_format_type='JSON'
  78. )
  79. ),
  80. name_prefix='AnalyticsApp',
  81. kinesis_streams_input=analytics.CfnApplication.KinesisStreamsInputProperty(
  82. resource_arn=self._input_stream_arn,
  83. role_arn=self._analytics_application_role.role_arn
  84. )
  85. )
  86. ],
  87. application_description='',
  88. application_code=aws_metrics_constants.KINESIS_APPLICATION_CODE
  89. )
  90. self._application_output = analytics.CfnApplicationOutput(
  91. self._stack,
  92. 'AnalyticsApplicationOutput',
  93. application_name=self._analytics_application.ref,
  94. output=analytics.CfnApplicationOutput.OutputProperty(
  95. destination_schema=analytics.CfnApplicationOutput.DestinationSchemaProperty(
  96. record_format_type='JSON'
  97. ),
  98. lambda_output=analytics.CfnApplicationOutput.LambdaOutputProperty(
  99. resource_arn=self._analytics_processing_lambda.function_arn,
  100. role_arn=self._analytics_application_role.role_arn
  101. ),
  102. name='DESTINATION_STREAM'
  103. ),
  104. )
  105. CfnOutput(
  106. self._stack,
  107. id='AnalyticsApplicationName',
  108. description='Kinesis Data Analytics application to process the real-time metrics data',
  109. export_name=f"{self._application_name}:AnalyticsApplication",
  110. value=self._analytics_application.application_name)
  111. def _create_analytics_application_role(self) -> iam.Role:
  112. """
  113. Generate the IAM role for the Kinesis analytics application to read events from the input stream
  114. and send the processed data to the analytics processing lambda.
  115. :return: The created IAM role.
  116. """
  117. kinesis_access_policy_document = iam.PolicyDocument(
  118. statements=[
  119. iam.PolicyStatement(
  120. actions=[
  121. 'kinesis:DescribeStream',
  122. 'kinesis:GetShardIterator',
  123. 'kinesis:GetRecords',
  124. 'kinesis:ListShards'
  125. ],
  126. effect=iam.Effect.ALLOW,
  127. sid='ReadKinesisStream',
  128. resources=[
  129. self._input_stream_arn
  130. ]
  131. )
  132. ]
  133. )
  134. lambda_access_policy_document = iam.PolicyDocument(
  135. statements=[
  136. iam.PolicyStatement(
  137. actions=[
  138. 'lambda:InvokeFunction',
  139. 'lambda:GetFunctionConfiguration',
  140. ],
  141. effect=iam.Effect.ALLOW,
  142. sid='AnalyticsProcessingInvokePermissions',
  143. resources=[
  144. self._analytics_processing_lambda.function_arn
  145. ]
  146. )
  147. ]
  148. )
  149. kinesis_analytics_role = iam.Role(
  150. self._stack,
  151. id='AnalyticsApplicationRole',
  152. role_name=resource_name_sanitizer.sanitize_resource_name(
  153. f'{self._stack.stack_name}-AnalyticsApplicationRole', 'iam_role'),
  154. assumed_by=iam.ServicePrincipal(
  155. service='kinesisanalytics.amazonaws.com'
  156. ),
  157. inline_policies={
  158. 'KinesisAccess': kinesis_access_policy_document,
  159. 'LambdaAccess': lambda_access_policy_document
  160. }
  161. )
  162. return kinesis_analytics_role
  163. def _create_analytics_processing_lambda(self) -> None:
  164. """
  165. Generate the analytics processing lambda to send processed data to CloudWatch for visualization.
  166. """
  167. analytics_processing_function_name = resource_name_sanitizer.sanitize_resource_name(
  168. f'{self._stack.stack_name}-AnalyticsProcessingLambda', 'lambda_function')
  169. self._analytics_processing_lambda_role = self._create_analytics_processing_lambda_role(
  170. analytics_processing_function_name
  171. )
  172. self._analytics_processing_lambda = lambda_.Function(
  173. self._stack,
  174. id='AnalyticsProcessingLambda',
  175. function_name=analytics_processing_function_name,
  176. log_retention=logs.RetentionDays.ONE_MONTH,
  177. memory_size=aws_metrics_constants.LAMBDA_MEMORY_SIZE_IN_MB,
  178. runtime=lambda_.Runtime.PYTHON_3_9,
  179. timeout=Duration.minutes(aws_metrics_constants.LAMBDA_TIMEOUT_IN_MINUTES),
  180. handler='analytics_processing.lambda_handler',
  181. code=lambda_.Code.from_asset(
  182. os.path.join(os.path.dirname(__file__), 'lambdas', 'analytics_processing_lambda')),
  183. role=self._analytics_processing_lambda_role
  184. )
  185. CfnOutput(
  186. self._stack,
  187. id='AnalyticsProcessingLambdaName',
  188. description='Lambda function for sending processed data to CloudWatch.',
  189. export_name=f"{self._application_name}:AnalyticsProcessingLambda",
  190. value=self._analytics_processing_lambda.function_name)
  191. def _create_analytics_processing_lambda_role(self, function_name: str) -> iam.Role:
  192. """
  193. Generate the IAM role for the analytics processing lambda to send metrics to CloudWatch.
  194. @param function_name Name of the Lambda function.
  195. @return The created IAM role.
  196. """
  197. analytics_processing_policy_document = iam.PolicyDocument(
  198. statements=[
  199. # The following policy limits the user to publishing metrics only in the namespace named AWSMetrics.
  200. # Check the following document for more details:
  201. # https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/iam-cw-condition-keys-namespace.html
  202. iam.PolicyStatement(
  203. actions=[
  204. 'cloudwatch:PutMetricData',
  205. ],
  206. effect=iam.Effect.ALLOW,
  207. resources=[
  208. '*'
  209. ],
  210. conditions={
  211. "StringEquals": {
  212. "cloudwatch:namespace": "AWSMetrics"
  213. }
  214. }
  215. ),
  216. iam.PolicyStatement(
  217. actions=[
  218. 'logs:CreateLogGroup',
  219. 'logs:CreateLogStream',
  220. 'logs:PutDestination',
  221. 'logs:PutLogEvents'
  222. ],
  223. effect=iam.Effect.ALLOW,
  224. resources=[
  225. Fn.sub(
  226. 'arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:'
  227. '/aws/lambda/${FunctionName}*',
  228. variables={
  229. 'FunctionName': function_name
  230. }
  231. )
  232. ]
  233. )
  234. ]
  235. )
  236. analytics_processing_lambda_role = iam.Role(
  237. self._stack,
  238. id='AnalyticsLambdaRole',
  239. role_name=resource_name_sanitizer.sanitize_resource_name(
  240. f'{self._stack.stack_name}-AnalyticsLambdaRole', 'iam_role'),
  241. assumed_by=iam.ServicePrincipal(
  242. service='lambda.amazonaws.com'
  243. ),
  244. inline_policies={
  245. 'AnalyticsProcessingPolicy': analytics_processing_policy_document
  246. }
  247. )
  248. return analytics_processing_lambda_role
  249. @property
  250. def analytics_processing_lambda_name(self) -> lambda_.Function.function_name:
  251. return self._analytics_processing_lambda.function_name
  252. @property
  253. def analytics_processing_lambda_arn(self) -> lambda_.Function.function_arn:
  254. return self._analytics_processing_lambda.function_arn
  255. @property
  256. def analytics_application_lambda_role_arn(self) -> iam.Role.role_arn:
  257. return self._analytics_processing_lambda_role.role_arn
  258. @property
  259. def analytics_application_name(self) -> analytics.CfnApplication.application_name:
  260. return self._analytics_application.application_name
  261. @property
  262. def analytics_application_role_arn(self) -> iam.Role.role_arn:
  263. return self._analytics_application_role.role_arn