#include <algorithm>
#include <iterator>
#include "ASyncTask.h"


#if defined(WIN32) || defined(_WIN32)
#  ifndef snprintf
#    define snprintf _snprintf
#  endif
#endif
#define REQMAN_TAG "AsyncTaskManager"


namespace ASyncTask
{

  // TODO: possibly this will be used in the server so we might not want an RKLog dependency here
  static void OutputLogEntry(const Task::LogEntry& a_entry)
  {
    switch (a_entry.first)
    {
    case LL_FATAL:
      RKLOGt_FATAL(REQMAN_TAG, "%s", a_entry.second.c_str()); break;
    case LL_ERROR:
      RKLOGt_ERROR(REQMAN_TAG, "%s", a_entry.second.c_str()); break;
    case LL_WARN:
      RKLOGt_WARN(REQMAN_TAG, "%s", a_entry.second.c_str()); break;
    case LL_INFO:
      RKLOGt_INFO(REQMAN_TAG, "%s", a_entry.second.c_str()); break;
    case LL_TRACE:
      RKLOGt_TRACE(REQMAN_TAG, "%s", a_entry.second.c_str()); break;
    default:
      break;
    }
  }

  ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

  ASyncTaskManager::ASyncTaskManager(size_t a_threadCount)
  {
    // launch some amount of worker threads
    for (size_t i = 0; i < a_threadCount; ++i)
    {
      m_workers.emplace_back([this]
      {
        for (;;)
        {
          std::unique_ptr<Task> task;
          {
            std::unique_lock<std::mutex> lock(m_queueMutex);
            m_condition.wait(lock, [this]{ return m_shuttingDown || !m_tasks.empty(); });
            if (m_shuttingDown && m_tasks.empty())
              return;
            task = std::move(m_tasks.front());
            m_tasks.pop();
          }
          task.get()->Process();
          {
            std::unique_lock<std::mutex> lock(m_queueMutex);
            m_finishedTasks.emplace(std::move(task));
          }
        }
      });
    }
  }

  ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

  ASyncTaskManager::~ASyncTaskManager()
  {
    Shutdown();
  }

  ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

  void ASyncTaskManager::Shutdown()
  {
    // mark that we are shutting down - don't accept more tasks
    m_queueMutex.lock();
    m_shuttingDown = true;
    m_queueMutex.unlock();

    // stop all the threads
    m_condition.notify_all();
    for (std::thread &worker : m_workers)
    {
      worker.join();
    }

    // call all the continuations
    while (m_finishedTasks.size())
    {
      std::this_thread::sleep_for(std::chrono::milliseconds(100));
      Update(std::chrono::milliseconds(100));
    }
  }

  ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

  void ASyncTaskManager::Update(std::chrono::milliseconds a_elapsedTime)
  {
    m_queueMutex.lock();
    size_t finishedTasksCount = m_finishedTasks.size();
    m_queueMutex.unlock();
    int max_count = std::min<int>(finishedTasksCount, m_maxRequestsAtOnce);
    for (int count = 0; count < max_count; count++)
    {
      std::unique_ptr<Task> task;
      {
        std::unique_lock<std::mutex> lock(m_queueMutex);
        if (m_finishedTasks.empty())
        {
          break;
        }
        task = std::move(m_finishedTasks.front());
        m_finishedTasks.pop();
      }
      for (const Task::LogEntry& logEntry : task.get()->GetLogging())
      {
        OutputLogEntry(logEntry);
      }
      task.get()->Continuation();
    }
  }

  ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

  bool ASyncTaskManager::StartNewRequest(std::unique_ptr<Task> a_task)
  {
    {
      std::unique_lock<std::mutex> lock(m_queueMutex);
      if (m_shuttingDown)
      {
        return false;
      }
      m_tasks.emplace(std::move(a_task));
    }
    m_condition.notify_one();
    return true;
  }

  ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

  template<typename ... Args>
  void Task::Log(LogLevel a_level, const char* a_format, Args ... a_args)
  {
    size_t size = snprintf(nullptr, 0, a_format, a_args ...) + 1; // Extra space for '\0'
    std::unique_ptr<char[]> buf(new char[size]);
    snprintf(buf.get(), size, a_format, a_args ...);
    m_logging.emplace_back(std::make_pair(a_level, std::string(buf.get()))); // We don't want the '\0' inside
  }

  ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

}

