public inbox for pthreads-win32@sourceware.org
 help / color / mirror / Atom feed
* POSIX Message Queues for Win32!
@ 2000-10-13  7:18 aurelio.medina
  0 siblings, 0 replies; 4+ messages in thread
From: aurelio.medina @ 2000-10-13  7:18 UTC (permalink / raw)
  To: rpj; +Cc: jramirez, pthreads-win32

[-- Attachment #1: Type: text/plain, Size: 3315 bytes --]

All,

Since I never actually tested my Win32 port of POSIX Message Queues I had no
idea how close I actually was.  Much to my surprise, I am glad to announce
that the source code provided below is now indeed very functional.  I have
tested this code with the file 'tester.c' using of course the PThreads-Win32
library and it worked great.

I'd like to provide a little history and notes about this source code.

1) I used Richard Steven's (May he RIP) POSIX MQ implementation using UNIX
memory mapped I/O as the base source code.
2) I then had to make a few very minor changes to port it to HP-UX 10.x.
3) I used this code in an in-house UNIX application at work that heavily
tested this code and it never once broke.
4) Thinking I would also need to port this application to Windows NT I
ported the POSIX MQ library to Win32 using Win32 memory mapped I/O.  Since I
never did port my application to Windows NT I never got to test my ported
code.
5) Finally I decided to see just how close I was to having functional code.
In my opinion this code is about 90% functional and can be made available as
is.  See "Things left to be done" below.
6) This code can be used on Win32 as well as UNIX platforms.
7) This code was compiled using MS Visual C++ 6.0.

POSIX Message Queue functions implemented:
mq_open()
mq_close()
mq_unlink()
mq_getattr()
mq_setattr()
mq_receive()
mq_send()

Thing left to be done:
1) Integrate into the current PThread-Win32 library.
2) Implement mq_notify(), this could be tough.  Anyone up for the challenge?
3) Provide more test cases.
4) Build using other compilers?

Finally, given the fact that most of this MQ source code has been heavily
tested by my in-house application on UNIX, I am fairly confident that the
Win32 port should be stable.  But please do test some more.

 <<mqueue.h>>  <<mqueue.c>>  <<tester.c>> 

Aurelio Medina


> -----Original Message-----
> From:	Ross Johnson [SMTP:rpj@ise.canberra.edu.au]
> Sent:	Wednesday, October 11, 2000 6:55 PM
> To:	Medina, Aurelio
> Cc:	jramirez@tct.hut.fi; pthreads-win32@sourceware.cygnus.com
> Subject:	Re: Message queues
> 
> Hi,
> 
> I'm going to make a special note to myself to include Aurelio's
> starting code in the next snapshot, which is what I should have done
> already - sorry. And I'll try to get it out as soon as I can - say,
> by next monday.
> 
> Ross
> 
> aurelio.medina@bankofamerica.com wrote:
> > 
> > Julio,
> > 
> > At one point I sent in my draft code implementing POSIX Message Queues
> for
> > Win32 hoping that someone can complete it.  I may be able to finish it
> in a
> > month or two but my work schedule makes it tough to do.
> > 
> > Has anyone touched the source code that I sent to the PThreads-Win32
> mailing
> > list.  If there is enough interest in POSIX MQs for Win32 I may try to
> > complete it.  I have benefitted from this effort and  I might be able to
> > contribute more if there is enough interest.
> > 
> > Aurelio Medina
> > 
> > > -----Original Message-----
> > > From: Julio [SMTP:jramirez@tct.hut.fi]
> > > Sent: Wednesday, October 11, 2000 5:29 AM
> > > To:   'Pthreads-win32'
> > > Subject:      Message queues
> > >
> > > Sorry, this may be offtopic.
> > >
> > > Could someone tell me where can I find Win32 POSIX libraries for
> message
> > >
> > > queues??
> > >
> > > Thnx.

[-- Attachment #2: mqueue.c --]
[-- Type: text/x-c, Size: 15860 bytes --]

/*****************************************************************************
 *
 * POSIX Message Queue for Windows NT
 *
 *****************************************************************************/
#include <stdlib.h>
#include <stdio.h>
#include <stdarg.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#if defined(WIN32)
#   include <io.h>
#endif
#include "mqueue.h"

#if defined(WIN32)
#   define S_IXUSR  0000100
#   define sleep(a) Sleep((a)*1000)

    typedef unsigned short mode_t;
#endif

#define MAX_TRIES   10
struct mq_attr defattr = { 0, 128, 1024, 0 };

int mq_close(mqd_t mqd)
{
    long            msgsize, filesize;
    struct mq_hdr  *mqhdr;
    struct mq_attr *attr;
    struct mq_info *mqinfo;

    mqinfo = mqd;
    if (mqinfo->mqi_magic != MQI_MAGIC) {
        errno = EBADF;
        return(-1);
    }
    mqhdr = mqinfo->mqi_hdr;
    attr = &mqhdr->mqh_attr;

    if (mq_notify(mqd, NULL) != 0)        /* unregister calling process */
        return(-1);

    msgsize = MSGSIZE(attr->mq_msgsize);
    filesize = sizeof(struct mq_hdr) + (attr->mq_maxmsg *
                      (sizeof(struct msg_hdr) + msgsize));
#if defined(WIN32)
    if (!UnmapViewOfFile(mqinfo->mqi_hdr) || !CloseHandle(mqinfo->mqi_fmap))
#else
    if (munmap(mqinfo->mqi_hdr, filesize) == -1)
#endif
        return(-1);

    mqinfo->mqi_magic = 0;          /* just in case */
    free(mqinfo);
    return(0);
}

int mq_getattr(mqd_t mqd, struct mq_attr *mqstat)
{
    int             n;
    struct mq_hdr  *mqhdr;
    struct mq_attr *attr;
    struct mq_info *mqinfo;

    mqinfo = mqd;
    if (mqinfo->mqi_magic != MQI_MAGIC) {
        errno = EBADF;
        return(-1);
    }
    mqhdr = mqinfo->mqi_hdr;
    attr = &mqhdr->mqh_attr;
    if ( (n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {
        errno = n;
        return(-1);
    }

    mqstat->mq_flags = mqinfo->mqi_flags;   /* per-open */
    mqstat->mq_maxmsg = attr->mq_maxmsg;    /* remaining three per-queue */
    mqstat->mq_msgsize = attr->mq_msgsize;
    mqstat->mq_curmsgs = attr->mq_curmsgs;

    pthread_mutex_unlock(&mqhdr->mqh_lock);
    return(0);
}

int mq_notify(mqd_t mqd, const struct sigevent *notification)
{
#if !defined(WIN32)
    int             n;
    pid_t           pid;
    struct mq_hdr  *mqhdr;
    struct mq_info *mqinfo;

    mqinfo = mqd;
    if (mqinfo->mqi_magic != MQI_MAGIC) {
        errno = EBADF;
        return(-1);
    }
    mqhdr = mqinfo->mqi_hdr;
    if ( (n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {
        errno = n;
        return(-1);
    }

    pid = getpid();
    if (notification == NULL) {
        if (mqhdr->mqh_pid == pid) {
            mqhdr->mqh_pid = 0;     /* unregister calling process */
        }                           /* no error if c aller not registered */
    } else {
        if (mqhdr->mqh_pid != 0) {
            if (kill(mqhdr->mqh_pid, 0) != -1 || errno != ESRCH) {
                errno = EBUSY;
                goto err;
            }
        }
        mqhdr->mqh_pid = pid;
        mqhdr->mqh_event = *notification;
    }
    pthread_mutex_unlock(&mqhdr->mqh_lock);
    return(0);

err:
    pthread_mutex_unlock(&mqhdr->mqh_lock);
    return(-1);
#else
    errno = EINVAL;
    return(-1);
#endif
}

mqd_t mq_open(const char *pathname, int oflag, ...)
{
    int                  i, fd, nonblock, created, save_errno;
    long                 msgsize, filesize, index;
    va_list              ap;
    mode_t               mode;
    char                *mptr;
    struct stat          statbuff;
    struct mq_hdr       *mqhdr;
    struct msg_hdr      *msghdr;
    struct mq_attr      *attr;
    struct mq_info      *mqinfo;
    pthread_mutexattr_t  mattr;
    pthread_condattr_t   cattr;
#if defined(WIN32)
    HANDLE fmap;

    mptr = NULL;
#else
    mptr = (char *) MAP_FAILED;
#endif
    created = 0;
    nonblock = oflag & O_NONBLOCK;
    oflag &= ~O_NONBLOCK;
    mqinfo = NULL;
again:
    if (oflag & O_CREAT) {
        va_start(ap, oflag); /* init ap to final named argument */
        mode = va_arg(ap, mode_t) & ~S_IXUSR;
        attr = va_arg(ap, struct mq_attr *);
        va_end(ap);

        /* open and specify O_EXCL and user-execute */
        fd = open(pathname, oflag | O_EXCL | O_RDWR, mode | S_IXUSR);
        if (fd < 0) {
            if (errno == EEXIST && (oflag & O_EXCL) == 0)
                goto exists;            /* already exists, OK */
            else
                return((mqd_t) -1);
            }
            created = 1;
                        /* first one to create the file initializes it */
            if (attr == NULL)
                attr = &defattr;
            else {
                if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
                    errno = EINVAL;
                    goto err;
                }
            }
            /* calculate and set the file size */
            msgsize = MSGSIZE(attr->mq_msgsize);
            filesize = sizeof(struct mq_hdr) + (attr->mq_maxmsg *
                               (sizeof(struct msg_hdr) + msgsize));
            if (lseek(fd, filesize - 1, SEEK_SET) == -1)
                goto err;
            if (write(fd, "", 1) == -1)
                goto err;

            /* memory map the file */
#if defined(WIN32)
            fmap = CreateFileMapping((HANDLE)_get_osfhandle(fd), NULL, 
                                     PAGE_READWRITE, 0, 0, NULL);
            if (fmap == NULL)
                goto err;
            mptr = MapViewOfFile(fmap, FILE_MAP_WRITE, 0, 0, filesize);
            if (mptr == NULL)
#else
            mptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE,
                                        MAP_SHARED, fd, 0);
            if (mptr == MAP_FAILED)
#endif
                goto err;

            /* allocate one mq_info{} for the queue */
            if ( (mqinfo = malloc(sizeof(struct mq_info))) == NULL)
                goto err;
#if defined(WIN32)
            mqinfo->mqi_fmap = fmap;
#endif
            mqinfo->mqi_hdr = mqhdr = (struct mq_hdr *) mptr;
            mqinfo->mqi_magic = MQI_MAGIC;
            mqinfo->mqi_flags = nonblock;

            /* initialize header at beginning of file */
            /* create free list with all messages on it */
            mqhdr->mqh_attr.mq_flags = 0;
            mqhdr->mqh_attr.mq_maxmsg = attr->mq_maxmsg;
            mqhdr->mqh_attr.mq_msgsize = attr->mq_msgsize;
            mqhdr->mqh_attr.mq_curmsgs = 0;
            mqhdr->mqh_nwait = 0;
            mqhdr->mqh_pid = 0;
            mqhdr->mqh_head = 0;
            index = sizeof(struct mq_hdr);
            mqhdr->mqh_free = index;
            for (i = 0; i < attr->mq_maxmsg - 1; i++) {
                msghdr = (struct msg_hdr *) &mptr[index];
                index += sizeof(struct msg_hdr) + msgsize;
                msghdr->msg_next = index;
            }
            msghdr = (struct msg_hdr *) &mptr[index];
            msghdr->msg_next = 0;           /* end of free list */

            /* initialize mutex & condition variable */
            if ( (i = pthread_mutexattr_init(&mattr)) != 0)
                goto pthreaderr;
            pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
            i = pthread_mutex_init(&mqhdr->mqh_lock, &mattr);
            pthread_mutexattr_destroy(&mattr);      /* be sure to destroy */
            if (i != 0)
                goto pthreaderr;

            if ( (i = pthread_condattr_init(&cattr)) != 0)
                goto pthreaderr;
            pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
            i = pthread_cond_init(&mqhdr->mqh_wait, &cattr);
            pthread_condattr_destroy(&cattr);       /* be sure to destroy */
            if (i != 0)
                goto pthreaderr;

            /* initialization complete, turn off user-execute bit */
#if defined(WIN32)
            if (chmod(pathname, mode) == -1)
#else
            if (fchmod(fd, mode) == -1)
#endif
                goto err;
            close(fd);
            return((mqd_t) mqinfo);
    }
exists:
    /* open the file then memory map */
    if ( (fd = open(pathname, O_RDWR)) < 0) {
        if (errno == ENOENT && (oflag & O_CREAT))
            goto again;
        goto err;
    }

    /* make certain initialization is complete */
    for (i = 0; i < MAX_TRIES; i++) {
        if (stat(pathname, &statbuff) == -1) {
            if (errno == ENOENT && (oflag & O_CREAT)) {
                close(fd);
                goto again;
            }
            goto err;
        }
        if ((statbuff.st_mode & S_IXUSR) == 0)
            break;
        sleep(1);
    }
    if (i == MAX_TRIES) {
        errno = ETIMEDOUT;
        goto err;
    }

    filesize = statbuff.st_size;
#if defined(WIN32)
    fmap = CreateFileMapping((HANDLE)_get_osfhandle(fd), NULL, PAGE_READWRITE, 
                             0, 0, NULL);                             
    if (fmap == NULL)
        goto err;
    mptr = MapViewOfFile(fmap, FILE_MAP_WRITE, 0, 0, filesize);
    if (mptr == NULL)
#else
    mptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
    if (mptr == MAP_FAILED)
#endif
        goto err;
    close(fd);

    /* allocate one mq_info{} for each open */
    if ( (mqinfo = malloc(sizeof(struct mq_info))) == NULL)
        goto err;
    mqinfo->mqi_hdr = (struct mq_hdr *) mptr;
    mqinfo->mqi_magic = MQI_MAGIC;
    mqinfo->mqi_flags = nonblock;
    return((mqd_t) mqinfo);
pthreaderr:
    errno = i;
err:
    /* don't let following function calls change errno */
    save_errno = errno;
    if (created)
        unlink(pathname);
#if defined(WIN32)
    if (fmap != NULL) {
        if (mptr != NULL) {
            UnmapViewOfFile(mptr);
        }
        CloseHandle(fmap);
    }
#else
    if (mptr != MAP_FAILED)
        munmap(mptr, filesize);
#endif
    if (mqinfo != NULL)
        free(mqinfo);
    close(fd);
    errno = save_errno;
    return((mqd_t) -1);
}

ssize_t mq_receive(mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop)
{
    int             n;
    long            index;
    char           *mptr;
    ssize_t         len;
    struct mq_hdr  *mqhdr;
    struct mq_attr *attr;
    struct msg_hdr *msghdr;
    struct mq_info *mqinfo;

    mqinfo = mqd;
    if (mqinfo->mqi_magic != MQI_MAGIC) {
        errno = EBADF;
        return(-1);
    }
    mqhdr = mqinfo->mqi_hdr;        /* struct pointer */
    mptr = (char *) mqhdr;          /* byte pointer */
    attr = &mqhdr->mqh_attr;
    if ( (n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {
        errno = n;
        return(-1);
    }

    if (maxlen < (size_t)attr->mq_msgsize) {
        errno = EMSGSIZE;
        goto err;
    }
    if (attr->mq_curmsgs == 0) {            /* queue is empty */
        if (mqinfo->mqi_flags & O_NONBLOCK) {
            errno = EAGAIN;
            goto err;
        }
        /* wait for a message to be placed onto queue */
        mqhdr->mqh_nwait++;
        while (attr->mq_curmsgs == 0)
            pthread_cond_wait(&mqhdr->mqh_wait, &mqhdr->mqh_lock);
        mqhdr->mqh_nwait--;
    }

    if ( (index = mqhdr->mqh_head) == 0) {
        fprintf(stderr, "mq_receive: curmsgs = %ld; head = 0",attr->mq_curmsgs);
        abort();
    }

    msghdr = (struct msg_hdr *) &mptr[index];
    mqhdr->mqh_head = msghdr->msg_next;     /* new head of list */
    len = msghdr->msg_len;
    memcpy(ptr, msghdr + 1, len);           /* copy the message itself */
    if (priop != NULL)
        *priop = msghdr->msg_prio;

    /* just-read message goes to front of free list */
    msghdr->msg_next = mqhdr->mqh_free;
    mqhdr->mqh_free = index;

    /* wake up anyone blocked in mq_send waiting for room */
    if (attr->mq_curmsgs == attr->mq_maxmsg)
        pthread_cond_signal(&mqhdr->mqh_wait);
    attr->mq_curmsgs--;

    pthread_mutex_unlock(&mqhdr->mqh_lock);
    return(len);

err:
    pthread_mutex_unlock(&mqhdr->mqh_lock);
    return(-1);
}

int mq_send(mqd_t mqd, const char *ptr, size_t len, unsigned int prio)
{
    int              n;
    long             index, freeindex;
    char            *mptr;
    struct sigevent *sigev;
    struct mq_hdr   *mqhdr;
    struct mq_attr  *attr;
    struct msg_hdr  *msghdr, *nmsghdr, *pmsghdr;
    struct mq_info  *mqinfo;

    mqinfo = mqd;
    if (mqinfo->mqi_magic != MQI_MAGIC) {
        errno = EBADF;
        return(-1);
    }
    mqhdr = mqinfo->mqi_hdr;        /* struct pointer */
    mptr = (char *) mqhdr;          /* byte pointer */
    attr = &mqhdr->mqh_attr;
    if ( (n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {
        errno = n;
        return(-1);
    }

    if (len > (size_t)attr->mq_msgsize) {
        errno = EMSGSIZE;
        goto err;
    }
    if (attr->mq_curmsgs == 0) {
        if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0) {
            sigev = &mqhdr->mqh_event;
#if !defined(WIN32)
            if (sigev->sigev_notify == SIGEV_SIGNAL) {
                sigqueue(mqhdr->mqh_pid, sigev->sigev_signo,
                                         sigev->sigev_value);
            }
#endif
            mqhdr->mqh_pid = 0;             /* unregister */
        }
    } else if (attr->mq_curmsgs >= attr->mq_maxmsg) {
        /* queue is full */
        if (mqinfo->mqi_flags & O_NONBLOCK) {
            errno = EAGAIN;
            goto err;
        }
        /* wait for room for one message on the queue */
        while (attr->mq_curmsgs >= attr->mq_maxmsg)
            pthread_cond_wait(&mqhdr->mqh_wait, &mqhdr->mqh_lock);
    }
    /* nmsghdr will point to new message */
    if ( (freeindex = mqhdr->mqh_free) == 0) {
        fprintf(stderr, "mq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);
    }

    nmsghdr = (struct msg_hdr *) &mptr[freeindex];
    nmsghdr->msg_prio = prio;
    nmsghdr->msg_len = len;
    memcpy(nmsghdr + 1, ptr, len);          /* copy message from caller */
    mqhdr->mqh_free = nmsghdr->msg_next;    /* new freelist head */

    /* find right place for message in linked list */
    index = mqhdr->mqh_head;
    pmsghdr = (struct msg_hdr *) &(mqhdr->mqh_head);
    while (index != 0) {
        msghdr = (struct msg_hdr *) &mptr[index];
        if (prio > msghdr->msg_prio) {
            nmsghdr->msg_next = index;
            pmsghdr->msg_next = freeindex;
            break;
        }
        index = msghdr->msg_next;
        pmsghdr = msghdr;
    }
    if (index == 0) {
        /* queue was empty or new goes at end of list */
        pmsghdr->msg_next = freeindex;
        nmsghdr->msg_next = 0;
    }
    /* wake up anyone blocked in mq_receive waiting for a message */ 
    if (attr->mq_curmsgs == 0)
        pthread_cond_signal(&mqhdr->mqh_wait);
    attr->mq_curmsgs++;

    pthread_mutex_unlock(&mqhdr->mqh_lock);
    return(0);

err:
    pthread_mutex_unlock(&mqhdr->mqh_lock);
    return(-1);
}

int mq_setattr(mqd_t mqd, const struct mq_attr *mqstat, struct mq_attr *omqstat)
{
    int             n;
    struct mq_hdr  *mqhdr;
    struct mq_attr *attr;
    struct mq_info *mqinfo;

    mqinfo = mqd;
    if (mqinfo->mqi_magic != MQI_MAGIC) {
        errno = EBADF;
        return(-1);
    }
    mqhdr = mqinfo->mqi_hdr;
    attr = &mqhdr->mqh_attr;
    if ( (n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {
        errno = n;
        return(-1);
    }

    if (omqstat != NULL) {
        omqstat->mq_flags = mqinfo->mqi_flags;  /* previous attributes */
        omqstat->mq_maxmsg = attr->mq_maxmsg;
        omqstat->mq_msgsize = attr->mq_msgsize;
        omqstat->mq_curmsgs = attr->mq_curmsgs; /* and current status */
    }

    if (mqstat->mq_flags & O_NONBLOCK)
        mqinfo->mqi_flags |= O_NONBLOCK;
    else
        mqinfo->mqi_flags &= ~O_NONBLOCK;

    pthread_mutex_unlock(&mqhdr->mqh_lock);
    return(0);
}

int mq_unlink(const char *pathname)
{
    if (unlink(pathname) == -1)
        return(-1);
    return(0);
}

[-- Attachment #3: mqueue.h --]
[-- Type: text/x-c, Size: 3120 bytes --]

/*****************************************************************************
 *
 * POSIX Message Queue library implemented using memory mapped files
 *
 *****************************************************************************/
#ifndef __mqueue_h
#define __mqueue_h
#include <pthread.h>

#if defined(WIN32)
#   include <fcntl.h>
#   define EMSGSIZE    4200
#   define O_NONBLOCK  0200000

    union sigval {
        int           sival_int;     /* integer value */
        void          *sival_ptr;    /* pointer value */
    };
    struct sigevent {
        int           sigev_notify;  /* notification type */
        int           sigev_signo;   /* signal number */
        union sigval  sigev_value;   /* signal value */
    };
    typedef int pid_t;
    typedef int ssize_t;
#endif

/*****************************************************************************/

typedef struct mq_info *mqd_t;       /* opaque datatype */

struct mq_attr {
    long mq_flags;     /* message queue flag: O_NONBLOCK */
    long mq_maxmsg;    /* max number of messages allowed on queue */
    long mq_msgsize;   /* max size of a message (in bytes) */
    long mq_curmsgs;   /* number of messages currently on queue */
};

/* one mq_hdr{} per queue, at beginning of mapped file */
struct mq_hdr {
    struct mq_attr    mqh_attr;  /* the queue's attributes */
    long              mqh_head;  /* index of first message */
    long              mqh_free;  /* index of first free message */
    long              mqh_nwait; /* #threads blocked in mq_receive() */
    pid_t             mqh_pid;   /* nonzero PID if mqh_event set */
    struct sigevent   mqh_event; /* for mq_notify() */
    pthread_mutex_t   mqh_lock;  /* mutex lock */
    pthread_cond_t    mqh_wait;  /* and condition variable */
};

/* one msg_hdr{} at the front of each message in the mapped file */
struct msg_hdr {
    long            msg_next;    /* index of next on linked list */
                                 /* msg_next must be first member in struct */
    ssize_t         msg_len;     /* actual length */
    unsigned int    msg_prio;    /* priority */
};

/* one mq_info{} malloc'ed per process per mq_open() */
struct mq_info {
#if defined(WIN32)
    HANDLE         mqi_fmap;     /* file mapping object */
#endif
    struct mq_hdr *mqi_hdr;      /* start of mmap'ed region */
    long           mqi_magic;    /* magic number if open */
    int            mqi_flags;    /* flags for this process */
};
#define MQI_MAGIC  0x98765432

/* size of message in file is rounded up for alignment */
#define MSGSIZE(i) ((((i) + sizeof(long)-1) / sizeof(long)) * sizeof(long))

/* message queue functions */
extern int     mq_close(mqd_t);
extern int     mq_getattr(mqd_t, struct mq_attr *);
extern int     mq_notify(mqd_t, const struct sigevent *);
extern mqd_t   mq_open(const char *, int, ...);
extern ssize_t mq_receive(mqd_t, char *, size_t, unsigned int *);
extern int     mq_send(mqd_t, const char *, size_t, unsigned int);
extern int     mq_setattr(mqd_t, const struct mq_attr *, struct mq_attr *);
extern int     mq_unlink(const char *name);

#endif

[-- Attachment #4: tester.c --]
[-- Type: text/x-c, Size: 2409 bytes --]

#include <stdio.h>
#include "mqueue.h"

#define PERCENT(n, d) (((float)(n) / ((d) ? (d) : 1)) * 100)

#define MSG_COUNT   500             

#define MQ_PATH     "/tester.mq"
#define MQ_MAXMSG   100
static mqd_t mqd = (mqd_t)-1;

void *producer(void *arg)
{
    int delay = (unsigned long)arg;
    int msg;

    printf("Producer: Delay = 0 to %d ms\n", delay);
    for (msg = 1; msg <= MSG_COUNT; msg++) {
        if (mq_send(mqd, (char *)&msg, sizeof(int), 1)) {
            perror("Unable to send message");
            return(NULL);
        }
        printf("Producer: Put message #%d in queue\n", msg);
        Sleep((delay) ? (rand() % delay) : 0);
    }
    return(NULL);
}

void *consumer(void *arg)
{
    int delay = (unsigned long)arg;
    int msg;
    int n;
    struct mq_attr mqattr;

    printf("Consumer: Delay = 0 to %d ms\n", delay);
    for (n = 0; n < MSG_COUNT; n++) {
        if (mq_receive(mqd, (char *)&msg, sizeof(int), NULL) < 0) {
            perror("Unable to recv message");
            return(NULL);
        }
        if (mq_getattr(mqd, &mqattr)) {
            perror("Unable to get message queue attributes");
            return(NULL);
        }
        printf("Consumer: Got message #%d from queue [Queue is %.1f%% full]\n", 
               msg, PERCENT(mqattr.mq_curmsgs, mqattr.mq_maxmsg));
        Sleep((delay) ? (rand() % delay) : 0);
    }
    
    return(NULL);
}

int main(int argc, char *argv[])
{
    pthread_t ptid, ctid;
    struct mq_attr mqattr = {0, MQ_MAXMSG, sizeof(int), 0};
    long pdelay = 250; /* Milliseconds */
    long cdelay = 500; /* Milliseconds */
    void *tstat;

    switch (argc) {
    case 3: cdelay = atol(argv[2]);
    case 2: pdelay = atol(argv[1]);
    default:
        break;
    }
    srand((unsigned)time(NULL));

    mq_unlink(MQ_PATH);
    if ((mqd = mq_open(MQ_PATH, O_CREAT|O_EXCL|O_RDWR, 0660, &mqattr)) ==
        (mqd_t)-1) {
        perror("Unable to open message queue");
        return(1);
    }
    if (pthread_create(&ctid, NULL, consumer, (void *)cdelay)) {
        mq_close(mqd);
        perror("Unable to create consumer thread");
        return(1);
    }

    if (pthread_create(&ptid, NULL, producer, (void *)pdelay)) {
        mq_close(mqd);
        perror("Unable to create producer thread");
        return(1);
    }

    pthread_join(ptid, &tstat);
    pthread_join(ctid, &tstat);
    mq_close(mqd);

    return(0);
}

^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: POSIX Message Queues for Win32!
  2000-10-13  8:42 aurelio.medina
@ 2000-10-13 22:13 ` Ross Johnson
  0 siblings, 0 replies; 4+ messages in thread
From: Ross Johnson @ 2000-10-13 22:13 UTC (permalink / raw)
  To: aurelio.medina; +Cc: John.Bossom, jramirez, pthreads-win32

I did a quick search for existing Win32 POSIX.4 projects and
could not find any. Therefore, in the absence of a better
place for it, I will keep the code in a separate subtree under
pthreads-win32 and generate a separate DLL. Separation should
not be a problem if a more suitable project is found or started
later on.

One advantage:
Keeping the code with pthreads-win32 may help promote continued
development of both libraries. For example there is still more
that needs to be done within pthread.dll to fully support this
code - such as implement process shared mutexes.

Ross

aurelio.medina@bankofamerica.com wrote:
> 
> John,
> 
> Those are good points and I pretty much agree.  My only recommendation is
> that it stays associated with the PThreads-Win32 library since it is a
> dependency.  If a new project is created who will maintain it?
> 
> Aurelio
> 
> > -----Original Message-----
> > From: Bossom, John [SMTP:John.Bossom@Cognos.COM]
> > Sent: Friday, October 13, 2000 10:08 AM
> > To:   Medina, Aurelio; rpj@ise.canberra.edu.au
> > Cc:   jramirez@tct.hut.fi; pthreads-win32@sourceware.cygnus.com
> > Subject:      RE: POSIX Message Queues for Win32!
> >
> > First of all, I think it is great that you are contributing
> > this MQ code to open source.
> >
> > As I have pointed out to Ross, however, I don't believe that the
> > MQ source should be integrated into the pthreads-win32 library.
> > Instead, I believe it should be it's own library that requires the
> > pthreads-win32.
> >
> > I have two reasons for this:
> >
> > 1) MQs are not part of PThreads. The PThreads-win32 project was
> >    to provide an implementation of POSIX-1003.1c-1995 (i.e. POSIX.4a)
> >    with the addition of semaphores, from POSIX 1003.1b-1993.
> >    MQs are part of POSIX.4
> >    Adding MQ's to pthreads-win32 will basically permut the project
> >    into a repository for all POSIX.4 features; thus it will no
> >    longer be focused on PThreads, specifically.
> >
> > 2) It prevents users from picking and choosing what components
> >    they wish/need. A PThreads client may not want MQ's.
> >    Even on Solaris, the message queues are part of the posix4 library
> >    while the pthreads library is separate.
> >    i.e. to link against both PThreads and MQ's on Solaris, you would
> >         have to specify
> >                   -lposix4 -lpthread
> >
> > My recommendation is to start a new project that provides MQ's
> >
> > John
> >
> > -----Original Message-----
> > From: aurelio.medina@bankofamerica.com
> > [ mailto:aurelio.medina@bankofamerica.com ]
> > Sent: Friday, October 13, 2000 10:19 AM
> > To: rpj@ise.canberra.edu.au
> > Cc: jramirez@tct.hut.fi; pthreads-win32@sourceware.cygnus.com
> > Subject: POSIX Message Queues for Win32!
> >
> >
> > All,
> >
> > Since I never actually tested my Win32 port of POSIX Message Queues I had
> > no
> > idea how close I actually was.  Much to my surprise, I am glad to announce
> > that the source code provided below is now indeed very functional.  I have
> > tested this code with the file 'tester.c' using of course the
> > PThreads-Win32
> > library and it worked great.
> >
> > I'd like to provide a little history and notes about this source code.
> >
> > 1) I used Richard Steven's (May he RIP) POSIX MQ implementation using UNIX
> > memory mapped I/O as the base source code.
> > 2) I then had to make a few very minor changes to port it to HP-UX 10.x.
> > 3) I used this code in an in-house UNIX application at work that heavily
> > tested this code and it never once broke.
> > 4) Thinking I would also need to port this application to Windows NT I
> > ported the POSIX MQ library to Win32 using Win32 memory mapped I/O.  Since
> > I
> > never did port my application to Windows NT I never got to test my ported
> > code.
> > 5) Finally I decided to see just how close I was to having functional
> > code.
> > In my opinion this code is about 90% functional and can be made available
> > as
> > is.  See "Things left to be done" below.
> > 6) This code can be used on Win32 as well as UNIX platforms.
> > 7) This code was compiled using MS Visual C++ 6.0.
> >
> > POSIX Message Queue functions implemented:
> > mq_open()
> > mq_close()
> > mq_unlink()
> > mq_getattr()
> > mq_setattr()
> > mq_receive()
> > mq_send()
> >
> > Thing left to be done:
> > 1) Integrate into the current PThread-Win32 library.
> > 2) Implement mq_notify(), this could be tough.  Anyone up for the
> > challenge?
> > 3) Provide more test cases.
> > 4) Build using other compilers?
> >
> > Finally, given the fact that most of this MQ source code has been heavily
> > tested by my in-house application on UNIX, I am fairly confident that the
> > Win32 port should be stable.  But please do test some more.
> >
> >  <<mqueue.h>>  <<mqueue.c>>  <<tester.c>>
> >
> > Aurelio Medina
> >
> >
> > > -----Original Message-----
> > > From:       Ross Johnson [SMTP:rpj@ise.canberra.edu.au]
> > > Sent:       Wednesday, October 11, 2000 6:55 PM
> > > To: Medina, Aurelio
> > > Cc: jramirez@tct.hut.fi; pthreads-win32@sourceware.cygnus.com
> > > Subject:    Re: Message queues
> > >
> > > Hi,
> > >
> > > I'm going to make a special note to myself to include Aurelio's
> > > starting code in the next snapshot, which is what I should have done
> > > already - sorry. And I'll try to get it out as soon as I can - say,
> > > by next monday.
> > >
> > > Ross
> > >
> > > aurelio.medina@bankofamerica.com wrote:
> > > >
> > > > Julio,
> > > >
> > > > At one point I sent in my draft code implementing POSIX Message Queues
> > > for
> > > > Win32 hoping that someone can complete it.  I may be able to finish it
> > > in a
> > > > month or two but my work schedule makes it tough to do.
> > > >
> > > > Has anyone touched the source code that I sent to the PThreads-Win32
> > > mailing
> > > > list.  If there is enough interest in POSIX MQs for Win32 I may try to
> > > > complete it.  I have benefitted from this effort and  I might be able
> > to
> > > > contribute more if there is enough interest.
> > > >
> > > > Aurelio Medina
> > > >
> > > > > -----Original Message-----
> > > > > From: Julio [SMTP:jramirez@tct.hut.fi]
> > > > > Sent: Wednesday, October 11, 2000 5:29 AM
> > > > > To:   'Pthreads-win32'
> > > > > Subject:      Message queues
> > > > >
> > > > > Sorry, this may be offtopic.
> > > > >
> > > > > Could someone tell me where can I find Win32 POSIX libraries for
> > > message
> > > > >
> > > > > queues??
> > > > >
> > > > > Thnx.

^ permalink raw reply	[flat|nested] 4+ messages in thread

* RE: POSIX Message Queues for Win32!
@ 2000-10-13  8:42 aurelio.medina
  2000-10-13 22:13 ` Ross Johnson
  0 siblings, 1 reply; 4+ messages in thread
From: aurelio.medina @ 2000-10-13  8:42 UTC (permalink / raw)
  To: John.Bossom, rpj; +Cc: jramirez, pthreads-win32

John,

Those are good points and I pretty much agree.  My only recommendation is
that it stays associated with the PThreads-Win32 library since it is a
dependency.  If a new project is created who will maintain it?

Aurelio

> -----Original Message-----
> From:	Bossom, John [SMTP:John.Bossom@Cognos.COM]
> Sent:	Friday, October 13, 2000 10:08 AM
> To:	Medina, Aurelio; rpj@ise.canberra.edu.au
> Cc:	jramirez@tct.hut.fi; pthreads-win32@sourceware.cygnus.com
> Subject:	RE: POSIX Message Queues for Win32!
> 
> First of all, I think it is great that you are contributing
> this MQ code to open source.
> 
> As I have pointed out to Ross, however, I don't believe that the
> MQ source should be integrated into the pthreads-win32 library.
> Instead, I believe it should be it's own library that requires the
> pthreads-win32.
> 
> I have two reasons for this:
> 
> 1) MQs are not part of PThreads. The PThreads-win32 project was
>    to provide an implementation of POSIX-1003.1c-1995 (i.e. POSIX.4a)
>    with the addition of semaphores, from POSIX 1003.1b-1993.
>    MQs are part of POSIX.4
>    Adding MQ's to pthreads-win32 will basically permut the project
>    into a repository for all POSIX.4 features; thus it will no
>    longer be focused on PThreads, specifically.
>       
> 2) It prevents users from picking and choosing what components
>    they wish/need. A PThreads client may not want MQ's.
>    Even on Solaris, the message queues are part of the posix4 library
>    while the pthreads library is separate.
>    i.e. to link against both PThreads and MQ's on Solaris, you would
>         have to specify
>                   -lposix4 -lpthread
> 
> My recommendation is to start a new project that provides MQ's
> 
> John
> 
> -----Original Message-----
> From: aurelio.medina@bankofamerica.com
> [ mailto:aurelio.medina@bankofamerica.com ]
> Sent: Friday, October 13, 2000 10:19 AM
> To: rpj@ise.canberra.edu.au
> Cc: jramirez@tct.hut.fi; pthreads-win32@sourceware.cygnus.com
> Subject: POSIX Message Queues for Win32!
> 
> 
> All,
> 
> Since I never actually tested my Win32 port of POSIX Message Queues I had
> no
> idea how close I actually was.  Much to my surprise, I am glad to announce
> that the source code provided below is now indeed very functional.  I have
> tested this code with the file 'tester.c' using of course the
> PThreads-Win32
> library and it worked great.
> 
> I'd like to provide a little history and notes about this source code.
> 
> 1) I used Richard Steven's (May he RIP) POSIX MQ implementation using UNIX
> memory mapped I/O as the base source code.
> 2) I then had to make a few very minor changes to port it to HP-UX 10.x.
> 3) I used this code in an in-house UNIX application at work that heavily
> tested this code and it never once broke.
> 4) Thinking I would also need to port this application to Windows NT I
> ported the POSIX MQ library to Win32 using Win32 memory mapped I/O.  Since
> I
> never did port my application to Windows NT I never got to test my ported
> code.
> 5) Finally I decided to see just how close I was to having functional
> code.
> In my opinion this code is about 90% functional and can be made available
> as
> is.  See "Things left to be done" below.
> 6) This code can be used on Win32 as well as UNIX platforms.
> 7) This code was compiled using MS Visual C++ 6.0.
> 
> POSIX Message Queue functions implemented:
> mq_open()
> mq_close()
> mq_unlink()
> mq_getattr()
> mq_setattr()
> mq_receive()
> mq_send()
> 
> Thing left to be done:
> 1) Integrate into the current PThread-Win32 library.
> 2) Implement mq_notify(), this could be tough.  Anyone up for the
> challenge?
> 3) Provide more test cases.
> 4) Build using other compilers?
> 
> Finally, given the fact that most of this MQ source code has been heavily
> tested by my in-house application on UNIX, I am fairly confident that the
> Win32 port should be stable.  But please do test some more.
> 
>  <<mqueue.h>>  <<mqueue.c>>  <<tester.c>> 
> 
> Aurelio Medina
> 
> 
> > -----Original Message-----
> > From:	Ross Johnson [SMTP:rpj@ise.canberra.edu.au]
> > Sent:	Wednesday, October 11, 2000 6:55 PM
> > To:	Medina, Aurelio
> > Cc:	jramirez@tct.hut.fi; pthreads-win32@sourceware.cygnus.com
> > Subject:	Re: Message queues
> > 
> > Hi,
> > 
> > I'm going to make a special note to myself to include Aurelio's
> > starting code in the next snapshot, which is what I should have done
> > already - sorry. And I'll try to get it out as soon as I can - say,
> > by next monday.
> > 
> > Ross
> > 
> > aurelio.medina@bankofamerica.com wrote:
> > > 
> > > Julio,
> > > 
> > > At one point I sent in my draft code implementing POSIX Message Queues
> > for
> > > Win32 hoping that someone can complete it.  I may be able to finish it
> > in a
> > > month or two but my work schedule makes it tough to do.
> > > 
> > > Has anyone touched the source code that I sent to the PThreads-Win32
> > mailing
> > > list.  If there is enough interest in POSIX MQs for Win32 I may try to
> > > complete it.  I have benefitted from this effort and  I might be able
> to
> > > contribute more if there is enough interest.
> > > 
> > > Aurelio Medina
> > > 
> > > > -----Original Message-----
> > > > From: Julio [SMTP:jramirez@tct.hut.fi]
> > > > Sent: Wednesday, October 11, 2000 5:29 AM
> > > > To:   'Pthreads-win32'
> > > > Subject:      Message queues
> > > >
> > > > Sorry, this may be offtopic.
> > > >
> > > > Could someone tell me where can I find Win32 POSIX libraries for
> > message
> > > >
> > > > queues??
> > > >
> > > > Thnx.

^ permalink raw reply	[flat|nested] 4+ messages in thread

* RE: POSIX Message Queues for Win32!
@ 2000-10-13  8:07 Bossom, John
  0 siblings, 0 replies; 4+ messages in thread
From: Bossom, John @ 2000-10-13  8:07 UTC (permalink / raw)
  To: 'aurelio.medina@bankofamerica.com', rpj; +Cc: jramirez, pthreads-win32

First of all, I think it is great that you are contributing
this MQ code to open source.

As I have pointed out to Ross, however, I don't believe that the
MQ source should be integrated into the pthreads-win32 library.
Instead, I believe it should be it's own library that requires the
pthreads-win32.

I have two reasons for this:

1) MQs are not part of PThreads. The PThreads-win32 project was
   to provide an implementation of POSIX-1003.1c-1995 (i.e. POSIX.4a)
   with the addition of semaphores, from POSIX 1003.1b-1993.
   MQs are part of POSIX.4
   Adding MQ's to pthreads-win32 will basically permut the project
   into a repository for all POSIX.4 features; thus it will no
   longer be focused on PThreads, specifically.
      
2) It prevents users from picking and choosing what components
   they wish/need. A PThreads client may not want MQ's.
   Even on Solaris, the message queues are part of the posix4 library
   while the pthreads library is separate.
   i.e. to link against both PThreads and MQ's on Solaris, you would
        have to specify
                  -lposix4 -lpthread

My recommendation is to start a new project that provides MQ's

John

-----Original Message-----
From: aurelio.medina@bankofamerica.com
[ mailto:aurelio.medina@bankofamerica.com ]
Sent: Friday, October 13, 2000 10:19 AM
To: rpj@ise.canberra.edu.au
Cc: jramirez@tct.hut.fi; pthreads-win32@sourceware.cygnus.com
Subject: POSIX Message Queues for Win32!


All,

Since I never actually tested my Win32 port of POSIX Message Queues I had no
idea how close I actually was.  Much to my surprise, I am glad to announce
that the source code provided below is now indeed very functional.  I have
tested this code with the file 'tester.c' using of course the PThreads-Win32
library and it worked great.

I'd like to provide a little history and notes about this source code.

1) I used Richard Steven's (May he RIP) POSIX MQ implementation using UNIX
memory mapped I/O as the base source code.
2) I then had to make a few very minor changes to port it to HP-UX 10.x.
3) I used this code in an in-house UNIX application at work that heavily
tested this code and it never once broke.
4) Thinking I would also need to port this application to Windows NT I
ported the POSIX MQ library to Win32 using Win32 memory mapped I/O.  Since I
never did port my application to Windows NT I never got to test my ported
code.
5) Finally I decided to see just how close I was to having functional code.
In my opinion this code is about 90% functional and can be made available as
is.  See "Things left to be done" below.
6) This code can be used on Win32 as well as UNIX platforms.
7) This code was compiled using MS Visual C++ 6.0.

POSIX Message Queue functions implemented:
mq_open()
mq_close()
mq_unlink()
mq_getattr()
mq_setattr()
mq_receive()
mq_send()

Thing left to be done:
1) Integrate into the current PThread-Win32 library.
2) Implement mq_notify(), this could be tough.  Anyone up for the challenge?
3) Provide more test cases.
4) Build using other compilers?

Finally, given the fact that most of this MQ source code has been heavily
tested by my in-house application on UNIX, I am fairly confident that the
Win32 port should be stable.  But please do test some more.

 <<mqueue.h>>  <<mqueue.c>>  <<tester.c>> 

Aurelio Medina


> -----Original Message-----
> From:	Ross Johnson [SMTP:rpj@ise.canberra.edu.au]
> Sent:	Wednesday, October 11, 2000 6:55 PM
> To:	Medina, Aurelio
> Cc:	jramirez@tct.hut.fi; pthreads-win32@sourceware.cygnus.com
> Subject:	Re: Message queues
> 
> Hi,
> 
> I'm going to make a special note to myself to include Aurelio's
> starting code in the next snapshot, which is what I should have done
> already - sorry. And I'll try to get it out as soon as I can - say,
> by next monday.
> 
> Ross
> 
> aurelio.medina@bankofamerica.com wrote:
> > 
> > Julio,
> > 
> > At one point I sent in my draft code implementing POSIX Message Queues
> for
> > Win32 hoping that someone can complete it.  I may be able to finish it
> in a
> > month or two but my work schedule makes it tough to do.
> > 
> > Has anyone touched the source code that I sent to the PThreads-Win32
> mailing
> > list.  If there is enough interest in POSIX MQs for Win32 I may try to
> > complete it.  I have benefitted from this effort and  I might be able to
> > contribute more if there is enough interest.
> > 
> > Aurelio Medina
> > 
> > > -----Original Message-----
> > > From: Julio [SMTP:jramirez@tct.hut.fi]
> > > Sent: Wednesday, October 11, 2000 5:29 AM
> > > To:   'Pthreads-win32'
> > > Subject:      Message queues
> > >
> > > Sorry, this may be offtopic.
> > >
> > > Could someone tell me where can I find Win32 POSIX libraries for
> message
> > >
> > > queues??
> > >
> > > Thnx.

^ permalink raw reply	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2000-10-13 22:13 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2000-10-13  7:18 POSIX Message Queues for Win32! aurelio.medina
2000-10-13  8:07 Bossom, John
2000-10-13  8:42 aurelio.medina
2000-10-13 22:13 ` Ross Johnson

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).