This is the mail archive of the cygwin@cygwin.com mailing list for the Cygwin project.


Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]
Other format: [Raw text]

pthread_create -- no callback?


Thanks for the help inre: getting a linkable pthreads library.  I have
1.3.5 installed and can link (using pthreads, pthread_cond, and pthread_mutex
calls).  However, I can't get pthread_create to call back into my startup
function...

I'm writing a QueueProcessor (producer-consumer pattern) that accepts
implementations of a Task interface.  The QueueProcessor is essentially
a thread pulling Tasks off an STL list + the cond/mutex code to handle the
wait/notify inter-thread communication.

Attached is the source for the QueueProcessor/Task and a test driver.  I've
also attached the output to illustrate the behavior.  Is there something I'm
doing wrong in the pthread_create call?  Notice that none of the messages in
the static driveQueue() call appear, nor anything from
QueueProcessor::processTasks().  The cond/mutex code
QueueProcessor::appendTask() and ::processTasks() is purely conjecture, as I
have no experience w/ POSIX thread wait/notify patterns (too used to the more
OO Java approach).

Ideas?  Any good online pthreads references/faq's that you'd recommend?


regards,
Evan


***
Here's the output:

------------------
QueueProcessor sucessfully constructed.
  --> pthread_create()
  <-- pthread_create()
QueueProcessor started successfully.
  --> joining against the queue thread...
  <-- join() returned...  This shouldn't happen!!!!
Appended Task[TestTask]
Appended Task[TestTask]
Appended Task[TestTask]
Appended Task[TestTask]
Appended Task[TestTask]
Appended Task[TestTask]
Appended Task[TestTask]
Appended Task[TestTask]
Appended Task[TestTask]
Appended Task[TestTask]
QueueProcessor shut down successfully.





--- Robert Collins <robert.collins@itdomain.com.au> wrote:
> From: "Robert Collins" <robert.collins@itdomain.com.au>
> To: "Evan Pollan" <evan_pollan@yahoo.com>,
> 	"Gerrit P. Haase" <cygwin@cygwin.com>
> Subject: Re: Cygwin && C++ POSIX threads support?
> Date: Sun, 11 Nov 2001 15:15:22 +1100
> 
> ----- Original Message -----
> From: "Evan Pollan" <evan_pollan@yahoo.com>
> To: "Gerrit P. Haase" <cygwin@cygwin.com>
> Sent: Sunday, November 11, 2001 3:12 PM
> Subject: Re: Cygwin && C++ POSIX threads support?
> 
> 
> > Great -- I'll pick up the kit tommorow over a more suitable
> > connection.  What's the preferred threading library?  POSIX?  Or
> > something a little more tuned to the Win32 env?
> >
> 
> Just link with no explicit library - the pthreads functions are in libc.
> 
> Rob
> 


__________________________________________________
Do You Yahoo!?
Find the one for you at Yahoo! Personals
http://personals.yahoo.com
#include <iostream>
#include <list>
#include <pthread.h>

#define ProcessingException 37

//-----------------------------------------------------------------------
// C-style call to drive the QueueProcessor
//-----------------------------------------------------------------------
extern "C" void* driveQueue (void*);

//-----------------------------------------------------------------------
// Task
//-----------------------------------------------------------------------

class Task {
 public:
    Task () {};

    virtual void execute () throw (int) = 0;

    virtual char* getDescription () {
        return "BaseTask";
    };
    
    friend ostream& operator<< (ostream& stream, Task* t) {
        stream << "Task[" << t->getDescription() << "]";
        return stream;
    };
};

//-----------------------------------------------------------------------
// QueueProcessor
//-----------------------------------------------------------------------

class QueueProcessor {
 private:
    bool _stopped;
    pthread_cond_t _condition;
    pthread_mutex_t _mutex;
    pthread_t _thread;
    list<Task*>* _queue;
    
 public:
    QueueProcessor ();
    ~QueueProcessor ();

    void appendTask (Task*);
    void start ();
    void stop ();
    void processTasks ();
};

#include "QueueProcessor.h"


//-----------------------------------------------------------------------
// C-style call to drive the QueueProcessor
//-----------------------------------------------------------------------

void* execute(void* args) {
    cout << "> Call into driveQueue()\n";
    QueueProcessor* p = (QueueProcessor*)args;
    p->processTasks();
    cout << "< QueueProcessor::processTasks() returned.\n";
    return NULL;
}

//-----------------------------------------------------------------------
// QueueProcessor
//-----------------------------------------------------------------------

QueueProcessor::QueueProcessor () {
    pthread_cond_init(&_condition, NULL);
    pthread_mutex_init(&_mutex, NULL);
    _stopped = false;
    _queue = new list<Task*>();
    cout << "QueueProcessor sucessfully constructed.\n";
}

QueueProcessor::~QueueProcessor () {
    pthread_cond_destroy(&_condition);
    pthread_mutex_destroy(&_mutex);
    delete _queue;
    cout << "QueueProcessor sucessfully destroyed.\n";
}

void QueueProcessor::appendTask (Task* t) {
    if (!_stopped) {

        pthread_mutex_lock(&_mutex);
        _queue->push_back(t);
        pthread_cond_signal(&_condition);
        pthread_mutex_unlock(&_mutex);

        cout << "Appended "<< t << "\n";
    } else {
        cout << "WARNING:  " << t << " added after the QueueProcessor was ";
        cout << "stopped.\n";
    }
}

void QueueProcessor::start () {
    cout << "  --> pthread_create()\n";
    int status = pthread_create(&_thread, NULL, execute, NULL);
    cout << "  <-- pthread_create()\n";

    if (status != 0) {
        cout << "ERROR: Thread creation failed (status=" << status << ").\n";
    } else {
        cout << "QueueProcessor started successfully.\n";
    }

    void* threadExitStatus;
    cout << "  --> joining against the queue thread...\n";
    pthread_join(&_thread, &threadExitStatus);
    cout << "  <-- join() returned...  This shouldn't happen!!!!\n";
}

void QueueProcessor::stop () {
    pthread_mutex_lock(&_mutex);
    _stopped = true;
    pthread_cond_signal(&_condition);
    pthread_mutex_unlock(&_mutex);
    
    void* threadExitStatus;
    pthread_join(&_thread, &threadExitStatus);
    cout << "QueueProcessor shut down successfully.\n";
}

void QueueProcessor::processTasks () {
    cout << "--> processTasks()\n";

    Task* t;
    while (!_stopped) {

        cout << "----> !_stopped\n";
        pthread_mutex_lock(&_mutex);
        while (_queue->size() == 0 && !_stopped) {
            pthread_cond_wait(&_condition, &_mutex);
        }
        pthread_mutex_unlock(&_mutex);

        if (_queue->size()>0 && !_stopped) {
            t=_queue->front();
            _queue->pop_front();
            try {
                t->execute();
            } catch (int i) {
                if (i == ProcessingException) {
                    cout << "Couldn't process "<< t << "\n";
                } else {
                    throw i;
                }
            }
        }
    }
    cout << "<-- processTasks()\n";
}

#include "QueueProcessor.h"

class TestTask: public Task {
  private:
    int _size;

  public:
    TestTask (int i) {
        _size = i;
    }

    char* getDescription () {
        return "TestTask";
    }

    void execute () {
        cout << "Executing TestTask(" << _size << ")\n";
    }
};

int main (int, char*[]) {

    // Create and startup the QueueProcessor
    QueueProcessor* p = new QueueProcessor();

    p->start();

    // Add the tasks, allowing them to be processed asynchronously by the
    // QueueProcessor
    Task* t;
    for (int i=0; i<10; i++) {
        t = new TestTask(i);
        p->appendTask(t);
    }

    // Shutdown the QueueProcessor
    p->stop();
}

--
Unsubscribe info:      http://cygwin.com/ml/#unsubscribe-simple
Bug reporting:         http://cygwin.com/bugs.html
Documentation:         http://cygwin.com/docs.html
FAQ:                   http://cygwin.com/faq/

Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]