123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548 |
- # Copyright (c) Contributors to the Open 3D Engine Project.
- # For complete copyright and license terms please see the LICENSE at the root of this distribution.
- #
- # SPDX-License-Identifier: Apache-2.0 OR MIT
- #
- import argparse
- import boto3
- from botocore.utils import IMDSFetcher
- import datetime
- import os
- import psutil
- import time
- import subprocess
- import sys
- import tempfile
- from contextlib import contextmanager
- import threading
- import _thread
- from botocore.config import Config
- DEFAULT_REGION = 'us-west-2'
- DEFAULT_DISK_SIZE = 300
- DEFAULT_DISK_TYPE = 'gp2'
- DEFAULT_TIMEOUT = 300
- MAX_EBS_MOUNTING_ATTEMPT = 3
- LOW_EBS_DISK_SPACE_LIMIT = 10240
- MAX_EBS_DISK_SIZE = DEFAULT_DISK_SIZE * 2
- if os.name == 'nt':
- MOUNT_PATH = 'D:\\'
- else:
- MOUNT_PATH = '/data'
- if os.name == 'nt':
- import ctypes
- import win32api
- import collections
- import locale
- locale.setlocale(locale.LC_ALL, '') # set locale to default to get thousands separators
- PULARGE_INTEGER = ctypes.POINTER(ctypes.c_ulonglong) # Pointer to large unsigned integer
- kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
- kernel32.GetDiskFreeSpaceExW.argtypes = (ctypes.c_wchar_p,) + (PULARGE_INTEGER,) * 3
- class UsageTuple(collections.namedtuple('UsageTuple', 'total, used, free')):
- def __str__(self):
- # Add thousands separator to numbers displayed
- return self.__class__.__name__ + '(total={:n}, used={:n}, free={:n})'.format(*self)
- def is_dir_symlink(path):
- FILE_ATTRIBUTE_REPARSE_POINT = 0x0400
- return os.path.isdir(path) and (
- ctypes.windll.kernel32.GetFileAttributesW(str(path)) & FILE_ATTRIBUTE_REPARSE_POINT)
- def get_free_space_mb(path):
- if sys.version_info < (3,): # Python 2?
- saved_conversion_mode = ctypes.set_conversion_mode('mbcs', 'strict')
- else:
- try:
- path = os.fsdecode(path) # allows str or bytes (or os.PathLike in Python 3.6+)
- except AttributeError: # fsdecode() not added until Python 3.2
- pass
- # Define variables to receive results when passed as "by reference" arguments
- _, total, free = ctypes.c_ulonglong(), ctypes.c_ulonglong(), ctypes.c_ulonglong()
- success = kernel32.GetDiskFreeSpaceExW(
- path, ctypes.byref(_), ctypes.byref(total), ctypes.byref(free))
- if not success:
- error_code = ctypes.get_last_error()
- if sys.version_info < (3,): # Python 2?
- ctypes.set_conversion_mode(*saved_conversion_mode) # restore conversion mode
- if not success:
- windows_error_message = ctypes.FormatError(error_code)
- raise ctypes.WinError(error_code, '{} {!r}'.format(windows_error_message, path))
- used = total.value - free.value
- return free.value / 1024 / 1024 # for now
- else:
- def get_free_space_mb(dirname):
- st = os.statvfs(dirname)
- return st.f_bavail * st.f_frsize / 1024 / 1024
- def error(message):
- print(message)
- exit(1)
- @contextmanager
- def timeout(duration, timeout_message):
- timer = threading.Timer(duration, lambda: _thread.interrupt_main())
- timer.start()
- try:
- yield
- except KeyboardInterrupt:
- print(timeout_message)
- raise TimeoutError
- finally:
- # If the action ends in specified time, timer is canceled
- timer.cancel()
- def print_drives():
- if os.name == 'nt':
- drives_before = win32api.GetLogicalDriveStrings()
- drives_before = drives_before.split('\000')[:-1]
- print(drives_before)
- def parse_args():
- parser = argparse.ArgumentParser()
- parser.add_argument('-a', '--action', dest="action", help="Action (mount|unmount|delete)")
- parser.add_argument('-snapshot-hint', '--snapshot-hint', dest="snapshot_hint", help="Build snapshot to attempt to use")
- parser.add_argument('-repository_name', '--repository_name', dest="repository_name", help="Repository name")
- parser.add_argument('-project', '--project', dest="project", help="Project")
- parser.add_argument('-pipe', '--pipeline', dest="pipeline", help="Pipeline")
- parser.add_argument('-b', '--branch', dest="branch", help="Branch")
- parser.add_argument('-plat', '--platform', dest="platform", help="Platform")
- parser.add_argument('-c', '--build_type', dest="build_type", help="Build type")
- parser.add_argument('-ds', '--disk_size', dest="disk_size",
- help=f"Disk size in Gigabytes (defaults to {DEFAULT_DISK_SIZE})", default=DEFAULT_DISK_SIZE)
- parser.add_argument('-dt', '--disk_type', dest="disk_type", help=f"Disk type (defaults to {DEFAULT_DISK_TYPE})",
- default=DEFAULT_DISK_TYPE)
- args = parser.parse_args()
- # Input validation
- if args.action is None:
- error('No action specified')
- args.action = args.action.lower()
- if args.action != 'unmount':
- if args.repository_name is None:
- error('No repository specified')
- if args.project is None:
- error('No project specified')
- if args.pipeline is None:
- error('No pipeline specified')
- if args.branch is None:
- error('No branch specified')
- if args.platform is None:
- error('No platform specified')
- if args.build_type is None:
- error('No build_type specified')
- return args
- def get_mount_name(repository_name, project, pipeline, branch, platform, build_type):
- mount_name = f"{repository_name}_{project}_{pipeline}_{branch}_{platform}_{build_type}"
- mount_name = mount_name.replace('/', '_').replace('\\', '_')
- return mount_name
- def get_pipeline_and_branch(pipeline, branch):
- pipeline_and_branch = f"{pipeline}_{branch}"
- pipeline_and_branch = pipeline_and_branch.replace('/', '_').replace('\\', '_')
- return pipeline_and_branch
- def get_region_name():
- session = boto3.session.Session()
- region = session.region_name
- if region is None:
- region = DEFAULT_REGION
- return region
- def get_ec2_client(region):
- client_config = Config(
- region_name=region,
- retries={
- 'mode': 'standard'
- }
- )
- client = boto3.client('ec2', config=client_config)
- return client
- def get_ec2_resource(region):
- resource_config = Config(
- region_name=region,
- retries={
- 'mode': 'standard'
- }
- )
- resource = boto3.resource('ec2', config=resource_config)
- return resource
- def get_ec2_instance_id():
- try:
- token = IMDSFetcher()._fetch_metadata_token()
- instance_id = IMDSFetcher()._get_request("/latest/meta-data/instance-id", None, token).text
- return instance_id
- except Exception as e:
- print(e)
- error('No EC2 metadata! Check if you are running this script on an EC2 instance.')
- def get_availability_zone():
- try:
- token = IMDSFetcher()._fetch_metadata_token()
- availability_zone = IMDSFetcher()._get_request("/latest/meta-data/placement/availability-zone", None, token).text
- return availability_zone
- except Exception as e:
- print(e)
- error('No EC2 metadata! Check if you are running this script on an EC2 instance.')
- def kill_processes(workspace='/data'):
- """
- Kills all processes that have open file paths associated with the workspace.
- Uses PSUtil for cross-platform compatibility
- """
- print('Checking for any stuck processes...')
- for proc in psutil.process_iter():
- try:
- if workspace in str(proc.open_files()):
- print(f"{proc.name()} has open files in {proc.open_files()}. Terminating")
- proc.kill()
- time.sleep(1) # Just to make sure a parent process has time to close
- except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
- continue
- def delete_volume(ec2_client, volume_id):
- response = ec2_client.delete_volume(VolumeId=volume_id)
- print(f'Volume {volume_id} deleted')
- def find_snapshot_id(ec2_client, snapshot_hint, repository_name, project, pipeline, platform, build_type, disk_size):
- mount_name = get_mount_name(repository_name, project, pipeline, snapshot_hint, platform, build_type)
- response = ec2_client.describe_snapshots(Filters= [{
- 'Name': 'tag:Name', 'Values': [mount_name]
- }])
- snapshot_id = None
- if 'Snapshots' in response and len(response['Snapshots']) > 0:
- snapshot_start_time_max = None # find the latest snapshot
- for snapshot in response['Snapshots']:
- if snapshot['State'] == 'completed' and snapshot['VolumeSize'] == disk_size:
- snapshot_start_time = snapshot['StartTime']
- if not snapshot_start_time_max or snapshot_start_time > snapshot_start_time_max:
- snapshot_start_time_max = snapshot_start_time
- snapshot_id = snapshot['SnapshotId']
- return snapshot_id
- def offline_drive(disk_number=1):
- """Use diskpart to offline a Windows drive"""
- with tempfile.NamedTemporaryFile(delete=False) as f:
- f.write(f"""
- select disk {disk_number}
- offline disk
- """.encode('utf-8'))
- subprocess.run(['diskpart', '/s', f.name])
- os.unlink(f.name)
- def create_volume(ec2_client, availability_zone, snapshot_hint, repository_name, project, pipeline, branch, platform, build_type, disk_size, disk_type):
- # The actual EBS default calculation for IOps is a floating point number, the closest approxmiation is 4x of the disk size for simplicity
- mount_name = get_mount_name(repository_name, project, pipeline, branch, platform, build_type)
- pipeline_and_branch = get_pipeline_and_branch(pipeline, branch)
- parameters = dict(
- AvailabilityZone=availability_zone,
- VolumeType=disk_type,
- Encrypted=True,
- TagSpecifications= [{
- 'ResourceType': 'volume',
- 'Tags': [
- {'Key': 'Name', 'Value': mount_name},
- {'Key': 'RepositoryName', 'Value': repository_name},
- {'Key': 'Project', 'Value': project},
- {'Key': 'Pipeline', 'Value': pipeline},
- {'Key': 'BranchName', 'Value': branch},
- {'Key': 'Platform', 'Value': platform},
- {'Key': 'BuildType', 'Value': build_type},
- # Used so the snapshoting easily identifies which volumes to snapshot
- {'Key': 'PipelineAndBranch', 'Value': pipeline_and_branch},
- ]
- }]
- )
- # The actual EBS default calculation for IOps is a floating point number,
- # the closest approxmiation is 4x of the disk size for simplicity
- if 'io1' in disk_type.lower():
- parameters['Iops'] = (4 * disk_size)
- snapshot_id = find_snapshot_id(ec2_client, snapshot_hint, repository_name, project, pipeline, platform, build_type, disk_size)
- if snapshot_id:
- parameters['SnapshotId'] = snapshot_id
- created = False
- else:
- # If no snapshot id, we need to specify the size
- parameters['Size'] = disk_size
- created = True
- response = ec2_client.create_volume(**parameters)
- volume_id = response['VolumeId']
- # give some time for the creation call to complete
- time.sleep(1)
- response = ec2_client.describe_volumes(VolumeIds=[volume_id, ])
- with timeout(DEFAULT_TIMEOUT, 'ERROR: Timeout reached trying to create EBS.'):
- while response['Volumes'][0]['State'] != 'available':
- time.sleep(1)
- response = ec2_client.describe_volumes(VolumeIds=[volume_id, ])
- print(f"Volume {volume_id} created\n\tSnapshot: {snapshot_id}\n\tRepository {repository_name}\n\t"
- f"Project {project}\n\tPipeline {pipeline}\n\tBranch {branch}\n\tPlatform: {platform}\n\tBuild type: {build_type}")
- return volume_id, created
- def mount_volume_to_device(created):
- print('Mounting volume...')
- if os.name == 'nt':
- # Verify drive is in an offline state.
- # Some Windows configs will automatically set new drives as online causing diskpart setup script to fail.
- offline_drive()
- with tempfile.NamedTemporaryFile(delete=False) as f:
- f.write("""
- select disk 1
- online disk
- attribute disk clear readonly
- """.encode('utf-8')) # assume disk # for now
- if created:
- print('Creating filesystem on new volume')
- f.write("""
- create partition primary
- select partition 1
- format quick fs=ntfs
- assign
- active
- """.encode('utf-8'))
- subprocess.call(['diskpart', '/s', f.name])
- time.sleep(5)
- print_drives()
- os.unlink(f.name)
- time.sleep(1)
- else:
- device_name = '/dev/xvdf'
- nvme_device_name = '/dev/nvme1n1'
- if os.path.exists(nvme_device_name):
- device_name = nvme_device_name
- subprocess.call(['file', '-s', device_name])
- if created:
- subprocess.call(['mkfs', '-t', 'ext4', device_name])
- subprocess.call(['mount', device_name, MOUNT_PATH])
- def attach_volume_to_ec2_instance(volume, volume_id, instance_id, timeout_duration=DEFAULT_TIMEOUT):
- print(f'Attaching volume {volume_id} to instance {instance_id}')
- volume.attach_to_instance(Device='xvdf',
- InstanceId=instance_id,
- VolumeId=volume_id)
- # give a little bit of time for the aws call to process
- time.sleep(2)
- # reload the volume just in case
- volume.load()
- with timeout(timeout_duration, 'ERROR: Timeout reached trying to mount EBS.'):
- while len(volume.attachments) and volume.attachments[0]['State'] != 'attached':
- time.sleep(1)
- volume.load()
- volume.create_tags(
- Tags=[
- {
- 'Key': 'AttachDate',
- 'Value': str(datetime.datetime.today().date())
- },
- ]
- )
- print(f'Volume {volume_id} has been attached to instance {instance_id}')
- def unmount_volume_from_device():
- print('Unmounting EBS volume from device...')
- if os.name == 'nt':
- kill_processes(MOUNT_PATH + 'workspace')
- offline_drive()
- else:
- kill_processes(MOUNT_PATH)
- subprocess.call(['umount', '-fl', MOUNT_PATH])
- def detach_volume_from_ec2_instance(volume, ec2_instance_id, force, timeout_duration=DEFAULT_TIMEOUT):
- print(f'Detaching volume {volume.volume_id} from instance {ec2_instance_id}')
- volume.detach_from_instance(Device='xvdf',
- Force=force,
- InstanceId=ec2_instance_id,
- VolumeId=volume.volume_id)
- try:
- with timeout(timeout_duration, 'ERROR: Timeout reached trying to unmount EBS.'):
- while len(volume.attachments) and volume.attachments[0]['State'] != 'detached':
- time.sleep(1)
- volume.load()
- except TimeoutError:
- print('Force detaching EBS.')
- volume.detach_from_instance(Device='xvdf', Force=True, InstanceId=ec2_instance_id, VolumeId=volume.volume_id)
- print(f'Volume {volume.volume_id} has been detached from instance {ec2_instance_id}')
- volume.load()
- if len(volume.attachments):
- print('Volume still has attachments')
- for attachment in volume.attachments:
- print(f"Volume {attachment['VolumeId']} {attachment['State']} to instance {attachment['InstanceId']}")
- def mount_ebs(snapshot_hint, repository_name, project, pipeline, branch, platform, build_type, disk_size, disk_type):
- region = get_region_name()
- ec2_client = get_ec2_client(region)
- ec2_instance_id = get_ec2_instance_id()
- ec2_availability_zone = get_availability_zone()
- ec2_resource = get_ec2_resource(region)
- ec2_instance = ec2_resource.Instance(ec2_instance_id)
- for volume in ec2_instance.volumes.all():
- for attachment in volume.attachments:
- print(f"attachment device: {attachment['Device']}")
- if 'xvdf' in attachment['Device'] and attachment['State'] != 'detached':
- print('A device is already attached to xvdf. This likely means a previous build failed to detach its '
- 'build volume. This volume is considered orphaned and will be detached from this instance.')
- unmount_volume_from_device()
- detach_volume_from_ec2_instance(volume, ec2_instance_id,
- False) # Force unmounts should not be used, as that will cause the EBS block device driver to fail the remount
- mount_name = get_mount_name(repository_name, project, pipeline, branch, platform, build_type)
- response = ec2_client.describe_volumes(Filters=[{
- 'Name': 'tag:Name', 'Values': [mount_name]
- }])
- created = False
- if 'Volumes' in response and not len(response['Volumes']):
- print(f'Volume for {mount_name} doesn\'t exist creating it...')
- # volume doesn't exist, create it
- volume_id, created = create_volume(ec2_client, ec2_availability_zone, snapshot_hint, repository_name, project, pipeline, branch, platform, build_type, disk_size, disk_type)
- else:
- volume = response['Volumes'][0]
- volume_id = volume['VolumeId']
- print(f"Current volume {volume_id} is a {volume['Size']} GB {volume['VolumeType']}")
- if volume['Size'] != disk_size or volume['VolumeType'] != disk_type:
- print(
- f'Override disk attributes does not match the existing volume, deleting {volume_id} and replacing the volume')
- delete_volume(ec2_client, volume_id)
- volume_id, created = create_volume(ec2_client, ec2_availability_zone, snapshot_hint, repository_name, project, pipeline, branch, platform, build_type, disk_size, disk_type)
- if len(volume['Attachments']):
- # this is bad we shouldn't be attached, we should have detached at the end of a build
- attachment = volume['Attachments'][0]
- print(f'Volume already has attachment {attachment}, detaching...')
- detach_volume_from_ec2_instance(ec2_resource.Volume(volume_id), attachment['InstanceId'], True)
- volume = ec2_resource.Volume(volume_id)
- print_drives()
- attach_volume_to_ec2_instance(volume, volume_id, ec2_instance_id)
- mount_volume_to_device(created)
- print_drives()
- free_space_mb = get_free_space_mb(MOUNT_PATH)
- print(f'Free disk space {free_space_mb}MB')
- if free_space_mb < LOW_EBS_DISK_SPACE_LIMIT:
- print(f'Volume is running below EBS free disk space treshhold {LOW_EBS_DISK_SPACE_LIMIT}MB. Recreating volume and running clean build.')
- unmount_volume_from_device()
- detach_volume_from_ec2_instance(volume, ec2_instance_id, False)
- delete_volume(ec2_client, volume_id)
- new_disk_size = int(volume.size * 1.25)
- if new_disk_size > MAX_EBS_DISK_SIZE:
- print(f'Error: EBS disk size reached to the allowed maximum disk size {MAX_EBS_DISK_SIZE}MB, please contact ly-infra@ and ly-build@ to investigate.')
- exit(1)
- print('Recreating the EBS with disk size {}'.format(new_disk_size))
- volume_id, created = create_volume(ec2_client, ec2_availability_zone, snapshot_hint, repository_name, project, pipeline, branch, platform, build_type, new_disk_size, disk_type)
- volume = ec2_resource.Volume(volume_id)
- attach_volume_to_ec2_instance(volume, volume_id, ec2_instance_id)
- mount_volume_to_device(created)
- def unmount_ebs():
- region = get_region_name()
- ec2_instance_id = get_ec2_instance_id()
- ec2_resource = get_ec2_resource(region)
- ec2_instance = ec2_resource.Instance(ec2_instance_id)
- if os.path.isfile('envinject.properties'):
- os.remove('envinject.properties')
- volume = None
- for attached_volume in ec2_instance.volumes.all():
- for attachment in attached_volume.attachments:
- print(f"attachment device: {attachment['Device']}")
- if attachment['Device'] == 'xvdf':
- volume = attached_volume
- if not volume:
- # volume is not mounted
- print('Volume is not mounted')
- else:
- unmount_volume_from_device()
- detach_volume_from_ec2_instance(volume, ec2_instance_id, False)
- def delete_ebs(repository_name, project, pipeline, branch, platform, build_type):
- unmount_ebs()
- region = get_region_name()
- ec2_client = get_ec2_client(region)
- mount_name = get_mount_name(repository_name, project, pipeline, branch, platform, build_type)
- response = ec2_client.describe_volumes(Filters=[
- {'Name': 'tag:Name', 'Values': [mount_name]}
- ])
- if 'Volumes' in response and len(response['Volumes']):
- volume = response['Volumes'][0]
- volume_id = volume['VolumeId']
- delete_volume(ec2_client, volume_id)
- def main(action, snapshot_hint, repository_name, project, pipeline, branch, platform, build_type, disk_size, disk_type):
- if action == 'mount':
- mount_ebs(snapshot_hint, repository_name, project, pipeline, branch, platform, build_type, disk_size, disk_type)
- elif action == 'unmount':
- unmount_ebs()
- elif action == 'delete':
- delete_ebs(repository_name, project, pipeline, branch, platform, build_type)
- if __name__ == "__main__":
- args = parse_args()
- ret = main(args.action, args.snapshot_hint, args.repository_name, args.project, args.pipeline, args.branch, args.platform, args.build_type, args.disk_size, args.disk_type)
- sys.exit(ret)
|