ECNotifyMaster.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. /*
  2. * Copyright 2005 - 2016 Zarafa and its licensors
  3. *
  4. * This program is free software: you can redistribute it and/or modify
  5. * it under the terms of the GNU Affero General Public License, version 3,
  6. * as published by the Free Software Foundation.
  7. *
  8. * This program is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. * GNU Affero General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU Affero General Public License
  14. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  15. *
  16. */
  17. #include <kopano/platform.h>
  18. #include <algorithm>
  19. #include <new>
  20. #include <kopano/lockhelper.hpp>
  21. #include <kopano/memory.hpp>
  22. #include <mapidefs.h>
  23. #include "ECNotifyClient.h"
  24. #include "ECNotifyMaster.h"
  25. #include "ECSessionGroupManager.h"
  26. #include <kopano/stringutil.h>
  27. #include "SOAPUtils.h"
  28. #include "WSTransport.h"
  29. #include <sys/signal.h>
  30. #include <sys/types.h>
  31. #define CALL_MEMBER_FN(object,ptrToMember) ((object).*(ptrToMember))
  32. inline ECNotifySink::ECNotifySink(ECNotifyClient *lpClient, NOTIFYCALLBACK fnCallback)
  33. : m_lpClient(lpClient)
  34. , m_fnCallback(fnCallback)
  35. { }
  36. inline HRESULT ECNotifySink::Notify(ULONG ulConnection,
  37. const NOTIFYLIST &lNotifications) const
  38. {
  39. return CALL_MEMBER_FN(*m_lpClient, m_fnCallback)(ulConnection, lNotifications);
  40. }
  41. inline bool ECNotifySink::IsClient(const ECNotifyClient *lpClient) const
  42. {
  43. return lpClient == m_lpClient;
  44. }
  45. ECNotifyMaster::ECNotifyMaster(SessionGroupData *lpData) :
  46. m_lpSessionGroupData(lpData /* no addref */)
  47. {
  48. TRACE_NOTIFY(TRACE_ENTRY, "ECNotifyMaster::ECNotifyMaster", "");
  49. memset(&m_hThread, 0, sizeof(m_hThread));
  50. m_ulConnection = 1;
  51. TRACE_NOTIFY(TRACE_RETURN, "ECNotifyMaster::ECNotifyMaster", "");
  52. }
  53. ECNotifyMaster::~ECNotifyMaster(void)
  54. {
  55. TRACE_NOTIFY(TRACE_ENTRY, "ECNotifyMaster::~ECNotifyMaster", "");
  56. assert(m_listNotifyClients.empty());
  57. /* Disable Notifications */
  58. StopNotifyWatch();
  59. if (m_lpSessionGroupData)
  60. m_lpSessionGroupData = NULL; /* DON'T Release() */
  61. if (m_lpTransport)
  62. m_lpTransport->Release();
  63. TRACE_NOTIFY(TRACE_RETURN, "ECNotifyMaster::~ECNotifyMaster", "");
  64. }
  65. HRESULT ECNotifyMaster::Create(SessionGroupData *lpData, ECNotifyMaster **lppMaster)
  66. {
  67. TRACE_NOTIFY(TRACE_ENTRY, "ECNotifyMaster::Create", "");
  68. HRESULT hr = hrSuccess;
  69. auto lpMaster = new(std::nothrow) ECNotifyMaster(lpData);
  70. if (lpMaster == nullptr)
  71. return MAPI_E_NOT_ENOUGH_MEMORY;
  72. lpMaster->AddRef();
  73. *lppMaster = lpMaster;
  74. TRACE_NOTIFY(TRACE_RETURN, "ECNotifyMaster::Create", "hr=0x%08X", hr);
  75. return hr;
  76. }
  77. HRESULT ECNotifyMaster::ConnectToSession()
  78. {
  79. TRACE_NOTIFY(TRACE_ENTRY, "ECNotifyMaster::ConnectToSession", "");
  80. HRESULT hr = hrSuccess;
  81. scoped_rlock biglock(m_hMutex);
  82. /* This function can be called from NotifyWatch, and could race against StopNotifyWatch */
  83. if (m_bThreadExit) {
  84. hr = MAPI_E_END_OF_SESSION;
  85. goto exit;
  86. }
  87. /*
  88. * Cancel connection IO operations before switching Transport.
  89. */
  90. if (m_lpTransport) {
  91. hr = m_lpTransport->HrCancelIO();
  92. if (hr != hrSuccess)
  93. goto exit;
  94. m_lpTransport->Release();
  95. m_lpTransport = NULL;
  96. }
  97. /* Open notification transport */
  98. hr = m_lpSessionGroupData->GetTransport(&m_lpTransport);
  99. if (hr != hrSuccess)
  100. goto exit;
  101. exit:
  102. TRACE_NOTIFY(TRACE_RETURN, "ECNotifyMaster::ConnectToSession", "hr=0x%08X", hr);
  103. return hr;
  104. }
  105. HRESULT ECNotifyMaster::AddSession(ECNotifyClient* lpClient)
  106. {
  107. TRACE_NOTIFY(TRACE_ENTRY, "ECNotifyMaster::AddSession", "");
  108. scoped_rlock biglock(m_hMutex);
  109. m_listNotifyClients.push_back(lpClient);
  110. /* Enable Notifications */
  111. if (StartNotifyWatch() != hrSuccess)
  112. assert(false);
  113. TRACE_NOTIFY(TRACE_RETURN, "ECNotifyMaster::AddSession", "");
  114. return hrSuccess;
  115. }
  116. struct findConnectionClient
  117. {
  118. ECNotifyClient* lpClient;
  119. findConnectionClient(ECNotifyClient* lpClient) : lpClient(lpClient) {}
  120. bool operator()(const NOTIFYCONNECTIONCLIENTMAP::value_type &entry) const
  121. {
  122. return entry.second.IsClient(lpClient);
  123. }
  124. };
  125. HRESULT ECNotifyMaster::ReleaseSession(ECNotifyClient* lpClient)
  126. {
  127. TRACE_NOTIFY(TRACE_ENTRY, "ECNotifyMaster::ReleaseSession", "");
  128. HRESULT hr = hrSuccess;
  129. scoped_rlock biglock(m_hMutex);
  130. /* Remove all connections attached to client */
  131. auto iter = m_mapConnections.cbegin();
  132. while (true) {
  133. iter = find_if(iter, m_mapConnections.cend(), findConnectionClient(lpClient));
  134. if (iter == m_mapConnections.cend())
  135. break;
  136. m_mapConnections.erase(iter++);
  137. }
  138. /* Remove client from list */
  139. m_listNotifyClients.remove(lpClient);
  140. TRACE_NOTIFY(TRACE_RETURN, "ECNotifyMaster::ReleaseSession", "");
  141. return hr;
  142. }
  143. HRESULT ECNotifyMaster::ReserveConnection(ULONG *lpulConnection)
  144. {
  145. scoped_rlock lock(m_hMutex);
  146. *lpulConnection = m_ulConnection++;
  147. return hrSuccess;
  148. }
  149. HRESULT ECNotifyMaster::ClaimConnection(ECNotifyClient* lpClient, NOTIFYCALLBACK fnCallback, ULONG ulConnection)
  150. {
  151. scoped_rlock lock(m_hMutex);
  152. m_mapConnections.insert(NOTIFYCONNECTIONCLIENTMAP::value_type(ulConnection, ECNotifySink(lpClient, fnCallback)));
  153. return hrSuccess;
  154. }
  155. HRESULT ECNotifyMaster::DropConnection(ULONG ulConnection)
  156. {
  157. scoped_rlock lock(m_hMutex);
  158. m_mapConnections.erase(ulConnection);
  159. return hrSuccess;
  160. }
  161. HRESULT ECNotifyMaster::StartNotifyWatch()
  162. {
  163. TRACE_NOTIFY(TRACE_ENTRY, "ECNotifyMaster::StartNotifyWatch", "");
  164. HRESULT hr = hrSuccess;
  165. /* Thread is already running */
  166. if (m_bThreadRunning)
  167. goto exit;
  168. hr = ConnectToSession();
  169. if (hr != hrSuccess)
  170. goto exit;
  171. /* Make thread joinable which we need during shutdown */
  172. pthread_attr_t m_hAttrib;
  173. pthread_attr_init(&m_hAttrib);
  174. pthread_attr_setdetachstate(&m_hAttrib, PTHREAD_CREATE_JOINABLE);
  175. /* 1Mb of stack space per thread */
  176. if (pthread_attr_setstacksize(&m_hAttrib, 1024 * 1024)) {
  177. hr = MAPI_E_CALL_FAILED;
  178. goto exit;
  179. }
  180. if (pthread_create(&m_hThread, &m_hAttrib, NotifyWatch, (void *)this)) {
  181. hr = MAPI_E_CALL_FAILED;
  182. goto exit;
  183. }
  184. pthread_attr_destroy(&m_hAttrib);
  185. set_thread_name(m_hThread, "NotifyThread");
  186. m_bThreadRunning = TRUE;
  187. exit:
  188. TRACE_NOTIFY(TRACE_RETURN, "ECNotifyMaster::StartNotifyWatch", "hr=0x%08X", hr);
  189. return hr;
  190. }
  191. HRESULT ECNotifyMaster::StopNotifyWatch()
  192. {
  193. TRACE_NOTIFY(TRACE_ENTRY, "ECNotifyMaster::StopNotifyWatch", "");
  194. HRESULT hr = hrSuccess;
  195. KCHL::object_ptr<WSTransport> lpTransport;
  196. ulock_rec biglock(m_hMutex, std::defer_lock_t());
  197. /* Thread was already halted, or connection is broken */
  198. if (!m_bThreadRunning)
  199. goto exit;
  200. /* Let the thread exit during its busy looping */
  201. biglock.lock();
  202. m_bThreadExit = TRUE;
  203. if (m_lpTransport) {
  204. /* Get another transport so we can tell the server to end the session. We
  205. * can't use our own m_lpTransport since it is probably in a blocking getNextNotify()
  206. * call. Seems like a bit of a shame to open a new connection, but there's no
  207. * other option */
  208. hr = m_lpTransport->HrClone(&~lpTransport);
  209. if (hr != hrSuccess) {
  210. biglock.unlock();
  211. goto exit;
  212. }
  213. lpTransport->HrLogOff();
  214. /* Cancel any pending IO if the network transport is down, causing the logoff to fail */
  215. m_lpTransport->HrCancelIO();
  216. }
  217. biglock.unlock();
  218. if (pthread_join(m_hThread, NULL) != 0)
  219. TRACE_NOTIFY(TRACE_WARNING, "ECNotifyMaster::StopNotifyWatch", "Invalid thread join");
  220. m_bThreadRunning = FALSE;
  221. exit:
  222. TRACE_NOTIFY(TRACE_RETURN, "ECNotifyMaster::StopNotifyWatch", "hr=0x%08X", hr);
  223. return hr;
  224. }
  225. void* ECNotifyMaster::NotifyWatch(void *pTmpNotifyMaster)
  226. {
  227. TRACE_NOTIFY(TRACE_ENTRY, "NotifyWatch", "");
  228. auto pNotifyMaster = static_cast<ECNotifyMaster *>(pTmpNotifyMaster);
  229. assert(pNotifyMaster != NULL);
  230. HRESULT hr = hrSuccess;
  231. NOTIFYCONNECTIONMAP mapNotifications;
  232. notifyResponse notifications;
  233. bool bReconnect = false;
  234. /* Ignore SIGPIPE which may be caused by HrGetNotify writing to the closed socket */
  235. signal(SIGPIPE, SIG_IGN);
  236. while (!pNotifyMaster->m_bThreadExit) {
  237. memset(&notifications, 0, sizeof(notifications));
  238. if (pNotifyMaster->m_bThreadExit)
  239. goto exit;
  240. /* 'exitable' sleep before reconnect */
  241. if (bReconnect) {
  242. for (ULONG i = 10; i > 0; --i) {
  243. Sleep(100);
  244. if (pNotifyMaster->m_bThreadExit)
  245. goto exit;
  246. }
  247. }
  248. /*
  249. * Request notification (Blocking Call)
  250. */
  251. notificationArray *pNotifyArray = NULL;
  252. hr = pNotifyMaster->m_lpTransport->HrGetNotify(&pNotifyArray);
  253. if (static_cast<unsigned int>(hr) == KCWARN_CALL_KEEPALIVE) {
  254. if (bReconnect) {
  255. TRACE_NOTIFY(TRACE_WARNING, "NotifyWatch::Reconnection", "OK connection: %d", pNotifyMaster->m_ulConnection);
  256. bReconnect = false;
  257. }
  258. continue;
  259. } else if (hr == MAPI_E_NETWORK_ERROR) {
  260. bReconnect = true;
  261. TRACE_NOTIFY(TRACE_WARNING, "NotifyWatch::Reconnection", "for connection: %d", pNotifyMaster->m_ulConnection);
  262. continue;
  263. } else if (hr != hrSuccess) {
  264. /*
  265. * Session was killed by server, try to start a new login.
  266. * This is not a foolproof recovery because 3 things might have happened:
  267. * 1) WSTransport has been logged off during StopNotifyWatch().
  268. * 2) Notification Session on server has died
  269. * 3) SessionGroup on server has died
  270. * If (1) m_bThreadExit will be set to TRUE, which means this thread is no longer desired. No
  271. * need to make a big deal out of it.
  272. * If (2) it is not a disaster (but it is a bad situation), the simple logon should do the trick
  273. * of restoring the notification retreival for all sessions for this group. Some notifications
  274. * might have arrived later then we might want, but that shouldn't be a total loss (the notificataions
  275. * themselves will not have disappeared since they have been queued on the server).
  276. * If (3) the problem is that _all_ sessions attached to the server has died and we have lost some
  277. * notifications. The main issue however is that a new login for the notification session will not
  278. * reanimate the other sessions belonging to this group and neither can we inform all ECNotifyClients
  279. * that they now belong to a dead session. A new login is important however, when new sessions are
  280. * attached to this group we must ensure that they will get notifications as expected.
  281. */
  282. if (!pNotifyMaster->m_bThreadExit) {
  283. TRACE_NOTIFY(TRACE_WARNING, "NotifyWatch::End of session", "reconnect");
  284. while (pNotifyMaster->ConnectToSession() != hrSuccess &&
  285. !pNotifyMaster->m_bThreadExit)
  286. // On Windows, the ConnectToSession() takes a while .. the windows kernel does 3 connect tries
  287. // But on linux, this immediately returns a connection error when the server socket is closed
  288. // so we wait before we try again
  289. Sleep(1000);
  290. }
  291. if (pNotifyMaster->m_bThreadExit)
  292. goto exit;
  293. else {
  294. // We have a new session ID, notify reload
  295. scoped_rlock lock(pNotifyMaster->m_hMutex);
  296. for (auto ptr : pNotifyMaster->m_listNotifyClients)
  297. ptr->NotifyReload();
  298. continue;
  299. }
  300. }
  301. if (bReconnect) {
  302. TRACE_NOTIFY(TRACE_WARNING, "NotifyWatch::Reconnection", "OK connection: %d", pNotifyMaster->m_ulConnection);
  303. bReconnect = false;
  304. }
  305. /* This is when the connection is interupted */
  306. if (pNotifyArray == NULL)
  307. continue;
  308. TRACE_NOTIFY(TRACE_ENTRY, "NotifyWatch::GetNotify", "%d", pNotifyArray->__size);
  309. /*
  310. * Loop through all notifications and sort them by connection number
  311. * with these mappings we can later send all notifications per connection to the appropriate client.
  312. */
  313. for (gsoap_size_t item = 0; item < pNotifyArray->__size; ++item) {
  314. ULONG ulConnection = pNotifyArray->__ptr[item].ulConnection;
  315. // No need to do a find before an insert with a default object.
  316. auto iterNotifications =
  317. mapNotifications.insert(NOTIFYCONNECTIONMAP::value_type(ulConnection, NOTIFYLIST())).first;
  318. iterNotifications->second.push_back(&pNotifyArray->__ptr[item]);
  319. }
  320. for (const auto &p : mapNotifications) {
  321. /*
  322. * Check if we have a client registered for this connection
  323. * Be careful when locking this, Client->m_hMutex has priority over Master->m_hMutex
  324. * which means we should NEVER call a Client function while holding the Master->m_hMutex!
  325. */
  326. scoped_rlock lock(pNotifyMaster->m_hMutex);
  327. auto iterClient = pNotifyMaster->m_mapConnections.find(p.first);
  328. if (iterClient == pNotifyMaster->m_mapConnections.cend())
  329. continue;
  330. iterClient->second.Notify(p.first, p.second);
  331. /*
  332. * All access to map completed, mutex is unlocked (end
  333. * of scope), and send notification to client.
  334. */
  335. }
  336. /* We're done, clean the map for next round */
  337. mapNotifications.clear();
  338. /* Cleanup */
  339. if (pNotifyArray != NULL) {
  340. FreeNotificationArrayStruct(pNotifyArray, true);
  341. pNotifyArray = NULL;
  342. }
  343. }
  344. exit:
  345. TRACE_NOTIFY(TRACE_RETURN, "NotifyWatch", "");
  346. return NULL;
  347. }