123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318 |
- #!/usr/bin/env python3
- import requests
- import os
- import time
- import argparse
- import logging
- import shutil
- import zipfile
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s [%(levelname)s] %(message)s [%(filename)s:%(lineno)d]",
- handlers=[logging.StreamHandler()],
- )
- # The URL of your Flask server
- BASE_URL = os.getenv("BASE_URL") or "http://localhost:5000"
- # The secret key for API authentication
- SECRET_KEY = os.getenv("SECRET_KEY") or "worldpeace2024"
- # The headers for API requests
- HEADERS = {"Authorization": f"Bearer {SECRET_KEY}"}
- SIGN_TIMEOUT = int(os.getenv("SIGN_TIMEOUT") or "30")
- TIMEOUT = float(os.getenv("TIMEOUT") or "900")
- def create(task_name, file_path=None):
- if file_path is None:
- response = requests.post(
- f"{BASE_URL}/tasks/{task_name}", timeout=TIMEOUT, headers=HEADERS
- )
- else:
- with open(file_path, "rb") as f:
- files = {"file": f}
- response = requests.post(
- f"{BASE_URL}/tasks/{task_name}",
- timeout=TIMEOUT,
- headers=HEADERS,
- files=files,
- )
- return get_json(response)
- def upload_file(task_id, file_path):
- with open(file_path, "rb") as f:
- files = {"file": f}
- response = requests.post(
- f"{BASE_URL}/tasks/{task_id}/files",
- timeout=TIMEOUT,
- headers=HEADERS,
- files=files,
- )
- return get_json(response)
- def get_status(task_id):
- response = requests.get(
- f"{BASE_URL}/tasks/{task_id}/status", timeout=TIMEOUT, headers=HEADERS
- )
- return get_json(response)
- def download_files(task_id, output_dir, fn=None):
- response = requests.get(
- f"{BASE_URL}/tasks/{task_id}/files",
- timeout=TIMEOUT,
- headers=HEADERS,
- stream=True,
- )
- # Check if the request was successful
- if fn is None:
- fn = f"task_{task_id}_files.zip"
- if response.status_code == 200:
- # Save the file to the output directory
- with open(os.path.join(output_dir, fn), "wb") as f:
- for chunk in response.iter_content(chunk_size=1024):
- if chunk:
- f.write(chunk)
- return response.ok
- def download_one_file(task_id, file_id, output_dir):
- response = requests.get(
- f"{BASE_URL}/tasks/{task_id}/files/{file_id}",
- timeout=TIMEOUT,
- headers=HEADERS,
- stream=True,
- )
- # Check if the request was successful
- if response.status_code == 200:
- # Save the file to the output directory
- with open(os.path.join(output_dir, file_id), "wb") as f:
- for chunk in response.iter_content(chunk_size=1024):
- if chunk:
- f.write(chunk)
- return response.ok
- def fetch(tag=None):
- response = requests.get(
- f"{BASE_URL}/tasks/fetch_task" + ("?tag=%s" % tag if tag else ""),
- timeout=TIMEOUT,
- headers=HEADERS,
- )
- return get_json(response)
- def update_status(task_id, status):
- response = requests.patch(
- f"{BASE_URL}/tasks/{task_id}/status",
- timeout=TIMEOUT,
- headers=HEADERS,
- json=status,
- )
- return get_json(response)
- def delete_task(task_id):
- response = requests.delete(
- f"{BASE_URL}/tasks/{task_id}",
- timeout=TIMEOUT,
- headers=HEADERS,
- )
- return get_json(response)
- def sign(file_path):
- res = create("sign", file_path)
- if res.ok:
- task_id = res.task_id
- # Poll the status every second
- while True:
- status = get_status(task_id)
- if status["status"] == "done":
- # Download the files
- download_files(task_id, "output")
- # Delete the task
- delete_task(task_id)
- break
- time.sleep(1)
- def sign_one_file(file_path):
- logging.info(f"Signing {file_path}")
- res = create("sign", file_path)
- logging.info(f"Uploaded {file_path}")
- task_id = res["id"]
- n = 0
- while True:
- if n >= SIGN_TIMEOUT:
- delete_task(task_id)
- logging.error(f"Failed to sign {file_path}")
- break
- time.sleep(6)
- n += 1
- status = get_status(task_id)
- if status and status.get("state") == "done":
- download_one_file(
- task_id, os.path.basename(file_path), os.path.dirname(file_path)
- )
- delete_task(task_id)
- logging.info(f"Signed {file_path}")
- return True
- return False
- def get_json(response):
- try:
- return response.json()
- except Exception as e:
- raise Exception(response.text)
- SIGN_EXTENSIONS = [
- ".dll",
- ".exe",
- ".sys",
- ".vxd",
- ".msix",
- ".msixbundle",
- ".appx",
- ".appxbundle",
- ".msi",
- ".msp",
- ".msm",
- ".cab",
- ".ps1",
- ".psm1",
- ]
- def sign_files(dir_path, only_ext=None):
- if only_ext:
- only_ext = only_ext.split(",")
- for i in range(len(only_ext)):
- if not only_ext[i].startswith("."):
- only_ext[i] = "." + only_ext[i]
- for root, dirs, files in os.walk(dir_path):
- for file in files:
- file_path = os.path.join(root, file)
- _, ext = os.path.splitext(file_path)
- if only_ext and ext not in only_ext:
- continue
- if ext in SIGN_EXTENSIONS:
- if not sign_one_file(file_path):
- logging.error(f"Failed to sign {file_path}")
- break
- def main():
- parser = argparse.ArgumentParser(
- description="Command line interface for task operations."
- )
- subparsers = parser.add_subparsers(dest="command")
- # Create a parser for the "sign_one_file" command
- sign_one_file_parser = subparsers.add_parser(
- "sign_one_file", help="Sign a single file."
- )
- sign_one_file_parser.add_argument("file_path", help="The path of the file to sign.")
- # Create a parser for the "sign_files" command
- sign_files_parser = subparsers.add_parser(
- "sign_files", help="Sign all files in a directory."
- )
- sign_files_parser.add_argument(
- "dir_path", help="The path of the directory containing the files to sign."
- )
- sign_files_parser.add_argument(
- "only_ext", help="The file extension to sign.", default=None, nargs="?"
- )
- # Create a parser for the "fetch" command
- fetch_parser = subparsers.add_parser("fetch", help="Fetch a task.")
- # Create a parser for the "update_status" command
- update_status_parser = subparsers.add_parser(
- "update_status", help="Update the status of a task."
- )
- update_status_parser.add_argument("task_id", help="The ID of the task to update.")
- update_status_parser.add_argument("status", help="The new status of the task.")
- # Create a parser for the "delete_task" command
- delete_task_parser = subparsers.add_parser("delete_task", help="Delete a task.")
- delete_task_parser.add_argument("task_id", help="The ID of the task to delete.")
- # Create a parser for the "create" command
- create_parser = subparsers.add_parser("create", help="Create a task.")
- create_parser.add_argument("task_name", help="The name of the task to create.")
- create_parser.add_argument(
- "file_path",
- help="The path of the file for the task.",
- default=None,
- nargs="?",
- )
- # Create a parser for the "upload_file" command
- upload_file_parser = subparsers.add_parser(
- "upload_file", help="Upload a file to a task."
- )
- upload_file_parser.add_argument(
- "task_id", help="The ID of the task to upload the file to."
- )
- upload_file_parser.add_argument("file_path", help="The path of the file to upload.")
- # Create a parser for the "get_status" command
- get_status_parser = subparsers.add_parser(
- "get_status", help="Get the status of a task."
- )
- get_status_parser.add_argument(
- "task_id", help="The ID of the task to get the status of."
- )
- # Create a parser for the "download_files" command
- download_files_parser = subparsers.add_parser(
- "download_files", help="Download files from a task."
- )
- download_files_parser.add_argument(
- "task_id", help="The ID of the task to download files from."
- )
- download_files_parser.add_argument(
- "output_dir", help="The directory to save the downloaded files to."
- )
- args = parser.parse_args()
- if args.command == "sign_one_file":
- sign_one_file(args.file_path)
- elif args.command == "sign_files":
- sign_files(args.dir_path, args.only_ext)
- elif args.command == "fetch":
- print(fetch())
- elif args.command == "update_status":
- print(update_status(args.task_id, args.status))
- elif args.command == "delete_task":
- print(delete_task(args.task_id))
- elif args.command == "create":
- print(create(args.task_name, args.file_path))
- elif args.command == "upload_file":
- print(upload_file(args.task_id, args.file_path))
- elif args.command == "get_status":
- print(get_status(args.task_id))
- elif args.command == "download_files":
- print(download_files(args.task_id, args.output_dir))
- if __name__ == "__main__":
- main()
|