AssetProcessorConnection.cpp 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634
  1. /*
  2. * Copyright (c) Contributors to the Open 3D Engine Project.
  3. * For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. *
  5. * SPDX-License-Identifier: Apache-2.0 OR MIT
  6. *
  7. */
  8. #include "FrameworkApplicationFixture.h"
  9. #include <atomic>
  10. #include <gtest/gtest.h>
  11. #include <AzCore/Casting/lossy_cast.h>
  12. #include <AzCore/std/parallel/binary_semaphore.h>
  13. #include <AzFramework/Network/AssetProcessorConnection.h>
  14. #include <AzFramework/Asset/AssetProcessorMessages.h>
  15. #include <AzFramework/Asset/AssetSystemComponent.h>
  16. // This is the type and payload sent from A to B
  17. const AZ::u32 ABType = 0x86;
  18. const char* ABPayload = "Hello World";
  19. const AZ::u32 ABPayloadSize = azlossy_caster(strlen(ABPayload));
  20. // This is the type sent from A to B with no payload
  21. const AZ::u32 ABNoPayloadType = 0x69;
  22. // This is the type and payload sent from B to A
  23. const AZ::u32 BAType = 0xffff0000;
  24. const char* BAPayload = "When in the Course of human events it becomes necessary for one people to dissolve the political bands which have connected them with another and to assume among the powers of the earth, the separate and equal station to which the Laws of Nature and of Nature's God entitle them, a decent respect to the opinions of mankind requires that they should declare the causes which impel them to the separation.";
  25. const AZ::u32 BAPayloadSize = azlossy_caster(strlen(BAPayload));
  26. // how long before tests fail when expecting a connection.
  27. // normally, connections to localhost happen immediately (microseconds), so this is just for when things
  28. // go wrong. In normal test runs, we'll be yielding and waiting very short
  29. // amounts of time (milliseconds) instead of the full 15 seconds.
  30. const int secondsMaxConnectionAttempt = 15;
  31. // the longest time it should be conceivable for a message to take to send.
  32. // most messages will arrive within microseconds, but if the machine is really busy it could take
  33. // a couple orders of magnitude longer. Nothing in these tests waits for this full duration
  34. // unless a test is failing, so the actual runtime of the tests should be milliseconds.
  35. const int millisecondsForSend = 5000;
  36. class APConnectionTest
  37. : public UnitTest::FrameworkApplicationFixture
  38. {
  39. protected:
  40. void SetUp() override
  41. {
  42. FrameworkApplicationFixture::SetUp();
  43. }
  44. void TearDown() override
  45. {
  46. FrameworkApplicationFixture::TearDown();
  47. }
  48. bool WaitForConnectionStateToBeEqual(AzFramework::AssetSystem::AssetProcessorConnection& connectionObject, AzFramework::SocketConnection::EConnectionState desired)
  49. {
  50. // The connection state must be copied to a local variable as once the condition
  51. // matches in the loop condition it could later not match in the return statement
  52. // as the state is being updated on another thread
  53. AzFramework::SocketConnection::EConnectionState connectionState;
  54. auto started = AZStd::chrono::steady_clock::now();
  55. for (connectionState = connectionObject.GetConnectionState(); connectionState != desired;
  56. connectionState = connectionObject.GetConnectionState())
  57. {
  58. auto seconds_passed = AZStd::chrono::duration_cast<AZStd::chrono::seconds>(AZStd::chrono::steady_clock::now() - started).count();
  59. if (seconds_passed > secondsMaxConnectionAttempt)
  60. {
  61. break;
  62. }
  63. AZStd::this_thread::yield();
  64. }
  65. return connectionState == desired;
  66. }
  67. bool WaitForConnectionStateToNotBeEqual(AzFramework::AssetSystem::AssetProcessorConnection& connectionObject, AzFramework::SocketConnection::EConnectionState notDesired)
  68. {
  69. // The connection state must be copied to a local variable as once the condition
  70. // matches in the loop condition it could later not match in the return statement
  71. // as the state is being updated on another thread
  72. AzFramework::SocketConnection::EConnectionState connectionState;
  73. auto started = AZStd::chrono::steady_clock::now();
  74. for (connectionState = connectionObject.GetConnectionState(); connectionState == notDesired;
  75. connectionState = connectionObject.GetConnectionState())
  76. {
  77. auto seconds_passed = AZStd::chrono::duration_cast<AZStd::chrono::seconds>(AZStd::chrono::steady_clock::now() - started).count();
  78. if (seconds_passed > secondsMaxConnectionAttempt)
  79. {
  80. break;
  81. }
  82. AZStd::this_thread::yield();
  83. }
  84. return connectionState != notDesired;
  85. }
  86. };
  87. TEST_F(APConnectionTest, TestAddRemoveCallbacks)
  88. {
  89. using namespace AzFramework;
  90. // This is connection A
  91. AssetSystem::AssetProcessorConnection apConnection;
  92. apConnection.m_unitTesting = true;
  93. // This is connection B
  94. AssetSystem::AssetProcessorConnection apListener;
  95. apListener.m_unitTesting = true;
  96. std::atomic_uint BAMessageCallbackCount;
  97. BAMessageCallbackCount = 0;
  98. std::atomic_uint ABMessageCallbackCount;
  99. ABMessageCallbackCount = 0;
  100. AZStd::binary_semaphore messageArrivedSemaphore;
  101. // once we disconnect, we'll set this atomic to ensure no message arrives after disconnection
  102. AZStd::atomic_bool failIfMessageArrivesAB = {false};
  103. AZStd::atomic_bool failIfMessageArrivesBA = {false};
  104. // Connection A is expecting the above type and payload from B, therefore it is B->A, BA
  105. auto BACallbackHandle = apConnection.AddMessageHandler(BAType, [&](AZ::u32 typeId, AZ::u32 /*serial*/, const void* data, AZ::u32 dataLength) -> void
  106. {
  107. EXPECT_FALSE(failIfMessageArrivesBA.load());
  108. EXPECT_EQ(typeId, BAType);
  109. EXPECT_EQ(BAPayloadSize, dataLength);
  110. EXPECT_TRUE(!strncmp(reinterpret_cast<const char*>(data), BAPayload, dataLength));
  111. ++BAMessageCallbackCount;
  112. messageArrivedSemaphore.release();
  113. });
  114. // Connection B is expecting the above type and payload from A, therefore it is A->B, AB
  115. auto ABCallbackHandle = apListener.AddMessageHandler(ABType, [&](AZ::u32 typeId, AZ::u32 /*serial*/, const void* data, AZ::u32 dataLength) -> void
  116. {
  117. EXPECT_FALSE(failIfMessageArrivesAB.load());
  118. EXPECT_EQ(typeId, ABType);
  119. EXPECT_EQ(ABPayloadSize, dataLength);
  120. EXPECT_TRUE(!strncmp(reinterpret_cast<const char*>(data), ABPayload, dataLength));
  121. ++ABMessageCallbackCount;
  122. messageArrivedSemaphore.release();
  123. });
  124. // Test listening
  125. EXPECT_EQ(apListener.GetConnectionState(), SocketConnection::EConnectionState::Disconnected);
  126. bool listenResult = apListener.Listen(11112);
  127. EXPECT_TRUE(listenResult);
  128. // Wait some time for the connection to start listening, since it doesn't actually call listen() immediately.
  129. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apListener, SocketConnection::EConnectionState::Listening));
  130. EXPECT_EQ(apListener.GetConnectionState(), SocketConnection::EConnectionState::Listening);
  131. // Test connect success
  132. EXPECT_EQ(apConnection.GetConnectionState(), SocketConnection::EConnectionState::Disconnected);
  133. // This is blocking, should connect
  134. bool connectResult = apConnection.Connect("127.0.0.1", 11112);
  135. EXPECT_TRUE(connectResult);
  136. // Wait some time for the connection to negotiate, only after negotiation succeeds is it actually considered connected,,
  137. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apConnection, SocketConnection::EConnectionState::Connected));
  138. // Check listener for success - by this time the listener should also be considered connected.
  139. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apListener, SocketConnection::EConnectionState::Connected));
  140. //
  141. // Send first set, ensure we got 1 each
  142. //
  143. // Send message from A to B
  144. apConnection.SendMsg(ABType, ABPayload, ABPayloadSize);
  145. // Wait some time to allow message to send
  146. messageArrivedSemaphore.try_acquire_for(AZStd::chrono::milliseconds(millisecondsForSend));
  147. EXPECT_EQ(ABMessageCallbackCount, 1);
  148. // Send message from B to A
  149. apListener.SendMsg(BAType, BAPayload, BAPayloadSize);
  150. messageArrivedSemaphore.try_acquire_for(AZStd::chrono::milliseconds(millisecondsForSend));
  151. EXPECT_EQ(BAMessageCallbackCount, 1);
  152. //
  153. // Send second set, ensure we got 2 each (didn't auto-remove or anything crazy)
  154. //
  155. // Send message from A to B
  156. apConnection.SendMsg(ABType, ABPayload, ABPayloadSize);
  157. // Wait some time to allow message to send
  158. messageArrivedSemaphore.try_acquire_for(AZStd::chrono::milliseconds(millisecondsForSend));
  159. EXPECT_EQ(ABMessageCallbackCount, 2);
  160. // Send message from B to A
  161. apListener.SendMsg(BAType, BAPayload, BAPayloadSize);
  162. messageArrivedSemaphore.try_acquire_for(AZStd::chrono::milliseconds(millisecondsForSend));
  163. EXPECT_EQ(BAMessageCallbackCount, 2);
  164. // Remove callbacks
  165. // after removing a listener, we expect no further messages to arrive.
  166. apConnection.RemoveMessageHandler(BAType, BACallbackHandle);
  167. failIfMessageArrivesBA = true;
  168. apListener.RemoveMessageHandler(ABType, ABCallbackHandle);
  169. failIfMessageArrivesAB = true;
  170. // the below 2 lines send a message while nobody is connected as a listener.
  171. // it may not fail immediately but will cause a cascade later, which is better than
  172. // waiting for some large timeout in the test.
  173. apConnection.SendMsg(ABType, ABPayload, ABPayloadSize);
  174. apListener.SendMsg(BAType, BAPayload, BAPayloadSize);
  175. // Disconnect A
  176. // which flushes and will cause any traps to spring.
  177. bool disconnectResult = apConnection.Disconnect(true);
  178. EXPECT_TRUE(disconnectResult);
  179. // Disconnect B
  180. disconnectResult = apListener.Disconnect(true);
  181. EXPECT_TRUE(disconnectResult);
  182. // Verify A and B are disconnected
  183. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apListener, SocketConnection::EConnectionState::Disconnected));
  184. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apConnection, SocketConnection::EConnectionState::Disconnected));
  185. }
  186. TEST_F(APConnectionTest, TestAddRemoveCallbacks_RemoveDuringCallback_DoesNotCrash)
  187. {
  188. using namespace AzFramework;
  189. // This is connection A
  190. AssetSystem::AssetProcessorConnection apConnection;
  191. apConnection.m_unitTesting = true;
  192. // This is connection B
  193. AssetSystem::AssetProcessorConnection apListener;
  194. apListener.m_unitTesting = true;
  195. std::atomic_uint BAMessageCallbackCount;
  196. BAMessageCallbackCount = 0;
  197. std::atomic_uint ABMessageCallbackCount;
  198. ABMessageCallbackCount = 0;
  199. AZStd::binary_semaphore messageArrivedSemaphore;
  200. // establish connection
  201. EXPECT_TRUE(apListener.Listen(11112));
  202. EXPECT_TRUE(apConnection.Connect("127.0.0.1", 11112));
  203. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apConnection, SocketConnection::EConnectionState::Connected));
  204. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apListener, SocketConnection::EConnectionState::Connected));
  205. //
  206. // Now try adding listeners that remove themselves during callback
  207. //
  208. // Connection A is expecting the above type and payload from B, therefore it is B->A, BA
  209. // we set a trap here - after we first get this message, we are removing the handler
  210. // so that it should not ever fire again, and we assert that its false.
  211. AZStd::atomic_bool failIfWeGetCalledAgainBA = {false};
  212. SocketConnection::TMessageCallbackHandle SelfRemovingBACallbackHandle = SocketConnection::s_invalidCallbackHandle;
  213. SelfRemovingBACallbackHandle = apConnection.AddMessageHandler(BAType, [&](AZ::u32 typeId, AZ::u32 /*serial*/, const void* data, AZ::u32 dataLength) -> void
  214. {
  215. EXPECT_FALSE(failIfWeGetCalledAgainBA.load());
  216. EXPECT_EQ(typeId, BAType);
  217. EXPECT_EQ(BAPayloadSize, dataLength);
  218. EXPECT_TRUE(!strncmp(reinterpret_cast<const char*>(data), BAPayload, dataLength));
  219. ++BAMessageCallbackCount;
  220. apConnection.RemoveMessageHandler(BAType, SelfRemovingBACallbackHandle);
  221. failIfWeGetCalledAgainBA = true;
  222. messageArrivedSemaphore.release();
  223. });
  224. // Connection B is expecting the above type and payload from A, therefore it is A->B, AB
  225. AZStd::atomic_bool failIfWeGetCalledAgainAB = {false};
  226. SocketConnection::TMessageCallbackHandle SelfRemovingABCallbackHandle = SocketConnection::s_invalidCallbackHandle;
  227. SelfRemovingABCallbackHandle = apListener.AddMessageHandler(ABType, [&](AZ::u32 typeId, AZ::u32 /*serial*/, const void* data, AZ::u32 dataLength) -> void
  228. {
  229. EXPECT_FALSE(failIfWeGetCalledAgainAB.load());
  230. EXPECT_EQ(typeId, ABType);
  231. EXPECT_EQ(ABPayloadSize, dataLength);
  232. EXPECT_TRUE(!strncmp(reinterpret_cast<const char*>(data), ABPayload, dataLength));
  233. ++ABMessageCallbackCount;
  234. apListener.RemoveMessageHandler(ABType, SelfRemovingABCallbackHandle);
  235. failIfWeGetCalledAgainAB = true;
  236. messageArrivedSemaphore.release();
  237. });
  238. // Send message, should be at 1 each
  239. // Send message from A to B
  240. apConnection.SendMsg(ABType, ABPayload, ABPayloadSize);
  241. // Wait some time to allow message to send
  242. messageArrivedSemaphore.try_acquire_for(AZStd::chrono::milliseconds(millisecondsForSend));
  243. EXPECT_EQ(ABMessageCallbackCount, 1);
  244. // Send message from B to A
  245. apListener.SendMsg(BAType, BAPayload, BAPayloadSize);
  246. messageArrivedSemaphore.try_acquire_for(AZStd::chrono::milliseconds(millisecondsForSend));
  247. EXPECT_EQ(BAMessageCallbackCount, 1);
  248. // the callback has disconnected, so sending additional messages should NOT result in the callback
  249. // being called.
  250. // we send some additional messages, as a "trap", if the callbacks fire, then the
  251. // above callback functions will trigger their asserts.
  252. apConnection.SendMsg(ABType, ABPayload, ABPayloadSize);
  253. apListener.SendMsg(BAType, BAPayload, BAPayloadSize);
  254. // disconnect fully, which flushes sender queue and reciever queue and will cause any traps to spring!
  255. EXPECT_TRUE(apConnection.Disconnect(true));
  256. EXPECT_TRUE(apListener.Disconnect(true));
  257. // Verify A and B are disconnected
  258. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apListener, SocketConnection::EConnectionState::Disconnected));
  259. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apConnection, SocketConnection::EConnectionState::Disconnected));
  260. }
  261. TEST_F(APConnectionTest, TestAddRemoveCallbacks_AddDuringCallback_DoesNotCrash)
  262. {
  263. using namespace AzFramework;
  264. // This is connection A
  265. AssetSystem::AssetProcessorConnection apConnection;
  266. apConnection.m_unitTesting = true;
  267. // This is connection B
  268. AssetSystem::AssetProcessorConnection apListener;
  269. apListener.m_unitTesting = true;
  270. std::atomic_uint BAMessageCallbackCount;
  271. BAMessageCallbackCount = 0;
  272. std::atomic_uint ABMessageCallbackCount;
  273. ABMessageCallbackCount = 0;
  274. AZStd::binary_semaphore messageArrivedSemaphore;
  275. // establish connection
  276. EXPECT_TRUE(apListener.Listen(11112));
  277. EXPECT_TRUE(apConnection.Connect("127.0.0.1", 11112));
  278. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apConnection, SocketConnection::EConnectionState::Connected));
  279. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apListener, SocketConnection::EConnectionState::Connected));
  280. //
  281. // Now try adding listeners that add more listeners during callback
  282. //
  283. // Connection A is expecting the above type and payload from B, therefore it is B->A, BA
  284. SocketConnection::TMessageCallbackHandle SecondAddedBACallbackHandle = SocketConnection::s_invalidCallbackHandle;
  285. SocketConnection::TMessageCallbackHandle AddingBACallbackHandle = SocketConnection::s_invalidCallbackHandle;
  286. // set some traps so that if things call more than once, its a failure:
  287. AZStd::atomic_bool AddingBACallbackFailIfCalledAgain = {false};
  288. AZStd::atomic_bool AddingBACallbackFailIfCalledAgain_inner = {false};
  289. AddingBACallbackHandle = apConnection.AddMessageHandler(BAType, [&](AZ::u32 typeId, AZ::u32 /*serial*/, const void* data, AZ::u32 dataLength) -> void
  290. {
  291. EXPECT_FALSE(AddingBACallbackFailIfCalledAgain.load());
  292. EXPECT_EQ(typeId, BAType);
  293. EXPECT_EQ(BAPayloadSize, dataLength);
  294. EXPECT_TRUE(!strncmp(reinterpret_cast<const char*>(data), BAPayload, dataLength));
  295. ++BAMessageCallbackCount;
  296. apConnection.RemoveMessageHandler(BAType, AddingBACallbackHandle);
  297. AddingBACallbackFailIfCalledAgain = true;
  298. SecondAddedBACallbackHandle = apConnection.AddMessageHandler(BAType, [&](AZ::u32 typeId, AZ::u32 /*serial*/, const void* data, AZ::u32 dataLength) -> void
  299. {
  300. EXPECT_FALSE(AddingBACallbackFailIfCalledAgain_inner.load());
  301. EXPECT_EQ(typeId, BAType);
  302. EXPECT_EQ(BAPayloadSize, dataLength);
  303. EXPECT_TRUE(!strncmp(reinterpret_cast<const char*>(data), BAPayload, dataLength));
  304. ++BAMessageCallbackCount;
  305. apConnection.RemoveMessageHandler(BAType, SecondAddedBACallbackHandle);
  306. AddingBACallbackFailIfCalledAgain_inner = true;
  307. messageArrivedSemaphore.release();
  308. });
  309. messageArrivedSemaphore.release();
  310. });
  311. // Connection B is expecting the above type and payload from A, therefore it is A->B, AB
  312. AZStd::atomic_bool AddingABCallbackFailIfCalledAgain = {false};
  313. AZStd::atomic_bool AddingABCallbackFailIfCalledAgain_inner = {false};
  314. SocketConnection::TMessageCallbackHandle SecondAddedABCallbackHandle = SocketConnection::s_invalidCallbackHandle;
  315. SocketConnection::TMessageCallbackHandle AddingABCallbackHandle = SocketConnection::s_invalidCallbackHandle;
  316. AddingABCallbackHandle = apListener.AddMessageHandler(ABType, [&](AZ::u32 typeId, AZ::u32 /*serial*/, const void* data, AZ::u32 dataLength) -> void
  317. {
  318. EXPECT_FALSE(AddingABCallbackFailIfCalledAgain.load());
  319. EXPECT_EQ(typeId, ABType);
  320. EXPECT_EQ(ABPayloadSize, dataLength);
  321. EXPECT_TRUE(!strncmp(reinterpret_cast<const char*>(data), ABPayload, dataLength));
  322. ++ABMessageCallbackCount;
  323. apListener.RemoveMessageHandler(ABType, AddingABCallbackHandle);
  324. AddingABCallbackFailIfCalledAgain = true;
  325. messageArrivedSemaphore.release();
  326. SecondAddedABCallbackHandle = apListener.AddMessageHandler(ABType, [&](AZ::u32 typeId, AZ::u32 /*serial*/, const void* data, AZ::u32 dataLength) -> void
  327. {
  328. EXPECT_FALSE(AddingABCallbackFailIfCalledAgain_inner.load());
  329. EXPECT_EQ(typeId, ABType);
  330. EXPECT_EQ(ABPayloadSize, dataLength);
  331. EXPECT_TRUE(!strncmp(reinterpret_cast<const char*>(data), ABPayload, dataLength));
  332. ++ABMessageCallbackCount;
  333. apListener.RemoveMessageHandler(ABType, SecondAddedABCallbackHandle);
  334. AddingABCallbackFailIfCalledAgain_inner = true;
  335. messageArrivedSemaphore.release();
  336. });
  337. });
  338. // Send message, should be at 1 each
  339. // Send message from A to B
  340. apConnection.SendMsg(ABType, ABPayload, ABPayloadSize);
  341. // Wait some time to allow message to send
  342. messageArrivedSemaphore.try_acquire_for(AZStd::chrono::milliseconds(millisecondsForSend));
  343. EXPECT_EQ(ABMessageCallbackCount, 1);
  344. // Send message from B to A
  345. apListener.SendMsg(BAType, BAPayload, BAPayloadSize);
  346. messageArrivedSemaphore.try_acquire_for(AZStd::chrono::milliseconds(millisecondsForSend));
  347. EXPECT_EQ(BAMessageCallbackCount, 1);
  348. // Send message, should be at 2 each since the handlers
  349. // have been replaced with the ones created in the callback
  350. // Send message from A to B
  351. apConnection.SendMsg(ABType, ABPayload, ABPayloadSize);
  352. // Wait some time to allow message to send
  353. messageArrivedSemaphore.try_acquire_for(AZStd::chrono::milliseconds(millisecondsForSend));
  354. EXPECT_EQ(ABMessageCallbackCount, 2);
  355. // Send message from B to A
  356. apListener.SendMsg(BAType, BAPayload, BAPayloadSize);
  357. messageArrivedSemaphore.try_acquire_for(AZStd::chrono::milliseconds(millisecondsForSend));
  358. EXPECT_EQ(BAMessageCallbackCount, 2);
  359. // Send message, we don't wait for these as our listeners should be disconnected, but there
  360. // are traps set to make sure they dont call.
  361. // since we flush on disconnect, these traps will activate.
  362. apConnection.SendMsg(ABType, ABPayload, ABPayloadSize);
  363. apListener.SendMsg(BAType, BAPayload, BAPayloadSize);
  364. // Disconnect A
  365. // which flushes and will cause any traps to spring.
  366. bool disconnectResult = apConnection.Disconnect(true);
  367. EXPECT_TRUE(disconnectResult);
  368. // Disconnect B
  369. disconnectResult = apListener.Disconnect(true);
  370. EXPECT_TRUE(disconnectResult);
  371. // Verify A and B are disconnected
  372. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apListener, SocketConnection::EConnectionState::Disconnected));
  373. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apConnection, SocketConnection::EConnectionState::Disconnected));
  374. }
  375. TEST_F(APConnectionTest, TestConnection)
  376. {
  377. using namespace AzFramework;
  378. // This is connection A
  379. AssetSystem::AssetProcessorConnection apConnection;
  380. apConnection.m_unitTesting = true;
  381. // This is connection B
  382. AssetSystem::AssetProcessorConnection apListener;
  383. apListener.m_unitTesting = true;
  384. bool ABMessageSuccess = false;
  385. bool ABNoPayloadMessageSuccess = false;
  386. bool BAMessageSuccess = false;
  387. AZStd::binary_semaphore messageArrivedSemaphore;
  388. // Connection A is expecting the above type and payload from B, therefore it is B->A, BA
  389. apConnection.AddMessageHandler(BAType, [&](AZ::u32 typeId, AZ::u32 /*serial*/, const void* data, AZ::u32 dataLength) -> void
  390. {
  391. EXPECT_EQ(typeId, BAType);
  392. EXPECT_EQ(BAPayloadSize, dataLength);
  393. EXPECT_TRUE(!strncmp(reinterpret_cast<const char*>(data), BAPayload, dataLength));
  394. BAMessageSuccess = true;
  395. messageArrivedSemaphore.release();
  396. });
  397. // Connection B is expecting the above type and payload from A, therefore it is A->B, AB
  398. apListener.AddMessageHandler(ABType, [&](AZ::u32 typeId, AZ::u32 /*serial*/, const void* data, AZ::u32 dataLength) -> void
  399. {
  400. EXPECT_EQ(typeId, ABType);
  401. EXPECT_EQ(ABPayloadSize, dataLength);
  402. EXPECT_TRUE(!strncmp(reinterpret_cast<const char*>(data), ABPayload, dataLength));
  403. ABMessageSuccess = true;
  404. messageArrivedSemaphore.release();
  405. });
  406. // Connection B is expecting the above type and no payload from A, therefore it is A->B, AB
  407. apListener.AddMessageHandler(ABNoPayloadType, [&](AZ::u32 typeId, AZ::u32 /*serial*/, const void* /*data*/, AZ::u32 dataLength) -> void
  408. {
  409. EXPECT_EQ(typeId, ABNoPayloadType);
  410. EXPECT_EQ(dataLength, 0);
  411. ABNoPayloadMessageSuccess = true;
  412. messageArrivedSemaphore.release();
  413. });
  414. // Test connection coming online first
  415. EXPECT_TRUE(apConnection.GetConnectionState() == SocketConnection::EConnectionState::Disconnected);
  416. bool connectResult = apConnection.Connect("127.0.0.1", 11120);
  417. EXPECT_TRUE(connectResult);
  418. // during the connect/disconnect/reconnect loop, the status of the connection rapidly oscillates
  419. // between "connecting" and "disconnecting" as it tries, fails, and sets up to try again.
  420. // Since the connection attempt starts as disconnected (checked before the connect), here we will
  421. // check that it transitions to a state different than disconnected (connecting/disconnecting) and then
  422. // it gets back to disconnected once the attempts are exhausted
  423. EXPECT_TRUE(WaitForConnectionStateToNotBeEqual(apConnection, SocketConnection::EConnectionState::Disconnected));
  424. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apConnection, SocketConnection::EConnectionState::Disconnected));
  425. // Test listening on separate port. This is NOT the port that the connecting one is trying to reach.
  426. EXPECT_TRUE(apListener.GetConnectionState() == SocketConnection::EConnectionState::Disconnected);
  427. EXPECT_TRUE(apListener.Listen(54321));
  428. // we should end up listening, not connected.
  429. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apListener, SocketConnection::EConnectionState::Listening));
  430. // since we listened on the 'wrong' port, we should not see a successful connection:
  431. EXPECT_NE(apConnection.GetConnectionState(), SocketConnection::EConnectionState::Connected);
  432. // Disconnect listener from wrong port
  433. EXPECT_TRUE(apListener.Disconnect());
  434. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apListener, SocketConnection::EConnectionState::Disconnected));
  435. // Listen with correct port
  436. EXPECT_TRUE(apListener.Listen(11120));
  437. // Wait some time for apConnection to connect (it has to finish negotiation)
  438. // Also the listener needs to tick and connect
  439. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apConnection, SocketConnection::EConnectionState::Connected));
  440. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apListener, SocketConnection::EConnectionState::Connected));
  441. // Send message from A to B
  442. apConnection.SendMsg(ABType, ABPayload, ABPayloadSize);
  443. // Wait some time to allow message to send
  444. messageArrivedSemaphore.try_acquire_for(AZStd::chrono::milliseconds(millisecondsForSend));
  445. EXPECT_TRUE(ABMessageSuccess);
  446. // Send message from B to A
  447. apListener.SendMsg(BAType, BAPayload, BAPayloadSize);
  448. messageArrivedSemaphore.try_acquire_for(AZStd::chrono::milliseconds(millisecondsForSend));
  449. EXPECT_TRUE(BAMessageSuccess);
  450. // Send no payload message from A to B
  451. apConnection.SendMsg(ABNoPayloadType, nullptr, 0);
  452. messageArrivedSemaphore.try_acquire_for(AZStd::chrono::milliseconds(millisecondsForSend));
  453. EXPECT_TRUE(ABNoPayloadMessageSuccess);
  454. EXPECT_TRUE(apConnection.Disconnect(true));
  455. EXPECT_TRUE(apListener.Disconnect(true));
  456. // Verify they've disconnected
  457. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apConnection, SocketConnection::EConnectionState::Disconnected));
  458. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apListener, SocketConnection::EConnectionState::Disconnected));
  459. }
  460. TEST_F(APConnectionTest, TestReconnect)
  461. {
  462. using namespace AzFramework;
  463. // This is connection A
  464. AssetSystem::AssetProcessorConnection apConnection;
  465. apConnection.m_unitTesting = true;
  466. // This is connection B
  467. AssetSystem::AssetProcessorConnection apListener;
  468. apListener.m_unitTesting = true;
  469. // Test listening - listen takes a moment to actually start listening:
  470. EXPECT_TRUE(apListener.GetConnectionState() == SocketConnection::EConnectionState::Disconnected);
  471. bool listenResult = apListener.Listen(11120);
  472. EXPECT_TRUE(listenResult);
  473. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apListener, SocketConnection::EConnectionState::Listening));
  474. // Test connect success
  475. EXPECT_TRUE(apConnection.GetConnectionState() == SocketConnection::EConnectionState::Disconnected);
  476. bool connectResult = apConnection.Connect("127.0.0.1", 11120);
  477. EXPECT_TRUE(connectResult);
  478. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apConnection, SocketConnection::EConnectionState::Connected));
  479. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apListener, SocketConnection::EConnectionState::Connected));
  480. // Disconnect B
  481. bool disconnectResult = apListener.Disconnect();
  482. EXPECT_TRUE(disconnectResult);
  483. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apListener, SocketConnection::EConnectionState::Disconnected));
  484. // disconncting the listener should kick out the other end:
  485. // note that the listener was the ONLY one we told to disconnect
  486. // the other end (the apConnection) is likely to be in a retry state - so it wont be connected, but it also won't necessarily
  487. // be disconnected, connecting, etc.
  488. EXPECT_TRUE(WaitForConnectionStateToNotBeEqual(apConnection, SocketConnection::EConnectionState::Connected));
  489. // start listening again
  490. listenResult = apListener.Listen(11120);
  491. EXPECT_TRUE(listenResult);
  492. // once we start listening, the ap connection should autoconnect very shortly:
  493. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apConnection, SocketConnection::EConnectionState::Connected));
  494. // at that point, both sides should consider themselves cyonnected (the listener, too)
  495. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apListener, SocketConnection::EConnectionState::Connected));
  496. // now disconnect A without waiting for it to finish
  497. disconnectResult = apConnection.Disconnect();
  498. EXPECT_TRUE(disconnectResult);
  499. // wait for it to finish
  500. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apConnection, SocketConnection::EConnectionState::Disconnected));
  501. // ensure that B rebinds and starts listening again
  502. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apListener, SocketConnection::EConnectionState::Listening));
  503. // reconnect manually from A -> B
  504. connectResult = apConnection.Connect("127.0.0.1", 11120);
  505. EXPECT_TRUE(connectResult);
  506. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apConnection, SocketConnection::EConnectionState::Connected));
  507. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apListener, SocketConnection::EConnectionState::Connected));
  508. // disconnect everything, starting with B to ensure that reconnect thread exits on disconnect
  509. disconnectResult = apListener.Disconnect();
  510. EXPECT_TRUE(disconnectResult);
  511. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apListener, SocketConnection::EConnectionState::Disconnected));
  512. // note that the listener was the ONLY one we told to disconnect!
  513. // the other end (the apConnection) is likely to be in a retry state (ie, one of the states that is not connected)
  514. EXPECT_TRUE(WaitForConnectionStateToNotBeEqual(apConnection, SocketConnection::EConnectionState::Connected));
  515. // disconnect A
  516. disconnectResult = apConnection.Disconnect(true); // we're not going to wait, so we do a final disconnect here (true)
  517. EXPECT_TRUE(disconnectResult);
  518. EXPECT_TRUE(WaitForConnectionStateToBeEqual(apConnection, SocketConnection::EConnectionState::Disconnected));
  519. }