123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483 |
- /*
- * Copyright (c) Contributors to the Open 3D Engine Project.
- * For complete copyright and license terms please see the LICENSE at the root of this distribution.
- *
- * SPDX-License-Identifier: Apache-2.0 OR MIT
- *
- */
- #include "connectionworker.h"
- #include "native/utilities/assetUtils.h"
- #include <native/utilities/ByteArrayStream.h>
- #include <QThread>
- #include <QTimer>
- #include <QCoreApplication>
- #include <QThread>
- #include <AzFramework/API/ApplicationAPI.h>
- // enable this to debug negotiation - it enables a huge delay so that when a debugger attaches we don't fail.
- //#define DEBUG_NEGOTIATION
- #undef SendMessage
- namespace AssetProcessor {
- ConnectionWorker::ConnectionWorker(qintptr /*socketDescriptor*/, QObject* parent)
- : QObject(parent)
- , m_terminate(false)
- {
- #ifdef DEBUG_NEGOTIATION
- m_waitDelay = 60 * 10 * 1000; // 10 min in debug, in ms
- #endif
- connect(&m_engineSocket, &QTcpSocket::stateChanged, this, &ConnectionWorker::EngineSocketStateChanged, Qt::QueuedConnection);
- #if defined(DEBUG_NEGOTIATION)
- AZ_TracePrintf(AssetProcessor::DebugChannel, "Connection::ConnectionWorker created for socket %p: %p", socketDescriptor, this);
- #endif
- }
- ConnectionWorker::~ConnectionWorker()
- {
- #if defined(DEBUG_NEGOTIATION)
- AZ_TracePrintf(AssetProcessor::DebugChannel, "ConnectionWorker::~: %p", this);
- #endif
- thread()->quit();
- }
- bool ConnectionWorker::ReadMessage(QTcpSocket& socket, AssetProcessor::Message& message)
- {
- const qint64 sizeOfHeader = static_cast<qint64>(sizeof(AssetProcessor::MessageHeader));
- qint64 bytesAvailable = socket.bytesAvailable();
- if (bytesAvailable == 0 || bytesAvailable < sizeOfHeader)
- {
- return false;
- }
- // read header
- if (!ReadData(socket, (char*)&message.header, sizeOfHeader))
- {
- DisconnectSockets();
- return false;
- }
- // Prepare the payload buffer
- message.payload.resize(message.header.size);
- // read payload
- if (!ReadData(socket, message.payload.data(), message.header.size))
- {
- DisconnectSockets();
- return false;
- }
- return true;
- }
- bool ConnectionWorker::ReadData(QTcpSocket& socket, char* buffer, qint64 size)
- {
- qint64 bytesRemaining = size;
- while (bytesRemaining > 0)
- {
- // check first, or Qt will throw a warning if we try to do this on an already-disconnected-socket
- if (socket.state() != QAbstractSocket::ConnectedState)
- {
- return false;
- }
- qint64 bytesRead = socket.read(buffer, bytesRemaining);
- if (bytesRead == -1)
- {
- return false;
- }
- buffer += bytesRead;
- bytesRemaining -= bytesRead;
- if (bytesRemaining > 0)
- {
- socket.waitForReadyRead();
- }
- }
- return true;
- }
- bool ConnectionWorker::WriteMessage(QTcpSocket& socket, const AssetProcessor::Message& message)
- {
- const qint64 sizeOfHeader = static_cast<qint64>(sizeof(AssetProcessor::MessageHeader));
- AZ_Assert(message.header.size == aznumeric_cast<decltype(message.header.size)>(message.payload.size()), "Message header size does not match payload size");
- // Write header
- if (!WriteData(socket, (char*)&message.header, sizeOfHeader))
- {
- DisconnectSockets();
- return false;
- }
- // write payload
- if (!WriteData(socket, message.payload.data(), message.payload.size()))
- {
- DisconnectSockets();
- return false;
- }
- return true;
- }
- bool ConnectionWorker::WriteData(QTcpSocket& socket, const char* buffer, qint64 size)
- {
- qint64 bytesRemaining = size;
- while (bytesRemaining > 0)
- {
- // check first, or Qt will throw a warning if we try to do this on an already-disconnected-socket
- if (socket.state() != QAbstractSocket::ConnectedState)
- {
- return false;
- }
- qint64 bytesWritten = socket.write(buffer, bytesRemaining);
- if (bytesWritten == -1)
- {
- return false;
- }
- buffer += bytesWritten;
- bytesRemaining -= bytesWritten;
- }
- return true;
- }
- void ConnectionWorker::EngineSocketHasData()
- {
- if (m_terminate)
- {
- return;
- }
- while (m_engineSocket.bytesAvailable() > 0)
- {
- AssetProcessor::Message message;
- if (ReadMessage(m_engineSocket, message))
- {
- Q_EMIT ReceiveMessage(message.header.type, message.header.serial, message.payload);
- }
- else
- {
- break;
- }
- }
- }
- void ConnectionWorker::SendMessage(unsigned int type, unsigned int serial, QByteArray payload)
- {
- AssetProcessor::Message message;
- message.header.type = type;
- message.header.serial = serial;
- message.header.size = payload.size();
- message.payload = payload;
- WriteMessage(m_engineSocket, message);
- }
- namespace Detail
- {
- template <class N>
- bool WriteNegotiation(ConnectionWorker* worker, QTcpSocket& socket, const N& negotiation, unsigned int serial = AzFramework::AssetSystem::NEGOTIATION_SERIAL)
- {
- AssetProcessor::Message message;
- bool packed = AssetProcessor::PackMessage(negotiation, message.payload);
- if (packed)
- {
- message.header.type = negotiation.GetMessageType();
- message.header.serial = serial;
- message.header.size = message.payload.size();
- return worker->WriteMessage(socket, message);
- }
- return false;
- }
- template <class N>
- bool ReadNegotiation(ConnectionWorker* worker, int waitDelay, QTcpSocket& socket, N& negotiation, unsigned int* serial = nullptr)
- {
- if (socket.bytesAvailable() == 0)
- {
- socket.waitForReadyRead(waitDelay);
- }
- AssetProcessor::Message message;
- if (!worker->ReadMessage(socket, message))
- {
- return false;
- }
- if (serial)
- {
- *serial = message.header.serial;
- }
- return AssetProcessor::UnpackMessage(message.payload, negotiation);
- }
- }
- // Negotiation directly with a game or downstream AssetProcessor:
- // if the connection is initiated from this end:
- // 1) Send AP Info to downstream engine
- // 2) Get downstream engine info
- // if there is an incoming connection
- // 1) Get downstream engine info
- // 2) Send AP Info
- bool ConnectionWorker::NegotiateDirect(bool initiate)
- {
- #if defined(DEBUG_NEGOTIATION)
- AZ_TracePrintf(AssetProcessor::DebugChannel, "ConnectionWorker::NegotiateDirect: %p", this);
- #endif
- using Detail::ReadNegotiation;
- using Detail::WriteNegotiation;
- using namespace AzFramework::AssetSystem;
- AZStd::string azBranchToken;
- AzFramework::ApplicationRequests::Bus::Broadcast(&AzFramework::ApplicationRequests::CalculateBranchTokenForEngineRoot, azBranchToken);
- QString branchToken(azBranchToken.c_str());
- QString projectName = AssetUtilities::ComputeProjectName();
-
- NegotiationMessage myInfo;
- char processId[20];
- azsnprintf(processId, 20, "%lld", QCoreApplication::applicationPid());
- myInfo.m_identifier = "ASSETPROCESSOR";
- myInfo.m_negotiationInfoMap.insert(AZStd::make_pair(NegotiationInfo_ProcessId, AZ::OSString(processId)));
- myInfo.m_negotiationInfoMap.insert(AZStd::make_pair(NegotiationInfo_BranchIndentifier, AZ::OSString(azBranchToken.c_str())));
- myInfo.m_negotiationInfoMap.insert(AZStd::make_pair(NegotiationInfo_ProjectName, AZ::OSString(projectName.toUtf8().constData())));
- NegotiationMessage engineInfo;
- if (initiate)
- {
- if (!WriteNegotiation(this, m_engineSocket, myInfo))
- {
- Q_EMIT ErrorMessage("Unable to send negotiation message");
- QTimer::singleShot(0, this, SLOT(DisconnectSockets()));
- return false;
- }
- if (!ReadNegotiation(this, m_waitDelay, m_engineSocket, engineInfo))
- {
- Q_EMIT ErrorMessage("Unable to read negotiation message");
- QTimer::singleShot(0, this, SLOT(DisconnectSockets()));
- return false;
- }
- }
- else
- {
- unsigned int serial = 0;
- #if defined(DEBUG_NEGOTIATION)
- AZ_TracePrintf(AssetProcessor::DebugChannel, "ConnectionWorker::NegotiateDirect: Reading negotiation from engine socket %p", this);
- #endif
- if (!ReadNegotiation(this, m_waitDelay, m_engineSocket, engineInfo, &serial))
- {
- #if defined(DEBUG_NEGOTIATION)
- AZ_TracePrintf(AssetProcessor::DebugChannel, "ConnectionWorker::NegotiateDirect: no negotation arrived %p", this);
- #endif
- Q_EMIT ErrorMessage("Unable to read engine negotiation message");
- QTimer::singleShot(0, this, SLOT(DisconnectSockets()));
- return false;
- }
- #if defined(DEBUG_NEGOTIATION)
- AZ_TracePrintf(AssetProcessor::DebugChannel, "ConnectionWorker::NegotiateDirect: writing negotiation to engine socket %p", this);
- #endif
- if (!WriteNegotiation(this, m_engineSocket, myInfo, serial))
- {
- #if defined(DEBUG_NEGOTIATION)
- AZ_TracePrintf(AssetProcessor::DebugChannel, "ConnectionWorker::NegotiateDirect: no negotation sent %p", this);
- #endif
- Q_EMIT ErrorMessage("Unable to send negotiation message");
- QTimer::singleShot(0, this, SLOT(DisconnectSockets()));
- return false;
- }
- }
- // Skip the process Id validation during negotiation if the identifier is UNITTEST
- if (engineInfo.m_identifier != "UNITTEST")
- {
- if (strncmp(engineInfo.m_negotiationInfoMap[NegotiationInfo_ProcessId].c_str(), processId, strlen(processId)) == 0)
- {
- Q_EMIT ErrorMessage("Attempted to negotiate with self");
- QTimer::singleShot(0, this, SLOT(DisconnectSockets()));
- return false;
- }
- }
- if (engineInfo.m_apiVersion != myInfo.m_apiVersion)
- {
- Q_EMIT ErrorMessage("Negotiation Failed.Version Mismatch.");
- QTimer::singleShot(0, this, SLOT(DisconnectSockets()));
- return false;
- }
- QString incomingBranchToken(engineInfo.m_negotiationInfoMap[NegotiationInfo_BranchIndentifier].c_str());
- if (QString::compare(incomingBranchToken, branchToken, Qt::CaseInsensitive) != 0)
- {
- // if we are here it means that the editor/game which is negotiating is running on a different branch
- // note that it could have just read nothing from the engine or a repeat packet, in that case, discard it silently and try again.
- AZ_TracePrintf(AssetProcessor::ConsoleChannel, "ConnectionWorker::NegotiateDirect: branch token mismatch from %s - %p - %s vs %s\n", engineInfo.m_identifier.c_str(), this, incomingBranchToken.toUtf8().data(), branchToken.toUtf8().data());
- AssetProcessor::MessageInfoBus::Broadcast(&AssetProcessor::MessageInfoBus::Events::NegotiationFailed);
- QTimer::singleShot(0, this, SLOT(DisconnectSockets()));
- return false;
- }
- QString incomingProjectName(engineInfo.m_negotiationInfoMap[NegotiationInfo_ProjectName].c_str());
- // Do a case-insensitive compare for the project name because some (case-sensitive) platforms will lower-case the incoming project name
- if(QString::compare(incomingProjectName, projectName, Qt::CaseInsensitive) != 0)
- {
- AZ_TracePrintf(AssetProcessor::ConsoleChannel, "ConnectionWorker::NegotiateDirect: project name mismatch from %s - %p - %s vs %s\n", engineInfo.m_identifier.c_str(), this, incomingProjectName.toUtf8().constData(), projectName.toUtf8().constData());
- AssetProcessor::MessageInfoBus::Broadcast(&AssetProcessor::MessageInfoBus::Events::NegotiationFailed);
- QTimer::singleShot(0, this, SLOT(DisconnectSockets()));
- return false;
- }
- Q_EMIT Identifier(engineInfo.m_identifier.c_str());
- Q_EMIT AssetPlatformsString(engineInfo.m_negotiationInfoMap[NegotiationInfo_Platform].c_str());
- #if defined(DEBUG_NEGOTIATION)
- AZ_TracePrintf(AssetProcessor::DebugChannel, "ConnectionWorker::NegotiateDirect: negotation complete %p", this);
- #endif
- Q_EMIT ConnectionEstablished(m_engineSocket.peerAddress().toString(), m_engineSocket.peerPort());
- connect(&m_engineSocket, &QTcpSocket::readyRead, this, &ConnectionWorker::EngineSocketHasData);
- // force the socket to evaluate any data recv'd between negotiation and now
- QTimer::singleShot(0, this, SLOT(EngineSocketHasData()));
- return true;
- }
- // RequestTerminate can be called from anywhere, so we queue the actual
- // termination to ensure it happens in the worker's thread
- void ConnectionWorker::RequestTerminate()
- {
- if (!m_alreadySentTermination)
- {
- m_terminate = true;
- m_alreadySentTermination = true;
- QMetaObject::invokeMethod(this, "TerminateConnection", Qt::BlockingQueuedConnection);
- }
- }
- void ConnectionWorker::TerminateConnection()
- {
- disconnect(&m_engineSocket, &QTcpSocket::stateChanged, this, &ConnectionWorker::EngineSocketStateChanged);
- DisconnectSockets();
- deleteLater();
- }
- void ConnectionWorker::ConnectSocket(qintptr socketDescriptor)
- {
- AZ_Assert(socketDescriptor != -1, "ConectionWorker::ConnectSocket: Supplied socket is invalid");
- if (socketDescriptor != -1)
- {
- // calling setSocketDescriptor will cause it to invoke EngineSocketStateChanged instantly, which we don't want, so disconnect it temporarily.
- disconnect(&m_engineSocket, &QTcpSocket::stateChanged, this, &ConnectionWorker::EngineSocketStateChanged);
- m_engineSocket.setSocketDescriptor(socketDescriptor, QAbstractSocket::ConnectedState, QIODevice::ReadWrite);
- Q_EMIT IsAddressInAllowedList(m_engineSocket.peerAddress(), reinterpret_cast<void*>(this));
- }
- }
- void ConnectionWorker::AddressIsInAllowedList(void* token, bool result)
- {
- if (reinterpret_cast<void*>(this) == token)
- {
- if (result)
- {
- // this address has been approved, connect and proceed
- connect(&m_engineSocket, &QTcpSocket::stateChanged, this, &ConnectionWorker::EngineSocketStateChanged);
- EngineSocketStateChanged(QAbstractSocket::ConnectedState);
- }
- else
- {
- // this address has been rejected, disconnect immediately!!!
- AZ_TracePrintf(AssetProcessor::ConsoleChannel, " A connection attempt was ignored because it is not in the allowed list. Please consider adding allowed_list=(IP ADDRESS),localhost to the bootstrap.cfg");
- disconnect(&m_engineSocket, &QTcpSocket::readyRead, this, &ConnectionWorker::EngineSocketHasData);
- DisconnectSockets();
- }
- }
- }
- void ConnectionWorker::ConnectToEngine(QString ipAddress, quint16 port)
- {
- #if defined(DEBUG_NEGOTIATION)
- AZ_TracePrintf(AssetProcessor::DebugChannel, " ConnectionWorker::ConnectToEngine");
- #endif
- m_terminate = false;
- if (m_engineSocket.state() == QAbstractSocket::UnconnectedState)
- {
- m_initiatedConnection = true;
- m_engineSocket.connectToHost(ipAddress, port, QIODevice::ReadWrite);
- }
- }
- void ConnectionWorker::EngineSocketStateChanged(QAbstractSocket::SocketState socketState)
- {
- #if defined(DEBUG_NEGOTIATION)
- AZ_TracePrintf(AssetProcessor::DebugChannel, " ConnectionWorker::EngineSocketStateChanged to %i", (int)socketState);
- #endif
- if (m_terminate)
- {
- return;
- }
- if (socketState == QAbstractSocket::ConnectedState)
- {
- m_engineSocket.setSocketOption(QAbstractSocket::KeepAliveOption, 1);
- m_engineSocket.setSocketOption(QAbstractSocket::LowDelayOption, 1); //disable nagles algorithm
- m_engineSocketIsConnected = true;
- #if defined(DEBUG_NEGOTIATION)
- AZ_TracePrintf(AssetProcessor::DebugChannel, "ConnectionWorker::EngineSocketStateChanged: %p connected now (%s)", this, m_engineSocketIsConnected ? "True" : "False");
- #endif
- QMetaObject::invokeMethod(this, "NegotiateDirect", Qt::QueuedConnection, Q_ARG(bool, m_initiatedConnection));
- }
- else if (socketState == QAbstractSocket::UnconnectedState)
- {
- m_engineSocketIsConnected = false;
- #if defined(DEBUG_NEGOTIATION)
- AZ_TracePrintf(AssetProcessor::DebugChannel, "ConnectionWorker::EngineSocketStateChanged: %p unconnected, now (%s)", this, m_engineSocketIsConnected ? "True" : "False");
- #endif
- disconnect(&m_engineSocket, &QTcpSocket::readyRead, 0, 0);
- DisconnectSockets();
- }
- }
- void ConnectionWorker::DisconnectSockets()
- {
- #if defined(DEBUG_NEGOTIATION)
- AZ_TracePrintf(AssetProcessor::DebugChannel, " ConnectionWorker::DisconnectSockets");
- #endif
- m_engineSocket.abort();
- m_engineSocket.close();
-
- Q_EMIT ConnectionDisconnected();
- }
- void ConnectionWorker::Reset()
- {
- m_terminate = false;
- }
- bool ConnectionWorker::Terminate()
- {
- return m_terminate;
- }
- QTcpSocket& ConnectionWorker::GetSocket()
- {
- return m_engineSocket;
- }
- bool ConnectionWorker::InitiatedConnection() const
- {
- return m_initiatedConnection;
- }
- } // namespace AssetProcessor
|