ECTPropsPurge.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. /*
  2. * Copyright 2005 - 2016 Zarafa and its licensors
  3. *
  4. * This program is free software: you can redistribute it and/or modify
  5. * it under the terms of the GNU Affero General Public License, version 3,
  6. * as published by the Free Software Foundation.
  7. *
  8. * This program is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. * GNU Affero General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU Affero General Public License
  14. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  15. *
  16. */
  17. #include <kopano/platform.h>
  18. #include <kopano/lockhelper.hpp>
  19. #include <kopano/stringutil.h>
  20. #include <kopano/ECLogger.h>
  21. #include <kopano/ECConfig.h>
  22. #include "ECSession.h"
  23. #include "ECSessionManager.h"
  24. #include "ECDatabaseFactory.h"
  25. #include "ECStatsCollector.h"
  26. #include <kopano/kcodes.h>
  27. #include "ECTPropsPurge.h"
  28. namespace KC {
  29. extern ECStatsCollector* g_lpStatsCollector;
  30. ECTPropsPurge::ECTPropsPurge(ECConfig *lpConfig,
  31. ECDatabaseFactory *lpDatabaseFactory) :
  32. m_lpConfig(lpConfig), m_lpDatabaseFactory(lpDatabaseFactory)
  33. {
  34. // Start our purge thread
  35. pthread_create(&m_hThread, NULL, Thread, (void *)this);
  36. set_thread_name(m_hThread, "TPropsPurge");
  37. }
  38. ECTPropsPurge::~ECTPropsPurge()
  39. {
  40. // Signal thread to exit
  41. ulock_normal l_exit(m_hMutexExit);
  42. m_bExit = true;
  43. m_hCondExit.notify_all();
  44. l_exit.unlock();
  45. // Wait for the thread to exit
  46. pthread_join(m_hThread, NULL);
  47. }
  48. /**
  49. * This is just a pthread_create() wrapper which calls PurgeThread()
  50. *
  51. * @param param Pthread context param
  52. * @return pthread return code, 0 on success, 1 on error
  53. */
  54. void * ECTPropsPurge::Thread(void *param)
  55. {
  56. static_cast<ECTPropsPurge *>(param)->PurgeThread();
  57. return NULL;
  58. }
  59. /**
  60. * Main TProps purger loop
  61. *
  62. * This is a constantly running loop that checks the number of deferred updates in the
  63. * deferredupdate table, and starts purging them if it goes over a certain limit. The purged
  64. * items are from the largest folder first; A folder with 20 deferredupdates will be purged
  65. * before a folder with only 10 deferred updates.
  66. *
  67. * The loop (thread) will exit ASAP when m_bExit is set to TRUE.
  68. *
  69. * @return result
  70. */
  71. ECRESULT ECTPropsPurge::PurgeThread()
  72. {
  73. ECRESULT er = erSuccess;
  74. ECDatabase *lpDatabase = NULL;
  75. while(1) {
  76. // Run in a loop constantly checking our deferred update table
  77. if(!lpDatabase) {
  78. er = GetThreadLocalDatabase(this->m_lpDatabaseFactory, &lpDatabase);
  79. if(er != erSuccess) {
  80. ec_log_crit("Unable to get database connection for delayed purge!");
  81. Sleep(60000);
  82. continue;
  83. }
  84. }
  85. // Wait a while before repolling the count, unless we are requested to exit
  86. {
  87. ulock_normal l_exit(m_hMutexExit);
  88. if (m_bExit)
  89. break;
  90. m_hCondExit.wait_for(l_exit, std::chrono::seconds(10));
  91. if (m_bExit)
  92. break;
  93. }
  94. PurgeOverflowDeferred(lpDatabase); // Ignore error, just retry
  95. }
  96. // Don't touch anything in *this from this point, we may have been delete()d by this time
  97. return er;
  98. }
  99. /**
  100. * Purge deferred updates
  101. *
  102. * This purges deferred updates until the total number of deferred updates drops below
  103. * the limit in max_deferred_records.
  104. *
  105. * @param lpDatabase Database to use
  106. * @return Result
  107. */
  108. ECRESULT ECTPropsPurge::PurgeOverflowDeferred(ECDatabase *lpDatabase)
  109. {
  110. ECRESULT er;
  111. unsigned int ulCount = 0;
  112. unsigned int ulFolderId = 0;
  113. unsigned int ulMaxDeferred = atoi(m_lpConfig->GetSetting("max_deferred_records"));
  114. if(ulMaxDeferred > 0) {
  115. while(!m_bExit) {
  116. er = GetDeferredCount(lpDatabase, &ulCount);
  117. if(er != erSuccess)
  118. return er;
  119. if(ulCount < ulMaxDeferred)
  120. break;
  121. er = lpDatabase->Begin();
  122. if(er != erSuccess)
  123. return er;
  124. er = GetLargestFolderId(lpDatabase, &ulFolderId);
  125. if(er != erSuccess) {
  126. lpDatabase->Rollback();
  127. return er;
  128. }
  129. er = PurgeDeferredTableUpdates(lpDatabase, ulFolderId);
  130. if(er != erSuccess) {
  131. lpDatabase->Rollback();
  132. return er;
  133. }
  134. er = lpDatabase->Commit();
  135. if(er != erSuccess)
  136. return er;
  137. }
  138. }
  139. return erSuccess;
  140. }
  141. /**
  142. * Get the deferred record count
  143. *
  144. * This gets the total number of deferred records
  145. *
  146. * @param[in] lpDatabase Database pointer
  147. * @param[out] Number of records
  148. * @return Result
  149. */
  150. ECRESULT ECTPropsPurge::GetDeferredCount(ECDatabase *lpDatabase, unsigned int *lpulCount)
  151. {
  152. ECRESULT er = erSuccess;
  153. DB_RESULT lpResult;
  154. DB_ROW lpRow = NULL;
  155. er = lpDatabase->DoSelect("SELECT count(*) FROM deferredupdate", &lpResult);
  156. if(er != erSuccess)
  157. return er;
  158. lpRow = lpDatabase->FetchRow(lpResult);
  159. if(!lpRow || !lpRow[0]) {
  160. ec_log_err("ECTPropsPurge::GetDeferredCount(): row or column null");
  161. return KCERR_DATABASE_ERROR;
  162. }
  163. *lpulCount = atoui(lpRow[0]);
  164. return erSuccess;
  165. }
  166. /**
  167. * Get the folder with the most deferred items in it
  168. *
  169. * Retrieves the hierarchy ID of the folder with the most deferred records in it. If two or more
  170. * folders tie, then one of these folders is returned. It is undefined exactly which one will be returned.
  171. *
  172. * @param[in] lpDatabase Database pointer
  173. * @param[out] lpulFolderId Hierarchy ID of folder
  174. * @return Result
  175. */
  176. ECRESULT ECTPropsPurge::GetLargestFolderId(ECDatabase *lpDatabase, unsigned int *lpulFolderId)
  177. {
  178. ECRESULT er = erSuccess;
  179. DB_RESULT lpResult;
  180. DB_ROW lpRow = NULL;
  181. er = lpDatabase->DoSelect("SELECT folderid, COUNT(*) as c FROM deferredupdate GROUP BY folderid ORDER BY c DESC LIMIT 1", &lpResult);
  182. if(er != erSuccess)
  183. return er;
  184. lpRow = lpDatabase->FetchRow(lpResult);
  185. if (lpRow == nullptr || lpRow[0] == nullptr)
  186. // Could be that there are no deferred updates, so give an appropriate error
  187. return KCERR_NOT_FOUND;
  188. *lpulFolderId = atoui(lpRow[0]);
  189. return erSuccess;
  190. }
  191. /**
  192. * Purge deferred table updates stored for folder ulFolderId
  193. *
  194. * This purges deferred records for hierarchy and contents tables of ulFolderId, and removes
  195. * them from the deferredupdate table.
  196. *
  197. * @param[in] lpDatabase Database pointer
  198. * @param[in] Hierarchy ID of folder to purge
  199. * @return Result
  200. */
  201. // @todo, multiple threads call this function, which will cause problems
  202. ECRESULT ECTPropsPurge::PurgeDeferredTableUpdates(ECDatabase *lpDatabase, unsigned int ulFolderId)
  203. {
  204. ECRESULT er = erSuccess;
  205. unsigned int ulAffected;
  206. DB_RESULT lpDBResult;
  207. DB_ROW lpDBRow = NULL;
  208. std::string strQuery;
  209. std::string strIn;
  210. // This makes sure that we lock the record in the hierarchy *first*. This helps in serializing access and avoiding deadlocks.
  211. strQuery = "SELECT hierarchyid FROM deferredupdate WHERE folderid=" + stringify(ulFolderId);
  212. er = lpDatabase->DoSelect(strQuery, &lpDBResult);
  213. if(er != erSuccess)
  214. return er;
  215. if(lpDatabase->GetNumRows(lpDBResult) == 0)
  216. return erSuccess;
  217. while((lpDBRow = lpDatabase->FetchRow(lpDBResult)) != NULL) {
  218. strIn += lpDBRow[0];
  219. strIn += ",";
  220. }
  221. strIn.resize(strIn.size()-1);
  222. strQuery = "SELECT id FROM hierarchy WHERE id IN(";
  223. strQuery += strIn;
  224. strQuery += ") FOR UPDATE";
  225. er = lpDatabase->DoSelect(strQuery, &lpDBResult);
  226. if(er != erSuccess)
  227. return er;
  228. strQuery = "REPLACE INTO tproperties (folderid, hierarchyid, tag, type, val_ulong, val_string, val_binary, val_double, val_longint, val_hi, val_lo) ";
  229. strQuery += "SELECT " + stringify(ulFolderId) + ", p.hierarchyid, p.tag, p.type, val_ulong, LEFT(val_string, " + stringify(TABLE_CAP_STRING) + "), LEFT(val_binary, " + stringify(TABLE_CAP_BINARY) + "), val_double, val_longint, val_hi, val_lo FROM properties AS p FORCE INDEX(primary) JOIN deferredupdate FORCE INDEX(folderid) ON deferredupdate.hierarchyid=p.hierarchyid WHERE tag NOT IN(0x1009, 0x1013) AND deferredupdate.folderid = " + stringify(ulFolderId);
  230. er = lpDatabase->DoInsert(strQuery);
  231. if(er != erSuccess)
  232. return er;
  233. strQuery = "DELETE FROM deferredupdate WHERE hierarchyid IN(" + strIn + ")";
  234. er = lpDatabase->DoDelete(strQuery, &ulAffected);
  235. if(er != erSuccess)
  236. return er;
  237. g_lpStatsCollector->Increment(SCN_DATABASE_MERGES);
  238. g_lpStatsCollector->Increment(SCN_DATABASE_MERGED_RECORDS, (int)ulAffected);
  239. return erSuccess;
  240. }
  241. ECRESULT ECTPropsPurge::GetDeferredCount(ECDatabase *lpDatabase, unsigned int ulFolderId, unsigned int *lpulCount)
  242. {
  243. ECRESULT er = erSuccess;
  244. DB_RESULT lpDBResult;
  245. DB_ROW lpDBRow = NULL;
  246. unsigned int ulCount = 0;
  247. std::string strQuery;
  248. strQuery = "SELECT count(*) FROM deferredupdate WHERE folderid = " + stringify(ulFolderId);
  249. er = lpDatabase->DoSelect(strQuery, &lpDBResult);
  250. if(er != erSuccess)
  251. return er;
  252. lpDBRow = lpDatabase->FetchRow(lpDBResult);
  253. if(!lpDBRow || !lpDBRow[0])
  254. ulCount = 0;
  255. else
  256. ulCount = atoui(lpDBRow[0]);
  257. *lpulCount = ulCount;
  258. return erSuccess;
  259. }
  260. /**
  261. * Add a deferred update
  262. *
  263. * Adds a deferred update to the deferred updates table and purges the deferred updates for the folder if necessary.
  264. *
  265. * @param[in] lpSession Session that created the change
  266. * @param[in] lpDatabase Database handle
  267. * @param[in] ulFolderId Folder ID to add a deferred update to
  268. * @param[in] ulOldFolderId Previous folder ID if the message was moved (may be 0)
  269. * @param[in] ulObjId Object ID that should be added
  270. * @return result
  271. */
  272. ECRESULT ECTPropsPurge::AddDeferredUpdate(ECSession *lpSession, ECDatabase *lpDatabase, unsigned int ulFolderId, unsigned int ulOldFolderId, unsigned int ulObjId)
  273. {
  274. ECRESULT er;
  275. er = AddDeferredUpdateNoPurge(lpDatabase, ulFolderId, ulOldFolderId, ulObjId);
  276. if (er != erSuccess)
  277. return er;
  278. return NormalizeDeferredUpdates(lpSession, lpDatabase, ulFolderId);
  279. }
  280. /**
  281. * Add a deferred update
  282. *
  283. * Adds a deferred update to the deferred updates table but never purges the deferred updates for the folder.
  284. *
  285. * @param[in] lpDatabase Database handle
  286. * @param[in] ulFolderId Folder ID to add a deferred update to
  287. * @param[in] ulOldFolderId Previous folder ID if the message was moved (may be 0)
  288. * @param[in] ulObjId Object ID that should be added
  289. * @return result
  290. */
  291. ECRESULT ECTPropsPurge::AddDeferredUpdateNoPurge(ECDatabase *lpDatabase, unsigned int ulFolderId, unsigned int ulOldFolderId, unsigned int ulObjId)
  292. {
  293. std::string strQuery;
  294. if (ulOldFolderId)
  295. // Message has moved into a new folder. If the record is already there then just update the existing record so that srcfolderid from a previous move remains untouched
  296. strQuery = "INSERT INTO deferredupdate(hierarchyid, srcfolderid, folderid) VALUES(" + stringify(ulObjId) + "," + stringify(ulOldFolderId) + "," + stringify(ulFolderId) + ") ON DUPLICATE KEY UPDATE folderid = " + stringify(ulFolderId);
  297. else
  298. // Message has modified. If there is already a record for this message, we don't need to do anything
  299. strQuery = "INSERT IGNORE INTO deferredupdate(hierarchyid, srcfolderid, folderid) VALUES(" + stringify(ulObjId) + "," + stringify(ulFolderId) + "," + stringify(ulFolderId) + ")";
  300. return lpDatabase->DoInsert(strQuery);
  301. }
  302. /**
  303. * Purge the deferred updates table if the count for the folder exceeds max_deferred_records_folder
  304. *
  305. * Purges the deferred updates for the folder if necessary.
  306. *
  307. * @param[in] lpSession Session that created the change
  308. * @param[in] lpDatabase Database handle
  309. * @param[in] ulFolderId Folder ID to add a deferred update to
  310. * @return result
  311. */
  312. ECRESULT ECTPropsPurge::NormalizeDeferredUpdates(ECSession *lpSession, ECDatabase *lpDatabase, unsigned int ulFolderId)
  313. {
  314. ECRESULT er;
  315. unsigned int ulMaxDeferred = 0;
  316. unsigned int ulCount = 0;
  317. ulMaxDeferred = atoui(lpSession->GetSessionManager()->GetConfig()->GetSetting("max_deferred_records_folder"));
  318. if (ulMaxDeferred) {
  319. er = GetDeferredCount(lpDatabase, ulFolderId, &ulCount);
  320. if (er != erSuccess)
  321. return er;
  322. if (ulCount >= ulMaxDeferred) {
  323. er = PurgeDeferredTableUpdates(lpDatabase, ulFolderId);
  324. if (er != erSuccess)
  325. return er;
  326. }
  327. }
  328. return erSuccess;
  329. }
  330. } /* namespace */