123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- #
- # Copyright (c) Contributors to the Open 3D Engine Project.
- # For complete copyright and license terms please see the LICENSE at the root of this distribution.
- #
- # SPDX-License-Identifier: Apache-2.0 OR MIT
- #
- #
- import argparse
- import logging
- import json
- import socket
- from datetime import datetime
- SOCKET_TIMEOUT = 60
- DATE_FORMAT = "%Y-%m-%dT%H:%M:%S"
- FILEBEAT_PIPELINE = "filebeat"
- FILEBEAT_DEFAULT_IP = "127.0.0.1"
- FILEBEAT_DEFAULT_PORT = 9000
- def parse_args():
- parser = argparse.ArgumentParser(
- prog="submit_metrics.py",
- description="Pushes a JSON document via Filebeat.",
- add_help=False
- )
-
- def file_arg(arg):
- try:
- with open(arg) as json_file:
- return json.load(json_file)
- except ValueError:
- raise argparse.ArgumentTypeError("Invalid json file '%s'" % arg)
- parser.add_argument("-f", "--file", default=None, type=file_arg, help="File containing JSON data to upload.")
- parser.add_argument("-i", "--index", default=None, help="Index to use when sending the data")
- parser.add_argument("-ip", "--filebeat_ip", default=FILEBEAT_DEFAULT_IP, help="IP address where filebeat service is listening")
- parser.add_argument("-port", "--filebeat_port", default=FILEBEAT_DEFAULT_PORT, help="Port where filebeat service is listening")
- return parser.parse_args()
- def submit(index, payload, filebeat_ip = FILEBEAT_DEFAULT_IP, filebeat_port = FILEBEAT_DEFAULT_PORT):
- try:
- filebeat_address = filebeat_ip, filebeat_port
- logging.debug(f"Connecting to Filebeat on '{filebeat_address[0]}:{filebeat_address[1]}'")
- fb_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- fb_socket.settimeout(SOCKET_TIMEOUT)
- fb_socket.connect(filebeat_address)
- event = {
- "index": index,
- "timestamp": datetime.strptime(payload['timestamp'], DATE_FORMAT).strftime(DATE_FORMAT),
- "pipeline": FILEBEAT_PIPELINE,
- "payload": json.dumps(payload),
- }
- # Serialise event, add new line and encode as UTF-8 before sending to Filebeat.
- data = json.dumps(event) + "\n"
- data = data.encode()
-
- total_sent = 0
- logging.debug(f"Sending JSON data")
- while total_sent < len(data):
- try:
- sent = fb_socket.send(data[total_sent:])
- except BrokenPipeError:
- print("An exception occurred while sending data")
- fb_socket.close()
- total_sent = 0
- else:
- total_sent = total_sent + sent
- logging.debug("JSON data sent")
- fb_socket.close()
- logging.debug(f"Disconnected from Filebeat on '{filebeat_address[0]}:{filebeat_address[1]}'")
- except (ConnectionError, socket.timeout):
- logging.error("Failed to connect to Filebeat")
- return False
- return True
- if __name__ == "__main__":
- # Parse CLI arguments.
- args = parse_args()
- if not args.index:
- logging.error(f"Index not specified")
- exit(1)
-
- if not submit(args.index, json.dumps(args.file), args.filebeat_ip, args.filebeat_port):
- exit(1)
|