batch_processing.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. """
  2. Copyright (c) Contributors to the Open 3D Engine Project.
  3. For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. SPDX-License-Identifier: Apache-2.0 OR MIT
  5. """
  6. from constructs import Construct
  7. from aws_cdk import (
  8. CfnOutput,
  9. Duration,
  10. Fn,
  11. RemovalPolicy,
  12. aws_kinesisfirehose as kinesisfirehose,
  13. aws_iam as iam,
  14. aws_lambda as lambda_,
  15. aws_logs as logs
  16. )
  17. import os
  18. from . import aws_metrics_constants
  19. from .aws_utils import resource_name_sanitizer
  20. class BatchProcessing:
  21. """
  22. Create the AWS resources including the events processing Lambda and
  23. the Kinesis Data Firehose delivery stream for batch processing.
  24. """
  25. def __init__(self,
  26. stack: Construct,
  27. application_name: str,
  28. input_stream_arn: str,
  29. analytics_bucket_arn: str,
  30. events_database_name: str,
  31. events_table_name) -> None:
  32. self._stack = stack
  33. self._application_name = application_name
  34. self._input_stream_arn = input_stream_arn
  35. self._analytics_bucket_arn = analytics_bucket_arn
  36. self._events_database_name = events_database_name
  37. self._events_table_name = events_table_name
  38. self._create_events_processing_lambda()
  39. self._create_events_firehose_delivery_stream()
  40. def _create_events_processing_lambda(self) -> None:
  41. """
  42. Generate the events processing lambda to filter the invalid metrics events.
  43. """
  44. events_processing_lambda_name = resource_name_sanitizer.sanitize_resource_name(
  45. f'{self._stack.stack_name}-EventsProcessingLambda', 'lambda_function')
  46. self._create_events_processing_lambda_role(events_processing_lambda_name)
  47. self._events_processing_lambda = lambda_.Function(
  48. self._stack,
  49. id='EventsProcessingLambda',
  50. function_name=events_processing_lambda_name,
  51. log_retention=logs.RetentionDays.ONE_MONTH,
  52. memory_size=aws_metrics_constants.LAMBDA_MEMORY_SIZE_IN_MB,
  53. runtime=lambda_.Runtime.PYTHON_3_9,
  54. timeout=Duration.minutes(aws_metrics_constants.LAMBDA_TIMEOUT_IN_MINUTES),
  55. handler='events_processing.lambda_handler',
  56. code=lambda_.Code.from_asset(
  57. os.path.join(os.path.dirname(__file__), 'lambdas', 'events_processing_lambda')),
  58. role=self._events_processing_lambda_role
  59. )
  60. CfnOutput(
  61. self._stack,
  62. id='EventProcessingLambdaName',
  63. description='Lambda function for processing metrics events data.',
  64. export_name=f"{self._application_name}:EventProcessingLambda",
  65. value=self._events_processing_lambda.function_name)
  66. def _create_events_processing_lambda_role(self, function_name: str) -> None:
  67. """
  68. Generate the IAM role for the events processing Lambda.
  69. """
  70. events_processing_lambda_policy_document = iam.PolicyDocument(
  71. statements=[
  72. iam.PolicyStatement(
  73. actions=[
  74. 'logs:CreateLogGroup',
  75. 'logs:CreateLogStream',
  76. 'logs:PutDestination',
  77. 'logs:PutLogEvents'
  78. ],
  79. effect=iam.Effect.ALLOW,
  80. resources=[
  81. Fn.sub(
  82. 'arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:'
  83. '/aws/lambda/${FunctionName}*',
  84. variables={
  85. 'FunctionName': function_name
  86. }
  87. )
  88. ]
  89. )
  90. ]
  91. )
  92. self._events_processing_lambda_role = iam.Role(
  93. self._stack,
  94. id='EventsProcessingLambdaRole',
  95. role_name=resource_name_sanitizer.sanitize_resource_name(
  96. f'{self._stack.stack_name}-EventsProcessingLambdaRole', 'iam_role'),
  97. assumed_by=iam.ServicePrincipal(
  98. service='lambda.amazonaws.com'
  99. ),
  100. inline_policies={
  101. 'EventsProcessingLambdaPolicy': events_processing_lambda_policy_document
  102. }
  103. )
  104. def _create_events_firehose_delivery_stream(self) -> None:
  105. """
  106. Generate the Kinesis Data Firehose delivery stream to convert input data and deliver them to the data lake.
  107. """
  108. self._create_firehose_s3_delivery_log_option()
  109. self._create_data_firehose_role()
  110. self._events_firehose_delivery_stream = kinesisfirehose.CfnDeliveryStream(
  111. self._stack,
  112. id=f'EventsFirehoseDeliveryStream',
  113. delivery_stream_type='KinesisStreamAsSource',
  114. delivery_stream_name=resource_name_sanitizer.sanitize_resource_name(
  115. f'{self._stack.stack_name}-EventsFirehoseDeliveryStream', 'firehose_delivery_stream'),
  116. kinesis_stream_source_configuration=kinesisfirehose.CfnDeliveryStream.KinesisStreamSourceConfigurationProperty(
  117. kinesis_stream_arn=self._input_stream_arn,
  118. role_arn=self._firehose_delivery_stream_role.role_arn
  119. ),
  120. extended_s3_destination_configuration=kinesisfirehose.CfnDeliveryStream.ExtendedS3DestinationConfigurationProperty(
  121. bucket_arn=self._analytics_bucket_arn,
  122. buffering_hints=kinesisfirehose.CfnDeliveryStream.BufferingHintsProperty(
  123. interval_in_seconds=aws_metrics_constants.DELIVERY_STREAM_BUFFER_HINTS_INTERVAL_IN_SECONDS,
  124. size_in_m_bs=aws_metrics_constants.DELIVERY_STREAM_BUFFER_HINTS_SIZE_IN_MBS
  125. ),
  126. prefix=aws_metrics_constants.S3_DESTINATION_PREFIX,
  127. error_output_prefix=aws_metrics_constants.S3_DESTINATION_ERROR_OUTPUT_PREFIX,
  128. role_arn=self._firehose_delivery_stream_role.role_arn,
  129. compression_format=aws_metrics_constants.S3_COMPRESSION_FORMAT,
  130. cloud_watch_logging_options=kinesisfirehose.CfnDeliveryStream.CloudWatchLoggingOptionsProperty(
  131. enabled=True,
  132. log_group_name=self._firehose_delivery_stream_log_group.log_group_name,
  133. log_stream_name=self._firehose_s3_delivery_log_stream.log_stream_name
  134. ),
  135. processing_configuration=kinesisfirehose.CfnDeliveryStream.ProcessingConfigurationProperty(
  136. enabled=True,
  137. processors=[
  138. kinesisfirehose.CfnDeliveryStream.ProcessorProperty(
  139. type='Lambda',
  140. parameters=[
  141. kinesisfirehose.CfnDeliveryStream.ProcessorParameterProperty(
  142. parameter_name='LambdaArn',
  143. parameter_value=self._events_processing_lambda.function_arn
  144. ),
  145. kinesisfirehose.CfnDeliveryStream.ProcessorParameterProperty(
  146. parameter_name='BufferIntervalInSeconds',
  147. parameter_value=aws_metrics_constants.PROCESSOR_BUFFER_INTERVAL_IN_SECONDS
  148. ),
  149. kinesisfirehose.CfnDeliveryStream.ProcessorParameterProperty(
  150. parameter_name='BufferSizeInMBs',
  151. parameter_value=aws_metrics_constants.PROCESSOR_BUFFER_SIZE_IN_MBS
  152. ),
  153. kinesisfirehose.CfnDeliveryStream.ProcessorParameterProperty(
  154. parameter_name='NumberOfRetries',
  155. parameter_value=aws_metrics_constants.PROCESSOR_BUFFER_NUM_OF_RETRIES
  156. )
  157. ]
  158. )
  159. ]
  160. ),
  161. data_format_conversion_configuration=kinesisfirehose.CfnDeliveryStream.DataFormatConversionConfigurationProperty(
  162. enabled=True,
  163. input_format_configuration=kinesisfirehose.CfnDeliveryStream.InputFormatConfigurationProperty(
  164. deserializer=kinesisfirehose.CfnDeliveryStream.DeserializerProperty(
  165. open_x_json_ser_de=kinesisfirehose.CfnDeliveryStream.OpenXJsonSerDeProperty(
  166. case_insensitive=True,
  167. convert_dots_in_json_keys_to_underscores=False
  168. )
  169. )
  170. ),
  171. output_format_configuration=kinesisfirehose.CfnDeliveryStream.OutputFormatConfigurationProperty(
  172. serializer=kinesisfirehose.CfnDeliveryStream.SerializerProperty(
  173. parquet_ser_de=kinesisfirehose.CfnDeliveryStream.ParquetSerDeProperty(
  174. compression=aws_metrics_constants.PARQUET_SER_DE_COMPRESSION
  175. )
  176. )
  177. ),
  178. schema_configuration=kinesisfirehose.CfnDeliveryStream.SchemaConfigurationProperty(
  179. catalog_id=self._stack.account,
  180. role_arn=self._firehose_delivery_stream_role.role_arn,
  181. database_name=self._events_database_name,
  182. table_name=self._events_table_name,
  183. region=self._stack.region,
  184. version_id='LATEST'
  185. )
  186. )
  187. )
  188. )
  189. def _create_firehose_s3_delivery_log_option(self) -> None:
  190. """
  191. Generated the CloudWatch log group and log stream that Kinesis Data Firehose
  192. uses for the delivery stream.
  193. """
  194. self._firehose_delivery_stream_log_group = logs.LogGroup(
  195. self._stack,
  196. id='FirehoseLogGroup',
  197. log_group_name=resource_name_sanitizer.sanitize_resource_name(
  198. f'{self._stack.stack_name}-FirehoseLogGroup', 'cloudwatch_log_group'),
  199. removal_policy=RemovalPolicy.DESTROY,
  200. retention=logs.RetentionDays.ONE_MONTH
  201. )
  202. self._firehose_s3_delivery_log_stream = logs.LogStream(
  203. self._stack,
  204. id='FirehoseS3DeliveryLogStream',
  205. log_group=self._firehose_delivery_stream_log_group,
  206. log_stream_name=f'{self._stack.stack_name}-FirehoseS3DeliveryLogStream',
  207. removal_policy=RemovalPolicy.DESTROY
  208. )
  209. def _create_data_firehose_role(self) -> None:
  210. """
  211. Generated IAM role for the Kinesis Data Firehose delivery stream.
  212. """
  213. policy_statements = list()
  214. data_lake_policy_statement = iam.PolicyStatement(
  215. actions=[
  216. 's3:AbortMultipartUpload',
  217. 's3:GetBucketLocation',
  218. 's3:GetObject',
  219. 's3:ListBucket',
  220. 's3:ListBucketMultipartUploads',
  221. 's3:PutObject'
  222. ],
  223. effect=iam.Effect.ALLOW,
  224. resources=[
  225. self._analytics_bucket_arn,
  226. f'{self._analytics_bucket_arn}/*'
  227. ]
  228. )
  229. policy_statements.append(data_lake_policy_statement)
  230. events_processing_lambda_policy_statement = iam.PolicyStatement(
  231. actions=[
  232. 'lambda:InvokeFunction',
  233. 'lambda:GetFunctionConfiguration',
  234. ],
  235. effect=iam.Effect.ALLOW,
  236. resources=[
  237. self._events_processing_lambda.function_arn
  238. ]
  239. )
  240. policy_statements.append(events_processing_lambda_policy_statement)
  241. input_stream_policy_statement = iam.PolicyStatement(
  242. actions=[
  243. 'kinesis:DescribeStream',
  244. 'kinesis:GetShardIterator',
  245. 'kinesis:GetRecords',
  246. 'kinesis:ListShards'
  247. ],
  248. effect=iam.Effect.ALLOW,
  249. resources=[
  250. self._input_stream_arn
  251. ]
  252. )
  253. policy_statements.append(input_stream_policy_statement)
  254. log_policy_statement = iam.PolicyStatement(
  255. actions=[
  256. 'logs:PutLogEvents',
  257. ],
  258. effect=iam.Effect.ALLOW,
  259. resources=[
  260. self._firehose_delivery_stream_log_group.log_group_arn
  261. ]
  262. )
  263. policy_statements.append(log_policy_statement)
  264. data_catalog_policy_statement = iam.PolicyStatement(
  265. actions=[
  266. 'glue:GetTable',
  267. 'glue:GetTableVersion',
  268. 'glue:GetTableVersions'
  269. ],
  270. effect=iam.Effect.ALLOW,
  271. resources=[
  272. Fn.sub(
  273. 'arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:catalog'
  274. ),
  275. Fn.sub(
  276. body='arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:table/${EventsDatabase}/*',
  277. variables={
  278. 'EventsDatabase': self._events_database_name
  279. }
  280. ),
  281. Fn.sub(
  282. body='arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:database/${EventsDatabase}',
  283. variables={
  284. 'EventsDatabase': self._events_database_name
  285. }
  286. )
  287. ]
  288. )
  289. policy_statements.append(data_catalog_policy_statement)
  290. firehose_delivery_policy = iam.PolicyDocument(
  291. statements=policy_statements
  292. )
  293. self._firehose_delivery_stream_role = iam.Role(
  294. self._stack,
  295. id='GameEventsFirehoseRole',
  296. role_name=resource_name_sanitizer.sanitize_resource_name(
  297. f'{self._stack.stack_name}-GameEventsFirehoseRole', 'iam_role'),
  298. assumed_by=iam.ServicePrincipal(
  299. service='firehose.amazonaws.com'
  300. ),
  301. inline_policies={
  302. 'FirehoseDelivery': firehose_delivery_policy
  303. }
  304. )
  305. @property
  306. def events_processing_lambda_name(self) -> lambda_.Function.function_name:
  307. return self._events_processing_lambda.function_name
  308. @property
  309. def events_processing_lambda_arn(self) -> lambda_.Function.function_name:
  310. return self._events_processing_lambda.function_arn
  311. @property
  312. def events_processing_lambda_role_arn(self) -> iam.Role.role_arn:
  313. return self._events_processing_lambda_role.role_arn
  314. @property
  315. def delivery_stream_name(self) -> kinesisfirehose.CfnDeliveryStream.delivery_stream_name:
  316. return self._events_firehose_delivery_stream.ref
  317. @property
  318. def delivery_stream_role_arn(self) -> iam.Role.role_arn:
  319. return self._firehose_delivery_stream_role.role_arn
  320. @property
  321. def delivery_stream_log_group_name(self) -> logs.LogGroup.log_group_name:
  322. return self._firehose_delivery_stream_log_group.log_group_name