123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355 |
- """
- Copyright (c) Contributors to the Open 3D Engine Project.
- For complete copyright and license terms please see the LICENSE at the root of this distribution.
- SPDX-License-Identifier: Apache-2.0 OR MIT
- """
- from constructs import Construct
- from aws_cdk import (
- CfnOutput,
- Duration,
- Fn,
- RemovalPolicy,
- aws_kinesisfirehose as kinesisfirehose,
- aws_iam as iam,
- aws_lambda as lambda_,
- aws_logs as logs
- )
- import os
- from . import aws_metrics_constants
- from .aws_utils import resource_name_sanitizer
- class BatchProcessing:
- """
- Create the AWS resources including the events processing Lambda and
- the Kinesis Data Firehose delivery stream for batch processing.
- """
- def __init__(self,
- stack: Construct,
- application_name: str,
- input_stream_arn: str,
- analytics_bucket_arn: str,
- events_database_name: str,
- events_table_name) -> None:
- self._stack = stack
- self._application_name = application_name
- self._input_stream_arn = input_stream_arn
- self._analytics_bucket_arn = analytics_bucket_arn
- self._events_database_name = events_database_name
- self._events_table_name = events_table_name
- self._create_events_processing_lambda()
- self._create_events_firehose_delivery_stream()
- def _create_events_processing_lambda(self) -> None:
- """
- Generate the events processing lambda to filter the invalid metrics events.
- """
- events_processing_lambda_name = resource_name_sanitizer.sanitize_resource_name(
- f'{self._stack.stack_name}-EventsProcessingLambda', 'lambda_function')
- self._create_events_processing_lambda_role(events_processing_lambda_name)
- self._events_processing_lambda = lambda_.Function(
- self._stack,
- id='EventsProcessingLambda',
- function_name=events_processing_lambda_name,
- log_retention=logs.RetentionDays.ONE_MONTH,
- memory_size=aws_metrics_constants.LAMBDA_MEMORY_SIZE_IN_MB,
- runtime=lambda_.Runtime.PYTHON_3_9,
- timeout=Duration.minutes(aws_metrics_constants.LAMBDA_TIMEOUT_IN_MINUTES),
- handler='events_processing.lambda_handler',
- code=lambda_.Code.from_asset(
- os.path.join(os.path.dirname(__file__), 'lambdas', 'events_processing_lambda')),
- role=self._events_processing_lambda_role
- )
- CfnOutput(
- self._stack,
- id='EventProcessingLambdaName',
- description='Lambda function for processing metrics events data.',
- export_name=f"{self._application_name}:EventProcessingLambda",
- value=self._events_processing_lambda.function_name)
- def _create_events_processing_lambda_role(self, function_name: str) -> None:
- """
- Generate the IAM role for the events processing Lambda.
- """
- events_processing_lambda_policy_document = iam.PolicyDocument(
- statements=[
- iam.PolicyStatement(
- actions=[
- 'logs:CreateLogGroup',
- 'logs:CreateLogStream',
- 'logs:PutDestination',
- 'logs:PutLogEvents'
- ],
- effect=iam.Effect.ALLOW,
- resources=[
- Fn.sub(
- 'arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:'
- '/aws/lambda/${FunctionName}*',
- variables={
- 'FunctionName': function_name
- }
- )
- ]
- )
- ]
- )
- self._events_processing_lambda_role = iam.Role(
- self._stack,
- id='EventsProcessingLambdaRole',
- role_name=resource_name_sanitizer.sanitize_resource_name(
- f'{self._stack.stack_name}-EventsProcessingLambdaRole', 'iam_role'),
- assumed_by=iam.ServicePrincipal(
- service='lambda.amazonaws.com'
- ),
- inline_policies={
- 'EventsProcessingLambdaPolicy': events_processing_lambda_policy_document
- }
- )
- def _create_events_firehose_delivery_stream(self) -> None:
- """
- Generate the Kinesis Data Firehose delivery stream to convert input data and deliver them to the data lake.
- """
- self._create_firehose_s3_delivery_log_option()
- self._create_data_firehose_role()
- self._events_firehose_delivery_stream = kinesisfirehose.CfnDeliveryStream(
- self._stack,
- id=f'EventsFirehoseDeliveryStream',
- delivery_stream_type='KinesisStreamAsSource',
- delivery_stream_name=resource_name_sanitizer.sanitize_resource_name(
- f'{self._stack.stack_name}-EventsFirehoseDeliveryStream', 'firehose_delivery_stream'),
- kinesis_stream_source_configuration=kinesisfirehose.CfnDeliveryStream.KinesisStreamSourceConfigurationProperty(
- kinesis_stream_arn=self._input_stream_arn,
- role_arn=self._firehose_delivery_stream_role.role_arn
- ),
- extended_s3_destination_configuration=kinesisfirehose.CfnDeliveryStream.ExtendedS3DestinationConfigurationProperty(
- bucket_arn=self._analytics_bucket_arn,
- buffering_hints=kinesisfirehose.CfnDeliveryStream.BufferingHintsProperty(
- interval_in_seconds=aws_metrics_constants.DELIVERY_STREAM_BUFFER_HINTS_INTERVAL_IN_SECONDS,
- size_in_m_bs=aws_metrics_constants.DELIVERY_STREAM_BUFFER_HINTS_SIZE_IN_MBS
- ),
- prefix=aws_metrics_constants.S3_DESTINATION_PREFIX,
- error_output_prefix=aws_metrics_constants.S3_DESTINATION_ERROR_OUTPUT_PREFIX,
- role_arn=self._firehose_delivery_stream_role.role_arn,
- compression_format=aws_metrics_constants.S3_COMPRESSION_FORMAT,
- cloud_watch_logging_options=kinesisfirehose.CfnDeliveryStream.CloudWatchLoggingOptionsProperty(
- enabled=True,
- log_group_name=self._firehose_delivery_stream_log_group.log_group_name,
- log_stream_name=self._firehose_s3_delivery_log_stream.log_stream_name
- ),
- processing_configuration=kinesisfirehose.CfnDeliveryStream.ProcessingConfigurationProperty(
- enabled=True,
- processors=[
- kinesisfirehose.CfnDeliveryStream.ProcessorProperty(
- type='Lambda',
- parameters=[
- kinesisfirehose.CfnDeliveryStream.ProcessorParameterProperty(
- parameter_name='LambdaArn',
- parameter_value=self._events_processing_lambda.function_arn
- ),
- kinesisfirehose.CfnDeliveryStream.ProcessorParameterProperty(
- parameter_name='BufferIntervalInSeconds',
- parameter_value=aws_metrics_constants.PROCESSOR_BUFFER_INTERVAL_IN_SECONDS
- ),
- kinesisfirehose.CfnDeliveryStream.ProcessorParameterProperty(
- parameter_name='BufferSizeInMBs',
- parameter_value=aws_metrics_constants.PROCESSOR_BUFFER_SIZE_IN_MBS
- ),
- kinesisfirehose.CfnDeliveryStream.ProcessorParameterProperty(
- parameter_name='NumberOfRetries',
- parameter_value=aws_metrics_constants.PROCESSOR_BUFFER_NUM_OF_RETRIES
- )
- ]
- )
- ]
- ),
- data_format_conversion_configuration=kinesisfirehose.CfnDeliveryStream.DataFormatConversionConfigurationProperty(
- enabled=True,
- input_format_configuration=kinesisfirehose.CfnDeliveryStream.InputFormatConfigurationProperty(
- deserializer=kinesisfirehose.CfnDeliveryStream.DeserializerProperty(
- open_x_json_ser_de=kinesisfirehose.CfnDeliveryStream.OpenXJsonSerDeProperty(
- case_insensitive=True,
- convert_dots_in_json_keys_to_underscores=False
- )
- )
- ),
- output_format_configuration=kinesisfirehose.CfnDeliveryStream.OutputFormatConfigurationProperty(
- serializer=kinesisfirehose.CfnDeliveryStream.SerializerProperty(
- parquet_ser_de=kinesisfirehose.CfnDeliveryStream.ParquetSerDeProperty(
- compression=aws_metrics_constants.PARQUET_SER_DE_COMPRESSION
- )
- )
- ),
- schema_configuration=kinesisfirehose.CfnDeliveryStream.SchemaConfigurationProperty(
- catalog_id=self._stack.account,
- role_arn=self._firehose_delivery_stream_role.role_arn,
- database_name=self._events_database_name,
- table_name=self._events_table_name,
- region=self._stack.region,
- version_id='LATEST'
- )
- )
- )
- )
- def _create_firehose_s3_delivery_log_option(self) -> None:
- """
- Generated the CloudWatch log group and log stream that Kinesis Data Firehose
- uses for the delivery stream.
- """
- self._firehose_delivery_stream_log_group = logs.LogGroup(
- self._stack,
- id='FirehoseLogGroup',
- log_group_name=resource_name_sanitizer.sanitize_resource_name(
- f'{self._stack.stack_name}-FirehoseLogGroup', 'cloudwatch_log_group'),
- removal_policy=RemovalPolicy.DESTROY,
- retention=logs.RetentionDays.ONE_MONTH
- )
- self._firehose_s3_delivery_log_stream = logs.LogStream(
- self._stack,
- id='FirehoseS3DeliveryLogStream',
- log_group=self._firehose_delivery_stream_log_group,
- log_stream_name=f'{self._stack.stack_name}-FirehoseS3DeliveryLogStream',
- removal_policy=RemovalPolicy.DESTROY
- )
- def _create_data_firehose_role(self) -> None:
- """
- Generated IAM role for the Kinesis Data Firehose delivery stream.
- """
- policy_statements = list()
- data_lake_policy_statement = iam.PolicyStatement(
- actions=[
- 's3:AbortMultipartUpload',
- 's3:GetBucketLocation',
- 's3:GetObject',
- 's3:ListBucket',
- 's3:ListBucketMultipartUploads',
- 's3:PutObject'
- ],
- effect=iam.Effect.ALLOW,
- resources=[
- self._analytics_bucket_arn,
- f'{self._analytics_bucket_arn}/*'
- ]
- )
- policy_statements.append(data_lake_policy_statement)
- events_processing_lambda_policy_statement = iam.PolicyStatement(
- actions=[
- 'lambda:InvokeFunction',
- 'lambda:GetFunctionConfiguration',
- ],
- effect=iam.Effect.ALLOW,
- resources=[
- self._events_processing_lambda.function_arn
- ]
- )
- policy_statements.append(events_processing_lambda_policy_statement)
- input_stream_policy_statement = iam.PolicyStatement(
- actions=[
- 'kinesis:DescribeStream',
- 'kinesis:GetShardIterator',
- 'kinesis:GetRecords',
- 'kinesis:ListShards'
- ],
- effect=iam.Effect.ALLOW,
- resources=[
- self._input_stream_arn
- ]
- )
- policy_statements.append(input_stream_policy_statement)
- log_policy_statement = iam.PolicyStatement(
- actions=[
- 'logs:PutLogEvents',
- ],
- effect=iam.Effect.ALLOW,
- resources=[
- self._firehose_delivery_stream_log_group.log_group_arn
- ]
- )
- policy_statements.append(log_policy_statement)
- data_catalog_policy_statement = iam.PolicyStatement(
- actions=[
- 'glue:GetTable',
- 'glue:GetTableVersion',
- 'glue:GetTableVersions'
- ],
- effect=iam.Effect.ALLOW,
- resources=[
- Fn.sub(
- 'arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:catalog'
- ),
- Fn.sub(
- body='arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:table/${EventsDatabase}/*',
- variables={
- 'EventsDatabase': self._events_database_name
- }
- ),
- Fn.sub(
- body='arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:database/${EventsDatabase}',
- variables={
- 'EventsDatabase': self._events_database_name
- }
- )
- ]
- )
- policy_statements.append(data_catalog_policy_statement)
- firehose_delivery_policy = iam.PolicyDocument(
- statements=policy_statements
- )
- self._firehose_delivery_stream_role = iam.Role(
- self._stack,
- id='GameEventsFirehoseRole',
- role_name=resource_name_sanitizer.sanitize_resource_name(
- f'{self._stack.stack_name}-GameEventsFirehoseRole', 'iam_role'),
- assumed_by=iam.ServicePrincipal(
- service='firehose.amazonaws.com'
- ),
- inline_policies={
- 'FirehoseDelivery': firehose_delivery_policy
- }
- )
- @property
- def events_processing_lambda_name(self) -> lambda_.Function.function_name:
- return self._events_processing_lambda.function_name
- @property
- def events_processing_lambda_arn(self) -> lambda_.Function.function_name:
- return self._events_processing_lambda.function_arn
- @property
- def events_processing_lambda_role_arn(self) -> iam.Role.role_arn:
- return self._events_processing_lambda_role.role_arn
- @property
- def delivery_stream_name(self) -> kinesisfirehose.CfnDeliveryStream.delivery_stream_name:
- return self._events_firehose_delivery_stream.ref
- @property
- def delivery_stream_role_arn(self) -> iam.Role.role_arn:
- return self._firehose_delivery_stream_role.role_arn
- @property
- def delivery_stream_log_group_name(self) -> logs.LogGroup.log_group_name:
- return self._firehose_delivery_stream_log_group.log_group_name
|