bq_utils.py 1.0 KB

1234567891011121314151617181920212223242526272829303132
  1. from typing import List
  2. from live import utils
  3. def make_bq_delete(table: str, flags: List[str]) -> str:
  4. project: str = utils.CONSTANTS['bigquery_project']
  5. flags: str = ' '.join(flags)
  6. statement: str = f'delete from {project}.{table} where true'
  7. query: str = f"bq query {flags} '{statement}'"
  8. return query
  9. def make_bq_query(
  10. select: str, where_clauses: List[str] = [], flags: List[str] = []) -> str:
  11. flags: str = ' '.join(flags)
  12. where_clauses: List[str] = [f'({clause})' for clause in where_clauses]
  13. where_clauses: str = ' and '.join(where_clauses)
  14. query: str = f"bq query {flags} '{select} where {where_clauses}'"
  15. return query
  16. def make_bq_load(table: str, csv_path: str, schema: str) -> str:
  17. project: str = utils.CONSTANTS['bigquery_project']
  18. flags: List[str] = [
  19. "--skip_leading_rows=1",
  20. "--field_delimiter='\t'",
  21. "--source_format=CSV",
  22. ]
  23. flags: str = ' '.join(flags)
  24. command: str = f"bq load {flags} {table} {csv_path} {schema}"
  25. return command