PTLib  Version 2.10.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
threadpool.h
Go to the documentation of this file.
1 /*
2  * threadpool.h
3  *
4  * Generalised Thread Pooling functions
5  *
6  * Portable Tools Library
7  *
8  * Copyright (C) 2009 Post Increment
9  *
10  * The contents of this file are subject to the Mozilla Public License
11  * Version 1.0 (the "License"); you may not use this file except in
12  * compliance with the License. You may obtain a copy of the License at
13  * http://www.mozilla.org/MPL/
14  *
15  * Software distributed under the License is distributed on an "AS IS"
16  * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
17  * the License for the specific language governing rights and limitations
18  * under the License.
19  *
20  * The Original Code is Portable Windows Library.
21  *
22  * The Initial Developer of the Original Code is Post Increment
23  *
24  * Portions of this code were written with the financial assistance of
25  * Metreos Corporation (http://www.metros.com).
26  *
27  * Contributor(s): ______________________________________.
28  *
29  * $Revision: 25362 $
30  * $Author: rjongbloed $
31  * $Date: 2011-03-20 18:27:31 -0500 (Sun, 20 Mar 2011) $
32  */
33 
34 
35 #ifndef PTLIB_THREADPOOL_H
36 #define PTLIB_THREADPOOL_H
37 
38 #ifdef P_USE_PRAGMA
39 #pragma interface
40 #endif
41 
42 #include <map>
43 #include <queue>
44 
45 
150 class PThreadPoolBase : public PObject
151 {
152  public:
153  class WorkerThreadBase : public PThread
154  {
155  public:
157  : PThread(100, NoAutoDeleteThread, priority, "Pool")
158  , m_shutdown(false)
159  { }
160 
161  virtual void Shutdown() = 0;
162  virtual unsigned GetWorkSize() const = 0;
163 
166  };
167 
169  {
170  public:
171  InternalWorkBase(const char * group)
172  {
173  if (group != NULL)
174  m_group = group;
175  }
176  std::string m_group;
177  };
178 
180 
181  virtual WorkerThreadBase * CreateWorkerThread() = 0;
182  virtual WorkerThreadBase * AllocateWorker();
183  virtual WorkerThreadBase * NewWorker();
184 
185  unsigned GetMaxWorkers() const { return m_maxWorkerCount; }
186 
188  unsigned count
189  ) { m_maxWorkerCount = count; }
190 
191  unsigned GetMaxUnits() const { return m_maxWorkUnitCount; }
192 
194  unsigned count
195  ) { m_maxWorkUnitCount = count; }
196 
197  protected:
198  PThreadPoolBase(unsigned maxWorkerCount = 10, unsigned maxWorkUnitCount = 0);
199 
200  virtual bool CheckWorker(WorkerThreadBase * worker);
201  void StopWorker(WorkerThreadBase * worker);
203 
204  typedef std::vector<WorkerThreadBase *> WorkerList_t;
206 
209 };
210 
211 
214 template <class Work_T>
216 {
217  PCLASSINFO(PThreadPool, PThreadPoolBase);
218  public:
219  //
220  // constructor
221  //
222  PThreadPool(unsigned maxWorkers = 10, unsigned maxWorkUnits = 0)
223  : PThreadPoolBase(maxWorkers, maxWorkUnits)
224  { }
225 
226  //
227  // define the ancestor of the worker thread
228  //
229  class WorkerThread : public WorkerThreadBase
230  {
231  public:
232  WorkerThread(PThreadPool & pool, Priority priority = NormalPriority)
233  : WorkerThreadBase(priority)
234  , m_pool(pool)
235  {
236  }
237 
238  virtual void AddWork(Work_T * work) = 0;
239  virtual void RemoveWork(Work_T * work) = 0;
240  virtual void Main() = 0;
241 
242  protected:
244  };
245 
246  //
247  // define internal worker wrapper class
248  //
249  class InternalWork : public InternalWorkBase
250  {
251  public:
252  InternalWork(WorkerThread * worker, Work_T * work, const char * group)
253  : InternalWorkBase(group)
254  , m_worker(worker)
255  , m_work(work)
256  {
257  }
258 
260  Work_T * m_work;
261  };
262 
263  //
264  // define map for external work units to internal work
265  //
266  typedef std::map<Work_T *, InternalWork> ExternalToInternalWorkMap_T;
268 
269 
270  //
271  // define class for storing group informationm
272  //
273  struct GroupInfo {
274  unsigned m_count;
276  };
277 
278 
279  //
280  // define map for group ID to group information
281  //
282  typedef std::map<std::string, GroupInfo> GroupInfoMap_t;
284 
285 
286  //
287  // add a new unit of work to a worker thread
288  //
289  bool AddWork(Work_T * work, const char * group = NULL)
290  {
292 
293  // allocate by group if specified
294  // else allocate to least busy
295  WorkerThread * worker;
296  if ((group == NULL) || (strlen(group) == 0)) {
297  worker = (WorkerThread *)AllocateWorker();
298  }
299  else {
300 
301  // find the worker thread with the matching group ID
302  // if no matching Id, then create a new thread
303  typename GroupInfoMap_t::iterator g = m_groupInfoMap.find(group);
304  if (g == m_groupInfoMap.end())
305  worker = (WorkerThread *)AllocateWorker();
306  else {
307  worker = g->second.m_worker;
308  PTRACE(4, "ThreadPool\tAllocated worker thread by group Id " << group);
309  }
310  }
311 
312  // if cannot allocate worker, return
313  if (worker == NULL)
314  return false;
315 
316  // create internal work structure
317  InternalWork internalWork(worker, work, group);
318 
319  // add work to external to internal map
320  m_externalToInternalWorkMap.insert(typename ExternalToInternalWorkMap_T::value_type(work, internalWork));
321 
322  // add group ID to map
323  if (!internalWork.m_group.empty()) {
324  typename GroupInfoMap_t::iterator r = m_groupInfoMap.find(internalWork.m_group);
325  if (r != m_groupInfoMap.end())
326  ++r->second.m_count;
327  else {
328  GroupInfo info;
329  info.m_count = 1;
330  info.m_worker = worker;
331  m_groupInfoMap.insert(typename GroupInfoMap_t::value_type(internalWork.m_group, info));
332  }
333  }
334 
335  // give the work to the worker
336  worker->AddWork(work);
337 
338  return true;
339  }
340 
341  //
342  // remove a unit of work from a worker thread
343  //
344  bool RemoveWork(Work_T * work, bool removeFromWorker = true)
345  {
346  PWaitAndSignal m(m_listMutex);
347 
348  // find worker with work unit to remove
349  typename ExternalToInternalWorkMap_T::iterator iterWork = m_externalToInternalWorkMap.find(work);
350  if (iterWork == m_externalToInternalWorkMap.end())
351  return false;
352 
353  InternalWork & internalWork = iterWork->second;
354 
355  // tell worker to stop processing work
356  if (removeFromWorker)
357  internalWork.m_worker->RemoveWork(work);
358 
359  // update group information
360  if (!internalWork.m_group.empty()) {
361  typename GroupInfoMap_t::iterator iterGroup = m_groupInfoMap.find(internalWork.m_group);
362  PAssert(iterGroup != m_groupInfoMap.end(), "Attempt to find thread from unknown work group");
363  if (iterGroup != m_groupInfoMap.end()) {
364  if (--iterGroup->second.m_count == 0)
365  m_groupInfoMap.erase(iterGroup);
366  }
367  }
368 
369  // see if workers need reorganising
370  CheckWorker(internalWork.m_worker);
371 
372  // remove element from work unit map
373  m_externalToInternalWorkMap.erase(iterWork);
374 
375  return true;
376  }
377 };
378 
379 
382 template <class Work_T>
383 class PQueuedThreadPool : public PThreadPool<Work_T>
384 {
385  public:
386  //
387  // constructor
388  //
389  PQueuedThreadPool(unsigned maxWorkers = 10, unsigned maxWorkUnits = 0)
390  : PThreadPool<Work_T>(maxWorkers, maxWorkUnits)
391  { }
392 
394  {
395  public:
397  : PThreadPool<Work_T>::WorkerThread(pool, priority)
398  , m_available(0, INT_MAX)
399  {
400  }
401 
402  void AddWork(Work_T * work)
403  {
404  m_mutex.Wait();
405  m_queue.push(work);
407  m_mutex.Signal();
408  }
409 
410  void RemoveWork(Work_T * )
411  {
412  m_mutex.Wait();
413  Work_T * work = m_queue.front();
414  m_queue.pop();
415  m_mutex.Signal();
416  delete work;
417  }
418 
419  unsigned GetWorkSize() const
420  {
421  return (unsigned)m_queue.size();
422  }
423 
424  void Main()
425  {
426  for (;;) {
427  m_available.Wait();
429  break;
430 
431  m_mutex.Wait();
432  Work_T * work = m_queue.empty() ? NULL : m_queue.front();
433  m_mutex.Signal();
434 
435  if (work != NULL) {
436  work->Work();
438  }
439  }
440  }
441 
442  void Shutdown()
443  {
446  }
447 
448  protected:
449  typedef std::queue<Work_T *> Queue;
453  };
454 
455 
457  {
458  return new QueuedWorkerThread(*this);
459  }
460 };
461 
462 
463 #endif // PTLIB_THREADPOOL_H
464 
465 
466 // End Of File ///////////////////////////////////////////////////////////////
virtual bool CheckWorker(WorkerThreadBase *worker)
This class waits for the semaphore on construction and automatically signals the semaphore on destruc...
Definition: psync.h:86
void SetMaxUnits(unsigned count)
Definition: threadpool.h:193
This class defines a thread synchronisation object.
Definition: semaphor.h:78
Definition: threadpool.h:249
unsigned GetMaxUnits() const
Definition: threadpool.h:191
PMutex m_mutex
Definition: threadpool.h:451
Work_T * m_work
Definition: threadpool.h:260
PThreadPool(unsigned maxWorkers=10, unsigned maxWorkUnits=0)
Definition: threadpool.h:222
std::vector< WorkerThreadBase * > WorkerList_t
Definition: threadpool.h:204
void SetMaxWorkers(unsigned count)
Definition: threadpool.h:187
Definition: threadpool.h:393
WorkerList_t m_workers
Definition: threadpool.h:205
Definition: threadpool.h:168
virtual void Main()=0
unsigned m_count
Definition: threadpool.h:274
void StopWorker(WorkerThreadBase *worker)
PThreadPoolBase(unsigned maxWorkerCount=10, unsigned maxWorkUnitCount=0)
WorkerThreadBase(Priority priority=NormalPriority)
Definition: threadpool.h:156
bool RemoveWork(Work_T *work, bool removeFromWorker=true)
Definition: threadpool.h:344
WorkerThread * m_worker
Definition: threadpool.h:275
ExternalToInternalWorkMap_T m_externalToInternalWorkMap
Definition: threadpool.h:267
Priority
Codes for thread priorities.
Definition: thread.h:74
#define PTRACE(level, args)
Output trace.
Definition: object.h:530
GroupInfoMap_t m_groupInfoMap
Definition: threadpool.h:283
virtual void Signal()
If there are waiting (blocked) threads then unblock the first one that was blocked.
std::map< Work_T *, InternalWork > ExternalToInternalWorkMap_T
Definition: threadpool.h:266
virtual void Wait()
If the semaphore count is > 0, decrement the semaphore and return.
PMutex m_workerMutex
Definition: threadpool.h:165
virtual void RemoveWork(Work_T *work)=0
unsigned m_maxWorkUnitCount
Definition: threadpool.h:208
High Level (queued work item) thread pool.
Definition: threadpool.h:383
unsigned GetWorkSize() const
Definition: threadpool.h:419
These classes and templates implement a generic thread pooling mechanism.
Definition: threadpool.h:150
void AddWork(Work_T *work)
Definition: threadpool.h:402
virtual PThreadPoolBase::WorkerThreadBase * CreateWorkerThread()
Definition: threadpool.h:456
bool m_shutdown
Definition: threadpool.h:164
WorkerThread * m_worker
Definition: threadpool.h:259
void Shutdown()
Definition: threadpool.h:442
virtual WorkerThreadBase * CreateWorkerThread()=0
PQueuedThreadPool(unsigned maxWorkers=10, unsigned maxWorkUnits=0)
Definition: threadpool.h:389
Queue m_queue
Definition: threadpool.h:450
Don't delete thread as it may not be on heap.
Definition: thread.h:94
std::map< std::string, GroupInfo > GroupInfoMap_t
Definition: threadpool.h:282
Definition: threadpool.h:153
Definition: threadpool.h:273
PThreadPool & m_pool
Definition: threadpool.h:243
This class defines a thread of execution in the system.
Definition: thread.h:66
void Main()
Definition: threadpool.h:424
QueuedWorkerThread(PThreadPool< Work_T > &pool, PThread::Priority priority=PThread::NormalPriority)
Definition: threadpool.h:396
virtual WorkerThreadBase * AllocateWorker()
bool AddWork(Work_T *work, const char *group=NULL)
Definition: threadpool.h:289
PMutex m_listMutex
Definition: threadpool.h:202
unsigned m_maxWorkerCount
Definition: threadpool.h:207
virtual WorkerThreadBase * NewWorker()
WorkerThread(PThreadPool &pool, Priority priority=NormalPriority)
Definition: threadpool.h:232
Synonym for PTimedMutex.
#define PAssert(b, msg)
This macro is used to assert that a condition must be true.
Definition: object.h:192
void RemoveWork(Work_T *)
Definition: threadpool.h:410
InternalWorkBase(const char *group)
Definition: threadpool.h:171
std::string m_group
Definition: threadpool.h:176
InternalWork(WorkerThread *worker, Work_T *work, const char *group)
Definition: threadpool.h:252
virtual void AddWork(Work_T *work)=0
Definition: threadpool.h:229
PSemaphore m_available
Definition: threadpool.h:452
Ultimate parent class for all objects in the class library.
Definition: object.h:1118
unsigned GetMaxWorkers() const
Definition: threadpool.h:185
Normal priority for a thread.
Definition: thread.h:79
std::queue< Work_T * > Queue
Definition: threadpool.h:449
Low Level thread pool.
Definition: threadpool.h:215
virtual unsigned GetWorkSize() const =0