submit_metrics.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. #
  2. # Copyright (c) Contributors to the Open 3D Engine Project.
  3. # For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. #
  5. # SPDX-License-Identifier: Apache-2.0 OR MIT
  6. #
  7. #
  8. import argparse
  9. import logging
  10. import json
  11. import socket
  12. from datetime import datetime
  13. SOCKET_TIMEOUT = 60
  14. DATE_FORMAT = "%Y-%m-%dT%H:%M:%S"
  15. FILEBEAT_PIPELINE = "filebeat"
  16. FILEBEAT_DEFAULT_IP = "127.0.0.1"
  17. FILEBEAT_DEFAULT_PORT = 9000
  18. def parse_args():
  19. parser = argparse.ArgumentParser(
  20. prog="submit_metrics.py",
  21. description="Pushes a JSON document via Filebeat.",
  22. add_help=False
  23. )
  24. def file_arg(arg):
  25. try:
  26. with open(arg) as json_file:
  27. return json.load(json_file)
  28. except ValueError:
  29. raise argparse.ArgumentTypeError("Invalid json file '%s'" % arg)
  30. parser.add_argument("-f", "--file", default=None, type=file_arg, help="File containing JSON data to upload.")
  31. parser.add_argument("-i", "--index", default=None, help="Index to use when sending the data")
  32. parser.add_argument("-ip", "--filebeat_ip", default=FILEBEAT_DEFAULT_IP, help="IP address where filebeat service is listening")
  33. parser.add_argument("-port", "--filebeat_port", default=FILEBEAT_DEFAULT_PORT, help="Port where filebeat service is listening")
  34. return parser.parse_args()
  35. def submit(index, payload, filebeat_ip = FILEBEAT_DEFAULT_IP, filebeat_port = FILEBEAT_DEFAULT_PORT):
  36. try:
  37. filebeat_address = filebeat_ip, filebeat_port
  38. logging.debug(f"Connecting to Filebeat on '{filebeat_address[0]}:{filebeat_address[1]}'")
  39. fb_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  40. fb_socket.settimeout(SOCKET_TIMEOUT)
  41. fb_socket.connect(filebeat_address)
  42. event = {
  43. "index": index,
  44. "timestamp": datetime.strptime(payload['timestamp'], DATE_FORMAT).strftime(DATE_FORMAT),
  45. "pipeline": FILEBEAT_PIPELINE,
  46. "payload": json.dumps(payload),
  47. }
  48. # Serialise event, add new line and encode as UTF-8 before sending to Filebeat.
  49. data = json.dumps(event) + "\n"
  50. data = data.encode()
  51. total_sent = 0
  52. logging.debug(f"Sending JSON data")
  53. while total_sent < len(data):
  54. try:
  55. sent = fb_socket.send(data[total_sent:])
  56. except BrokenPipeError:
  57. print("An exception occurred while sending data")
  58. fb_socket.close()
  59. total_sent = 0
  60. else:
  61. total_sent = total_sent + sent
  62. logging.debug("JSON data sent")
  63. fb_socket.close()
  64. logging.debug(f"Disconnected from Filebeat on '{filebeat_address[0]}:{filebeat_address[1]}'")
  65. except (ConnectionError, socket.timeout):
  66. logging.error("Failed to connect to Filebeat")
  67. return False
  68. return True
  69. if __name__ == "__main__":
  70. # Parse CLI arguments.
  71. args = parse_args()
  72. if not args.index:
  73. logging.error(f"Index not specified")
  74. exit(1)
  75. if not submit(args.index, json.dumps(args.file), args.filebeat_ip, args.filebeat_port):
  76. exit(1)