connectionworker.cpp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483
  1. /*
  2. * Copyright (c) Contributors to the Open 3D Engine Project.
  3. * For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. *
  5. * SPDX-License-Identifier: Apache-2.0 OR MIT
  6. *
  7. */
  8. #include "connectionworker.h"
  9. #include "native/utilities/assetUtils.h"
  10. #include <native/utilities/ByteArrayStream.h>
  11. #include <QThread>
  12. #include <QTimer>
  13. #include <QCoreApplication>
  14. #include <QThread>
  15. #include <AzFramework/API/ApplicationAPI.h>
  16. // enable this to debug negotiation - it enables a huge delay so that when a debugger attaches we don't fail.
  17. //#define DEBUG_NEGOTIATION
  18. #undef SendMessage
  19. namespace AssetProcessor {
  20. ConnectionWorker::ConnectionWorker(qintptr /*socketDescriptor*/, QObject* parent)
  21. : QObject(parent)
  22. , m_terminate(false)
  23. {
  24. #ifdef DEBUG_NEGOTIATION
  25. m_waitDelay = 60 * 10 * 1000; // 10 min in debug, in ms
  26. #endif
  27. connect(&m_engineSocket, &QTcpSocket::stateChanged, this, &ConnectionWorker::EngineSocketStateChanged, Qt::QueuedConnection);
  28. #if defined(DEBUG_NEGOTIATION)
  29. AZ_TracePrintf(AssetProcessor::DebugChannel, "Connection::ConnectionWorker created for socket %p: %p", socketDescriptor, this);
  30. #endif
  31. }
  32. ConnectionWorker::~ConnectionWorker()
  33. {
  34. #if defined(DEBUG_NEGOTIATION)
  35. AZ_TracePrintf(AssetProcessor::DebugChannel, "ConnectionWorker::~: %p", this);
  36. #endif
  37. thread()->quit();
  38. }
  39. bool ConnectionWorker::ReadMessage(QTcpSocket& socket, AssetProcessor::Message& message)
  40. {
  41. const qint64 sizeOfHeader = static_cast<qint64>(sizeof(AssetProcessor::MessageHeader));
  42. qint64 bytesAvailable = socket.bytesAvailable();
  43. if (bytesAvailable == 0 || bytesAvailable < sizeOfHeader)
  44. {
  45. return false;
  46. }
  47. // read header
  48. if (!ReadData(socket, (char*)&message.header, sizeOfHeader))
  49. {
  50. DisconnectSockets();
  51. return false;
  52. }
  53. // Prepare the payload buffer
  54. message.payload.resize(message.header.size);
  55. // read payload
  56. if (!ReadData(socket, message.payload.data(), message.header.size))
  57. {
  58. DisconnectSockets();
  59. return false;
  60. }
  61. return true;
  62. }
  63. bool ConnectionWorker::ReadData(QTcpSocket& socket, char* buffer, qint64 size)
  64. {
  65. qint64 bytesRemaining = size;
  66. while (bytesRemaining > 0)
  67. {
  68. // check first, or Qt will throw a warning if we try to do this on an already-disconnected-socket
  69. if (socket.state() != QAbstractSocket::ConnectedState)
  70. {
  71. return false;
  72. }
  73. qint64 bytesRead = socket.read(buffer, bytesRemaining);
  74. if (bytesRead == -1)
  75. {
  76. return false;
  77. }
  78. buffer += bytesRead;
  79. bytesRemaining -= bytesRead;
  80. if (bytesRemaining > 0)
  81. {
  82. socket.waitForReadyRead();
  83. }
  84. }
  85. return true;
  86. }
  87. bool ConnectionWorker::WriteMessage(QTcpSocket& socket, const AssetProcessor::Message& message)
  88. {
  89. const qint64 sizeOfHeader = static_cast<qint64>(sizeof(AssetProcessor::MessageHeader));
  90. AZ_Assert(message.header.size == aznumeric_cast<decltype(message.header.size)>(message.payload.size()), "Message header size does not match payload size");
  91. // Write header
  92. if (!WriteData(socket, (char*)&message.header, sizeOfHeader))
  93. {
  94. DisconnectSockets();
  95. return false;
  96. }
  97. // write payload
  98. if (!WriteData(socket, message.payload.data(), message.payload.size()))
  99. {
  100. DisconnectSockets();
  101. return false;
  102. }
  103. return true;
  104. }
  105. bool ConnectionWorker::WriteData(QTcpSocket& socket, const char* buffer, qint64 size)
  106. {
  107. qint64 bytesRemaining = size;
  108. while (bytesRemaining > 0)
  109. {
  110. // check first, or Qt will throw a warning if we try to do this on an already-disconnected-socket
  111. if (socket.state() != QAbstractSocket::ConnectedState)
  112. {
  113. return false;
  114. }
  115. qint64 bytesWritten = socket.write(buffer, bytesRemaining);
  116. if (bytesWritten == -1)
  117. {
  118. return false;
  119. }
  120. buffer += bytesWritten;
  121. bytesRemaining -= bytesWritten;
  122. }
  123. return true;
  124. }
  125. void ConnectionWorker::EngineSocketHasData()
  126. {
  127. if (m_terminate)
  128. {
  129. return;
  130. }
  131. while (m_engineSocket.bytesAvailable() > 0)
  132. {
  133. AssetProcessor::Message message;
  134. if (ReadMessage(m_engineSocket, message))
  135. {
  136. Q_EMIT ReceiveMessage(message.header.type, message.header.serial, message.payload);
  137. }
  138. else
  139. {
  140. break;
  141. }
  142. }
  143. }
  144. void ConnectionWorker::SendMessage(unsigned int type, unsigned int serial, QByteArray payload)
  145. {
  146. AssetProcessor::Message message;
  147. message.header.type = type;
  148. message.header.serial = serial;
  149. message.header.size = payload.size();
  150. message.payload = payload;
  151. WriteMessage(m_engineSocket, message);
  152. }
  153. namespace Detail
  154. {
  155. template <class N>
  156. bool WriteNegotiation(ConnectionWorker* worker, QTcpSocket& socket, const N& negotiation, unsigned int serial = AzFramework::AssetSystem::NEGOTIATION_SERIAL)
  157. {
  158. AssetProcessor::Message message;
  159. bool packed = AssetProcessor::PackMessage(negotiation, message.payload);
  160. if (packed)
  161. {
  162. message.header.type = negotiation.GetMessageType();
  163. message.header.serial = serial;
  164. message.header.size = message.payload.size();
  165. return worker->WriteMessage(socket, message);
  166. }
  167. return false;
  168. }
  169. template <class N>
  170. bool ReadNegotiation(ConnectionWorker* worker, int waitDelay, QTcpSocket& socket, N& negotiation, unsigned int* serial = nullptr)
  171. {
  172. if (socket.bytesAvailable() == 0)
  173. {
  174. socket.waitForReadyRead(waitDelay);
  175. }
  176. AssetProcessor::Message message;
  177. if (!worker->ReadMessage(socket, message))
  178. {
  179. return false;
  180. }
  181. if (serial)
  182. {
  183. *serial = message.header.serial;
  184. }
  185. return AssetProcessor::UnpackMessage(message.payload, negotiation);
  186. }
  187. }
  188. // Negotiation directly with a game or downstream AssetProcessor:
  189. // if the connection is initiated from this end:
  190. // 1) Send AP Info to downstream engine
  191. // 2) Get downstream engine info
  192. // if there is an incoming connection
  193. // 1) Get downstream engine info
  194. // 2) Send AP Info
  195. bool ConnectionWorker::NegotiateDirect(bool initiate)
  196. {
  197. #if defined(DEBUG_NEGOTIATION)
  198. AZ_TracePrintf(AssetProcessor::DebugChannel, "ConnectionWorker::NegotiateDirect: %p", this);
  199. #endif
  200. using Detail::ReadNegotiation;
  201. using Detail::WriteNegotiation;
  202. using namespace AzFramework::AssetSystem;
  203. AZStd::string azBranchToken;
  204. AzFramework::ApplicationRequests::Bus::Broadcast(&AzFramework::ApplicationRequests::CalculateBranchTokenForEngineRoot, azBranchToken);
  205. QString branchToken(azBranchToken.c_str());
  206. QString projectName = AssetUtilities::ComputeProjectName();
  207. NegotiationMessage myInfo;
  208. char processId[20];
  209. azsnprintf(processId, 20, "%lld", QCoreApplication::applicationPid());
  210. myInfo.m_identifier = "ASSETPROCESSOR";
  211. myInfo.m_negotiationInfoMap.insert(AZStd::make_pair(NegotiationInfo_ProcessId, AZ::OSString(processId)));
  212. myInfo.m_negotiationInfoMap.insert(AZStd::make_pair(NegotiationInfo_BranchIndentifier, AZ::OSString(azBranchToken.c_str())));
  213. myInfo.m_negotiationInfoMap.insert(AZStd::make_pair(NegotiationInfo_ProjectName, AZ::OSString(projectName.toUtf8().constData())));
  214. NegotiationMessage engineInfo;
  215. if (initiate)
  216. {
  217. if (!WriteNegotiation(this, m_engineSocket, myInfo))
  218. {
  219. Q_EMIT ErrorMessage("Unable to send negotiation message");
  220. QTimer::singleShot(0, this, SLOT(DisconnectSockets()));
  221. return false;
  222. }
  223. if (!ReadNegotiation(this, m_waitDelay, m_engineSocket, engineInfo))
  224. {
  225. Q_EMIT ErrorMessage("Unable to read negotiation message");
  226. QTimer::singleShot(0, this, SLOT(DisconnectSockets()));
  227. return false;
  228. }
  229. }
  230. else
  231. {
  232. unsigned int serial = 0;
  233. #if defined(DEBUG_NEGOTIATION)
  234. AZ_TracePrintf(AssetProcessor::DebugChannel, "ConnectionWorker::NegotiateDirect: Reading negotiation from engine socket %p", this);
  235. #endif
  236. if (!ReadNegotiation(this, m_waitDelay, m_engineSocket, engineInfo, &serial))
  237. {
  238. #if defined(DEBUG_NEGOTIATION)
  239. AZ_TracePrintf(AssetProcessor::DebugChannel, "ConnectionWorker::NegotiateDirect: no negotation arrived %p", this);
  240. #endif
  241. Q_EMIT ErrorMessage("Unable to read engine negotiation message");
  242. QTimer::singleShot(0, this, SLOT(DisconnectSockets()));
  243. return false;
  244. }
  245. #if defined(DEBUG_NEGOTIATION)
  246. AZ_TracePrintf(AssetProcessor::DebugChannel, "ConnectionWorker::NegotiateDirect: writing negotiation to engine socket %p", this);
  247. #endif
  248. if (!WriteNegotiation(this, m_engineSocket, myInfo, serial))
  249. {
  250. #if defined(DEBUG_NEGOTIATION)
  251. AZ_TracePrintf(AssetProcessor::DebugChannel, "ConnectionWorker::NegotiateDirect: no negotation sent %p", this);
  252. #endif
  253. Q_EMIT ErrorMessage("Unable to send negotiation message");
  254. QTimer::singleShot(0, this, SLOT(DisconnectSockets()));
  255. return false;
  256. }
  257. }
  258. // Skip the process Id validation during negotiation if the identifier is UNITTEST
  259. if (engineInfo.m_identifier != "UNITTEST")
  260. {
  261. if (strncmp(engineInfo.m_negotiationInfoMap[NegotiationInfo_ProcessId].c_str(), processId, strlen(processId)) == 0)
  262. {
  263. Q_EMIT ErrorMessage("Attempted to negotiate with self");
  264. QTimer::singleShot(0, this, SLOT(DisconnectSockets()));
  265. return false;
  266. }
  267. }
  268. if (engineInfo.m_apiVersion != myInfo.m_apiVersion)
  269. {
  270. Q_EMIT ErrorMessage("Negotiation Failed.Version Mismatch.");
  271. QTimer::singleShot(0, this, SLOT(DisconnectSockets()));
  272. return false;
  273. }
  274. QString incomingBranchToken(engineInfo.m_negotiationInfoMap[NegotiationInfo_BranchIndentifier].c_str());
  275. if (QString::compare(incomingBranchToken, branchToken, Qt::CaseInsensitive) != 0)
  276. {
  277. // if we are here it means that the editor/game which is negotiating is running on a different branch
  278. // note that it could have just read nothing from the engine or a repeat packet, in that case, discard it silently and try again.
  279. AZ_TracePrintf(AssetProcessor::ConsoleChannel, "ConnectionWorker::NegotiateDirect: branch token mismatch from %s - %p - %s vs %s\n", engineInfo.m_identifier.c_str(), this, incomingBranchToken.toUtf8().data(), branchToken.toUtf8().data());
  280. AssetProcessor::MessageInfoBus::Broadcast(&AssetProcessor::MessageInfoBus::Events::NegotiationFailed);
  281. QTimer::singleShot(0, this, SLOT(DisconnectSockets()));
  282. return false;
  283. }
  284. QString incomingProjectName(engineInfo.m_negotiationInfoMap[NegotiationInfo_ProjectName].c_str());
  285. // Do a case-insensitive compare for the project name because some (case-sensitive) platforms will lower-case the incoming project name
  286. if(QString::compare(incomingProjectName, projectName, Qt::CaseInsensitive) != 0)
  287. {
  288. AZ_TracePrintf(AssetProcessor::ConsoleChannel, "ConnectionWorker::NegotiateDirect: project name mismatch from %s - %p - %s vs %s\n", engineInfo.m_identifier.c_str(), this, incomingProjectName.toUtf8().constData(), projectName.toUtf8().constData());
  289. AssetProcessor::MessageInfoBus::Broadcast(&AssetProcessor::MessageInfoBus::Events::NegotiationFailed);
  290. QTimer::singleShot(0, this, SLOT(DisconnectSockets()));
  291. return false;
  292. }
  293. Q_EMIT Identifier(engineInfo.m_identifier.c_str());
  294. Q_EMIT AssetPlatformsString(engineInfo.m_negotiationInfoMap[NegotiationInfo_Platform].c_str());
  295. #if defined(DEBUG_NEGOTIATION)
  296. AZ_TracePrintf(AssetProcessor::DebugChannel, "ConnectionWorker::NegotiateDirect: negotation complete %p", this);
  297. #endif
  298. Q_EMIT ConnectionEstablished(m_engineSocket.peerAddress().toString(), m_engineSocket.peerPort());
  299. connect(&m_engineSocket, &QTcpSocket::readyRead, this, &ConnectionWorker::EngineSocketHasData);
  300. // force the socket to evaluate any data recv'd between negotiation and now
  301. QTimer::singleShot(0, this, SLOT(EngineSocketHasData()));
  302. return true;
  303. }
  304. // RequestTerminate can be called from anywhere, so we queue the actual
  305. // termination to ensure it happens in the worker's thread
  306. void ConnectionWorker::RequestTerminate()
  307. {
  308. if (!m_alreadySentTermination)
  309. {
  310. m_terminate = true;
  311. m_alreadySentTermination = true;
  312. QMetaObject::invokeMethod(this, "TerminateConnection", Qt::BlockingQueuedConnection);
  313. }
  314. }
  315. void ConnectionWorker::TerminateConnection()
  316. {
  317. disconnect(&m_engineSocket, &QTcpSocket::stateChanged, this, &ConnectionWorker::EngineSocketStateChanged);
  318. DisconnectSockets();
  319. deleteLater();
  320. }
  321. void ConnectionWorker::ConnectSocket(qintptr socketDescriptor)
  322. {
  323. AZ_Assert(socketDescriptor != -1, "ConectionWorker::ConnectSocket: Supplied socket is invalid");
  324. if (socketDescriptor != -1)
  325. {
  326. // calling setSocketDescriptor will cause it to invoke EngineSocketStateChanged instantly, which we don't want, so disconnect it temporarily.
  327. disconnect(&m_engineSocket, &QTcpSocket::stateChanged, this, &ConnectionWorker::EngineSocketStateChanged);
  328. m_engineSocket.setSocketDescriptor(socketDescriptor, QAbstractSocket::ConnectedState, QIODevice::ReadWrite);
  329. Q_EMIT IsAddressInAllowedList(m_engineSocket.peerAddress(), reinterpret_cast<void*>(this));
  330. }
  331. }
  332. void ConnectionWorker::AddressIsInAllowedList(void* token, bool result)
  333. {
  334. if (reinterpret_cast<void*>(this) == token)
  335. {
  336. if (result)
  337. {
  338. // this address has been approved, connect and proceed
  339. connect(&m_engineSocket, &QTcpSocket::stateChanged, this, &ConnectionWorker::EngineSocketStateChanged);
  340. EngineSocketStateChanged(QAbstractSocket::ConnectedState);
  341. }
  342. else
  343. {
  344. // this address has been rejected, disconnect immediately!!!
  345. AZ_TracePrintf(AssetProcessor::ConsoleChannel, " A connection attempt was ignored because it is not in the allowed list. Please consider adding allowed_list=(IP ADDRESS),localhost to the bootstrap.cfg");
  346. disconnect(&m_engineSocket, &QTcpSocket::readyRead, this, &ConnectionWorker::EngineSocketHasData);
  347. DisconnectSockets();
  348. }
  349. }
  350. }
  351. void ConnectionWorker::ConnectToEngine(QString ipAddress, quint16 port)
  352. {
  353. #if defined(DEBUG_NEGOTIATION)
  354. AZ_TracePrintf(AssetProcessor::DebugChannel, " ConnectionWorker::ConnectToEngine");
  355. #endif
  356. m_terminate = false;
  357. if (m_engineSocket.state() == QAbstractSocket::UnconnectedState)
  358. {
  359. m_initiatedConnection = true;
  360. m_engineSocket.connectToHost(ipAddress, port, QIODevice::ReadWrite);
  361. }
  362. }
  363. void ConnectionWorker::EngineSocketStateChanged(QAbstractSocket::SocketState socketState)
  364. {
  365. #if defined(DEBUG_NEGOTIATION)
  366. AZ_TracePrintf(AssetProcessor::DebugChannel, " ConnectionWorker::EngineSocketStateChanged to %i", (int)socketState);
  367. #endif
  368. if (m_terminate)
  369. {
  370. return;
  371. }
  372. if (socketState == QAbstractSocket::ConnectedState)
  373. {
  374. m_engineSocket.setSocketOption(QAbstractSocket::KeepAliveOption, 1);
  375. m_engineSocket.setSocketOption(QAbstractSocket::LowDelayOption, 1); //disable nagles algorithm
  376. m_engineSocketIsConnected = true;
  377. #if defined(DEBUG_NEGOTIATION)
  378. AZ_TracePrintf(AssetProcessor::DebugChannel, "ConnectionWorker::EngineSocketStateChanged: %p connected now (%s)", this, m_engineSocketIsConnected ? "True" : "False");
  379. #endif
  380. QMetaObject::invokeMethod(this, "NegotiateDirect", Qt::QueuedConnection, Q_ARG(bool, m_initiatedConnection));
  381. }
  382. else if (socketState == QAbstractSocket::UnconnectedState)
  383. {
  384. m_engineSocketIsConnected = false;
  385. #if defined(DEBUG_NEGOTIATION)
  386. AZ_TracePrintf(AssetProcessor::DebugChannel, "ConnectionWorker::EngineSocketStateChanged: %p unconnected, now (%s)", this, m_engineSocketIsConnected ? "True" : "False");
  387. #endif
  388. disconnect(&m_engineSocket, &QTcpSocket::readyRead, 0, 0);
  389. DisconnectSockets();
  390. }
  391. }
  392. void ConnectionWorker::DisconnectSockets()
  393. {
  394. #if defined(DEBUG_NEGOTIATION)
  395. AZ_TracePrintf(AssetProcessor::DebugChannel, " ConnectionWorker::DisconnectSockets");
  396. #endif
  397. m_engineSocket.abort();
  398. m_engineSocket.close();
  399. Q_EMIT ConnectionDisconnected();
  400. }
  401. void ConnectionWorker::Reset()
  402. {
  403. m_terminate = false;
  404. }
  405. bool ConnectionWorker::Terminate()
  406. {
  407. return m_terminate;
  408. }
  409. QTcpSocket& ConnectionWorker::GetSocket()
  410. {
  411. return m_engineSocket;
  412. }
  413. bool ConnectionWorker::InitiatedConnection() const
  414. {
  415. return m_initiatedConnection;
  416. }
  417. } // namespace AssetProcessor