#include <algorithm>
#include "GameActionsChannel.h"
#include "glwebtools/tool/glwebtools_codec.h"
#ifdef OS_LINUX
#include "zlib.h"
#include <syslog.h>
#include <stdarg.h>
#else
#include "zlib/include/zlib/zlib.h"
#include <cstdio>
#endif
std::atomic<int> g_currentGameActionId = { 1 };
namespace details
{
template <size_t N, class ...Ts>
void Log(const char (&a_fmt)[N], Ts... args)
{
// TODO: use something other than printf
// currently this code is used on the server and client, and RKLOG is not hooked up on the server
#ifdef OS_LINUX
syslog(LOG_ERR, a_fmt, args...);
#else
printf("[GameActionsChannel] ");
printf(a_fmt, args...);
printf("\n");
#endif
}
#define CHECK_RESULT(cond) \
if (!(cond)) \
{ \
details::Log("Failed check: %s", #cond); \
return false; \
}
// Deserialize a 32bit value from a stream of bytes
inline uint32_t GetIntFromBuffer(const uint8_t* data)
{
return uint32_t((data[0] << 24) |
(data[1] << 16) |
(data[2] << 8) |
(data[3] << 0));
}
// Serialize a 32bit value in to a stream of bytes
inline void PutIntToBuffer(uint8_t* data, uint32_t val)
{
data[0] = uint8_t((val >> 24) & 0xFF);
data[1] = uint8_t((val >> 16) & 0xFF);
data[2] = uint8_t((val >> 8) & 0xFF);
data[3] = uint8_t((val >> 0) & 0xFF);
}
struct PacketHeader
{
PacketHeader(uint32_t size, uint32_t unpackedSize, uint32_t padding);
PacketHeader(uint8_t* data);
void PutToBuffer(uint8_t* data);
bool IsValid();
uint32_t m_packetSize = 0; // size of the packet including this packet header
uint32_t m_decompressedSize = 0; // original size of input data
uint32_t m_padding = 0; // padding needed for encryption
};
static_assert(sizeof(PacketHeader) == 12, "Unexpected size of packet header");
enum Constants
{
MINIMUM_PACKET_SIZE = sizeof(PacketHeader),
MAXIMUM_PACKET_SIZE = 64 * 1024,
MINIMUM_PADDING_SIZE = 0,
MAXIMUM_PADDING_SIZE = 5,
MINIMUM_MESSAGE_SIZE = 2, // smallest valid json message would be "{}"
MAXIMUM_MESSAGE_SIZE = 512 * 1024,
MAXIMUM_PACKET_OVERHEAD = sizeof(PacketHeader) + MAXIMUM_PADDING_SIZE,
};
PacketHeader::PacketHeader(uint32_t size, uint32_t unpackedSize, uint32_t padding)
: m_packetSize(size), m_decompressedSize(unpackedSize), m_padding(padding)
{
}
PacketHeader::PacketHeader(uint8_t* data)
{
m_packetSize = GetIntFromBuffer(data);
m_decompressedSize = GetIntFromBuffer(data + sizeof(uint32_t));
m_padding = GetIntFromBuffer(data + sizeof(uint32_t) * 2);
}
void PacketHeader::PutToBuffer(uint8_t* data)
{
// Put the size of the remaining bytes to read after this 32bit value to get the rest of the packet
PutIntToBuffer(data, m_packetSize);
// Put the uncompress/original packet size in to the packet data
PutIntToBuffer(data + sizeof(uint32_t), m_decompressedSize);
// Put the encryption padding overhead required
PutIntToBuffer(data + sizeof(uint32_t) * 2, m_padding);
}
bool PacketHeader::IsValid()
{
CHECK_RESULT(m_packetSize >= MINIMUM_PACKET_SIZE);
CHECK_RESULT(m_packetSize <= MAXIMUM_PACKET_SIZE);
CHECK_RESULT(m_decompressedSize >= MINIMUM_MESSAGE_SIZE);
CHECK_RESULT(m_decompressedSize <= MAXIMUM_MESSAGE_SIZE);
//CHECK_RESULT(m_padding >= MINIMUM_PADDING_SIZE);
CHECK_RESULT(m_padding <= MAXIMUM_PADDING_SIZE);
return true;
}
// This is the application's XXTEA symetric key
static const unsigned int s_gameXXTEAKey[4] = { 1, 3, 3, 7 };
// Pack the message (compress and encrypt - in that order because compression of encrypted data is very in-effective)
bool Pack(std::vector<uint8_t>& a_outputBuffer, const uint8_t* a_inputBytes, size_t a_intputLength)
{
// zlib compress data
uLongf compressedSize = compressBound(a_intputLength); // upper bound of compressed size
size_t maxPacketSize = compressedSize + MAXIMUM_PACKET_OVERHEAD;
CHECK_RESULT(maxPacketSize <= MAXIMUM_PACKET_SIZE);
a_outputBuffer.resize(maxPacketSize); // Enough space for upper bound compressed data, packet header and padding bytes
uint8_t* dataPtr = a_outputBuffer.data() + sizeof(PacketHeader);
CHECK_RESULT(compress(dataPtr, &compressedSize, a_inputBytes, a_intputLength) == Z_OK);
// encrypt the data
size_t encryptedSize = glwebtools::Codec::GetEncryptedXXTEADataSize(compressedSize);
size_t padding = encryptedSize - compressedSize;
CHECK_RESULT(glwebtools::Codec::EncryptXXTEA(dataPtr, compressedSize, dataPtr, encryptedSize, &details::s_gameXXTEAKey[0]));
// Put the packet header information in to the packet data
PacketHeader pkt(encryptedSize + sizeof(PacketHeader), a_intputLength, padding);
CHECK_RESULT(pkt.IsValid());
a_outputBuffer.resize(pkt.m_packetSize); // trim off any extra padding bytes that didn't end up being needed
pkt.PutToBuffer(a_outputBuffer.data());
return true;
}
// Unpack in the reverse order it was packed (decypt and uncompress)
// Note, the input buffer is clobbered (decryption happens in-place - the decrypt code does a memcpy if not in-place, so it is desirable we do it in-place)
bool Unpack(std::vector<uint8_t>& a_outputBuffer, uint8_t* a_inputBytes, size_t a_intputLength)
{
PacketHeader pkt(a_inputBytes);
// Check pre-conditions
CHECK_RESULT(pkt.m_packetSize == a_intputLength);
CHECK_RESULT(pkt.IsValid());
a_inputBytes += sizeof(PacketHeader);
a_intputLength -= sizeof(PacketHeader); // Skip over header to the payload
// Decyrpt in-place
CHECK_RESULT(glwebtools::Codec::DecryptXXTEA(a_inputBytes, a_intputLength, a_inputBytes, a_intputLength, &details::s_gameXXTEAKey[0]));
// zlib decompress data
uLongf uncompressedSize = pkt.m_decompressedSize;
a_outputBuffer.resize(uncompressedSize);
CHECK_RESULT(uncompress(a_outputBuffer.data(), &uncompressedSize, a_inputBytes, a_intputLength - pkt.m_padding) == Z_OK);
return true;
}
// Each action has a unique action id, this is the function to generate these
uint32_t GetNextActionId()
{
return g_currentGameActionId.fetch_add(1);
}
// Reset to a fresh state for unit testing purposes
void ResetGameActionIds()
{
g_currentGameActionId = 1;
}
}
GameActionsChannel::GameActionsChannel(std::shared_ptr<EventSink> a_eventSink)
: m_eventSink(a_eventSink)
{
RegisterActionFactoryItem::ForEach([this](const RegisterActionFactoryItem& a_actionFactory)
{
m_actionFactory.insert(std::pair<std::string, void(*)(EventSink&, bne::JsonValueConstRef)>(a_actionFactory.m_name, a_actionFactory.m_dispatch));
});
Reset();
}
void GameActionsChannel::Reset()
{
m_encodedSendBuffer.reserve(details::MAXIMUM_PACKET_SIZE);
m_unpackedPacketBuffer.reserve(details::MAXIMUM_MESSAGE_SIZE);
m_receiveBuffer.reserve(details::MAXIMUM_PACKET_SIZE * 2); // Perhaps need a grow strategy like in ZA where it could grow and shrink in a controlled way
m_receiveBuffer.resize(0);
m_receiveBufferOffset = 0;
m_receivedPacketHeader = false;
m_receiveBytesNeeded = details::MINIMUM_PACKET_SIZE;
}
void GameActionsChannel::Error(const char* errorMsg)
{
if (m_channelInterface)
{
m_channelInterface->Close();
}
details::Log("Error: %s", errorMsg);
Reset();
}
bool GameActionsChannel::Update(std::chrono::milliseconds a_timeElapsed, std::chrono::milliseconds a_timeout)
{
if (!m_channelInterface)
{
return false;
}
ReceiveAndDispatchActions(a_timeout);
SendQueuedActions(a_timeout);
return m_channelInterface->IsConnected();
}
void GameActionsChannel::ReceiveAndDispatchActions(std::chrono::milliseconds a_timeout)
{
while (true)
{
size_t currentSize = m_receiveBuffer.size() - m_receiveBufferOffset;
if (currentSize >= m_receiveBytesNeeded)
{
if (m_receivedPacketHeader)
{
// We already have the header and we have read the required bytes needed to unpack a full packet
if (!details::Unpack(m_unpackedPacketBuffer, m_receiveBuffer.data() + m_receiveBufferOffset, m_receiveBytesNeeded))
{
return Error("badly formed packet");
}
// TODO: collect network packet statistics
// Could be data after the packet we need to keep / move
size_t excessBytes = currentSize - m_receiveBytesNeeded;
m_receiveBufferOffset += m_receiveBytesNeeded;
// TODO: remove this debugging when it is working well
//details::Log("got bytes: %s (excess bytes: %d)", m_unpackedPacketBuffer.data(), excessBytes);
bne::JsonValue root;
if (bne::JsonValue::Parse(reinterpret_cast<const char*>(m_unpackedPacketBuffer.data()), root))
{
if (root.isArray())
{
for (bne::JsonValueConstRef item : root)
{
if (item.isObject())
{
for (std::string mem : item.getMemberNames())
{
if (m_actionFactory.count(mem))
{
// Deserializes and dispatchs
if (m_eventSink)
{
m_actionFactory[mem](*m_eventSink.get(), item);
}
}
}
}
}
}
}
// TODO: put this in a receiver function registered for channel state objects
static int pktNum = 0; // Need to get this out of the channel state object
m_channelState.m_ackNumber = pktNum;
pktNum++;
if (excessBytes)
{
// TODO: ensure code coverage that tests these code paths (need to add unit tests)
// We can mock the ChannelInterface in a unit test to cause this to happen
if ((m_receiveBuffer.capacity() - m_receiveBufferOffset) < details::MAXIMUM_PACKET_SIZE)
{
// Not enough space, move the data down
memcpy(m_receiveBuffer.data(), m_receiveBuffer.data() + m_receiveBufferOffset, excessBytes);
m_receiveBuffer.resize(excessBytes);
m_receiveBufferOffset = 0;
}
else
{
// Enough spare space in the buffer, so keep using it
m_receiveBuffer.resize(m_receiveBufferOffset + excessBytes);
}
}
else
{
m_receiveBuffer.resize(0);
m_receiveBufferOffset = 0;
}
m_receivedPacketHeader = false;
m_receiveBytesNeeded = details::MINIMUM_PACKET_SIZE;
}
else // !m_receivedPacketHeader
{
// Decode the packet header
details::PacketHeader pkt(m_receiveBuffer.data() + m_receiveBufferOffset);
m_receiveBytesNeeded = pkt.m_packetSize;; // Get the size of the packet
if (m_receiveBytesNeeded > details::MAXIMUM_MESSAGE_SIZE)
{
return Error("message size is too large, bad packet or out of sync, need to reconnect");
}
m_receivedPacketHeader = true;
}
}
else // Need more data to be able to decode a packet
{
switch (m_channelInterface->SelectAndRead(m_receiveBuffer, a_timeout))
{
case ChannelInterface::Error:
return Error("error while reading : connection closed");
case ChannelInterface::Success:
// While there is still data to read, keep going around the loop
break;
case ChannelInterface::InsufficientCapacity: // There is data to read, but we need space to read it
// This shouldn't happen if the other end only sends validly sized packets, but if it does, deal with it in a failsafe way
// TODO: Coverage tests to attempt to make this happen
m_receiveBuffer.reserve(m_receiveBuffer.capacity() * 2);
break;
case ChannelInterface::DataUnavailable:
// We need to wait till next update to see if there is more data
return;
}
// Only wait on the first call through, then we reset the timeout to 0 so the time spent in this function is bounded.
a_timeout = std::chrono::milliseconds(0);
}
}
}
void GameActionsChannel::SendQueuedActions(std::chrono::milliseconds a_timeout)
{
m_channelState.m_gameTick++;
// TODO: save the unacknowledged parts of m_encodedSendBuffer to resend if required
// need to do the work for making the client/server resend anything that wasn't acknowledged in the case of a reconnect.
// means we need to be saving the data in m_encodedSendBuffer until it gets acknowledged - may require changes to how this
// buffer is used below
// Don't attempt to encode in to a packet the queued actions if we still have unsent data from before
// so that it resends the unsent portion of m_encodedSendBuffer
if (!m_encodedSendBuffer.size())
{
if (m_queuedActions.size())
{
m_queuedActionsBuffer.push_back('[');
// Insert as json stuff like ack and seq num etc, any kind of meta stuff for the GameActionsChannel
m_channelState.SerializeToJson(m_channelState, m_queuedActionsBuffer);
m_channelState.m_seqNumber++;
for (auto action : m_queuedActions)
{
m_queuedActionsBuffer.push_back(',');
action(m_queuedActionsBuffer);
}
m_queuedActionsBuffer.push_back(']');
m_queuedActions.clear();
}
if (m_queuedActionsBuffer.size())
{
m_queuedActionsBuffer.push_back(0); // Append a nul terminator (mainly for debugging purposes)
// Pack the message (encrypt and compress)
if (!details::Pack(m_encodedSendBuffer, m_queuedActionsBuffer.data(), m_queuedActionsBuffer.size()))
{
return Error("Unable to pack data");
}
m_queuedActionsBuffer.resize(0);
}
}
else
{
// What do you do if there is too much data / too many actions to send that the bps rate of the available link? Slowly the game will get more and more behind and
// backed up with data to send, so need a way to stop or throw up a message box telling the user their networking is too slow (we obviously need to make an effort
// to make the network usage efficient, but there are limits to the quality of network the game will work with, eg: 1 kbps might not cut it)
// We can mock the ChannelInterface in a unit test to cause this to happen
}
if (m_encodedSendBuffer.size())
{
switch (m_channelInterface->SelectAndWrite(m_encodedSendBuffer, m_offset, a_timeout))
{
case ChannelInterface::DataUnavailable:
// nothing left to write
case ChannelInterface::Success:
// all was written
m_offset = 0;
m_encodedSendBuffer.resize(0);
return;
case ChannelInterface::InsufficientCapacity:
// We'll need to write the remainder next time
// TODO: code coverage to check we test this path - we can mock the ChannelInterface in a unit test to cause this to happen
return;
case ChannelInterface::Error:
return Error("error writing packet");
}
// All code paths above should return, but if in the future the switch is changed, ensure that the time in this function is bounded
a_timeout = std::chrono::milliseconds(0);
}
}