ebs_snapshot.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. #
  2. # Copyright (c) Contributors to the Open 3D Engine Project.
  3. # For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. #
  5. # SPDX-License-Identifier: Apache-2.0 OR MIT
  6. #
  7. #
  8. import argparse
  9. import boto3
  10. import logging
  11. import sys
  12. from botocore.config import Config
  13. log = logging.getLogger(__name__)
  14. log.setLevel(logging.INFO)
  15. log.addHandler(logging.StreamHandler())
  16. DEFAULT_REGION = 'us-west-2'
  17. DEFAULT_SNAPSHOT_RETAIN = 2
  18. DEFAULT_SNAPSHOT_DESCRIPTION = 'Created for Build Artifact Snapshots'
  19. DEFAULT_DRYRUN = True
  20. def _kv_to_dict(kv_string):
  21. """
  22. Simple splitting of a key value string to dictionary in "Name: <Key>, Values: [<value>]" form
  23. :param kv_string: String in the form of "key:value"
  24. :return Dictionary of values
  25. """
  26. dict = {}
  27. if ":" not in kv_string:
  28. log.error(f'Keyvalue parameter not in the form of "key:value"')
  29. raise ValueError
  30. kv = kv_string.split(':')
  31. dict['Name'] = f'tag:{kv[0]}'
  32. dict['Values'] = [kv[1]]
  33. return dict
  34. def _format_tags(tag_keyvalue):
  35. """
  36. Format tags in list form
  37. :param tag_keyvalue: String of comma separated key value pairs
  38. :return List of dictionary values
  39. """
  40. tag_filter = []
  41. for keyvalue in tag_keyvalue:
  42. tag_filter.append(_kv_to_dict(keyvalue))
  43. return tag_filter
  44. def get_ec2_resource():
  45. """
  46. Get the AWS EC2 resource object, with appropriate region
  47. :return The EC2 resource object
  48. """
  49. session = boto3.session.Session()
  50. region = session.region_name
  51. if region is None:
  52. region = DEFAULT_REGION
  53. resource_config = Config(
  54. region_name=region,
  55. retries={
  56. 'mode': 'standard'
  57. }
  58. )
  59. resource = boto3.resource('ec2', config=resource_config)
  60. return resource
  61. def create_snapshot(ec2_resource, tag_keyvalue, snap_description, snap_dryrun=DEFAULT_DRYRUN):
  62. """
  63. Find and snapshot all EBS volumes that have a matching tag value. Injects all volume tags into the snapshot,
  64. including the name and adds a description
  65. :param ec2_resource: The EC2 resource object
  66. :param tag_keyvalue: List of Strings with tag keyvalues in the form of "key:value"
  67. :param snap_description: String with the snapshot description to write
  68. :param snap_dryrun: Boolean to dryrun the action. Set to true by default (always dryrun)
  69. :return: Number of EBS volumes that are snapshotted successfully, number of EBS volumes that failed to be snapshotted
  70. """
  71. success = 0
  72. failure = 0
  73. tags = _format_tags(tag_keyvalue)
  74. for tag in tags:
  75. response = ec2_resource.volumes.filter(Filters=[tag])
  76. log.info(f'Snapshotting EBS volumes with tags that match {tag}...')
  77. for volume in response:
  78. try:
  79. log.info(f'Snapshotting volume {volume.volume_id}')
  80. volume.create_snapshot(Description=snap_description, TagSpecifications=[{'ResourceType': 'snapshot', 'Tags': volume.tags}], DryRun=snap_dryrun)
  81. success += 1
  82. except Exception as e:
  83. log.error(f'Failed to snapshot volume {volume.volume_id}.')
  84. log.error(e)
  85. failure += 1
  86. return success, failure
  87. def delete_snapshot(ec2_resource, tag_keyvalue, snap_description, snap_retention, snap_dryrun=DEFAULT_DRYRUN):
  88. """
  89. Find all EBS snapshots that have a matching tag value AND description. If the number of snapshots exceeds a retention amount,
  90. delete the oldest snapshot until retention is achived.
  91. :param ec2_resource: The EC2 resource object
  92. :param tag_keyvalue: List of Strings with tag keyvalues in the form of "key:value"
  93. :param snap_description: String with the snapshot description to search
  94. :param snap_retention: Integer with the number of snapshots to retain
  95. :param snap_dryrun: Boolean to dryrun the action. Set to true by default (always dryrun)
  96. :return: Number of EBS snapshots deleted successfully, number of EBS snapshots that failed to be deleted
  97. """
  98. success = 0
  99. failure = 0
  100. description_filter = {"Name": "description", "Values": [snap_description]}
  101. tags = _format_tags(tag_keyvalue)
  102. for tag in tags:
  103. response = list(ec2_resource.snapshots.filter(Filters=[tag,description_filter]))
  104. log.info(f'Getting snapshots with tags that match {tag}...')
  105. num_snaps = len(response)
  106. log.info(f'Tag {tag} has {num_snaps} snapshots')
  107. if num_snaps > snap_retention:
  108. log.info(f'Deleting oldest snapshots to keep retention of {snap_retention}')
  109. snap_list = sorted(response, key=lambda k: k.start_time) # Get a sorted list of snapshots by start time in descending order
  110. diff_snap = num_snaps - snap_retention
  111. for n in range(diff_snap):
  112. try:
  113. log.info(f'Deleting snapshot {snap_list[n].snapshot_id}')
  114. snap_list[n].delete(DryRun=snap_dryrun)
  115. success += 1
  116. except Exception as e:
  117. log.error(f'Failed to delete snapshot {snap_list[n].snapshot_id}.')
  118. log.error(e)
  119. failure += 1
  120. return success, failure
  121. def list_snapshot(ec2_resource, tag_keyvalue, snap_description):
  122. """
  123. Find all EBS snapshots that have a matching tag value AND description. Prints snap id, description, tags, and start time.
  124. :param ec2_resource: The EC2 resource object
  125. :param tag_keyvalue: List of Strings with tag keyvalues in the form of "key:value"
  126. :param snap_description: String with the snapshot description to search
  127. :return: None
  128. """
  129. description_filter = {"Name": "description", "Values": [snap_description]}
  130. tags = _format_tags(tag_keyvalue)
  131. for tag in tags:
  132. response = ec2_resource.snapshots.filter(Filters=[tag,description_filter])
  133. log.info(f'Getting snapshots with tags that match {tag}...')
  134. num_snaps = len(list(response))
  135. log.info(f'Tag {tag} has {num_snaps} snapshots')
  136. snap_list = sorted(response, key=lambda k: k.start_time)
  137. for n in range(num_snaps):
  138. print(f'Snap ID: {snap_list[n].snapshot_id} \n Description: {snap_list[n].description} \n Tags: {snap_list[n].tags} \n Start Time: {snap_list[n].start_time}')
  139. return None
  140. def parse_args():
  141. parser = argparse.ArgumentParser(description='Script to manage EBS snapshots for build artifacts')
  142. parser.add_argument('--action', '-a', type=str, help='(create|delete|list) Creates, deletes, or lists EBS snapshots based on tag. Requires --tags argument')
  143. parser.add_argument('--tags', '-t', type=str, required=True, help='Comma separated key value tags to search for in the form of "key:value", for example, "PipelineAndBranch:default_development","PipelineAndBranch:default_development"')
  144. parser.add_argument('--description', '-d', default=DEFAULT_SNAPSHOT_DESCRIPTION, help=f'Snapshot description to write or search for. Defaults to "{DEFAULT_SNAPSHOT_DESCRIPTION}"')
  145. parser.add_argument('--retention', '-r', nargs="?", const=DEFAULT_SNAPSHOT_RETAIN, type=int, help=f'Integer with the number of snapshots to retain. Defaults to {DEFAULT_SNAPSHOT_RETAIN}')
  146. parser.add_argument('--execute', '-e', action='store_false', help=f'Execute the snapshot commands. This needs to be set, otherwise it will always dryrun')
  147. return parser.parse_args()
  148. def main():
  149. args = parse_args()
  150. tag_list = args.tags.split(",")
  151. ec2_resource = get_ec2_resource()
  152. if 'create' in args.action:
  153. ret = create_snapshot(ec2_resource, tag_list, args.description, args.execute)
  154. log.info(f'{ret[0]} snapshots created, {ret[1]} snapshots failed')
  155. elif 'delete' in args.action:
  156. ret = delete_snapshot(ec2_resource, tag_list, args.description, args.retention, args.execute)
  157. log.info(f'{ret[0]} snapshots deleted, {ret[1]} snapshot deletions failed')
  158. elif 'list' in args.action:
  159. ret = list_snapshot(ec2_resource, tag_list, args.description)
  160. if __name__ == "__main__":
  161. sys.exit(main())