data_lake_integration.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. """
  2. Copyright (c) Contributors to the Open 3D Engine Project.
  3. For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. SPDX-License-Identifier: Apache-2.0 OR MIT
  5. """
  6. from aws_cdk import (
  7. core,
  8. aws_iam as iam,
  9. aws_s3 as s3,
  10. aws_glue as glue
  11. )
  12. from . import aws_metrics_constants
  13. from .aws_utils import resource_name_sanitizer
  14. class DataLakeIntegration:
  15. """
  16. Create the AWS resources including the S3 bucket, Glue database, table and crawler for data lake integration
  17. """
  18. def __init__(self, stack: core.Construct, application_name: str,
  19. server_access_logs_bucket: str = None) -> None:
  20. self._stack = stack
  21. self._application_name = application_name
  22. self._server_access_logs_bucket = server_access_logs_bucket
  23. self._create_analytics_bucket()
  24. self._create_events_database()
  25. self._create_events_table()
  26. self._create_events_crawler()
  27. def _create_analytics_bucket(self) -> None:
  28. """
  29. Create a a private bucket that should only be accessed by the resources defined in the CDK application.
  30. The bucket uses server-side encryption with a CMK managed by S3:
  31. https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingKMSEncryption.html
  32. """
  33. # Enable server access logging if the server access logs bucket is provided following S3 best practices.
  34. # See https://docs.aws.amazon.com/AmazonS3/latest/dev/security-best-practices.html
  35. server_access_logs_bucket = s3.Bucket.from_bucket_name(
  36. self._stack,
  37. f'{self._stack.stack_name}-ImportedAccessLogsBucket',
  38. self._server_access_logs_bucket,
  39. ) if self._server_access_logs_bucket else None
  40. # Bucket name cannot contain uppercase characters
  41. # Do not specify the bucket name here since bucket name is required to be unique globally. If we set
  42. # a specific name here, only one customer can deploy the bucket successfully.
  43. self._analytics_bucket = s3.Bucket(
  44. self._stack,
  45. id=resource_name_sanitizer.sanitize_resource_name(
  46. f'{self._stack.stack_name}-AnalyticsBucket'.lower(), 's3_bucket'),
  47. encryption=s3.BucketEncryption.S3_MANAGED,
  48. block_public_access=s3.BlockPublicAccess(
  49. block_public_acls=True,
  50. block_public_policy=True,
  51. ignore_public_acls=True,
  52. restrict_public_buckets=True
  53. ),
  54. server_access_logs_bucket=server_access_logs_bucket,
  55. server_access_logs_prefix=f'{self._stack.stack_name}-AccessLogs' if server_access_logs_bucket else None
  56. )
  57. # For Amazon S3 buckets, you must delete all objects in the bucket for deletion to succeed.
  58. cfn_bucket = self._analytics_bucket.node.find_child('Resource')
  59. cfn_bucket.apply_removal_policy(core.RemovalPolicy.DESTROY)
  60. core.CfnOutput(
  61. self._stack,
  62. id='AnalyticsBucketName',
  63. description='Name of the S3 bucket for storing metrics event data',
  64. export_name=f"{self._application_name}:AnalyticsBucket",
  65. value=self._analytics_bucket.bucket_name)
  66. def _create_events_database(self) -> None:
  67. """
  68. Create the Glue database for metrics events.
  69. """
  70. # Database name cannot contain uppercase characters
  71. self._events_database = glue.CfnDatabase(
  72. self._stack,
  73. id='EventsDatabase',
  74. catalog_id=self._stack.account,
  75. database_input=glue.CfnDatabase.DatabaseInputProperty(
  76. description=f'Metrics events database for stack {self._stack.stack_name}',
  77. location_uri=f's3://{self._analytics_bucket.bucket_name}',
  78. name=f'{self._stack.stack_name}-EventsDatabase'.lower()
  79. )
  80. )
  81. core.CfnOutput(
  82. self._stack,
  83. id='EventDatabaseName',
  84. description='Glue database for metrics events.',
  85. export_name=f"{self._application_name}:EventsDatabase",
  86. value=self._events_database.ref)
  87. def _create_events_table(self) -> None:
  88. """
  89. Create the Glue table for metrics events. This table is used by the Kinesis Data Firehose
  90. to convert data from the JSON format to the Parquet format before writing it to Amazon S3.
  91. """
  92. self._events_table = glue.CfnTable(
  93. self._stack,
  94. id=f'EventsTable',
  95. catalog_id=self._stack.account,
  96. database_name=self._events_database.ref,
  97. table_input=glue.CfnTable.TableInputProperty(
  98. description=f'Stores metrics event data from the analytics pipeline for stack {self._stack.stack_name}',
  99. name=aws_metrics_constants.GLUE_TABLE_NAME,
  100. table_type='EXTERNAL_TABLE',
  101. partition_keys=[
  102. glue.CfnTable.ColumnProperty(
  103. name='year',
  104. type='string'
  105. ),
  106. glue.CfnTable.ColumnProperty(
  107. name='month',
  108. type='string'
  109. ),
  110. glue.CfnTable.ColumnProperty(
  111. name='day',
  112. type='string'
  113. ),
  114. ],
  115. parameters={
  116. 'classification': 'parquet',
  117. 'compressionType': 'none',
  118. 'typeOfData': 'file'
  119. },
  120. storage_descriptor=glue.CfnTable.StorageDescriptorProperty(
  121. input_format=aws_metrics_constants.GLUE_TABLE_INPUT_FORMAT,
  122. output_format=aws_metrics_constants.GLUE_TABLE_OUTPUT_FORMAT,
  123. serde_info=glue.CfnTable.SerdeInfoProperty(
  124. serialization_library=aws_metrics_constants.GLUE_TABLE_SERIALIZATION_LIBRARY,
  125. parameters={
  126. 'serialization.format':
  127. aws_metrics_constants.GLUE_TABLE_SERIALIZATION_LIBRARY_SERIALIZATION_FORMAT
  128. }
  129. ),
  130. stored_as_sub_directories=False,
  131. location=f's3://{self._analytics_bucket.bucket_name}/{aws_metrics_constants.GLUE_TABLE_NAME}/',
  132. columns=[
  133. glue.CfnTable.ColumnProperty(
  134. name='event_id',
  135. type='string'
  136. ),
  137. glue.CfnTable.ColumnProperty(
  138. name='event_type',
  139. type='string'
  140. ),
  141. glue.CfnTable.ColumnProperty(
  142. name='event_name',
  143. type='string'
  144. ),
  145. glue.CfnTable.ColumnProperty(
  146. name='event_timestamp',
  147. type='string'
  148. ),
  149. glue.CfnTable.ColumnProperty(
  150. name='event_version',
  151. type='string'
  152. ),
  153. glue.CfnTable.ColumnProperty(
  154. name='event_source',
  155. type='string'
  156. ),
  157. glue.CfnTable.ColumnProperty(
  158. name='application_id',
  159. type='string'
  160. ),
  161. glue.CfnTable.ColumnProperty(
  162. name='event_data',
  163. type='string'
  164. )
  165. ]
  166. )
  167. )
  168. )
  169. def _create_events_crawler(self) -> None:
  170. """
  171. Create the Glue crawler to populate the AWS Glue Data Catalog with tables.
  172. """
  173. self._create_events_crawler_role()
  174. self._events_crawler = glue.CfnCrawler(
  175. self._stack,
  176. id='EventsCrawler',
  177. name=f'{self._stack.stack_name}-EventsCrawler',
  178. role=self._events_crawler_role.role_arn,
  179. database_name=self._events_database.ref,
  180. targets=glue.CfnCrawler.TargetsProperty(
  181. s3_targets=[
  182. glue.CfnCrawler.S3TargetProperty(
  183. path=f's3://{self._analytics_bucket.bucket_name}/{aws_metrics_constants.GLUE_TABLE_NAME}/'
  184. )
  185. ]
  186. ),
  187. schema_change_policy=glue.CfnCrawler.SchemaChangePolicyProperty(
  188. update_behavior='UPDATE_IN_DATABASE',
  189. delete_behavior='LOG',
  190. ),
  191. configuration=aws_metrics_constants.CRAWLER_CONFIGURATION
  192. )
  193. core.CfnOutput(
  194. self._stack,
  195. id='EventsCrawlerName',
  196. description='Glue Crawler to populate the AWS Glue Data Catalog with metrics events tables',
  197. export_name=f"{self._application_name}:EventsCrawler",
  198. value=self._events_crawler.name)
  199. def _create_events_crawler_role(self) -> None:
  200. """
  201. Create the IAM role for the Glue crawler.
  202. """
  203. policy_statements = list()
  204. s3_policy_statement = iam.PolicyStatement(
  205. actions=[
  206. 's3:ListBucket',
  207. 's3:GetObject',
  208. 's3:PutObject',
  209. 's3:DeleteObject'
  210. ],
  211. effect=iam.Effect.ALLOW,
  212. resources=[
  213. self._analytics_bucket.bucket_arn,
  214. f'{self._analytics_bucket.bucket_arn}/*'
  215. ]
  216. )
  217. policy_statements.append(s3_policy_statement)
  218. glue_table_policy_statement = iam.PolicyStatement(
  219. actions=[
  220. 'glue:BatchGetPartition',
  221. 'glue:GetPartition',
  222. 'glue:GetPartitions',
  223. 'glue:BatchCreatePartition',
  224. 'glue:CreatePartition',
  225. 'glue:CreateTable',
  226. 'glue:GetTable',
  227. 'glue:GetTables',
  228. 'glue:GetTableVersion',
  229. 'glue:GetTableVersions',
  230. 'glue:UpdatePartition',
  231. 'glue:UpdateTable'
  232. ],
  233. effect=iam.Effect.ALLOW,
  234. resources=[
  235. core.Fn.sub(
  236. 'arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:catalog'
  237. ),
  238. core.Fn.sub(
  239. body='arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:table/${EventsDatabase}/*',
  240. variables={
  241. 'EventsDatabase': self._events_database.ref
  242. }
  243. ),
  244. core.Fn.sub(
  245. body='arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:database/${EventsDatabase}',
  246. variables={
  247. 'EventsDatabase': self._events_database.ref
  248. }
  249. )
  250. ]
  251. )
  252. policy_statements.append(glue_table_policy_statement)
  253. glue_database_policy_statement = iam.PolicyStatement(
  254. actions=[
  255. 'glue:GetDatabase',
  256. 'glue:GetDatabases',
  257. 'glue:UpdateDatabase'
  258. ],
  259. effect=iam.Effect.ALLOW,
  260. resources=[
  261. core.Fn.sub(
  262. 'arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:catalog'
  263. ),
  264. core.Fn.sub(
  265. body='arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:database/${EventsDatabase}',
  266. variables={
  267. 'EventsDatabase': self._events_database.ref
  268. }
  269. )
  270. ]
  271. )
  272. policy_statements.append(glue_database_policy_statement)
  273. log_policy_statement = iam.PolicyStatement(
  274. actions=[
  275. 'logs:CreateLogGroup',
  276. 'logs:CreateLogStream',
  277. 'logs:PutLogEvents'
  278. ],
  279. effect=iam.Effect.ALLOW,
  280. resources=[
  281. core.Fn.sub(
  282. 'arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws-glue/crawlers:*'
  283. )
  284. ]
  285. )
  286. policy_statements.append(log_policy_statement)
  287. events_crawler_policy_document = iam.PolicyDocument(
  288. statements=policy_statements
  289. )
  290. self._events_crawler_role = iam.Role(
  291. self._stack,
  292. id='EventsCrawlerRole',
  293. role_name=resource_name_sanitizer.sanitize_resource_name(
  294. f'{self._stack.stack_name}-EventsCrawlerRole', 'iam_role'),
  295. assumed_by=iam.ServicePrincipal(
  296. service='glue.amazonaws.com'
  297. ),
  298. inline_policies={
  299. 'GameAnalyticsPipelineGlueCrawlerPolicy': events_crawler_policy_document
  300. }
  301. )
  302. @property
  303. def analytics_bucket_arn(self) -> s3.Bucket.bucket_arn:
  304. return self._analytics_bucket.bucket_arn
  305. @property
  306. def analytics_bucket_name(self) -> s3.Bucket.bucket_name:
  307. return self._analytics_bucket.bucket_name
  308. @property
  309. def events_database_name(self) -> str:
  310. return self._events_database.ref
  311. @property
  312. def events_table_name(self) -> str:
  313. return self._events_table.ref
  314. @property
  315. def events_crawler_name(self) -> str:
  316. return self._events_crawler.name
  317. @property
  318. def events_crawler_role_arn(self) -> iam.Role.role_arn:
  319. return self._events_crawler_role.role_arn