main.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. import requests
  2. import json
  3. import os
  4. import boto3
  5. import time
  6. import base64
  7. # Function - Get token
  8. def get_token():
  9. response = requests.get('http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token', headers={"Metadata-Flavor":"Google"})
  10. return response.json().get('access_token')
  11. # Function - Decrypt data with KMS key
  12. def decrypt_secret_kms(secret):
  13. token = get_token()
  14. request_suffix = f"{kms_key_id}:decrypt"
  15. request_json_data = {'ciphertext': secret}
  16. response = requests.post('https://kms.yandex/kms/v1/keys/'+request_suffix, data=json.dumps(request_json_data), headers={"Accept":"application/json", "Authorization": "Bearer "+token})
  17. b64_data = response.json().get('plaintext')
  18. return base64.b64decode(b64_data).decode()
  19. # Configuration - Keys
  20. kms_key_id = os.environ['KMS_KEY_ID']
  21. splunk_token = os.environ['SPLUNK_TOKEN_ENCR']
  22. s3_key_encr = os.environ['S3_KEY_ENCR']
  23. s3_secret_encr = os.environ['S3_SECRET_ENCR']
  24. # Configuration - Setting up variables for ElasticSearch
  25. splunk_server = os.environ['SPLUNK_SERVER']
  26. splunk_auth_pw = decrypt_secret_kms(splunk_token)
  27. # Configuration - Setting up variables for S3
  28. s3_key = decrypt_secret_kms(s3_key_encr)
  29. s3_secret = decrypt_secret_kms(s3_secret_encr)
  30. s3_bucket = os.environ['S3_BUCKET']
  31. s3_folder = os.environ['S3_FOLDER']
  32. s3_local = '/tmp/s3'
  33. # Configuration - Sleep time
  34. if(os.getenv('SLEEP_TIME') is not None):
  35. sleep_time = int(os.environ['SLEEP_TIME'])
  36. else:
  37. sleep_time = 240
  38. # State - Setting up S3 client
  39. s3 = boto3.resource('s3',
  40. endpoint_url='https://storage.yandexcloud.net',
  41. aws_access_key_id = s3_key,
  42. aws_secret_access_key = s3_secret
  43. )
  44. # Function - Download JSON logs to local folder
  45. def download_s3_folder(s3_bucket, s3_folder, local_folder=None):
  46. print('JSON download -- STARTED')
  47. bucket = s3.Bucket(s3_bucket)
  48. if not os.path.exists(local_folder):
  49. os.makedirs(local_folder)
  50. for obj in bucket.objects.filter(Prefix=s3_folder):
  51. target = obj.key if local_folder is None \
  52. else os.path.join(local_folder, os.path.relpath(obj.key, s3_folder))
  53. if not os.path.exists(local_folder):
  54. os.makedirs(local_folder)
  55. if obj.key[-1] == '/':
  56. continue
  57. # Downloading JSON logs in a flat-structured way
  58. bucket.download_file(obj.key, local_folder+'/'+target.rsplit('/')[-1])
  59. print('JSON download -- COMPLETE')
  60. # Function - Clean up S3 folder
  61. def delete_objects_s3(s3_bucket, s3_folder):
  62. bucket = s3.Bucket(s3_bucket)
  63. for obj in bucket.objects.filter(Prefix=s3_folder):
  64. if(obj.key != s3_folder+'/'):
  65. bucket.delete_objects(
  66. Delete={
  67. 'Objects': [
  68. {
  69. 'Key': obj.key
  70. },
  71. ]
  72. }
  73. )
  74. print('S3 bucket -- EMPTIED')
  75. # Function - Upload logs to ElasticSearch
  76. def upload_docs_bulk(s3_bucket, s3_folder):
  77. print('JSON upload -- STARTED')
  78. request_suffix = "/services/collector/event"
  79. error_count = 0
  80. for f in os.listdir(s3_local):
  81. if f.endswith(".json"):
  82. with open(f"{s3_local}/{f}", "r") as read_file:
  83. data = json.load(read_file)
  84. result = [json.dumps(record) for record in data]
  85. with open(f"{s3_local}/nd-temp.json", 'w') as obj:
  86. for i in result:
  87. obj.write('{\n')
  88. obj.write('"time":'+' '+ str(time.time()) + ','+ '\n')
  89. obj.write('"event":'+ ' '+i+'\n')
  90. obj.write('}\n')
  91. obj.write('\n')
  92. data_file = open(f"{s3_local}/nd-temp.json", 'rb').read()
  93. response = requests.post(splunk_server+request_suffix, data=data_file, verify=False, headers={"Authorization":"Splunk "+ splunk_auth_pw})
  94. os.remove(s3_local+"/"+f)
  95. if(response.status_code != 200):
  96. error_count += 1
  97. print(response.text)
  98. if(os.path.exists(f"{s3_local}/nd-temp.json")):
  99. os.remove(f"{s3_local}/nd-temp.json")
  100. print(f"JSON upload -- COMPLETE -- {error_count} ERRORS")
  101. if(error_count == 0):
  102. delete_objects_s3(s3_bucket, s3_folder)
  103. # Process - Upload data
  104. def upload_logs():
  105. download_s3_folder(s3_bucket, s3_folder, s3_local)
  106. upload_docs_bulk(s3_bucket, s3_folder)
  107. ### MAIN CONTROL PANEL
  108. upload_logs()
  109. print("Sleep -- STARTED")
  110. time.sleep(sleep_time)