Newer
Older
Import / applications / HighwayDash / ports / Framework / Tasks.cpp
//  BlockyFroggy
//  Copyright © 2017 John Ryland.
//  All rights reserved.
#include "Tasks.h"
#include "Log.h"
#include "Common.h"
#include "Utilities.h"


TaskManager::TaskManager(size_t a_numberOfThreads)
{
  m_running = true;
  for (int i = 0; i < a_numberOfThreads; i++)
  {
    Log(LL_Debug, "creating thread");
    m_threads.push_back(std::thread([this]()
    {
      AllocationTracking::g_enableAllocTracing = true;
      Log(LL_Debug, "started thread");
      ScopeLock lck(m_mutex);
      while (m_running)
      {
        Log(LL_Debug, "thread waiting for work");
        m_cond.wait(lck);
        while (m_running && m_queuedTasks.size())
        {
          std::unique_ptr<Task> t = std::move(m_queuedTasks.front());
          m_queuedTasks.pop();
          lck.unlocked([&](){ t->process(); });
          m_pendingContinuation.push(std::move(t));
        }
      }
      Log(LL_Debug, "stopping thread");
    }));
  }
}


TaskManager::~TaskManager()
{
  m_running = false;
  m_cond.notify_all();
  for (std::thread& t : m_threads)
  {
    if (t.joinable())
    {
      t.join();
    }
  }
}


void TaskManager::update()
{
  ScopeLock lck(m_mutex);
  while (m_pendingContinuation.size())
  {
    std::unique_ptr<Task> t = std::move(m_pendingContinuation.front());
    m_pendingContinuation.pop();
    lck.unlocked([&](){ t->continuation(); });
  }
}


void TaskManager::queueTask(std::unique_ptr<Task> t)
{
  ScopeLock lck(m_mutex);
  m_queuedTasks.push(std::move(t));
  lck.unlocked([&](){ m_cond.notify_one(); });
}