quake_storages.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. # -*- coding: utf-8 -*-
  2. from datetime import datetime
  3. from pathlib import Path
  4. from typing import Protocol, Sequence, Iterable, Tuple, List, Callable
  5. import openpyxl
  6. from openpyxl.styles import Alignment
  7. from openpyxl.worksheet.worksheet import Worksheet
  8. import config
  9. from exceptions import FormatToStrError
  10. from quake_structures import Quake
  11. class QuakesStorage(Protocol):
  12. """Interface of any storage for saving info of quakes"""
  13. def save(self, quakes: Iterable[Quake]) -> None:
  14. raise NotImplementedError
  15. def save_quakes(quakes: Iterable[Quake], storage: QuakesStorage) -> None:
  16. """Save quakes in the storage"""
  17. storage.save(quakes)
  18. class CatalogStorage(QuakesStorage):
  19. """Store some quakes info as a catalog in an Excel file"""
  20. def __init__(self, file: Path):
  21. self._file = file
  22. self._init_storage()
  23. self.wb = openpyxl.load_workbook(self._file)
  24. self._del_init_empty_worksheet()
  25. self.sheet: Worksheet
  26. def save(self, quakes: Iterable[Quake]) -> None:
  27. for quake in quakes:
  28. if quake.lat is None or quake.lon is None:
  29. continue
  30. self._init_sheet(quake)
  31. self._add_values_in_sheet(quake)
  32. self._format_cells()
  33. self.wb.save(self._file)
  34. def _init_storage(self) -> None:
  35. if not self._file.exists():
  36. wb = openpyxl.Workbook()
  37. wb.save(self._file)
  38. def _add_values_in_sheet(self, quake: Quake) -> None:
  39. origin_dt = _format_common_attrs(quake)[0]
  40. origin_d, origin_t = origin_dt.split()
  41. lat = quake.lat if quake.lat else '-'
  42. lon = quake.lon if quake.lon else '-'
  43. mag = quake.magnitude
  44. avg_ml = mag.ML if mag.ML != 0.0 else '-'
  45. avg_mpsp = mag.MPSP if mag.MPSP != 0.0 else '-'
  46. depth = quake.depth if quake.depth else '-'
  47. stations_name = ', '.join(quake.stations_name)
  48. row = (origin_d, origin_t, lat, lon, depth,
  49. quake.reg, avg_ml, avg_mpsp, stations_name)
  50. self.sheet.append(row)
  51. def _format_cells(self) -> None:
  52. for sheet in self.wb.sheetnames:
  53. rows_of_cells = self.wb[sheet][self.wb[sheet].dimensions]
  54. for row in rows_of_cells:
  55. for cell in row:
  56. if cell.column in (3, 4, 5):
  57. # digital format for lat, lon, depth
  58. cell.number_format = '0.00'
  59. elif cell.column in (7, 8):
  60. cell.number_format = '0.0'
  61. cell.alignment = Alignment(horizontal='center',
  62. vertical='center')
  63. def _init_sheet(self, quake: Quake) -> None:
  64. month_num = quake.origin_dt.month - 1
  65. sheet_name = config.MONTHS[month_num]
  66. if sheet_name not in self.wb.sheetnames:
  67. self.sheet = self.wb.create_sheet(sheet_name, month_num)
  68. self.sheet.append(config.CATALOG_HEADER)
  69. else:
  70. self.sheet = self.wb.get_sheet_by_name(sheet_name)
  71. def _del_init_empty_worksheet(self):
  72. if 'Sheet' in self.wb.sheetnames and \
  73. self.wb['Sheet'].dimensions == 'A1:A1':
  74. del self.wb['Sheet']
  75. class BulletinStorage(QuakesStorage):
  76. """Store some quakes info as a bulletin in a plain text file"""
  77. def __init__(self, file: Path):
  78. self._file = file
  79. self.origin_dt = ''
  80. self.lat = ''
  81. self.lon = ''
  82. self.mag = ''
  83. self.avg_ml = ''
  84. self.avg_mpsp = ''
  85. self.mag_type = '-'
  86. self.depth = ''
  87. def save(self, quakes: Iterable[Quake]) -> None:
  88. with self._file.open('w', encoding='utf8') as f:
  89. amnt_quakes = 0
  90. for quake in quakes:
  91. (self.origin_dt, self.lat, self.lon, self.mag, self.avg_ml,
  92. self.avg_mpsp, self.depth,
  93. self.mag_type) = _format_common_attrs(quake)
  94. rows = self._get_rows(quake)
  95. f.write('\n'.join(rows))
  96. amnt_quakes += 1
  97. f.write(f'\nTotal: {amnt_quakes}')
  98. def _get_rows(self, quake: Quake) -> Iterable[str]:
  99. quake_hdr_describe = self._get_quake_hdr_describe()
  100. quake_hdr = self._get_quake_hdr(quake) + '\n'
  101. sta_hdr_describe = \
  102. _format_to_str(config.STATION_HEADER_DESCRIBE,
  103. config.AMNT_COLUMN_SYMBOLS['sta_hdr'])
  104. sta_strings = self._get_stations_string(quake)
  105. return ('#' + quake.id, quake_hdr_describe, quake_hdr,
  106. sta_hdr_describe, sta_strings)
  107. def _get_quake_hdr_describe(self) -> str:
  108. mag_type = 'Mag' if self.mag_type == '-' else self.mag_type
  109. columns_data = config.QUAKE_HEADER_DESCRIBE[:]
  110. columns_data.insert(5, mag_type)
  111. return _format_to_str(columns_data,
  112. config.AMNT_COLUMN_SYMBOLS['quake_hdr'])
  113. def _get_quake_hdr(self, quake: Quake) -> str:
  114. amnt_sta = str(len(quake.stations_name))
  115. return _format_to_str(
  116. columns_data=(self.origin_dt, self.lat, self.lon, self.depth,
  117. amnt_sta, self.mag, quake.reg),
  118. hdr_type_config=config.AMNT_COLUMN_SYMBOLS['quake_hdr'])
  119. def _get_stations_string(self, quake: Quake) -> str:
  120. res = ''
  121. for sta in quake.stations:
  122. phase_dt = datetime.strftime(sta.phase_dt,
  123. '%d.%m.%Y %H:%M:%S.%f')[:-3]
  124. dist = f'{sta.dist:.2f}' if sta.dist else '-'
  125. az = f'{sta.azimuth:.2f}' if sta.azimuth else '-'
  126. ampl = f'{sta.ampl:.4f}' if sta.ampl else '-'
  127. period = f'{sta.period:.2f}' if sta.period else '-'
  128. mag = f'{sta.mag_ML:.1f}' if sta.mag_ML else \
  129. f'{sta.mag_MPSP:.1f}' if sta.mag_MPSP else '-'
  130. mag_type = 'ML' if sta.mag_ML else 'MPSP' if sta.mag_MPSP else '-'
  131. sta_data = (sta.name, dist, az, sta.phase, sta.entry, phase_dt,
  132. ampl, period, mag, mag_type)
  133. res += _format_to_str(sta_data,
  134. config.AMNT_COLUMN_SYMBOLS['sta_hdr']) + '\n'
  135. return res + '\n'
  136. class NASBulletinStorage(QuakesStorage):
  137. """Store some info of each quake as a bulletin for NAS program
  138. in a separate plain text file with ext (*.bltn)"""
  139. def __init__(self, path: Path):
  140. self._path = path.joinpath(*path.parts[:-1])
  141. self.bltn_strings: List[str, ] = []
  142. def save(self, quakes: Iterable[Quake]) -> None:
  143. for quake in quakes:
  144. self._get_rows(quake)
  145. if self.bltn_strings:
  146. f_name = datetime.strftime(quake.origin_dt, '%Y%m%d_%H%M%S')
  147. full_path = self._path.joinpath(f_name).with_suffix('.bltn')
  148. data = '\n'.join(self.bltn_strings)
  149. full_path.write_text(data, encoding='utf8')
  150. def _get_rows(self, quake: Quake) -> None:
  151. self.bltn_strings.clear()
  152. if (quake.lat is not None and quake.lon is not None) \
  153. or len(quake.stations_name) > 4:
  154. dt, lat, lon = _format_common_attrs(quake,
  155. '%Y %m %d %H %M %S.%f')[:3]
  156. self.bltn_strings.append(f'Fi={lat} LD={lon} T0={dt}')
  157. for sta in quake.stations:
  158. phase_dt = datetime.strftime(sta.phase_dt,
  159. '%Y %m %d %H %M %S.%f')[:-3]
  160. self.bltn_strings.append(
  161. f'{sta.name} {sta.phase}={phase_dt}')
  162. class ArcGisStorage(QuakesStorage):
  163. """Store some quakes info in a plain text file
  164. for further processing in ArcGis program"""
  165. def __init__(self, file: Path):
  166. self._file = file
  167. def save(self, quakes: Iterable[Quake]) -> None:
  168. with self._file.open('w', encoding='utf8') as f:
  169. f.write(' '.join(config.ArcGIS_HEADER) + '\n')
  170. for quake in quakes:
  171. if columns := self._get_column_values(quake):
  172. row = ' '.join(columns)
  173. f.write(row + '\n')
  174. def _get_column_values(self, quake: Quake) -> Iterable[str]:
  175. origin_dt, lat, lon, mag = _format_common_attrs(quake)[:4]
  176. if lat == '-' or lon == '-':
  177. return ()
  178. columns = origin_dt, lat, lon, '0.0', '1'
  179. if mag != '-':
  180. for _range in config.MAGNITUDE_RANGES:
  181. if _range[0] < float(mag) < _range[1]:
  182. columns = origin_dt, lat, lon, mag, \
  183. config.MAGNITUDE_RANGES[_range]
  184. return columns
  185. def _format_common_attrs(quake: Quake,
  186. date_fmt='%d.%m.%Y %H:%M:%S.%f') -> Tuple[str, ...]:
  187. origin_dt = datetime.strftime(quake.origin_dt, date_fmt)[:-3] \
  188. if quake.origin_dt != datetime.min else '-'
  189. lat = f'{quake.lat:.2f}' if quake.lat else '-'
  190. lon = f'{quake.lon:.2f}' if quake.lon else '-'
  191. mag = quake.magnitude
  192. avg_ml = f'{mag.ML:.1f}' if mag.ML else '-'
  193. avg_mpsp = f'{mag.MPSP:.1f}' if mag.MPSP else '-'
  194. preferred_mag = avg_ml if avg_ml != '-' else avg_mpsp
  195. depth = f'{quake.depth:.2f}' if quake.depth else '-'
  196. mag_type = 'ML' if avg_ml != '-' else 'MPSP' if avg_mpsp != '-' else '-'
  197. return origin_dt, lat, lon, preferred_mag, avg_ml, avg_mpsp, depth, mag_type
  198. def _format_to_str(columns_data: Sequence, hdr_type_config: Sequence) -> str:
  199. """Return formatted string accordingly layout of hdr_type_config"""
  200. if len(columns_data) != len(hdr_type_config):
  201. raise FormatToStrError('len(columns_data) is not '
  202. 'equal len(hdr_type_config)')
  203. res = ''
  204. for i in range(len(hdr_type_config)):
  205. data = columns_data[i] if columns_data[i] is not None else '-'
  206. res += f'{data:<{hdr_type_config[i]}}'
  207. return res
  208. def get_storage(ext: str) -> Callable:
  209. storage = {'.txt': BulletinStorage,
  210. '.bltn': NASBulletinStorage,
  211. '.xlsx': CatalogStorage,
  212. '.GIS': ArcGisStorage}
  213. return storage[ext]