  1. """
  2. Copyright (c) Contributors to the Open 3D Engine Project.
  3. For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. SPDX-License-Identifier: Apache-2.0 OR MIT
  5. """
  6. from aws_cdk import (
  7. core,
  8. aws_apigateway as apigateway,
  9. aws_iam as iam,
  10. aws_kinesis as kinesis
  11. )
  12. import json
  13. from . import aws_metrics_constants
  14. from .aws_utils import resource_name_sanitizer
  15. class DataIngestion:
  16. """
  17. Create the service API via APIGateway and Kinesis data stream to ingest metrics events.
  18. """
  19. def __init__(self, stack: core.Construct, application_name: str) -> None:
  20. self._stack = stack
  21. # create the input Kinesis stream
  22. self._input_stream = kinesis.Stream(
  23. self._stack,
  24. id='InputStream',
  25. stream_name=resource_name_sanitizer.sanitize_resource_name(
  26. f'{self._stack.stack_name}-InputStream', 'kinesis_stream'),
  27. shard_count=1
  28. )
  29. apigateway_role = self._create_apigateway_role()
  30. # create the REST API resource
  31. self._rest_api = apigateway.SpecRestApi(
  32. self._stack,
  33. 'RestApi',
  34. rest_api_name=f'{self._stack.stack_name}-RestApi',
  35. endpoint_export_name=f'{application_name}:RestApiEndpoint',
  36. api_definition=apigateway.ApiDefinition.from_asset('api_spec.json'),
  37. deploy_options=apigateway.StageOptions(
  38. method_options={
  39. "/*/*": apigateway.MethodDeploymentOptions(
  40. metrics_enabled=True,
  41. logging_level=apigateway.MethodLoggingLevel.INFO
  42. )
  43. },
  44. stage_name=aws_metrics_constants.APIGATEWAY_STAGE
  45. )
  46. )
  47. # read api_spec.json and replace the template variables
  48. with open("api_spec.json", "r") as api_spec:
  49. content =
  50. content = content.replace("${ApiGatewayRoleArn}", apigateway_role.role_arn)
  51. content = content.replace("${InputStreamName}", self._input_stream.stream_name)
  52. api_definition = json.loads(content)
  53. # use escape hatches to override the API definitions with the actual resource information
  54. #
  55. cfn_rest_api = self._rest_api.node.default_child
  56. cfn_rest_api.add_property_override("Body", api_definition)
  57. cfn_rest_api.add_property_deletion_override("BodyS3Location")
  58. cfn_rest_api.add_property_override("FailOnWarnings", True)
  59. core.CfnOutput(
  60. self._stack,
  61. id='RESTApiId',
  62. description='Service API Id for the analytics pipeline',
  63. export_name=f"{application_name}:RestApiId",
  64. value=self._rest_api.rest_api_id)
  65. core.CfnOutput(
  66. self._stack,
  67. id='RESTApiStage',
  68. description='Stage for the REST API deployment',
  69. export_name=f"{application_name}:DeploymentStage",
  70. value=self._rest_api.deployment_stage.stage_name)
  71. def _create_apigateway_role(self) -> iam.Role:
  72. """
  73. Generate the IAM role for the REST API to integration with Kinesis.
  74. :return: The created IAM role.
  75. """
  76. api_gateway_put_kinesis_policy_document = iam.PolicyDocument(
  77. statements=[
  78. iam.PolicyStatement(
  79. actions=[
  80. "kinesis:PutRecord",
  81. "kinesis:PutRecords"
  82. ],
  83. effect=iam.Effect.ALLOW,
  84. resources=[
  85. core.Fn.sub(
  86. body="arn:${AWS::Partition}:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${EventsStream}",
  87. variables={
  88. "EventsStream": self._input_stream.stream_name
  89. }
  90. )
  91. ]
  92. )
  93. ]
  94. )
  95. apigateway_role = iam.Role(
  96. self._stack,
  97. id=f'{self._stack.stack_name}-ApiGatewayRole',
  98. assumed_by=iam.ServicePrincipal(
  99. service=""
  100. ),
  101. inline_policies={
  102. "ApiGatewayPutKinesisPolicy": api_gateway_put_kinesis_policy_document
  103. }
  104. )
  105. return apigateway_role
  106. @property
  107. def input_stream_arn(self) -> kinesis.Stream.stream_arn:
  108. return self._input_stream.stream_arn
  109. @property
  110. def input_stream_name(self) -> kinesis.Stream.stream_name:
  111. return self._input_stream.stream_name
  112. @property
  113. def rest_api_id(self) -> apigateway.RestApi.rest_api_id:
  114. return self._rest_api.rest_api_id
  115. @property
  116. def deployment_stage(self) -> str:
  117. return aws_metrics_constants.APIGATEWAY_STAGE
  118. @property
  119. def execute_api_arn(self) -> apigateway.RestApi.arn_for_execute_api:
  120. return self._rest_api.arn_for_execute_api(stage=aws_metrics_constants.APIGATEWAY_STAGE)