Newer
Older
Import / projects / Gameloft / core / Network / GameActionsChannel.cpp
#include <algorithm>
#include "GameActionsChannel.h"
#include "glwebtools/tool/glwebtools_codec.h"


#ifdef OS_LINUX
#include "zlib.h"
#include <syslog.h>
#include <stdarg.h>
#else
#include "zlib/include/zlib/zlib.h"
#include <cstdio>
#endif


std::atomic<int> g_currentGameActionId = { 1 };


namespace details
{

  template <size_t N, class ...Ts>
  void Log(const char (&a_fmt)[N], Ts... args)
  {
    // TODO: use something other than printf
    // currently this code is used on the server and client, and RKLOG is not hooked up on the server
#ifdef OS_LINUX
    syslog(LOG_ERR, a_fmt, args...);
#else
    printf("[GameActionsChannel] ");
    printf(a_fmt, args...);
    printf("\n");
#endif
  }


#define CHECK_RESULT(cond) \
  if (!(cond)) \
  { \
    details::Log("Failed check: %s", #cond); \
    return false; \
  }


  // Deserialize a 32bit value from a stream of bytes
  inline uint32_t GetIntFromBuffer(const uint8_t* data)
  {
    return uint32_t((data[0] << 24) |
                    (data[1] << 16) |
                    (data[2] <<  8) |
                    (data[3] <<  0));
  }


  // Serialize a 32bit value in to a stream of bytes
  inline void PutIntToBuffer(uint8_t* data, uint32_t val)
  {
    data[0] = uint8_t((val >> 24) & 0xFF);
    data[1] = uint8_t((val >> 16) & 0xFF);
    data[2] = uint8_t((val >>  8) & 0xFF);
    data[3] = uint8_t((val >>  0) & 0xFF);
  }


  struct PacketHeader
  {
    PacketHeader(uint32_t size, uint32_t unpackedSize, uint32_t padding);
    PacketHeader(uint8_t* data);
    void PutToBuffer(uint8_t* data);
    bool IsValid();
    uint32_t m_packetSize = 0;        // size of the packet including this packet header
    uint32_t m_decompressedSize = 0;  // original size of input data
    uint32_t m_padding = 0;           // padding needed for encryption
  };
  static_assert(sizeof(PacketHeader) == 12, "Unexpected size of packet header");


  enum Constants
  {
    MINIMUM_PACKET_SIZE     = sizeof(PacketHeader),
    MAXIMUM_PACKET_SIZE     = 64 * 1024,
    MINIMUM_PADDING_SIZE    = 0,
    MAXIMUM_PADDING_SIZE    = 5,
    MINIMUM_MESSAGE_SIZE    = 2,  // smallest valid json message would be "{}"
    MAXIMUM_MESSAGE_SIZE    = 512 * 1024,
    MAXIMUM_PACKET_OVERHEAD = sizeof(PacketHeader) + MAXIMUM_PADDING_SIZE,
  };


  PacketHeader::PacketHeader(uint32_t size, uint32_t unpackedSize, uint32_t padding)
    : m_packetSize(size), m_decompressedSize(unpackedSize), m_padding(padding)
  {
  }


  PacketHeader::PacketHeader(uint8_t* data)
  {
    m_packetSize = GetIntFromBuffer(data);
    m_decompressedSize = GetIntFromBuffer(data + sizeof(uint32_t));
    m_padding = GetIntFromBuffer(data + sizeof(uint32_t) * 2);
  }


  void PacketHeader::PutToBuffer(uint8_t* data)
  {
    // Put the size of the remaining bytes to read after this 32bit value to get the rest of the packet
    PutIntToBuffer(data, m_packetSize);
    // Put the uncompress/original packet size in to the packet data
    PutIntToBuffer(data + sizeof(uint32_t), m_decompressedSize);
    // Put the encryption padding overhead required
    PutIntToBuffer(data + sizeof(uint32_t) * 2, m_padding);
  }


  bool PacketHeader::IsValid()
  {
    CHECK_RESULT(m_packetSize >= MINIMUM_PACKET_SIZE);
    CHECK_RESULT(m_packetSize <= MAXIMUM_PACKET_SIZE);
    CHECK_RESULT(m_decompressedSize >= MINIMUM_MESSAGE_SIZE);
    CHECK_RESULT(m_decompressedSize <= MAXIMUM_MESSAGE_SIZE);
    //CHECK_RESULT(m_padding >= MINIMUM_PADDING_SIZE);
    CHECK_RESULT(m_padding <= MAXIMUM_PADDING_SIZE);
    return true;
  }


  // This is the application's XXTEA symetric key
  static const unsigned int s_gameXXTEAKey[4] = { 1, 3, 3, 7 };


  // Pack the message (compress and encrypt - in that order because compression of encrypted data is very in-effective)
  bool Pack(std::vector<uint8_t>& a_outputBuffer, const uint8_t* a_inputBytes, size_t a_intputLength)
  {
    // zlib compress data
    uLongf compressedSize = compressBound(a_intputLength); // upper bound of compressed size
    size_t maxPacketSize = compressedSize + MAXIMUM_PACKET_OVERHEAD;
    CHECK_RESULT(maxPacketSize <= MAXIMUM_PACKET_SIZE);
    a_outputBuffer.resize(maxPacketSize); // Enough space for upper bound compressed data, packet header and padding bytes
    uint8_t* dataPtr = a_outputBuffer.data() + sizeof(PacketHeader);
    CHECK_RESULT(compress(dataPtr, &compressedSize, a_inputBytes, a_intputLength) == Z_OK);

    // encrypt the data
    size_t encryptedSize = glwebtools::Codec::GetEncryptedXXTEADataSize(compressedSize);
    size_t padding = encryptedSize - compressedSize;
    CHECK_RESULT(glwebtools::Codec::EncryptXXTEA(dataPtr, compressedSize, dataPtr, encryptedSize, &details::s_gameXXTEAKey[0]));

    // Put the packet header information in to the packet data
    PacketHeader pkt(encryptedSize + sizeof(PacketHeader), a_intputLength, padding);
    CHECK_RESULT(pkt.IsValid());
    a_outputBuffer.resize(pkt.m_packetSize); // trim off any extra padding bytes that didn't end up being needed
    pkt.PutToBuffer(a_outputBuffer.data());
    return true;
  }


  // Unpack in the reverse order it was packed (decypt and uncompress)
  // Note, the input buffer is clobbered (decryption happens in-place - the decrypt code does a memcpy if not in-place, so it is desirable we do it in-place)
  bool Unpack(std::vector<uint8_t>& a_outputBuffer, uint8_t* a_inputBytes, size_t a_intputLength)
  {
    PacketHeader pkt(a_inputBytes);
    // Check pre-conditions
    CHECK_RESULT(pkt.m_packetSize == a_intputLength);
    CHECK_RESULT(pkt.IsValid());
    a_inputBytes += sizeof(PacketHeader);
    a_intputLength -= sizeof(PacketHeader); // Skip over header to the payload
    // Decyrpt in-place
    CHECK_RESULT(glwebtools::Codec::DecryptXXTEA(a_inputBytes, a_intputLength, a_inputBytes, a_intputLength, &details::s_gameXXTEAKey[0]));
    // zlib decompress data
    uLongf uncompressedSize = pkt.m_decompressedSize;
    a_outputBuffer.resize(uncompressedSize);
    CHECK_RESULT(uncompress(a_outputBuffer.data(), &uncompressedSize, a_inputBytes, a_intputLength - pkt.m_padding) == Z_OK);
    return true;
  }


  // Each action has a unique action id, this is the function to generate these
  uint32_t GetNextActionId()
  {
    return g_currentGameActionId.fetch_add(1);
  }


  // Reset to a fresh state for unit testing purposes
  void ResetGameActionIds()
  {
    g_currentGameActionId = 1;
  }

}


GameActionsChannel::GameActionsChannel(std::shared_ptr<EventSink> a_eventSink)
  : m_eventSink(a_eventSink)
{
  RegisterActionFactoryItem::ForEach([this](const RegisterActionFactoryItem& a_actionFactory)
  {
    m_actionFactory.insert(std::pair<std::string, void(*)(EventSink&, bne::JsonValueConstRef)>(a_actionFactory.m_name, a_actionFactory.m_dispatch));
  });
  Reset();
}


void GameActionsChannel::Reset()
{
  m_encodedSendBuffer.reserve(details::MAXIMUM_PACKET_SIZE);
  m_unpackedPacketBuffer.reserve(details::MAXIMUM_MESSAGE_SIZE);
  m_receiveBuffer.reserve(details::MAXIMUM_PACKET_SIZE * 2); // Perhaps need a grow strategy like in ZA where it could grow and shrink in a controlled way
  m_receiveBuffer.resize(0);
  m_receiveBufferOffset = 0;
  m_receivedPacketHeader = false;
  m_receiveBytesNeeded = details::MINIMUM_PACKET_SIZE;
}


void GameActionsChannel::Error(const char* errorMsg)
{
  if (m_channelInterface)
  {
    m_channelInterface->Close();
  }
  details::Log("Error: %s", errorMsg);
  Reset();
}


bool GameActionsChannel::Update(std::chrono::milliseconds a_timeElapsed, std::chrono::milliseconds a_timeout)
{
  if (!m_channelInterface)
  {
    return false;
  }
  ReceiveAndDispatchActions(a_timeout);
  SendQueuedActions(a_timeout);
  return m_channelInterface->IsConnected();
}


void GameActionsChannel::ReceiveAndDispatchActions(std::chrono::milliseconds a_timeout)
{
  while (true)
  {
    size_t currentSize = m_receiveBuffer.size() - m_receiveBufferOffset;
    if (currentSize >= m_receiveBytesNeeded)
    {
      if (m_receivedPacketHeader)
      {
        // We already have the header and we have read the required bytes needed to unpack a full packet
        if (!details::Unpack(m_unpackedPacketBuffer, m_receiveBuffer.data() + m_receiveBufferOffset, m_receiveBytesNeeded))
        {
          return Error("badly formed packet");
        }

        // TODO: collect network packet statistics

        // Could be data after the packet we need to keep / move
        size_t excessBytes = currentSize - m_receiveBytesNeeded;
        m_receiveBufferOffset += m_receiveBytesNeeded;


        // TODO: remove this debugging when it is working well
        //details::Log("got bytes: %s   (excess bytes: %d)", m_unpackedPacketBuffer.data(), excessBytes);


        bne::JsonValue root;
        if (bne::JsonValue::Parse(reinterpret_cast<const char*>(m_unpackedPacketBuffer.data()), root))
        {
          if (root.isArray())
          {
            for (bne::JsonValueConstRef item : root)
            {
              if (item.isObject())
              {
                for (std::string mem : item.getMemberNames())
                {
                  if (m_actionFactory.count(mem))
                  {
                    // Deserializes and dispatchs
                    if (m_eventSink)
                    {
                      m_actionFactory[mem](*m_eventSink.get(), item);
                    }
                  }
                }
              }
            }
          }
        }

        // TODO: put this in a receiver function registered for channel state objects
        static int pktNum = 0; // Need to get this out of the channel state object
        m_channelState.m_ackNumber = pktNum;
        pktNum++;

        if (excessBytes)
        {
          // TODO: ensure code coverage that tests these code paths (need to add unit tests)
          // We can mock the ChannelInterface in a unit test to cause this to happen
          if ((m_receiveBuffer.capacity() - m_receiveBufferOffset) < details::MAXIMUM_PACKET_SIZE)
          {
            // Not enough space, move the data down
            memcpy(m_receiveBuffer.data(), m_receiveBuffer.data() + m_receiveBufferOffset, excessBytes);
            m_receiveBuffer.resize(excessBytes);
            m_receiveBufferOffset = 0;
          }
          else
          {
            // Enough spare space in the buffer, so keep using it
            m_receiveBuffer.resize(m_receiveBufferOffset + excessBytes);
          }
        }
        else
        {
          m_receiveBuffer.resize(0);
          m_receiveBufferOffset = 0;
        }
        m_receivedPacketHeader = false;
        m_receiveBytesNeeded = details::MINIMUM_PACKET_SIZE;

      }
      else // !m_receivedPacketHeader
      {
        // Decode the packet header
        details::PacketHeader pkt(m_receiveBuffer.data() + m_receiveBufferOffset);
        m_receiveBytesNeeded = pkt.m_packetSize;; // Get the size of the packet
        if (m_receiveBytesNeeded > details::MAXIMUM_MESSAGE_SIZE)
        {
          return Error("message size is too large, bad packet or out of sync, need to reconnect");
        }
        m_receivedPacketHeader = true;
      }
    }
    else // Need more data to be able to decode a packet
    {
      switch (m_channelInterface->SelectAndRead(m_receiveBuffer, a_timeout))
      {
        case ChannelInterface::Error:
          return Error("error while reading : connection closed");
        case ChannelInterface::Success:
          // While there is still data to read, keep going around the loop
          break;
        case ChannelInterface::InsufficientCapacity: // There is data to read, but we need space to read it
          // This shouldn't happen if the other end only sends validly sized packets, but if it does, deal with it in a failsafe way
          // TODO: Coverage tests to attempt to make this happen
          m_receiveBuffer.reserve(m_receiveBuffer.capacity() * 2);
          break;
        case ChannelInterface::DataUnavailable:
          // We need to wait till next update to see if there is more data
          return;
      }
      // Only wait on the first call through, then we reset the timeout to 0 so the time spent in this function is bounded.
      a_timeout = std::chrono::milliseconds(0);
    }
  }
}


void GameActionsChannel::SendQueuedActions(std::chrono::milliseconds a_timeout)
{
  m_channelState.m_gameTick++;

  // TODO: save the unacknowledged parts of m_encodedSendBuffer to resend if required
  // need to do the work for making the client/server resend anything that wasn't acknowledged in the case of a reconnect.
  // means we need to be saving the data in m_encodedSendBuffer until it gets acknowledged - may require changes to how this
  // buffer is used below

  // Don't attempt to encode in to a packet the queued actions if we still have unsent data from before
  // so that it resends the unsent portion of m_encodedSendBuffer
  if (!m_encodedSendBuffer.size())
  {
    if (m_queuedActions.size())
    {
      m_queuedActionsBuffer.push_back('[');
      // Insert as json stuff like ack and seq num etc, any kind of meta stuff for the GameActionsChannel
      m_channelState.SerializeToJson(m_channelState, m_queuedActionsBuffer);
      m_channelState.m_seqNumber++;
      for (auto action : m_queuedActions)
      {
        m_queuedActionsBuffer.push_back(',');
        action(m_queuedActionsBuffer);
      }
      m_queuedActionsBuffer.push_back(']');
      m_queuedActions.clear();
    }

    if (m_queuedActionsBuffer.size())
    {
      m_queuedActionsBuffer.push_back(0); // Append a nul terminator (mainly for debugging purposes)

      // Pack the message (encrypt and compress)
      if (!details::Pack(m_encodedSendBuffer, m_queuedActionsBuffer.data(), m_queuedActionsBuffer.size()))
      {
        return Error("Unable to pack data");
      }
      m_queuedActionsBuffer.resize(0);
    }
  }
  else
  {
    // What do you do if there is too much data / too many actions to send that the bps rate of the available link? Slowly the game will get more and more behind and
    // backed up with data to send, so need a way to stop or throw up a message box telling the user their networking is too slow (we obviously need to make an effort
    // to make the network usage efficient, but there are limits to the quality of network the game will work with, eg: 1 kbps might not cut it)
    // We can mock the ChannelInterface in a unit test to cause this to happen
  }

  if (m_encodedSendBuffer.size())
  {
    switch (m_channelInterface->SelectAndWrite(m_encodedSendBuffer, m_offset, a_timeout))
    {
      case ChannelInterface::DataUnavailable:
        // nothing left to write
      case ChannelInterface::Success:
        // all was written
        m_offset = 0;
        m_encodedSendBuffer.resize(0);
        return;
      case ChannelInterface::InsufficientCapacity:
        // We'll need to write the remainder next time
        // TODO: code coverage to check we test this path - we can mock the ChannelInterface in a unit test to cause this to happen
        return;
      case ChannelInterface::Error:
        return Error("error writing packet");
    }
    // All code paths above should return, but if in the future the switch is changed, ensure that the time in this function is bounded
    a_timeout = std::chrono::milliseconds(0);
  }
}