#include <algorithm>
#include <iterator>
#include "ASyncTask.h"
#if defined(WIN32) || defined(_WIN32)
# ifndef snprintf
# define snprintf _snprintf
# endif
#endif
#define REQMAN_TAG "AsyncTaskManager"
namespace ASyncTask
{
// TODO: possibly this will be used in the server so we might not want an RKLog dependency here
static void OutputLogEntry(const Task::LogEntry& a_entry)
{
switch (a_entry.first)
{
case LL_FATAL:
RKLOGt_FATAL(REQMAN_TAG, "%s", a_entry.second.c_str()); break;
case LL_ERROR:
RKLOGt_ERROR(REQMAN_TAG, "%s", a_entry.second.c_str()); break;
case LL_WARN:
RKLOGt_WARN(REQMAN_TAG, "%s", a_entry.second.c_str()); break;
case LL_INFO:
RKLOGt_INFO(REQMAN_TAG, "%s", a_entry.second.c_str()); break;
case LL_TRACE:
RKLOGt_TRACE(REQMAN_TAG, "%s", a_entry.second.c_str()); break;
default:
break;
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
ASyncTaskManager::ASyncTaskManager(size_t a_threadCount)
{
// launch some amount of worker threads
for (size_t i = 0; i < a_threadCount; ++i)
{
m_workers.emplace_back([this]
{
for (;;)
{
std::unique_ptr<Task> task;
{
std::unique_lock<std::mutex> lock(m_queueMutex);
m_condition.wait(lock, [this]{ return m_shuttingDown || !m_tasks.empty(); });
if (m_shuttingDown && m_tasks.empty())
return;
task = std::move(m_tasks.front());
m_tasks.pop();
}
task.get()->Process();
{
std::unique_lock<std::mutex> lock(m_queueMutex);
m_finishedTasks.emplace(std::move(task));
}
}
});
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
ASyncTaskManager::~ASyncTaskManager()
{
Shutdown();
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void ASyncTaskManager::Shutdown()
{
// mark that we are shutting down - don't accept more tasks
m_queueMutex.lock();
m_shuttingDown = true;
m_queueMutex.unlock();
// stop all the threads
m_condition.notify_all();
for (std::thread &worker : m_workers)
{
worker.join();
}
// call all the continuations
while (m_finishedTasks.size())
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
Update(std::chrono::milliseconds(100));
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void ASyncTaskManager::Update(std::chrono::milliseconds a_elapsedTime)
{
m_queueMutex.lock();
size_t finishedTasksCount = m_finishedTasks.size();
m_queueMutex.unlock();
int max_count = std::min<int>(finishedTasksCount, m_maxRequestsAtOnce);
for (int count = 0; count < max_count; count++)
{
std::unique_ptr<Task> task;
{
std::unique_lock<std::mutex> lock(m_queueMutex);
if (m_finishedTasks.empty())
{
break;
}
task = std::move(m_finishedTasks.front());
m_finishedTasks.pop();
}
for (const Task::LogEntry& logEntry : task.get()->GetLogging())
{
OutputLogEntry(logEntry);
}
task.get()->Continuation();
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
bool ASyncTaskManager::StartNewRequest(std::unique_ptr<Task> a_task)
{
{
std::unique_lock<std::mutex> lock(m_queueMutex);
if (m_shuttingDown)
{
return false;
}
m_tasks.emplace(std::move(a_task));
}
m_condition.notify_one();
return true;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
template<typename ... Args>
void Task::Log(LogLevel a_level, const char* a_format, Args ... a_args)
{
size_t size = snprintf(nullptr, 0, a_format, a_args ...) + 1; // Extra space for '\0'
std::unique_ptr<char[]> buf(new char[size]);
snprintf(buf.get(), size, a_format, a_args ...);
m_logging.emplace_back(std::make_pair(a_level, std::string(buf.get()))); // We don't want the '\0' inside
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
}