123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- """
- Copyright (c) Contributors to the Open 3D Engine Project.
- For complete copyright and license terms please see the LICENSE at the root of this distribution.
- SPDX-License-Identifier: Apache-2.0 OR MIT
- """
- from constructs import Construct
- from aws_cdk import (
- CfnOutput,
- Duration,
- Fn,
- aws_iam as iam,
- aws_kinesisanalytics as analytics,
- aws_lambda as lambda_,
- aws_logs as logs
- )
- import os
- from . import aws_metrics_constants
- from .aws_utils import resource_name_sanitizer
- class RealTimeDataProcessing:
- """
- Create the AWS resources used for real time data processing
- """
- def __init__(self, stack: Construct, input_stream_arn: str, application_name: str) -> None:
- self._stack = stack
- self._input_stream_arn = input_stream_arn
- self._application_name = application_name
- self._create_analytics_processing_lambda()
- self._create_analytics_application()
- def _create_analytics_application(self) -> None:
- """
- Generate the Kinesis data analytics application to process the real-time data.
- The sample application filters input events and counts the total number of login within one minute.
- :return: The created Kinesis data analytics application.
- """
- self._analytics_application_role = self._create_analytics_application_role()
- self._analytics_application = analytics.CfnApplication(
- self._stack,
- 'AnalyticsApplication',
- application_name=resource_name_sanitizer.sanitize_resource_name(
- f'{self._stack.stack_name}-AnalyticsApplication', 'kinesis_application'),
- inputs=[
- analytics.CfnApplication.InputProperty(
- input_schema=analytics.CfnApplication.InputSchemaProperty(
- record_columns=[
- analytics.CfnApplication.RecordColumnProperty(
- name='event_id',
- sql_type='VARCHAR(32)',
- mapping='$.event_id'
- ),
- analytics.CfnApplication.RecordColumnProperty(
- name='event_type',
- sql_type='VARCHAR(32)',
- mapping='$.event_type'
- ),
- analytics.CfnApplication.RecordColumnProperty(
- name='event_name',
- sql_type='VARCHAR(32)',
- mapping='$.event_name'
- ),
- analytics.CfnApplication.RecordColumnProperty(
- name='event_version',
- sql_type='VARCHAR(32)',
- mapping='$.event_version'
- ),
- analytics.CfnApplication.RecordColumnProperty(
- name='event_timestamp',
- sql_type='VARCHAR(32)',
- mapping='$.event_timestamp'
- ),
- analytics.CfnApplication.RecordColumnProperty(
- name='application_id',
- sql_type='VARCHAR(32)',
- mapping='$.application_id'
- )
- ],
- record_format=analytics.CfnApplication.RecordFormatProperty(
- record_format_type='JSON'
- )
- ),
- name_prefix='AnalyticsApp',
- kinesis_streams_input=analytics.CfnApplication.KinesisStreamsInputProperty(
- resource_arn=self._input_stream_arn,
- role_arn=self._analytics_application_role.role_arn
- )
- )
- ],
- application_description='',
- application_code=aws_metrics_constants.KINESIS_APPLICATION_CODE
- )
- self._application_output = analytics.CfnApplicationOutput(
- self._stack,
- 'AnalyticsApplicationOutput',
- application_name=self._analytics_application.ref,
- output=analytics.CfnApplicationOutput.OutputProperty(
- destination_schema=analytics.CfnApplicationOutput.DestinationSchemaProperty(
- record_format_type='JSON'
- ),
- lambda_output=analytics.CfnApplicationOutput.LambdaOutputProperty(
- resource_arn=self._analytics_processing_lambda.function_arn,
- role_arn=self._analytics_application_role.role_arn
- ),
- name='DESTINATION_STREAM'
- ),
- )
- CfnOutput(
- self._stack,
- id='AnalyticsApplicationName',
- description='Kinesis Data Analytics application to process the real-time metrics data',
- export_name=f"{self._application_name}:AnalyticsApplication",
- value=self._analytics_application.application_name)
- def _create_analytics_application_role(self) -> iam.Role:
- """
- Generate the IAM role for the Kinesis analytics application to read events from the input stream
- and send the processed data to the analytics processing lambda.
- :return: The created IAM role.
- """
- kinesis_access_policy_document = iam.PolicyDocument(
- statements=[
- iam.PolicyStatement(
- actions=[
- 'kinesis:DescribeStream',
- 'kinesis:GetShardIterator',
- 'kinesis:GetRecords',
- 'kinesis:ListShards'
- ],
- effect=iam.Effect.ALLOW,
- sid='ReadKinesisStream',
- resources=[
- self._input_stream_arn
- ]
- )
- ]
- )
- lambda_access_policy_document = iam.PolicyDocument(
- statements=[
- iam.PolicyStatement(
- actions=[
- 'lambda:InvokeFunction',
- 'lambda:GetFunctionConfiguration',
- ],
- effect=iam.Effect.ALLOW,
- sid='AnalyticsProcessingInvokePermissions',
- resources=[
- self._analytics_processing_lambda.function_arn
- ]
- )
- ]
- )
- kinesis_analytics_role = iam.Role(
- self._stack,
- id='AnalyticsApplicationRole',
- role_name=resource_name_sanitizer.sanitize_resource_name(
- f'{self._stack.stack_name}-AnalyticsApplicationRole', 'iam_role'),
- assumed_by=iam.ServicePrincipal(
- service='kinesisanalytics.amazonaws.com'
- ),
- inline_policies={
- 'KinesisAccess': kinesis_access_policy_document,
- 'LambdaAccess': lambda_access_policy_document
- }
- )
- return kinesis_analytics_role
- def _create_analytics_processing_lambda(self) -> None:
- """
- Generate the analytics processing lambda to send processed data to CloudWatch for visualization.
- """
- analytics_processing_function_name = resource_name_sanitizer.sanitize_resource_name(
- f'{self._stack.stack_name}-AnalyticsProcessingLambda', 'lambda_function')
- self._analytics_processing_lambda_role = self._create_analytics_processing_lambda_role(
- analytics_processing_function_name
- )
- self._analytics_processing_lambda = lambda_.Function(
- self._stack,
- id='AnalyticsProcessingLambda',
- function_name=analytics_processing_function_name,
- log_retention=logs.RetentionDays.ONE_MONTH,
- memory_size=aws_metrics_constants.LAMBDA_MEMORY_SIZE_IN_MB,
- runtime=lambda_.Runtime.PYTHON_3_9,
- timeout=Duration.minutes(aws_metrics_constants.LAMBDA_TIMEOUT_IN_MINUTES),
- handler='analytics_processing.lambda_handler',
- code=lambda_.Code.from_asset(
- os.path.join(os.path.dirname(__file__), 'lambdas', 'analytics_processing_lambda')),
- role=self._analytics_processing_lambda_role
- )
- CfnOutput(
- self._stack,
- id='AnalyticsProcessingLambdaName',
- description='Lambda function for sending processed data to CloudWatch.',
- export_name=f"{self._application_name}:AnalyticsProcessingLambda",
- value=self._analytics_processing_lambda.function_name)
- def _create_analytics_processing_lambda_role(self, function_name: str) -> iam.Role:
- """
- Generate the IAM role for the analytics processing lambda to send metrics to CloudWatch.
- @param function_name Name of the Lambda function.
- @return The created IAM role.
- """
- analytics_processing_policy_document = iam.PolicyDocument(
- statements=[
- # The following policy limits the user to publishing metrics only in the namespace named AWSMetrics.
- # Check the following document for more details:
- # https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/iam-cw-condition-keys-namespace.html
- iam.PolicyStatement(
- actions=[
- 'cloudwatch:PutMetricData',
- ],
- effect=iam.Effect.ALLOW,
- resources=[
- '*'
- ],
- conditions={
- "StringEquals": {
- "cloudwatch:namespace": "AWSMetrics"
- }
- }
- ),
- iam.PolicyStatement(
- actions=[
- 'logs:CreateLogGroup',
- 'logs:CreateLogStream',
- 'logs:PutDestination',
- 'logs:PutLogEvents'
- ],
- effect=iam.Effect.ALLOW,
- resources=[
- Fn.sub(
- 'arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:'
- '/aws/lambda/${FunctionName}*',
- variables={
- 'FunctionName': function_name
- }
- )
- ]
- )
- ]
- )
- analytics_processing_lambda_role = iam.Role(
- self._stack,
- id='AnalyticsLambdaRole',
- role_name=resource_name_sanitizer.sanitize_resource_name(
- f'{self._stack.stack_name}-AnalyticsLambdaRole', 'iam_role'),
- assumed_by=iam.ServicePrincipal(
- service='lambda.amazonaws.com'
- ),
- inline_policies={
- 'AnalyticsProcessingPolicy': analytics_processing_policy_document
- }
- )
- return analytics_processing_lambda_role
- @property
- def analytics_processing_lambda_name(self) -> lambda_.Function.function_name:
- return self._analytics_processing_lambda.function_name
- @property
- def analytics_processing_lambda_arn(self) -> lambda_.Function.function_arn:
- return self._analytics_processing_lambda.function_arn
- @property
- def analytics_application_lambda_role_arn(self) -> iam.Role.role_arn:
- return self._analytics_processing_lambda_role.role_arn
- @property
- def analytics_application_name(self) -> analytics.CfnApplication.application_name:
- return self._analytics_application.application_name
- @property
- def analytics_application_role_arn(self) -> iam.Role.role_arn:
- return self._analytics_application_role.role_arn
|