data.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. """
  2. Contains utilities to download recent data needed to run heuristics
  3. on tornado cash data.
  4. This script relies on `bq`. Make sure that this is setup on the instance
  5. running this script.
  6. Most of the heuristics require access to all of tornado cash at once.
  7. As such, we cannot just download a daily slice (an increment) and
  8. process that. The best we can do is the following:
  9. 1) Keep a table of Tornado Cash trace and transaction data in BigQuery.
  10. We will update this table daily.
  11. 2) Once updated, the table will be exported a Google cloud bucket. We
  12. will delete prior contents in bucket before exporting.
  13. 3) Download the trace and transactions buckets locally and rename to
  14. expected tornado_trace.csv and tornado_transaction.csv.
  15. 4) Run the preprocessing script to generate complete_withdraw_tx.csv
  16. and complete_deposit_tx.csv.
  17. 5) Save to disk (overwrite old files).
  18. """
  19. import os
  20. import sys
  21. import numpy as np
  22. import pandas as pd
  23. from os.path import join
  24. from typing import Tuple, Optional, List, Dict, Any
  25. from live import utils
  26. from live.bq_utils import make_bq_delete, make_bq_load, make_bq_query
  27. from src.tcash.data import decode_transactions
  28. def get_last_block():
  29. """
  30. Read the current transactions dataframe to see what the latest block is.
  31. We will grab all data from the block and after.
  32. """
  33. data_path: str = utils.CONSTANTS['data_path']
  34. tcash_path: str = join(data_path, 'live/tornado_cash')
  35. transactions: pd.DataFrame = pd.read_csv(join(tcash_path, 'tornado_transactions.csv'))
  36. last_block: int = int(transactions.block_number.max())
  37. return last_block
  38. def update_bigquery(
  39. start_block: int,
  40. delete_before: bool = False) -> Tuple[bool, Dict[str, Any]]:
  41. """
  42. Run SQL queries against BigQuery to insert the most recent data into
  43. the following tables.
  44. tornado_transactions.traces
  45. tornado_transactions.transactions
  46. We assume your bigquery project has a `tornado_transactions` dataset
  47. with the two tables already existing. If not, please make them prior
  48. to running this script.
  49. We intentionally use `bq` instead of the Python BigQuery library as
  50. `bq` is orders of magnitudes faster.
  51. """
  52. project: str = utils.CONSTANTS['bigquery_project']
  53. bq_trace: str = 'bigquery-public-data.crypto_ethereum.traces'
  54. bq_transaction: str = 'bigquery-public-data.crypto_ethereum.transactions'
  55. contract_sql: str = f'select address from {project}.tornado_transactions.tornadocontracts'
  56. subtrace_sql: str = f'select transaction_hash from {project}.tornado_transactions.traces'
  57. trace_table: str = 'tornado_transactions.traces'
  58. transaction_table: str = 'tornado_transactions.transactions'
  59. miner_table: str = 'tornado_transactions.miner_transactions'
  60. flags: List[str] = ['--use_legacy_sql=false']
  61. if delete_before:
  62. trace_init: str = make_bq_delete(trace_table, flags = flags)
  63. transaction_init: str = make_bq_delete(transaction_table, flags = flags)
  64. miner_init: str = make_bq_delete(miner_table, flags = flags)
  65. trace_init_success: bool = utils.execute_bash(trace_init)
  66. transaction_init_success: bool = utils.execute_bash(transaction_init)
  67. miner_init_success: bool = utils.execute_bash(miner_init)
  68. else: # nothing to do
  69. trace_init_success, transaction_init_success, miner_init_success = True, True, True
  70. trace_query: str = make_bq_query(
  71. f'insert into {project}.{trace_table} select * from {bq_trace}',
  72. where_clauses = [
  73. f'to_address in ({contract_sql})',
  74. 'substr(input, 1, 10) in ("0xb214faa5", "0x21a0adb6")',
  75. f'block_number > {start_block}',
  76. ],
  77. flags = flags,
  78. )
  79. transaction_query: str = make_bq_query(
  80. f'insert into {project}.{transaction_table} select * from {bq_transaction} as b',
  81. where_clauses = [
  82. f'b.hash in ({subtrace_sql})',
  83. f'b.block_number > {start_block}',
  84. ],
  85. flags = flags,
  86. )
  87. # This is for the TORN mining heuristic -- we need to get miner's txs
  88. miner_query: str = make_bq_query(
  89. f'insert into {project}.{miner_table} select * from {bq_transaction}',
  90. where_clauses = [
  91. 'to_address = "0x746aebc06d2ae31b71ac51429a19d54e797878e9"',
  92. f'block_number > {start_block}',
  93. ],
  94. flags = flags,
  95. )
  96. trace_query_success: bool = utils.execute_bash(trace_query)
  97. transaction_query_success: bool = utils.execute_bash(transaction_query)
  98. miner_query_success: bool = utils.execute_bash(miner_query)
  99. success: bool = (trace_init_success and trace_query_success) and \
  100. (transaction_init_success and transaction_query_success) and \
  101. (miner_init_success and miner_query_success)
  102. return success, {}
  103. def empty_bucket() -> Tuple[bool, Dict[str, Any]]:
  104. """
  105. Make sure nothing is in bucket (we want to overwrite).
  106. """
  107. trace_success: bool = utils.delete_bucket_contents('tornado-trace')
  108. transaction_success: bool = utils.delete_bucket_contents('tornado-transaction')
  109. miner_success: bool = utils.delete_bucket_contents('tornado-miner-transaction')
  110. success: bool = trace_success and transaction_success and miner_success
  111. return success, {}
  112. def update_bucket() -> Tuple[bool, Dict[str, Any]]:
  113. """
  114. Move the updated bigquery data to bucket.
  115. """
  116. project: str = utils.CONSTANTS['bigquery_project']
  117. trace_success: bool = utils.export_bigquery_table_to_cloud_bucket(
  118. f'{project}.tornado_transactions',
  119. 'traces',
  120. 'tornado-trace',
  121. )
  122. transaction_success: bool = utils.export_bigquery_table_to_cloud_bucket(
  123. f'{project}.tornado_transactions',
  124. 'transactions',
  125. 'tornado-transaction',
  126. )
  127. miner_success: bool = utils.export_bigquery_table_to_cloud_bucket(
  128. f'{project}.tornado_transactions',
  129. 'miner_transactions',
  130. 'tornado-miner-transaction',
  131. )
  132. success: bool = trace_success and transaction_success and miner_success
  133. return success, {}
  134. def download_bucket() -> Tuple[bool, Any]:
  135. """
  136. Make sure nothing is in bucket (we want to overwrite).
  137. """
  138. data_path: str = utils.CONSTANTS['data_path']
  139. out_dir = join(data_path, 'live/tornado_cash')
  140. trace_success, trace_files = utils.export_cloud_bucket_to_csv('tornado-trace', out_dir)
  141. transaction_success, transaction_files = utils.export_cloud_bucket_to_csv('tornado-transaction', out_dir)
  142. miner_success, miner_files = utils.export_cloud_bucket_to_csv('tornado-miner-transaction', out_dir)
  143. success: bool = trace_success and transaction_success and miner_success
  144. data = {'trace': trace_files, 'transaction': transaction_files, 'miner': miner_files}
  145. return success, data
  146. def get_deposit_and_withdraw(
  147. trace_df: pd.DataFrame,
  148. transaction_df: pd.DataFrame) -> Tuple[bool, Dict[str, pd.DataFrame]]:
  149. data_path: str = utils.CONSTANTS['data_path']
  150. contract_dir = join(data_path, 'static/tcash')
  151. try:
  152. address_df: pd.DataFrame = pd.read_csv(
  153. join(contract_dir, 'tornado_contract_abi.csv'),
  154. names=['address', 'token', 'value', 'name','abi'],
  155. sep='|')
  156. proxy_df = pd.read_csv(
  157. join(contract_dir, 'tornado_proxy_abi.csv'),
  158. names=['address', 'abi'],
  159. sep='|')
  160. deposit_df, withdraw_df = decode_transactions(
  161. address_df, proxy_df, transaction_df, trace_df)
  162. success: bool = True
  163. data = {'withdraw': withdraw_df, 'deposit': deposit_df}
  164. except:
  165. success: bool = False
  166. data: Dict[str, pd.DataFrame] = {}
  167. return success, data
  168. def external_pipeline(
  169. start_block: int,
  170. deposit_df: pd.DataFrame,
  171. withdraw_df: pd.DataFrame,
  172. delete_before: bool = False) -> Tuple[bool, Dict[str, Any]]:
  173. """
  174. We need to update another bigquery table for external transactions
  175. between TornadoCash users. We must do this separately because we
  176. need `deposit_df` and `withdraw_df`.
  177. This function will also move the table to a google bucket and
  178. download that bucket locally, combine, sort, and save.
  179. """
  180. deposit_addresses: List[str] = deposit_df.from_address.unique().tolist()
  181. withdraw_addresses: List[str] = withdraw_df.recipient_address.unique().tolist()
  182. deposit_address_df: pd.DataFrame = pd.DataFrame.from_dict(
  183. {'address': deposit_addresses})
  184. withdraw_address_df: pd.DataFrame = pd.DataFrame.from_dict(
  185. {'address': withdraw_addresses})
  186. deposit_file: str = save_file(deposit_address_df, 'deposit_addrs.csv')
  187. withdraw_file: str = save_file(withdraw_address_df, 'withdraw_addrs.csv')
  188. # upload files to bigquery
  189. flags: List[str] = ['--use_legacy_sql=false']
  190. deposit_address_table: str = 'tornado_transactions.deposit_addresses'
  191. withdraw_address_table: str = 'tornado_transactions.withdraw_addresses'
  192. if delete_before:
  193. withdraw_init: str = make_bq_delete(withdraw_address_table, flags = flags)
  194. deposit_init: str = make_bq_delete(deposit_address_table, flags = flags)
  195. deposit_init_success: bool = utils.execute_bash(deposit_init)
  196. withdraw_init_success: bool = utils.execute_bash(withdraw_init)
  197. else:
  198. deposit_init_success, withdraw_init_success = True, True
  199. deposit_query: str = make_bq_load(deposit_address_table, deposit_file, 'address:string')
  200. withdraw_query: str = make_bq_load(withdraw_address_table, withdraw_file, 'address:string')
  201. deposit_query_success: bool = utils.execute_bash(deposit_query)
  202. withdraw_query_success: bool = utils.execute_bash(withdraw_query)
  203. success: bool = (deposit_init_success and deposit_query_success) and \
  204. (withdraw_init_success and withdraw_query_success)
  205. if not success:
  206. return success, {}
  207. project: str = utils.CONSTANTS['bigquery_project']
  208. external_table: str = 'tornado_transactions.external_transactions'
  209. insert: str = f'insert into {project}.{external_table}'
  210. select: str = 'select * from bigquery-public-data.crypto_ethereum.transactions'
  211. deposit_select: str = f'select address from {deposit_address_table}'
  212. withdraw_select: str = f'select address from {withdraw_address_table}'
  213. where_clauses: List[str] = [
  214. f'(from_address in ({deposit_select})) and (to_address in ({withdraw_select}))',
  215. f'(from_address in ({withdraw_select})) and (to_address in ({deposit_select}))',
  216. ]
  217. where_clauses: str = ' or '.join(where_clauses)
  218. where_clauses: List[str] = [
  219. where_clauses,
  220. f'block_number > {start_block}',
  221. ]
  222. where_clauses: str = ' and '.join(where_clauses)
  223. query: str = f"bq query {' '.join(flags)} '{insert} {select} where {where_clauses}'"
  224. if delete_before:
  225. init: str = make_bq_delete(external_table, flags = flags)
  226. success_init: bool = utils.execute_bash(init)
  227. else:
  228. success_init: bool = True
  229. success_query: bool = utils.execute_bash(query)
  230. success: bool = success_init and success_query
  231. if not success:
  232. return success, {}
  233. # clear google bucket
  234. success: bool = utils.delete_bucket_contents('tornado-external-transaction')
  235. if not success:
  236. return success, {}
  237. # now move to google cloud bucket
  238. project: str = utils.CONSTANTS['bigquery_project']
  239. success: bool = utils.export_bigquery_table_to_cloud_bucket(
  240. f'{project}.tornado_transactions',
  241. 'external_transactions',
  242. 'tornado-external-transaction',
  243. )
  244. if not success:
  245. return success, {}
  246. # download google cloud bucket
  247. data_path: str = utils.CONSTANTS['data_path']
  248. out_dir = join(data_path, 'live/tornado_cash')
  249. success, files = utils.export_cloud_bucket_to_csv('tornado-external-transaction', out_dir)
  250. if not success:
  251. return success, {}
  252. external_df: pd.DataFrame = utils.load_data_from_chunks(files)
  253. delete_files(files)
  254. save_file(external_df, 'external_txs.csv')
  255. return True, {}
  256. def save_file(df: pd.DataFrame, name: str):
  257. data_path: str = utils.CONSTANTS['data_path']
  258. out_dir: str = join(data_path, 'live/tornado_cash')
  259. out_file: str = join(out_dir, name)
  260. df.to_csv(out_file, index=False)
  261. return out_file
  262. def delete_files(paths: List[str]):
  263. for path in paths:
  264. if os.path.isfile(path):
  265. os.remove(path)
  266. def main(args: Any):
  267. if not args.db_only:
  268. log_path: str = utils.CONSTANTS['log_path']
  269. os.makedirs(log_path, exist_ok=True)
  270. log_file: str = join(log_path, 'tornadocash-data.log')
  271. if os.path.isfile(log_file):
  272. os.remove(log_file) # remove old file (yesterday's)
  273. logger = utils.get_logger(log_file)
  274. if args.scratch:
  275. logger.info('starting from scratch')
  276. last_block: int = 0
  277. else:
  278. logger.info('entering get_last_block')
  279. last_block: int = get_last_block()
  280. logger.info(f'last_block={last_block}')
  281. logger.info('entering update_bigquery')
  282. success, _ = update_bigquery(last_block, delete_before = args.scratch)
  283. if not success:
  284. logger.error('failed on updating bigquery tables')
  285. sys.exit(0)
  286. logger.info('entering empty_bucket')
  287. success, _ = empty_bucket()
  288. if not success:
  289. logger.error('failed on emptying cloud buckets')
  290. sys.exit(0)
  291. logger.info('entering update_bucket')
  292. success, _ = update_bucket()
  293. if not success:
  294. logger.error('failed on updating cloud buckets')
  295. sys.exit(0)
  296. logger.info('entering download_bucket')
  297. success, data = download_bucket()
  298. if not success:
  299. logger.error('failed on downloading cloud buckets')
  300. sys.exit(0)
  301. trace_files: List[str] = data['trace']
  302. transaction_files: List[str] = data['transaction']
  303. miner_files: List[str] = data['miner']
  304. if len(trace_files) == 0:
  305. logger.error('found 0 files for tornado cash traces')
  306. sys.exit(0)
  307. if len(transaction_files) == 0:
  308. logger.error('found 0 files for tornado cash transactions')
  309. sys.exit(0)
  310. if len(miner_files) == 0:
  311. logger.error('found 0 files for tornado cash miners')
  312. sys.exit(0)
  313. logger.info('sorting and combining trace files')
  314. trace_df: pd.DataFrame = utils.load_data_from_chunks(trace_files)
  315. logger.info('sorting and combining transaction files')
  316. transaction_df: pd.DataFrame = utils.load_data_from_chunks(transaction_files)
  317. logger.info('sorting and combining miner files')
  318. miner_df: pd.DataFrame = utils.load_data_from_chunks(miner_files)
  319. # drop duplicates
  320. trace_df.drop_duplicates('transaction_hash', inplace=True)
  321. transaction_df.drop_duplicates('hash', inplace=True)
  322. miner_df.drop_duplicates('hash', inplace=True)
  323. logger.info('saving trace chunks')
  324. save_file(trace_df, 'tornado_traces.csv')
  325. logger.info('saving transaction chunks')
  326. save_file(transaction_df, 'tornado_transactions.csv')
  327. logger.info('saving miner chunks')
  328. save_file(miner_df, 'miner_txs.csv')
  329. logger.info('deleting trace files')
  330. delete_files(trace_files)
  331. logger.info('deleting transaction files')
  332. delete_files(transaction_files)
  333. logger.info('deleting miner files')
  334. delete_files(miner_files)
  335. logger.info('entering get_deposit_and_withdraw')
  336. success, data = get_deposit_and_withdraw(trace_df, transaction_df)
  337. if not success:
  338. logger.error('failed on computing deposit and withdraw dataframes')
  339. sys.exit(0)
  340. deposit_df: pd.DataFrame = data['deposit']
  341. withdraw_df: pd.DataFrame = data['withdraw']
  342. logger.info('entering external_pipeline')
  343. success, _ = external_pipeline(
  344. last_block, deposit_df, withdraw_df, delete_before = args.scratch)
  345. if not success:
  346. logger.error('failed on processing external transactions')
  347. sys.exit(0)
  348. # subset columns
  349. deposit_columns: List[str] = [
  350. 'hash', 'transaction_index', 'from_address', 'to_address', 'gas',
  351. 'gas_price', 'block_number', 'block_hash', 'tornado_cash_address'
  352. ]
  353. withdraw_columns: List[str] = [
  354. 'hash', 'transaction_index', 'from_address', 'to_address', 'gas',
  355. 'gas_price', 'block_number', 'block_hash', 'tornado_cash_address',
  356. 'recipient_address',
  357. ]
  358. deposit_df: pd.DataFrame = deposit_df[deposit_columns]
  359. withdraw_df: pd.DataFrame = withdraw_df[withdraw_columns]
  360. deposit_file: str = save_file(deposit_df, 'deposit_txs.csv')
  361. withdraw_file: str = save_file(withdraw_df, 'withdraw_txs.csv')
  362. else:
  363. data_path: str = utils.CONSTANTS['data_path']
  364. out_dir: str = join(data_path, 'live/tornado_cash')
  365. deposit_file: str = join(out_dir, 'deposit_txs.csv')
  366. withdraw_file: str = join(out_dir, 'withdraw_txs.csv')
  367. if not args.no_db:
  368. import psycopg2
  369. # write csvs into databases
  370. conn = psycopg2.connect(
  371. database = utils.CONSTANTS['postgres_db'],
  372. user = utils.CONSTANTS['postgres_user'],
  373. )
  374. cursor = conn.cursor()
  375. # -- step 1: copy deposits into db
  376. deposit_columns: List[str] = [
  377. 'hash', 'transaction_index', 'from_address', 'to_address', 'gas',
  378. 'gas_price', 'block_number', 'block_hash', 'tornado_cash_address'
  379. ]
  380. deposit_columns: str = ','.join(deposit_columns)
  381. command = f"COPY tornado_deposit({deposit_columns}) FROM '{deposit_file}' DELIMITER ',' CSV HEADER;"
  382. cursor.execute(command)
  383. conn.commit()
  384. # -- step 2: copy deposits into db
  385. withdraw_columns: List[str] = [
  386. 'hash', 'transaction_index', 'from_address', 'to_address', 'gas',
  387. 'gas_price', 'block_number', 'block_hash', 'tornado_cash_address',
  388. 'recipient_address',
  389. ]
  390. withdraw_columns: str = ','.join(withdraw_columns)
  391. command: str = f"COPY tornado_withdraw({withdraw_columns}) FROM '{withdraw_file}' DELIMITER ',' CSV HEADER;"
  392. cursor.execute(command)
  393. conn.commit()
  394. # -- step 3: insert deposits into TornadoPool
  395. select_sql: str = f"select b.hash as transaction, b.from_address as address, b.tornado_cash_address as pool from tornado_deposit as b"
  396. command: str = f"insert into tornado_pool(transaction, address, pool) ({select_sql}) on conflict do nothing;"
  397. cursor.execute(command)
  398. conn.commit()
  399. cursor.close()
  400. conn.close()
  401. if __name__ == "__main__":
  402. import argparse
  403. parser = argparse.ArgumentParser()
  404. parser.add_argument('--scratch', action='store_true', default=False)
  405. parser.add_argument('--no-db', action='store_true', default=False,
  406. help='skip the code to edit database (default: False)')
  407. parser.add_argument('--db-only', action='store_true', default=False,
  408. help='only execute the code to edit database (default: False)')
  409. args = parser.parse_args()
  410. main(args)