SUMO - Simulation of Urban MObility
FXWorkerThread.h
Go to the documentation of this file.
1 /****************************************************************************/
7 // A thread class together with a pool and a task for parallelized computation
8 /****************************************************************************/
9 // SUMO, Simulation of Urban MObility; see http://sumo.dlr.de/
10 // Copyright (C) 2004-2015 DLR (http://www.dlr.de/) and contributors
11 /****************************************************************************/
12 //
13 // This file is part of SUMO.
14 // SUMO is free software: you can redistribute it and/or modify
15 // it under the terms of the GNU General Public License as published by
16 // the Free Software Foundation, either version 3 of the License, or
17 // (at your option) any later version.
18 //
19 /****************************************************************************/
20 
21 #ifndef FXWorkerThread_h
22 #define FXWorkerThread_h
23 
24 
25 // ===========================================================================
26 // included modules
27 // ===========================================================================
28 #ifdef _MSC_VER
29 #include <windows_config.h>
30 #else
31 #include <config.h>
32 #endif
33 
34 #include <list>
35 #include <vector>
36 #include <fx.h>
37 #include <FXThread.h>
38 
39 
40 // ===========================================================================
41 // class definitions
42 // ===========================================================================
47 class FXWorkerThread : public FXThread {
48 
49 public:
54  class Task {
55  public:
57  virtual ~Task() {};
58 
67  virtual void run(FXWorkerThread* context) = 0;
68 
75  void setIndex(const int newIndex) {
76  myIndex = newIndex;
77  }
78  private:
80  int myIndex;
81  };
82 
87  class Pool {
88  public:
95  Pool(int numThreads = 0) : myRunningIndex(0), myNumFinished(0) {
96  while (numThreads > 0) {
97  new FXWorkerThread(*this);
98  numThreads--;
99  }
100  }
101 
106  virtual ~Pool() {
107  clear();
108  }
109 
112  void clear() {
113  for (std::vector<FXWorkerThread*>::iterator it = myWorkers.begin(); it != myWorkers.end(); ++it) {
114  delete *it;
115  }
116  myWorkers.clear();
117  }
118 
123  void addWorker(FXWorkerThread* const w) {
124 // if (myWorkers.empty()) std::cout << "created pool at " << SysUtils::getCurrentMillis() << std::endl;
125  myWorkers.push_back(w);
126  }
127 
132  void add(Task* const t) {
133  t->setIndex(myRunningIndex++);
134  myWorkers[myRunningIndex % myWorkers.size()]->add(t);
135  }
136 
143  void addFinished(Task* const t) {
144  myMutex.lock();
145  myNumFinished++;
146  myFinishedTasks.push_back(t);
147  myCondition.signal();
148  myMutex.unlock();
149  }
150 
152  void waitAll() {
153  myMutex.lock();
154  while (myNumFinished < myRunningIndex) {
155  myCondition.wait(myMutex);
156  }
157 // if (myRunningIndex > 0) std::cout << "finished waiting for " << myRunningIndex << " tasks at " << SysUtils::getCurrentMillis() << std::endl;
158  for (std::list<Task*>::iterator it = myFinishedTasks.begin(); it != myFinishedTasks.end(); ++it) {
159  delete *it;
160  }
161  myFinishedTasks.clear();
162  myRunningIndex = 0;
163  myNumFinished = 0;
164  myMutex.unlock();
165  }
166 
174  bool isFull() const {
175  return myRunningIndex - myNumFinished >= size();
176  }
177 
182  int size() const {
183  return (int)myWorkers.size();
184  }
185 
187  void lock() {
188  myPoolMutex.lock();
189  }
190 
192  void unlock() {
193  myPoolMutex.unlock();
194  }
195 
196  private:
198  std::vector<FXWorkerThread*> myWorkers;
200  FXMutex myMutex;
202  FXMutex myPoolMutex;
204  FXCondition myCondition;
206  std::list<Task*> myFinishedTasks;
211  };
212 
213 public:
220  FXWorkerThread(Pool& pool): FXThread(), myPool(pool), myStopped(false), myCounter(0) {
221  pool.addWorker(this);
222  start();
223  }
224 
229  virtual ~FXWorkerThread() {
230  stop();
231  }
232 
237  void add(Task* t) {
238  myMutex.lock();
239  myTasks.push_back(t);
240  myCondition.signal();
241  myMutex.unlock();
242  }
243 
250  FXint run() {
251  while (!myStopped) {
252  myMutex.lock();
253  while (!myStopped && myTasks.empty()) {
254  myCondition.wait(myMutex);
255  }
256  if (myStopped) {
257  myMutex.unlock();
258  break;
259  }
260  Task* t = myTasks.front();
261  myTasks.pop_front();
262  myMutex.unlock();
263  t->run(this);
264  myCounter++;
265 // if (myCounter % 1000 == 0) std::cout << (size_t)this << " ran " << myCounter << " tasks " << std::endl;
266  myPool.addFinished(t);
267  }
268 // std::cout << "ran " << myCounter << " tasks " << std::endl;
269  return 0;
270  }
271 
276  void stop() {
277  myMutex.lock();
278  myStopped = true;
279  myCondition.signal();
280  myMutex.unlock();
281  join();
282  }
283 
284 private:
288  FXMutex myMutex;
290  FXCondition myCondition;
292  std::list<Task*> myTasks;
294  bool myStopped;
297 };
298 
299 
300 #endif
std::vector< FXWorkerThread * > myWorkers
the current worker threads
int myNumFinished
the number of finished tasks (is reset when the pool runs empty)
bool isFull() const
Checks whether there are currently more pending tasks than threads.
virtual ~FXWorkerThread()
Destructor.
FXCondition myCondition
the semaphore when waiting for new tasks
std::list< Task * > myTasks
the list of pending tasks
void add(Task *const t)
Gives a number to the given task and assigns it to a randomly chosen worker.
int myRunningIndex
the running index for the next task
FXWorkerThread(Pool &pool)
Constructor.
virtual ~Pool()
Destructor.
void addFinished(Task *const t)
Adds the given task to the list of finished tasks and assigns it to a randomly chosen worker...
void add(Task *t)
Adds the given task to this thread to be calculated.
FXMutex myMutex
the internal mutex for the task list
FXMutex myPoolMutex
the pool mutex for external sync
FXint run()
Main execution method of this thread.
void lock()
locks the pool mutex
bool myStopped
whether we are still running
void waitAll()
waits for all tasks to be finished
int myIndex
the index of the task, valid only after the task has been added to the pool
int myCounter
counting completed tasks for debugging / profiling
std::list< Task * > myFinishedTasks
list of finished tasks
Pool(int numThreads=0)
Constructor.
FXMutex myMutex
the mutex for the task list
void clear()
Stops and deletes all worker threads.
virtual ~Task()
Desctructor.
int size() const
Returns the number of threads in the pool.
A pool of worker threads which distributes the tasks and collects the results.
void addWorker(FXWorkerThread *const w)
Adds the given thread to the pool.
Pool & myPool
the pool for this thread
virtual void run(FXWorkerThread *context)=0
Abstract method which in subclasses should contain the computations to be performed.
void unlock()
unlocks the pool mutex
void stop()
Stops the thread.
Abstract superclass of a task to be run with an index to keep track of pending tasks.
A thread repeatingly calculating incoming tasks.
FXCondition myCondition
the semaphore to wait on for finishing all tasks
void setIndex(const int newIndex)
Sets the running index of this task.