data_ingestion.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. """
  2. Copyright (c) Contributors to the Open 3D Engine Project.
  3. For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. SPDX-License-Identifier: Apache-2.0 OR MIT
  5. """
  6. from aws_cdk import (
  7. core,
  8. aws_apigateway as apigateway,
  9. aws_iam as iam,
  10. aws_kinesis as kinesis
  11. )
  12. import json
  13. from . import aws_metrics_constants
  14. from .aws_utils import resource_name_sanitizer
  15. class DataIngestion:
  16. """
  17. Create the service API via APIGateway and Kinesis data stream to ingest metrics events.
  18. """
  19. def __init__(self, stack: core.Construct, application_name: str) -> None:
  20. self._stack = stack
  21. # create the input Kinesis stream
  22. self._input_stream = kinesis.Stream(
  23. self._stack,
  24. id='InputStream',
  25. stream_name=resource_name_sanitizer.sanitize_resource_name(
  26. f'{self._stack.stack_name}-InputStream', 'kinesis_stream'),
  27. shard_count=1
  28. )
  29. apigateway_role = self._create_apigateway_role()
  30. # create the REST API resource
  31. self._rest_api = apigateway.SpecRestApi(
  32. self._stack,
  33. 'RestApi',
  34. rest_api_name=f'{self._stack.stack_name}-RestApi',
  35. endpoint_export_name=f'{application_name}:RestApiEndpoint',
  36. api_definition=apigateway.ApiDefinition.from_asset('api_spec.json'),
  37. deploy_options=apigateway.StageOptions(
  38. method_options={
  39. "/*/*": apigateway.MethodDeploymentOptions(
  40. metrics_enabled=True,
  41. logging_level=apigateway.MethodLoggingLevel.INFO
  42. )
  43. },
  44. stage_name=aws_metrics_constants.APIGATEWAY_STAGE
  45. )
  46. )
  47. # read api_spec.json and replace the template variables
  48. with open("api_spec.json", "r") as api_spec:
  49. content = api_spec.read()
  50. content = content.replace("${ApiGatewayRoleArn}", apigateway_role.role_arn)
  51. content = content.replace("${InputStreamName}", self._input_stream.stream_name)
  52. api_definition = json.loads(content)
  53. # use escape hatches to override the API definitions with the actual resource information
  54. # https://docs.aws.amazon.com/cdk/latest/guide/cfn_layer.html
  55. cfn_rest_api = self._rest_api.node.default_child
  56. cfn_rest_api.add_property_override("Body", api_definition)
  57. cfn_rest_api.add_property_deletion_override("BodyS3Location")
  58. cfn_rest_api.add_property_override("FailOnWarnings", True)
  59. core.CfnOutput(
  60. self._stack,
  61. id='RESTApiId',
  62. description='Service API Id for the analytics pipeline',
  63. export_name=f"{application_name}:RestApiId",
  64. value=self._rest_api.rest_api_id)
  65. core.CfnOutput(
  66. self._stack,
  67. id='RESTApiStage',
  68. description='Stage for the REST API deployment',
  69. export_name=f"{application_name}:DeploymentStage",
  70. value=self._rest_api.deployment_stage.stage_name)
  71. def _create_apigateway_role(self) -> iam.Role:
  72. """
  73. Generate the IAM role for the REST API to integration with Kinesis.
  74. :return: The created IAM role.
  75. """
  76. api_gateway_put_kinesis_policy_document = iam.PolicyDocument(
  77. statements=[
  78. iam.PolicyStatement(
  79. actions=[
  80. "kinesis:PutRecord",
  81. "kinesis:PutRecords"
  82. ],
  83. effect=iam.Effect.ALLOW,
  84. resources=[
  85. core.Fn.sub(
  86. body="arn:${AWS::Partition}:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${EventsStream}",
  87. variables={
  88. "EventsStream": self._input_stream.stream_name
  89. }
  90. )
  91. ]
  92. )
  93. ]
  94. )
  95. apigateway_role = iam.Role(
  96. self._stack,
  97. id=f'{self._stack.stack_name}-ApiGatewayRole',
  98. assumed_by=iam.ServicePrincipal(
  99. service="apigateway.amazonaws.com"
  100. ),
  101. inline_policies={
  102. "ApiGatewayPutKinesisPolicy": api_gateway_put_kinesis_policy_document
  103. }
  104. )
  105. return apigateway_role
  106. @property
  107. def input_stream_arn(self) -> kinesis.Stream.stream_arn:
  108. return self._input_stream.stream_arn
  109. @property
  110. def input_stream_name(self) -> kinesis.Stream.stream_name:
  111. return self._input_stream.stream_name
  112. @property
  113. def rest_api_id(self) -> apigateway.RestApi.rest_api_id:
  114. return self._rest_api.rest_api_id
  115. @property
  116. def deployment_stage(self) -> str:
  117. return aws_metrics_constants.APIGATEWAY_STAGE
  118. @property
  119. def execute_api_arn(self) -> apigateway.RestApi.arn_for_execute_api:
  120. return self._rest_api.arn_for_execute_api(stage=aws_metrics_constants.APIGATEWAY_STAGE)