#include "QueueProcessor.h" //----------------------------------------------------------------------- // C-style call to drive the QueueProcessor //----------------------------------------------------------------------- void* execute(void* args) { cout << "> Call into driveQueue()\n"; QueueProcessor* p = (QueueProcessor*)args; p->processTasks(); cout << "< QueueProcessor::processTasks() returned.\n"; return NULL; } //----------------------------------------------------------------------- // QueueProcessor //----------------------------------------------------------------------- QueueProcessor::QueueProcessor () { pthread_cond_init(&_condition, NULL); pthread_mutex_init(&_mutex, NULL); _stopped = false; _queue = new list(); cout << "QueueProcessor sucessfully constructed.\n"; } QueueProcessor::~QueueProcessor () { pthread_cond_destroy(&_condition); pthread_mutex_destroy(&_mutex); delete _queue; cout << "QueueProcessor sucessfully destroyed.\n"; } void QueueProcessor::appendTask (Task* t) { if (!_stopped) { pthread_mutex_lock(&_mutex); _queue->push_back(t); pthread_cond_signal(&_condition); pthread_mutex_unlock(&_mutex); cout << "Appended "<< t << "\n"; } else { cout << "WARNING: " << t << " added after the QueueProcessor was "; cout << "stopped.\n"; } } void QueueProcessor::start () { cout << " --> pthread_create()\n"; int status = pthread_create(&_thread, NULL, execute, NULL); cout << " <-- pthread_create()\n"; if (status != 0) { cout << "ERROR: Thread creation failed (status=" << status << ").\n"; } else { cout << "QueueProcessor started successfully.\n"; } void* threadExitStatus; cout << " --> joining against the queue thread...\n"; pthread_join(&_thread, &threadExitStatus); cout << " <-- join() returned... This shouldn't happen!!!!\n"; } void QueueProcessor::stop () { pthread_mutex_lock(&_mutex); _stopped = true; pthread_cond_signal(&_condition); pthread_mutex_unlock(&_mutex); void* threadExitStatus; pthread_join(&_thread, &threadExitStatus); cout << "QueueProcessor shut down successfully.\n"; } void QueueProcessor::processTasks () { cout << "--> processTasks()\n"; Task* t; while (!_stopped) { cout << "----> !_stopped\n"; pthread_mutex_lock(&_mutex); while (_queue->size() == 0 && !_stopped) { pthread_cond_wait(&_condition, &_mutex); } pthread_mutex_unlock(&_mutex); if (_queue->size()>0 && !_stopped) { t=_queue->front(); _queue->pop_front(); try { t->execute(); } catch (int i) { if (i == ProcessingException) { cout << "Couldn't process "<< t << "\n"; } else { throw i; } } } } cout << "<-- processTasks()\n"; }