ModErn Text Analysis
META Enumerates Textual Applications
thread_pool.h
Go to the documentation of this file.
1 
13 #ifndef META_THREAD_POOL_H_
14 #define META_THREAD_POOL_H_
15 
16 #include <condition_variable>
17 #include <mutex>
18 #include <thread>
19 #include <vector>
20 #include <functional>
21 #include <queue>
22 #include <future>
23 
24 namespace meta
25 {
26 namespace parallel
27 {
28 
34 {
35  public:
40  thread_pool(size_t num_threads = std::thread::hardware_concurrency())
41  : running_(true)
42  {
43  for (size_t i = 0; i < num_threads; ++i)
44  threads_.push_back(
45  std::thread{std::bind(&thread_pool::worker, this)});
46  }
47 
52  {
53  {
54  std::unique_lock<std::mutex> lock(mutex_);
55  running_ = false;
56  }
57  cond_.notify_all();
58  for (auto& thread : threads_)
59  thread.join();
60  }
61 
68  template <class Function>
69  std::future<typename std::result_of<Function()>::type>
70  submit_task(Function func)
71  {
72  using result_type = typename std::result_of<Function()>::type;
73 
74  std::unique_ptr<concrete_task<result_type>> task(
75  new concrete_task<result_type>(func));
76 
77  auto future = task->get_future();
78  {
79  std::unique_lock<std::mutex> lock(mutex_);
80  tasks_.push(std::move(task));
81  }
82  cond_.notify_one();
83  return future;
84  }
85 
89  std::vector<std::thread::id> thread_ids() const
90  {
91  std::vector<std::thread::id> ids;
92  for (auto& t : threads_)
93  ids.emplace_back(t.get_id());
94  return ids;
95  }
96 
100  size_t tasks() const
101  {
102  std::unique_lock<std::mutex> lock(mutex_);
103  return tasks_.size();
104  }
105 
106  private:
110  struct task
111  {
115  virtual void run() = 0;
116 
120  virtual ~task() = default;
121  };
122 
126  template <class R>
128  {
134  template <class Function>
135  concrete_task(const Function& f)
136  : task_(f)
137  {
138  }
139 
143  virtual ~concrete_task() = default;
144 
145  virtual void run() override
146  {
147  task_();
148  }
149 
153  std::future<R> get_future()
154  {
155  return task_.get_future();
156  }
157 
159  std::packaged_task<R()> task_;
160  };
161 
166  void worker()
167  {
168  while (true)
169  {
170  std::unique_ptr<task> task;
171  {
172  std::unique_lock<std::mutex> lock(mutex_);
173  while (running_ && tasks_.empty())
174  cond_.wait(lock);
175  if (!running_ && tasks_.empty())
176  return;
177  task = std::move(tasks_.front());
178  tasks_.pop();
179  }
180  task->run();
181  }
182  }
183 
185  std::vector<std::thread> threads_;
187  std::queue<std::unique_ptr<task>> tasks_;
188 
190  bool running_;
191 
193  mutable std::mutex mutex_;
195  std::condition_variable cond_;
196 };
197 }
198 }
199 
200 #endif
size_t tasks() const
Definition: thread_pool.h:100
virtual ~concrete_task()=default
Virtual destructor to support deletion from base pointers.
std::future< typename std::result_of< Function()>::type > submit_task(Function func)
Adds a task to the thread_pool.
Definition: thread_pool.h:70
thread_pool(size_t num_threads=std::thread::hardware_concurrency())
Definition: thread_pool.h:40
std::condition_variable cond_
the condition variable that workers sleep on when waiting for work
Definition: thread_pool.h:195
A generic task object.
Definition: thread_pool.h:110
~thread_pool()
Destructor; joins all threads.
Definition: thread_pool.h:51
virtual void run()=0
Runs the given task.
std::vector< std::thread::id > thread_ids() const
Definition: thread_pool.h:89
virtual ~task()=default
Virtual destructor to support deletion from base pointers.
A concrete task is templated with a result type.
Definition: thread_pool.h:127
std::packaged_task< R()> task_
the internal task representation
Definition: thread_pool.h:159
The ModErn Text Analysis toolkit is a suite of natural language processing, classification, information retreival, data mining, and other applications of text processing.
Definition: analyzer.h:24
std::queue< std::unique_ptr< task > > tasks_
the queue containing the tasks to be run
Definition: thread_pool.h:187
Represents a collection of a fixed number of threads, which tasks can be added to.
Definition: thread_pool.h:33
void worker()
Function invoked by the worker threads to process tasks off the internal queue.
Definition: thread_pool.h:166
virtual void run() override
Runs the given task.
Definition: thread_pool.h:145
std::mutex mutex_
the mutex to wrap queue operations
Definition: thread_pool.h:193
std::future< R > get_future()
Definition: thread_pool.h:153
std::vector< std::thread > threads_
the threads in the pool
Definition: thread_pool.h:185
bool running_
whether or not the pool is currently running
Definition: thread_pool.h:190
concrete_task(const Function &f)
Constructs a new concrete task.
Definition: thread_pool.h:135