quakes_from_db.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. # -*- coding: utf-8 -*-
  2. from typing import Tuple, List, NamedTuple
  3. from mysql.connector import connect, Error # type: ignore
  4. import config
  5. from exceptions import ConnectDatabaseError
  6. from quake_structures import Quake, Sta
  7. from datetime import datetime
  8. class QueryParams(NamedTuple):
  9. from_dt: str
  10. to_dt: str
  11. sta: str
  12. from_mag: str
  13. to_mag: str
  14. comment: str
  15. def _get_sql_query(params: QueryParams) -> str:
  16. from_dt = datetime.strptime(params.from_dt + '+0000', '%Y-%m-%d %H:%M:%S%z')
  17. from_dt_timestamp = from_dt.timestamp()
  18. to_dt = datetime.strptime(params.to_dt + '+0000', '%Y-%m-%d %H:%M:%S%z')
  19. to_dt_timestamp = to_dt.timestamp()
  20. key_words = params.comment.split()
  21. key_words_count = len(key_words)
  22. if key_words_count > 1:
  23. comment_query = "%' OR o.COMMENTS LIKE '%".join(key_words)
  24. elif key_words_count == 1:
  25. comment_query = key_words[0]
  26. else:
  27. comment_query = ""
  28. return f"SELECT" \
  29. f" o.EVENTID, o.ORIGINTIME, ROUND(o.LAT, 2), ROUND(o.LON, 2)," \
  30. f" o.`DEPTH`," \
  31. f" CONCAT(SUBSTR(o.COMMENTS, 1, INSTR(o.COMMENTS, '.') - 3),"\
  32. f" SUBSTR(o.COMMENTS, INSTR(o.COMMENTS, ':') + 4))," \
  33. f" a.ITIME, a.STA, ROUND(a.DIST, 2)," \
  34. f" ROUND(a.AZIMUTH, 2), a.IPHASE, CONCAT(a.IM_EM, a.FM)," \
  35. f" ROUND(a.AMPL, 4), ROUND(a.PER, 2)," \
  36. f" ROUND(a.ML, 1), ROUND(a.MPSP, 1) " \
  37. f"FROM origin o " \
  38. f"INNER JOIN arrival a ON a.EVENTID = o.EVENTID " \
  39. f"WHERE" \
  40. f" (o.COMMENTS LIKE '%{comment_query}%')" \
  41. f" AND" \
  42. f" (a.ITIME BETWEEN '{from_dt_timestamp}' AND " \
  43. f" '{to_dt_timestamp}')" \
  44. f" ORDER BY o.EVENTID"
  45. def get_data(params: QueryParams) -> List[tuple]:
  46. """Returns data of quakes from DB"""
  47. sql = _get_sql_query(params)
  48. try:
  49. with connect(**config.DB, connection_timeout=2) as conn:
  50. with conn.cursor() as cursor:
  51. cursor.execute(sql)
  52. return cursor.fetchall()
  53. except Error as exc:
  54. raise ConnectDatabaseError(exc.msg)
  55. def get_quakes(params: QueryParams) -> Tuple[Quake, ...]:
  56. """Return tuple of Quake from db records"""
  57. quakes = []
  58. stations: List[Sta] = []
  59. origin_dt = datetime(year=1, month=1, day=1)
  60. _id, origin_dtime, lat, lon, depth, reg = \
  61. '', 0.0, 0.0, 0.0, 0.0, ''
  62. quake_records = get_data(params)
  63. for quake_record in quake_records:
  64. if quake_record[0] != _id:
  65. if len(stations) != 0:
  66. stations = _filter_stations(stations)
  67. quake = Quake(_id, origin_dt, lat,
  68. lon, depth, reg, tuple(stations))
  69. quakes.append(quake)
  70. stations.clear()
  71. _id, origin_dtime, lat, lon, depth, reg = quake_record[:6]
  72. origin_dt = datetime.utcfromtimestamp(origin_dtime) \
  73. if origin_dtime is not None else datetime.min
  74. sta_dt = datetime.utcfromtimestamp(quake_record[6])
  75. sta = Sta(sta_dt, *quake_record[7:])
  76. stations.append(sta)
  77. stations = _filter_stations(stations)
  78. quakes.append(Quake(_id, origin_dt, lat, lon,
  79. depth, reg, tuple(stations)))
  80. return tuple(_filter_quakes(quakes, params))
  81. def _add_sta(sta: Sta, stations: List[Sta], prev_sta: Sta | None) -> Sta:
  82. if sta.name in config.STA_RENAME:
  83. sta.name += 'R'
  84. if prev_sta is not None and sta.name == prev_sta.name:
  85. if sta.dist is not None:
  86. prev_sta.dist = sta.dist
  87. else:
  88. sta.dist = prev_sta.dist
  89. if prev_sta is None or (sta.phase_dt != prev_sta.phase_dt) \
  90. or (sta.name != prev_sta.name):
  91. stations.append(sta)
  92. out_sta = sta
  93. else:
  94. if sta.azimuth is not None:
  95. prev_sta.azimuth = sta.azimuth
  96. if sta.ampl is not None:
  97. prev_sta.ampl = sta.ampl
  98. if sta.period is not None:
  99. prev_sta.period = sta.period
  100. if sta.mag_ML is not None:
  101. prev_sta.mag_ML = sta.mag_ML
  102. if sta.mag_MPSP is not None:
  103. prev_sta.mag_MPSP = sta.mag_MPSP
  104. out_sta = prev_sta
  105. return out_sta
  106. def _filter_stations(stations: List[Sta]) -> List[Sta]:
  107. sorted_by_time = sorted(stations, key=lambda x: x.phase_dt)
  108. sorted_by_name = sorted(sorted_by_time, key=lambda x: x.name)
  109. res: List[Sta] = []
  110. prev_sta: Sta | None = None
  111. for sta in sorted_by_name:
  112. prev_sta = _add_sta(sta, res, prev_sta)
  113. sorted_by_dist = sorted(res,
  114. key=lambda x: x.dist if x.dist is not None else 0.0)
  115. return sorted_by_dist
  116. def _filter_quakes(quakes: List[Quake], params: QueryParams) -> List[Quake]:
  117. sta_set = set(params.sta.split())
  118. from_mag, to_mag = float(params.from_mag), float(params.to_mag)
  119. res: List[Quake] = []
  120. for quake in quakes:
  121. mag_ml, mag_mpsp = quake.magnitude
  122. mag = mag_ml if mag_ml != 0.0 else mag_mpsp
  123. if from_mag <= mag <= to_mag:
  124. res.append(quake)
  125. if params.sta.lower() != 'all':
  126. res = [quake for quake in res
  127. if sta_set.issubset(quake.stations_name)]
  128. return sorted(res, key=lambda x: x.stations[0].phase_dt)