pdb.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. import os
  2. import asyncpg as apg
  3. import config
  4. from loader import logger
  5. from asyncpg.exceptions import UndefinedTableError, InterfaceError
  6. import time
  7. import json
  8. import asyncio
  9. async def convert_data(data: list, fetchone=False):
  10. res = []
  11. for x in data:
  12. res.append(dict(x))
  13. if len(res) == 0:
  14. if not fetchone:
  15. return []
  16. return None
  17. if fetchone:
  18. return res[0]
  19. return res
  20. class PostSQLDB:
  21. def __init__(self, _db_name, _db_user, _db_pass, _db_host):
  22. self.db_name = _db_name
  23. self.db_user = _db_user
  24. self.db_pass = _db_pass
  25. self.db_host = _db_host
  26. self.connection = None
  27. self.users_table_columns = (
  28. ('user_id', 'BIGINT PRIMARY KEY'),
  29. ('status', 'VARCHAR(255)'),
  30. ('role', 'VARCHAR(255)'),
  31. ('time_registered', 'FLOAT'),
  32. )
  33. self.personal_table_columns = (
  34. ('personal_id', 'SERIAL PRIMARY KEY'),
  35. ('status', 'VARCHAR(255)'),
  36. ('first_name', 'VARCHAR(255)'),
  37. ('last_name', 'VARCHAR(255)'),
  38. ('middle_name', 'VARCHAR(255)'),
  39. ('full_name', 'VARCHAR(765)'),
  40. ('project', 'VARCHAR(255)'),
  41. ('position', 'VARCHAR(255)'),
  42. ('avatar', 'VARCHAR(255)'),
  43. ('time_join', 'FLOAT'),
  44. ('time_registered', 'FLOAT'),
  45. )
  46. # ====== SERVICE METHODS ======
  47. async def connect(self):
  48. self.connection = await apg.connect(database=self.db_name, user=self.db_user, password=self.db_pass,
  49. host=self.db_host)
  50. async def check_db_structure(self):
  51. async def repair_table(table_name, columns_data):
  52. try:
  53. await self.connection.execute(f'SELECT 1 FROM {table_name};')
  54. for name, typ in columns_data[1::]:
  55. await self.connection.execute(f"ALTER TABLE {table_name} ADD COLUMN IF NOT EXISTS {name} {typ};")
  56. except UndefinedTableError:
  57. logger.info(f'[DB_REPAIR] Table {table_name} not found, creating new table...')
  58. columns = ''
  59. for name, typ in columns_data:
  60. columns = columns + name + ' ' + typ + ', '
  61. await self.connection.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({columns[0:-2]});")
  62. logger.info(f'[DB_REPAIR] Table {table_name} successfully created')
  63. # Check / Repair all tables
  64. await repair_table('users', self.users_table_columns)
  65. await repair_table('personal', self.personal_table_columns)
  66. logger.info('Successfully check db structure')
  67. async def execute(self, sql, rerun=False, fetch=False, fetchone=False):
  68. try:
  69. if fetch:
  70. return await convert_data(await self.connection.fetch(sql), fetchone)
  71. else:
  72. return await self.connection.execute(sql)
  73. except UndefinedTableError as ex:
  74. logger.warning(f'Table not found error, trying to solve... (errmsg: {ex}, sql: {sql})')
  75. await self.check_db_structure()
  76. if not rerun:
  77. await self.execute(sql, rerun=True, fetch=fetch)
  78. else:
  79. logger.error(
  80. f'Cant solve db error, please check if database works correctly. (errmsg: {ex}, sql: {sql})')
  81. return None
  82. except InterfaceError as ex:
  83. if 'another operation is in progress' in str(ex):
  84. logger.info(f"another db operation in progress, waiting 0.2 sec and trying again... (sql: {sql})")
  85. await asyncio.sleep(0.2)
  86. await self.execute(sql, fetch=fetch)
  87. except Exception as ex:
  88. logger.error(f'Error while executing db method. (errmsg: {ex}, sql: {sql})', exc_info=True)
  89. raise
  90. async def fetch(self, sql):
  91. return await self.execute(sql, fetch=True)
  92. async def fetchone(self, sql):
  93. return await self.execute(sql, fetch=True, fetchone=True)
  94. # ====== DB METHODS ======
  95. # === Users ===
  96. async def get_user(self, user_id: int):
  97. return await self.fetchone(f"SELECT * FROM users WHERE user_id = {user_id};")
  98. async def get_users(self):
  99. return await self.fetch(f"SELECT * FROM users;")
  100. async def add_user(self, user_id: int):
  101. if user_id in config.ADMINS:
  102. role = "admin"
  103. else:
  104. role = "user"
  105. return await self.execute(
  106. f"INSERT INTO users (user_id, time_registered, status, role) "
  107. f"VALUES({user_id}, {time.time()}, 'active', {role});")
  108. async def set_user_status(self, user_id: int, value: str):
  109. return await self.execute(f"UPDATE users SET status = '{value}' WHERE user_id = {user_id};")
  110. async def set_user_role(self, user_id: int, value: str):
  111. return await self.execute(f"UPDATE users SET role = '{value}' WHERE user_id = {user_id};")
  112. # === Personal ===
  113. async def get_personal(self, active: bool = True, project=False, position=False):
  114. add = ""
  115. if active:
  116. add = " WHERE status = 'active'"
  117. if project:
  118. if 'WHERE' in add:
  119. add += " AND"
  120. else:
  121. add += " WHERE"
  122. add += f" project = '{project}'"
  123. if position:
  124. if 'WHERE' in add:
  125. add += " AND"
  126. else:
  127. add += " WHERE"
  128. add += f" position = '{position}'"
  129. return await self.fetch(f"SELECT * FROM personal {add};")
  130. async def get_personal_by_id(self, personal_id: int):
  131. return await self.fetchone(f"SELECT * FROM personal WHERE personal_id = {personal_id};")
  132. # Защита от SQL инъекций не требуется, так как имеется встроенная, в самой библиотеке для работы с базой данных
  133. # (https://github.com/MagicStack/asyncpg/issues/822).
  134. async def get_personal_by_name(self, name: str, active: bool = True, project=False, position=False,
  135. time_search: tuple[float] = (0, 0)):
  136. sql = f"SELECT * FROM personal WHERE (first_name LIKE '{name.lower()}' " \
  137. f"OR last_name LIKE '{name.lower()}' " \
  138. f"OR middle_name LIKE '{name.lower()}' " \
  139. f"OR full_name LIKE '%{name.lower()}%')"
  140. add = ''
  141. if active:
  142. add += " AND status = 'active'"
  143. if project:
  144. add += f" AND LOWER(project) = '{project}'"
  145. if position:
  146. add += f" AND LOWER(position) = '{position}'"
  147. add += f" AND time_join BETWEEN {time_search[0]} AND {time_search[1]}"
  148. return await self.fetch(sql + add + ';')
  149. async def get_personal_by_time(self, active: bool = True, project=False, position=False,
  150. time_search: tuple[float] = (0, 0)):
  151. add = ''
  152. if active:
  153. add += " WHERE status = 'active'"
  154. if 'WHERE' in add:
  155. add += f" AND time_join BETWEEN {time_search[0]} AND {time_search[1]}"
  156. return await self.fetch(f"SELECT * FROM personal {add};")
  157. async def add_personal(self, first_name: str, last_name: str, middle_name: str, full_name: str, position: str,
  158. project: str, time_join: float, avatar: str = 'Null'):
  159. return await self.fetchone(f"INSERT INTO personal (first_name, last_name, middle_name, full_name, position, "
  160. f"project, time_join, avatar, status) VALUES('{first_name.lower()}', '{last_name.lower()}', "
  161. f"'{middle_name.lower()}', '{full_name.lower()}', "
  162. f"'{position}', '{project}', {time_join}, '{avatar}', 'active') RETURNING personal_id;")
  163. async def change_personal(self, personal_id: int, parameter: str, value):
  164. add = ('', '')
  165. if isinstance(value, str):
  166. add = ("'", "'")
  167. elif isinstance(value, list):
  168. add = ('ARRAY', '')
  169. return await self.execute(f"UPDATE personal SET {parameter} = {add[0]}{value}{add[1]} "
  170. f"WHERE personal_id = {personal_id};")
  171. async def del_personal(self, personal_id: int):
  172. return await self.execute(f"UPDATE personal SET status = 'deleted' WHERE personal_id = {personal_id}")
  173. async def get_projects_personal(self) -> dict:
  174. personal = await self.get_personal(active=True)
  175. res = {}
  176. for pers in personal:
  177. if pers['project'] not in res:
  178. res[pers['project']] = [pers['personal_id']]
  179. else:
  180. res[pers['project']].append(pers['personal_id'])
  181. return res
  182. async def get_positions_personal(self) -> dict:
  183. personal = await self.get_personal(active=True)
  184. res = {}
  185. for pers in personal:
  186. if pers['position'] not in res:
  187. res[pers['position']] = [pers['personal_id']]
  188. else:
  189. res[pers['position']].append(pers['personal_id'])
  190. return res