incremental_build_util.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548
  1. # Copyright (c) Contributors to the Open 3D Engine Project.
  2. # For complete copyright and license terms please see the LICENSE at the root of this distribution.
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR MIT
  5. #
  6. import argparse
  7. import boto3
  8. from botocore.utils import IMDSFetcher
  9. import datetime
  10. import os
  11. import psutil
  12. import time
  13. import subprocess
  14. import sys
  15. import tempfile
  16. from contextlib import contextmanager
  17. import threading
  18. import _thread
  19. from botocore.config import Config
  20. DEFAULT_REGION = 'us-west-2'
  21. DEFAULT_DISK_SIZE = 300
  22. DEFAULT_DISK_TYPE = 'gp2'
  23. DEFAULT_TIMEOUT = 300
  24. MAX_EBS_MOUNTING_ATTEMPT = 3
  25. LOW_EBS_DISK_SPACE_LIMIT = 10240
  26. MAX_EBS_DISK_SIZE = DEFAULT_DISK_SIZE * 2
  27. if os.name == 'nt':
  28. MOUNT_PATH = 'D:\\'
  29. else:
  30. MOUNT_PATH = '/data'
  31. if os.name == 'nt':
  32. import ctypes
  33. import win32api
  34. import collections
  35. import locale
  36. locale.setlocale(locale.LC_ALL, '') # set locale to default to get thousands separators
  37. PULARGE_INTEGER = ctypes.POINTER(ctypes.c_ulonglong) # Pointer to large unsigned integer
  38. kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
  39. kernel32.GetDiskFreeSpaceExW.argtypes = (ctypes.c_wchar_p,) + (PULARGE_INTEGER,) * 3
  40. class UsageTuple(collections.namedtuple('UsageTuple', 'total, used, free')):
  41. def __str__(self):
  42. # Add thousands separator to numbers displayed
  43. return self.__class__.__name__ + '(total={:n}, used={:n}, free={:n})'.format(*self)
  44. def is_dir_symlink(path):
  45. FILE_ATTRIBUTE_REPARSE_POINT = 0x0400
  46. return os.path.isdir(path) and (
  47. ctypes.windll.kernel32.GetFileAttributesW(str(path)) & FILE_ATTRIBUTE_REPARSE_POINT)
  48. def get_free_space_mb(path):
  49. if sys.version_info < (3,): # Python 2?
  50. saved_conversion_mode = ctypes.set_conversion_mode('mbcs', 'strict')
  51. else:
  52. try:
  53. path = os.fsdecode(path) # allows str or bytes (or os.PathLike in Python 3.6+)
  54. except AttributeError: # fsdecode() not added until Python 3.2
  55. pass
  56. # Define variables to receive results when passed as "by reference" arguments
  57. _, total, free = ctypes.c_ulonglong(), ctypes.c_ulonglong(), ctypes.c_ulonglong()
  58. success = kernel32.GetDiskFreeSpaceExW(
  59. path, ctypes.byref(_), ctypes.byref(total), ctypes.byref(free))
  60. if not success:
  61. error_code = ctypes.get_last_error()
  62. if sys.version_info < (3,): # Python 2?
  63. ctypes.set_conversion_mode(*saved_conversion_mode) # restore conversion mode
  64. if not success:
  65. windows_error_message = ctypes.FormatError(error_code)
  66. raise ctypes.WinError(error_code, '{} {!r}'.format(windows_error_message, path))
  67. used = total.value - free.value
  68. return free.value / 1024 / 1024 # for now
  69. else:
  70. def get_free_space_mb(dirname):
  71. st = os.statvfs(dirname)
  72. return st.f_bavail * st.f_frsize / 1024 / 1024
  73. def error(message):
  74. print(message)
  75. exit(1)
  76. @contextmanager
  77. def timeout(duration, timeout_message):
  78. timer = threading.Timer(duration, lambda: _thread.interrupt_main())
  79. timer.start()
  80. try:
  81. yield
  82. except KeyboardInterrupt:
  83. print(timeout_message)
  84. raise TimeoutError
  85. finally:
  86. # If the action ends in specified time, timer is canceled
  87. timer.cancel()
  88. def print_drives():
  89. if os.name == 'nt':
  90. drives_before = win32api.GetLogicalDriveStrings()
  91. drives_before = drives_before.split('\000')[:-1]
  92. print(drives_before)
  93. def parse_args():
  94. parser = argparse.ArgumentParser()
  95. parser.add_argument('-a', '--action', dest="action", help="Action (mount|unmount|delete)")
  96. parser.add_argument('-snapshot-hint', '--snapshot-hint', dest="snapshot_hint", help="Build snapshot to attempt to use")
  97. parser.add_argument('-repository_name', '--repository_name', dest="repository_name", help="Repository name")
  98. parser.add_argument('-project', '--project', dest="project", help="Project")
  99. parser.add_argument('-pipe', '--pipeline', dest="pipeline", help="Pipeline")
  100. parser.add_argument('-b', '--branch', dest="branch", help="Branch")
  101. parser.add_argument('-plat', '--platform', dest="platform", help="Platform")
  102. parser.add_argument('-c', '--build_type', dest="build_type", help="Build type")
  103. parser.add_argument('-ds', '--disk_size', dest="disk_size",
  104. help=f"Disk size in Gigabytes (defaults to {DEFAULT_DISK_SIZE})", default=DEFAULT_DISK_SIZE)
  105. parser.add_argument('-dt', '--disk_type', dest="disk_type", help=f"Disk type (defaults to {DEFAULT_DISK_TYPE})",
  106. default=DEFAULT_DISK_TYPE)
  107. args = parser.parse_args()
  108. # Input validation
  109. if args.action is None:
  110. error('No action specified')
  111. args.action = args.action.lower()
  112. if args.action != 'unmount':
  113. if args.repository_name is None:
  114. error('No repository specified')
  115. if args.project is None:
  116. error('No project specified')
  117. if args.pipeline is None:
  118. error('No pipeline specified')
  119. if args.branch is None:
  120. error('No branch specified')
  121. if args.platform is None:
  122. error('No platform specified')
  123. if args.build_type is None:
  124. error('No build_type specified')
  125. return args
  126. def get_mount_name(repository_name, project, pipeline, branch, platform, build_type):
  127. mount_name = f"{repository_name}_{project}_{pipeline}_{branch}_{platform}_{build_type}"
  128. mount_name = mount_name.replace('/', '_').replace('\\', '_')
  129. return mount_name
  130. def get_pipeline_and_branch(pipeline, branch):
  131. pipeline_and_branch = f"{pipeline}_{branch}"
  132. pipeline_and_branch = pipeline_and_branch.replace('/', '_').replace('\\', '_')
  133. return pipeline_and_branch
  134. def get_region_name():
  135. session = boto3.session.Session()
  136. region = session.region_name
  137. if region is None:
  138. region = DEFAULT_REGION
  139. return region
  140. def get_ec2_client(region):
  141. client_config = Config(
  142. region_name=region,
  143. retries={
  144. 'mode': 'standard'
  145. }
  146. )
  147. client = boto3.client('ec2', config=client_config)
  148. return client
  149. def get_ec2_resource(region):
  150. resource_config = Config(
  151. region_name=region,
  152. retries={
  153. 'mode': 'standard'
  154. }
  155. )
  156. resource = boto3.resource('ec2', config=resource_config)
  157. return resource
  158. def get_ec2_instance_id():
  159. try:
  160. token = IMDSFetcher()._fetch_metadata_token()
  161. instance_id = IMDSFetcher()._get_request("/latest/meta-data/instance-id", None, token).text
  162. return instance_id
  163. except Exception as e:
  164. print(e)
  165. error('No EC2 metadata! Check if you are running this script on an EC2 instance.')
  166. def get_availability_zone():
  167. try:
  168. token = IMDSFetcher()._fetch_metadata_token()
  169. availability_zone = IMDSFetcher()._get_request("/latest/meta-data/placement/availability-zone", None, token).text
  170. return availability_zone
  171. except Exception as e:
  172. print(e)
  173. error('No EC2 metadata! Check if you are running this script on an EC2 instance.')
  174. def kill_processes(workspace='/data'):
  175. """
  176. Kills all processes that have open file paths associated with the workspace.
  177. Uses PSUtil for cross-platform compatibility
  178. """
  179. print('Checking for any stuck processes...')
  180. for proc in psutil.process_iter():
  181. try:
  182. if workspace in str(proc.open_files()):
  183. print(f"{proc.name()} has open files in {proc.open_files()}. Terminating")
  184. proc.kill()
  185. time.sleep(1) # Just to make sure a parent process has time to close
  186. except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
  187. continue
  188. def delete_volume(ec2_client, volume_id):
  189. response = ec2_client.delete_volume(VolumeId=volume_id)
  190. print(f'Volume {volume_id} deleted')
  191. def find_snapshot_id(ec2_client, snapshot_hint, repository_name, project, pipeline, platform, build_type, disk_size):
  192. mount_name = get_mount_name(repository_name, project, pipeline, snapshot_hint, platform, build_type)
  193. response = ec2_client.describe_snapshots(Filters= [{
  194. 'Name': 'tag:Name', 'Values': [mount_name]
  195. }])
  196. snapshot_id = None
  197. if 'Snapshots' in response and len(response['Snapshots']) > 0:
  198. snapshot_start_time_max = None # find the latest snapshot
  199. for snapshot in response['Snapshots']:
  200. if snapshot['State'] == 'completed' and snapshot['VolumeSize'] == disk_size:
  201. snapshot_start_time = snapshot['StartTime']
  202. if not snapshot_start_time_max or snapshot_start_time > snapshot_start_time_max:
  203. snapshot_start_time_max = snapshot_start_time
  204. snapshot_id = snapshot['SnapshotId']
  205. return snapshot_id
  206. def offline_drive(disk_number=1):
  207. """Use diskpart to offline a Windows drive"""
  208. with tempfile.NamedTemporaryFile(delete=False) as f:
  209. f.write(f"""
  210. select disk {disk_number}
  211. offline disk
  212. """.encode('utf-8'))
  213. subprocess.run(['diskpart', '/s', f.name])
  214. os.unlink(f.name)
  215. def create_volume(ec2_client, availability_zone, snapshot_hint, repository_name, project, pipeline, branch, platform, build_type, disk_size, disk_type):
  216. # The actual EBS default calculation for IOps is a floating point number, the closest approxmiation is 4x of the disk size for simplicity
  217. mount_name = get_mount_name(repository_name, project, pipeline, branch, platform, build_type)
  218. pipeline_and_branch = get_pipeline_and_branch(pipeline, branch)
  219. parameters = dict(
  220. AvailabilityZone=availability_zone,
  221. VolumeType=disk_type,
  222. Encrypted=True,
  223. TagSpecifications= [{
  224. 'ResourceType': 'volume',
  225. 'Tags': [
  226. {'Key': 'Name', 'Value': mount_name},
  227. {'Key': 'RepositoryName', 'Value': repository_name},
  228. {'Key': 'Project', 'Value': project},
  229. {'Key': 'Pipeline', 'Value': pipeline},
  230. {'Key': 'BranchName', 'Value': branch},
  231. {'Key': 'Platform', 'Value': platform},
  232. {'Key': 'BuildType', 'Value': build_type},
  233. # Used so the snapshoting easily identifies which volumes to snapshot
  234. {'Key': 'PipelineAndBranch', 'Value': pipeline_and_branch},
  235. ]
  236. }]
  237. )
  238. # The actual EBS default calculation for IOps is a floating point number,
  239. # the closest approxmiation is 4x of the disk size for simplicity
  240. if 'io1' in disk_type.lower():
  241. parameters['Iops'] = (4 * disk_size)
  242. snapshot_id = find_snapshot_id(ec2_client, snapshot_hint, repository_name, project, pipeline, platform, build_type, disk_size)
  243. if snapshot_id:
  244. parameters['SnapshotId'] = snapshot_id
  245. created = False
  246. else:
  247. # If no snapshot id, we need to specify the size
  248. parameters['Size'] = disk_size
  249. created = True
  250. response = ec2_client.create_volume(**parameters)
  251. volume_id = response['VolumeId']
  252. # give some time for the creation call to complete
  253. time.sleep(1)
  254. response = ec2_client.describe_volumes(VolumeIds=[volume_id, ])
  255. with timeout(DEFAULT_TIMEOUT, 'ERROR: Timeout reached trying to create EBS.'):
  256. while response['Volumes'][0]['State'] != 'available':
  257. time.sleep(1)
  258. response = ec2_client.describe_volumes(VolumeIds=[volume_id, ])
  259. print(f"Volume {volume_id} created\n\tSnapshot: {snapshot_id}\n\tRepository {repository_name}\n\t"
  260. f"Project {project}\n\tPipeline {pipeline}\n\tBranch {branch}\n\tPlatform: {platform}\n\tBuild type: {build_type}")
  261. return volume_id, created
  262. def mount_volume_to_device(created):
  263. print('Mounting volume...')
  264. if os.name == 'nt':
  265. # Verify drive is in an offline state.
  266. # Some Windows configs will automatically set new drives as online causing diskpart setup script to fail.
  267. offline_drive()
  268. with tempfile.NamedTemporaryFile(delete=False) as f:
  269. f.write("""
  270. select disk 1
  271. online disk
  272. attribute disk clear readonly
  273. """.encode('utf-8')) # assume disk # for now
  274. if created:
  275. print('Creating filesystem on new volume')
  276. f.write("""
  277. create partition primary
  278. select partition 1
  279. format quick fs=ntfs
  280. assign
  281. active
  282. """.encode('utf-8'))
  283. subprocess.call(['diskpart', '/s', f.name])
  284. time.sleep(5)
  285. print_drives()
  286. os.unlink(f.name)
  287. time.sleep(1)
  288. else:
  289. device_name = '/dev/xvdf'
  290. nvme_device_name = '/dev/nvme1n1'
  291. if os.path.exists(nvme_device_name):
  292. device_name = nvme_device_name
  293. subprocess.call(['file', '-s', device_name])
  294. if created:
  295. subprocess.call(['mkfs', '-t', 'ext4', device_name])
  296. subprocess.call(['mount', device_name, MOUNT_PATH])
  297. def attach_volume_to_ec2_instance(volume, volume_id, instance_id, timeout_duration=DEFAULT_TIMEOUT):
  298. print(f'Attaching volume {volume_id} to instance {instance_id}')
  299. volume.attach_to_instance(Device='xvdf',
  300. InstanceId=instance_id,
  301. VolumeId=volume_id)
  302. # give a little bit of time for the aws call to process
  303. time.sleep(2)
  304. # reload the volume just in case
  305. volume.load()
  306. with timeout(timeout_duration, 'ERROR: Timeout reached trying to mount EBS.'):
  307. while len(volume.attachments) and volume.attachments[0]['State'] != 'attached':
  308. time.sleep(1)
  309. volume.load()
  310. volume.create_tags(
  311. Tags=[
  312. {
  313. 'Key': 'AttachDate',
  314. 'Value': str(datetime.datetime.today().date())
  315. },
  316. ]
  317. )
  318. print(f'Volume {volume_id} has been attached to instance {instance_id}')
  319. def unmount_volume_from_device():
  320. print('Unmounting EBS volume from device...')
  321. if os.name == 'nt':
  322. kill_processes(MOUNT_PATH + 'workspace')
  323. offline_drive()
  324. else:
  325. kill_processes(MOUNT_PATH)
  326. subprocess.call(['umount', '-fl', MOUNT_PATH])
  327. def detach_volume_from_ec2_instance(volume, ec2_instance_id, force, timeout_duration=DEFAULT_TIMEOUT):
  328. print(f'Detaching volume {volume.volume_id} from instance {ec2_instance_id}')
  329. volume.detach_from_instance(Device='xvdf',
  330. Force=force,
  331. InstanceId=ec2_instance_id,
  332. VolumeId=volume.volume_id)
  333. try:
  334. with timeout(timeout_duration, 'ERROR: Timeout reached trying to unmount EBS.'):
  335. while len(volume.attachments) and volume.attachments[0]['State'] != 'detached':
  336. time.sleep(1)
  337. volume.load()
  338. except TimeoutError:
  339. print('Force detaching EBS.')
  340. volume.detach_from_instance(Device='xvdf', Force=True, InstanceId=ec2_instance_id, VolumeId=volume.volume_id)
  341. print(f'Volume {volume.volume_id} has been detached from instance {ec2_instance_id}')
  342. volume.load()
  343. if len(volume.attachments):
  344. print('Volume still has attachments')
  345. for attachment in volume.attachments:
  346. print(f"Volume {attachment['VolumeId']} {attachment['State']} to instance {attachment['InstanceId']}")
  347. def mount_ebs(snapshot_hint, repository_name, project, pipeline, branch, platform, build_type, disk_size, disk_type):
  348. region = get_region_name()
  349. ec2_client = get_ec2_client(region)
  350. ec2_instance_id = get_ec2_instance_id()
  351. ec2_availability_zone = get_availability_zone()
  352. ec2_resource = get_ec2_resource(region)
  353. ec2_instance = ec2_resource.Instance(ec2_instance_id)
  354. for volume in ec2_instance.volumes.all():
  355. for attachment in volume.attachments:
  356. print(f"attachment device: {attachment['Device']}")
  357. if 'xvdf' in attachment['Device'] and attachment['State'] != 'detached':
  358. print('A device is already attached to xvdf. This likely means a previous build failed to detach its '
  359. 'build volume. This volume is considered orphaned and will be detached from this instance.')
  360. unmount_volume_from_device()
  361. detach_volume_from_ec2_instance(volume, ec2_instance_id,
  362. False) # Force unmounts should not be used, as that will cause the EBS block device driver to fail the remount
  363. mount_name = get_mount_name(repository_name, project, pipeline, branch, platform, build_type)
  364. response = ec2_client.describe_volumes(Filters=[{
  365. 'Name': 'tag:Name', 'Values': [mount_name]
  366. }])
  367. created = False
  368. if 'Volumes' in response and not len(response['Volumes']):
  369. print(f'Volume for {mount_name} doesn\'t exist creating it...')
  370. # volume doesn't exist, create it
  371. volume_id, created = create_volume(ec2_client, ec2_availability_zone, snapshot_hint, repository_name, project, pipeline, branch, platform, build_type, disk_size, disk_type)
  372. else:
  373. volume = response['Volumes'][0]
  374. volume_id = volume['VolumeId']
  375. print(f"Current volume {volume_id} is a {volume['Size']} GB {volume['VolumeType']}")
  376. if volume['Size'] != disk_size or volume['VolumeType'] != disk_type:
  377. print(
  378. f'Override disk attributes does not match the existing volume, deleting {volume_id} and replacing the volume')
  379. delete_volume(ec2_client, volume_id)
  380. volume_id, created = create_volume(ec2_client, ec2_availability_zone, snapshot_hint, repository_name, project, pipeline, branch, platform, build_type, disk_size, disk_type)
  381. if len(volume['Attachments']):
  382. # this is bad we shouldn't be attached, we should have detached at the end of a build
  383. attachment = volume['Attachments'][0]
  384. print(f'Volume already has attachment {attachment}, detaching...')
  385. detach_volume_from_ec2_instance(ec2_resource.Volume(volume_id), attachment['InstanceId'], True)
  386. volume = ec2_resource.Volume(volume_id)
  387. print_drives()
  388. attach_volume_to_ec2_instance(volume, volume_id, ec2_instance_id)
  389. mount_volume_to_device(created)
  390. print_drives()
  391. free_space_mb = get_free_space_mb(MOUNT_PATH)
  392. print(f'Free disk space {free_space_mb}MB')
  393. if free_space_mb < LOW_EBS_DISK_SPACE_LIMIT:
  394. print(f'Volume is running below EBS free disk space treshhold {LOW_EBS_DISK_SPACE_LIMIT}MB. Recreating volume and running clean build.')
  395. unmount_volume_from_device()
  396. detach_volume_from_ec2_instance(volume, ec2_instance_id, False)
  397. delete_volume(ec2_client, volume_id)
  398. new_disk_size = int(volume.size * 1.25)
  399. if new_disk_size > MAX_EBS_DISK_SIZE:
  400. print(f'Error: EBS disk size reached to the allowed maximum disk size {MAX_EBS_DISK_SIZE}MB, please contact ly-infra@ and ly-build@ to investigate.')
  401. exit(1)
  402. print('Recreating the EBS with disk size {}'.format(new_disk_size))
  403. volume_id, created = create_volume(ec2_client, ec2_availability_zone, snapshot_hint, repository_name, project, pipeline, branch, platform, build_type, new_disk_size, disk_type)
  404. volume = ec2_resource.Volume(volume_id)
  405. attach_volume_to_ec2_instance(volume, volume_id, ec2_instance_id)
  406. mount_volume_to_device(created)
  407. def unmount_ebs():
  408. region = get_region_name()
  409. ec2_instance_id = get_ec2_instance_id()
  410. ec2_resource = get_ec2_resource(region)
  411. ec2_instance = ec2_resource.Instance(ec2_instance_id)
  412. if os.path.isfile('envinject.properties'):
  413. os.remove('envinject.properties')
  414. volume = None
  415. for attached_volume in ec2_instance.volumes.all():
  416. for attachment in attached_volume.attachments:
  417. print(f"attachment device: {attachment['Device']}")
  418. if attachment['Device'] == 'xvdf':
  419. volume = attached_volume
  420. if not volume:
  421. # volume is not mounted
  422. print('Volume is not mounted')
  423. else:
  424. unmount_volume_from_device()
  425. detach_volume_from_ec2_instance(volume, ec2_instance_id, False)
  426. def delete_ebs(repository_name, project, pipeline, branch, platform, build_type):
  427. unmount_ebs()
  428. region = get_region_name()
  429. ec2_client = get_ec2_client(region)
  430. mount_name = get_mount_name(repository_name, project, pipeline, branch, platform, build_type)
  431. response = ec2_client.describe_volumes(Filters=[
  432. {'Name': 'tag:Name', 'Values': [mount_name]}
  433. ])
  434. if 'Volumes' in response and len(response['Volumes']):
  435. volume = response['Volumes'][0]
  436. volume_id = volume['VolumeId']
  437. delete_volume(ec2_client, volume_id)
  438. def main(action, snapshot_hint, repository_name, project, pipeline, branch, platform, build_type, disk_size, disk_type):
  439. if action == 'mount':
  440. mount_ebs(snapshot_hint, repository_name, project, pipeline, branch, platform, build_type, disk_size, disk_type)
  441. elif action == 'unmount':
  442. unmount_ebs()
  443. elif action == 'delete':
  444. delete_ebs(repository_name, project, pipeline, branch, platform, build_type)
  445. if __name__ == "__main__":
  446. args = parse_args()
  447. ret = main(args.action, args.snapshot_hint, args.repository_name, args.project, args.pipeline, args.branch, args.platform, args.build_type, args.disk_size, args.disk_type)
  448. sys.exit(ret)