diff --git a/invertedlogic/iLUtilities/iLThreadPool.h b/invertedlogic/iLUtilities/iLThreadPool.h new file mode 100644 index 0000000..785578e --- /dev/null +++ b/invertedlogic/iLUtilities/iLThreadPool.h @@ -0,0 +1,95 @@ +#ifndef IL_THREAD_POOL_H +#define IL_THREAD_POOL_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +class iLThreadPool { +public: + iLThreadPool(size_t); + ~iLThreadPool(); + + template + auto enqueue(F&& f, Args&&... args) + -> std::future::type>; +private: + // need to keep track of threads so we can join them + std::vector< std::thread > workers; + // the task queue + std::queue< std::function > tasks; + + // synchronization + std::mutex queue_mutex; + std::condition_variable condition; + bool stop; +}; + +// the constructor just launches some amount of workers +inline iLThreadPool::iLThreadPool(size_t threads) + : stop(false) +{ + for (size_t i = 0; i < threads; ++i) + { + workers.emplace_back( + [this] + { + for (;;) + { + std::function task; + { + std::unique_lock lock(this->queue_mutex); + this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); }); + if (this->stop && this->tasks.empty()) + return; + task = std::move(this->tasks.front()); + this->tasks.pop(); + } + task(); + } + }); + } +} + +// the destructor joins all threads +inline iLThreadPool::~iLThreadPool() +{ + { + std::unique_lock lock(queue_mutex); + stop = true; + } + condition.notify_all(); + for (std::thread &worker : workers) + worker.join(); +} + +// add new work item to the pool +template +auto iLThreadPool::enqueue(F&& f, Args&&... args) + -> std::future::type> +{ + using return_type = typename std::result_of::type; + auto task = std::make_shared< std::packaged_task >( + std::bind(std::forward(f), std::forward(args)...) + ); + + std::future res = task->get_future(); + { + std::unique_lock lock(queue_mutex); + // don't allow enqueueing after stopping the pool + if (stop) + throw std::runtime_error("enqueue on stopped ThreadPool"); + tasks.emplace([task](){ (*task)(); }); + } + condition.notify_one(); + return res; +} + +#endif // IL_THREAD_POOL_H + diff --git a/invertedlogic/iLUtilities/iLThreadPool.h b/invertedlogic/iLUtilities/iLThreadPool.h new file mode 100644 index 0000000..785578e --- /dev/null +++ b/invertedlogic/iLUtilities/iLThreadPool.h @@ -0,0 +1,95 @@ +#ifndef IL_THREAD_POOL_H +#define IL_THREAD_POOL_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +class iLThreadPool { +public: + iLThreadPool(size_t); + ~iLThreadPool(); + + template + auto enqueue(F&& f, Args&&... args) + -> std::future::type>; +private: + // need to keep track of threads so we can join them + std::vector< std::thread > workers; + // the task queue + std::queue< std::function > tasks; + + // synchronization + std::mutex queue_mutex; + std::condition_variable condition; + bool stop; +}; + +// the constructor just launches some amount of workers +inline iLThreadPool::iLThreadPool(size_t threads) + : stop(false) +{ + for (size_t i = 0; i < threads; ++i) + { + workers.emplace_back( + [this] + { + for (;;) + { + std::function task; + { + std::unique_lock lock(this->queue_mutex); + this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); }); + if (this->stop && this->tasks.empty()) + return; + task = std::move(this->tasks.front()); + this->tasks.pop(); + } + task(); + } + }); + } +} + +// the destructor joins all threads +inline iLThreadPool::~iLThreadPool() +{ + { + std::unique_lock lock(queue_mutex); + stop = true; + } + condition.notify_all(); + for (std::thread &worker : workers) + worker.join(); +} + +// add new work item to the pool +template +auto iLThreadPool::enqueue(F&& f, Args&&... args) + -> std::future::type> +{ + using return_type = typename std::result_of::type; + auto task = std::make_shared< std::packaged_task >( + std::bind(std::forward(f), std::forward(args)...) + ); + + std::future res = task->get_future(); + { + std::unique_lock lock(queue_mutex); + // don't allow enqueueing after stopping the pool + if (stop) + throw std::runtime_error("enqueue on stopped ThreadPool"); + tasks.emplace([task](){ (*task)(); }); + } + condition.notify_one(); + return res; +} + +#endif // IL_THREAD_POOL_H + diff --git a/invertedlogic/iLUtilities/iLUtilities.pro b/invertedlogic/iLUtilities/iLUtilities.pro new file mode 100644 index 0000000..b5dbe58 --- /dev/null +++ b/invertedlogic/iLUtilities/iLUtilities.pro @@ -0,0 +1,8 @@ + +QMAKE_CXXFLAGS += -std=c++11 -pthread + +TEMPLATE = app + +SOURCES += test.cpp + + diff --git a/invertedlogic/iLUtilities/iLThreadPool.h b/invertedlogic/iLUtilities/iLThreadPool.h new file mode 100644 index 0000000..785578e --- /dev/null +++ b/invertedlogic/iLUtilities/iLThreadPool.h @@ -0,0 +1,95 @@ +#ifndef IL_THREAD_POOL_H +#define IL_THREAD_POOL_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +class iLThreadPool { +public: + iLThreadPool(size_t); + ~iLThreadPool(); + + template + auto enqueue(F&& f, Args&&... args) + -> std::future::type>; +private: + // need to keep track of threads so we can join them + std::vector< std::thread > workers; + // the task queue + std::queue< std::function > tasks; + + // synchronization + std::mutex queue_mutex; + std::condition_variable condition; + bool stop; +}; + +// the constructor just launches some amount of workers +inline iLThreadPool::iLThreadPool(size_t threads) + : stop(false) +{ + for (size_t i = 0; i < threads; ++i) + { + workers.emplace_back( + [this] + { + for (;;) + { + std::function task; + { + std::unique_lock lock(this->queue_mutex); + this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); }); + if (this->stop && this->tasks.empty()) + return; + task = std::move(this->tasks.front()); + this->tasks.pop(); + } + task(); + } + }); + } +} + +// the destructor joins all threads +inline iLThreadPool::~iLThreadPool() +{ + { + std::unique_lock lock(queue_mutex); + stop = true; + } + condition.notify_all(); + for (std::thread &worker : workers) + worker.join(); +} + +// add new work item to the pool +template +auto iLThreadPool::enqueue(F&& f, Args&&... args) + -> std::future::type> +{ + using return_type = typename std::result_of::type; + auto task = std::make_shared< std::packaged_task >( + std::bind(std::forward(f), std::forward(args)...) + ); + + std::future res = task->get_future(); + { + std::unique_lock lock(queue_mutex); + // don't allow enqueueing after stopping the pool + if (stop) + throw std::runtime_error("enqueue on stopped ThreadPool"); + tasks.emplace([task](){ (*task)(); }); + } + condition.notify_one(); + return res; +} + +#endif // IL_THREAD_POOL_H + diff --git a/invertedlogic/iLUtilities/iLUtilities.pro b/invertedlogic/iLUtilities/iLUtilities.pro new file mode 100644 index 0000000..b5dbe58 --- /dev/null +++ b/invertedlogic/iLUtilities/iLUtilities.pro @@ -0,0 +1,8 @@ + +QMAKE_CXXFLAGS += -std=c++11 -pthread + +TEMPLATE = app + +SOURCES += test.cpp + + diff --git a/invertedlogic/iLUtilities/test.cpp b/invertedlogic/iLUtilities/test.cpp new file mode 100644 index 0000000..d10c63c --- /dev/null +++ b/invertedlogic/iLUtilities/test.cpp @@ -0,0 +1,10 @@ +#include "iLThreadPool.h" + + +int main() +{ + iLThreadPool tp(10); + return 0; +} + +