public inbox for pthreads-win32@sourceware.org
 help / color / mirror / Atom feed
* Last version of message queues.
@ 2001-08-02  7:27 Le Coent Yannick
  0 siblings, 0 replies; 5+ messages in thread
From: Le Coent Yannick @ 2001-08-02  7:27 UTC (permalink / raw)
  To: pthreads-win32

[-- Warning: decoded text below may be mangled, UTF-8 assumed --]
[-- Attachment #1: Type: text/plain, Size: 354 bytes --]

Hello,

Can anyone tell me where I could find the latest sources for the message queues?

Thanks a lot,
Yannick LE COENT
SIEMENS Réseaux Informatiques et Télécommunications
3, rue Blaise Pascal
F - 22300 LANNION

Tel.:	+33-2-96 48 74 26
Fax:	+33-2-96 48 74 73
E-mail:	Yannick.LeCoent@srit.siemens.fr < mailto:Yannick.LeCoent@srit.siemens.fr > 


^ permalink raw reply	[flat|nested] 5+ messages in thread

* RE: Last version of message queues.
@ 2001-08-03  6:19 Aurelio Medina
  0 siblings, 0 replies; 5+ messages in thread
From: Aurelio Medina @ 2001-08-03  6:19 UTC (permalink / raw)
  To: 'Ross Johnson', 'Le Coent Yannick', pthreads-win32

Ross,

I am using the 2000-12-29 snapshot.  Do you return an error in newer
snapshots.  If so, let me know and I can update the code.

Aurelio
-----Original Message-----
From: Ross Johnson [ mailto:rpj@ise.canberra.edu.au ]
Sent: Thursday, August 02, 2001 11:49 PM
To: Aurelio Medina; 'Le Coent Yannick';
pthreads-win32@sourceware.cygnus.com
Subject: Re: Last version of message queues.


Aurelio,

Ross Johnson wrote:
> 
> Hi Aurelio,
> 
> Last time I looked you needed PTHREAD_PROCESS_SHARED mutexes
> and condition variables. The code you just sent does the same
> as last time.
> 
> _POSIX_THREAD_PROCESS_SHARED isn't defined in pthreads-win32, nor
> in Linux I believe (although I could be out of date there).
> 
> For example, in  mq_open() you have the following code:
> 
>         if ( (i = pthread_mutexattr_init(&mattr)) != 0)
>            goto pthreaderr;
>         pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
>         i = pthread_mutex_init(&mqhdr->mqh_lock, &mattr);
>         pthread_mutexattr_destroy(&mattr);      /* be sure to destroy */
>         if (i != 0)
>            goto pthreaderr;
> 
>         if ( (i = pthread_condattr_init(&cattr)) != 0)
>            goto pthreaderr;
>         pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
>         i = pthread_cond_init(&mqhdr->mqh_wait, &cattr);
> 
> The *attr_setpshared() calls will be returning ENOSYS errors.
> 

I apologise - it's pthread_{mutex,cond}_init() that should be
returning ENOSYS, not *attr_setpshared(). Since you're catching
that I'm now confused so I'll go take a longer harder look.

Sorry.
Ross

^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: Last version of message queues.
  2001-08-02 18:19 ` Ross Johnson
@ 2001-08-02 21:48   ` Ross Johnson
  0 siblings, 0 replies; 5+ messages in thread
From: Ross Johnson @ 2001-08-02 21:48 UTC (permalink / raw)
  To: Aurelio Medina, 'Le Coent Yannick', pthreads-win32

Aurelio,

Ross Johnson wrote:
> 
> Hi Aurelio,
> 
> Last time I looked you needed PTHREAD_PROCESS_SHARED mutexes
> and condition variables. The code you just sent does the same
> as last time.
> 
> _POSIX_THREAD_PROCESS_SHARED isn't defined in pthreads-win32, nor
> in Linux I believe (although I could be out of date there).
> 
> For example, in  mq_open() you have the following code:
> 
>         if ( (i = pthread_mutexattr_init(&mattr)) != 0)
>            goto pthreaderr;
>         pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
>         i = pthread_mutex_init(&mqhdr->mqh_lock, &mattr);
>         pthread_mutexattr_destroy(&mattr);      /* be sure to destroy */
>         if (i != 0)
>            goto pthreaderr;
> 
>         if ( (i = pthread_condattr_init(&cattr)) != 0)
>            goto pthreaderr;
>         pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
>         i = pthread_cond_init(&mqhdr->mqh_wait, &cattr);
> 
> The *attr_setpshared() calls will be returning ENOSYS errors.
> 

I apologise - it's pthread_{mutex,cond}_init() that should be
returning ENOSYS, not *attr_setpshared(). Since you're catching
that I'm now confused so I'll go take a longer harder look.

Sorry.
Ross

^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: Last version of message queues.
  2001-08-02  7:36 Aurelio Medina
@ 2001-08-02 18:19 ` Ross Johnson
  2001-08-02 21:48   ` Ross Johnson
  0 siblings, 1 reply; 5+ messages in thread
From: Ross Johnson @ 2001-08-02 18:19 UTC (permalink / raw)
  To: Aurelio Medina; +Cc: 'Le Coent Yannick', pthreads-win32

[-- Warning: decoded text below may be mangled, UTF-8 assumed --]
[-- Attachment #1: Type: text/plain, Size: 2174 bytes --]

Hi Aurelio,

Last time I looked you needed PTHREAD_PROCESS_SHARED mutexes
and condition variables. The code you just sent does the same
as last time.

_POSIX_THREAD_PROCESS_SHARED isn't defined in pthreads-win32, nor
in Linux I believe (although I could be out of date there).

For example, in  mq_open() you have the following code:

	if ( (i = pthread_mutexattr_init(&mattr)) != 0)
	   goto pthreaderr;
	pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
	i = pthread_mutex_init(&mqhdr->mqh_lock, &mattr);
	pthread_mutexattr_destroy(&mattr);      /* be sure to destroy */
	if (i != 0)
	   goto pthreaderr;
 
	if ( (i = pthread_condattr_init(&cattr)) != 0)
	   goto pthreaderr;
	pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
	i = pthread_cond_init(&mqhdr->mqh_wait, &cattr);

The *attr_setpshared() calls will be returning ENOSYS errors.

Ross

Aurelio Medina wrote:
> 
> Here is my latest port of the POSIX message queue interface.  I contributed
> my code to PThreads-Win32 some time ago.  I'm not sure if its available on
> the Web Site.
> 
> Anyhow,  I'm using this implementation along with Pthreads-Win32 in a
> production real-time trading application with no problems.
> 
> Hope this helps,
> Aurelio Medina
> 
> -----Original Message-----
> From: Le Coent Yannick [ mailto:Yannick.LeCoent@srit.siemens.fr ]
> Sent: Thursday, August 02, 2001 9:27 AM
> To: pthreads-win32@sourceware.cygnus.com
> Subject: Last version of message queues.
> 
> Hello,
> 
> Can anyone tell me where I could find the latest sources for the message
> queues?
> 
> Thanks a lot,
> Yannick LE COENT
> SIEMENS Réseaux Informatiques et Télécommunications
> 3, rue Blaise Pascal
> F - 22300 LANNION
> 
> Tel.:   +33-2-96 48 74 26
> Fax:    +33-2-96 48 74 73
> E-mail: Yannick.LeCoent@srit.siemens.fr
> < mailto:Yannick.LeCoent@srit.siemens.fr >
> 
>   ------------------------------------------------------------------------
>                Name: mqueue.c
>    mqueue.c    Type: unspecified type (application/octet-stream)
>            Encoding: quoted-printable
> 
>    mqueue.hName: mqueue.h
>            Type: unspecified type (application/octet-stream)

^ permalink raw reply	[flat|nested] 5+ messages in thread

* RE: Last version of message queues.
@ 2001-08-02  7:36 Aurelio Medina
  2001-08-02 18:19 ` Ross Johnson
  0 siblings, 1 reply; 5+ messages in thread
From: Aurelio Medina @ 2001-08-02  7:36 UTC (permalink / raw)
  To: 'Le Coent Yannick', pthreads-win32

[-- Warning: decoded text below may be mangled, UTF-8 assumed --]
[-- Attachment #1: Type: text/plain, Size: 901 bytes --]

Here is my latest port of the POSIX message queue interface.  I contributed
my code to PThreads-Win32 some time ago.  I'm not sure if its available on
the Web Site.

Anyhow,  I'm using this implementation along with Pthreads-Win32 in a
production real-time trading application with no problems.

Hope this helps,
Aurelio Medina

-----Original Message-----
From: Le Coent Yannick [ mailto:Yannick.LeCoent@srit.siemens.fr ]
Sent: Thursday, August 02, 2001 9:27 AM
To: pthreads-win32@sourceware.cygnus.com
Subject: Last version of message queues.


Hello,

Can anyone tell me where I could find the latest sources for the message
queues?

Thanks a lot,
Yannick LE COENT
SIEMENS Réseaux Informatiques et Télécommunications
3, rue Blaise Pascal
F - 22300 LANNION

Tel.:	+33-2-96 48 74 26
Fax:	+33-2-96 48 74 73
E-mail:	Yannick.LeCoent@srit.siemens.fr
< mailto:Yannick.LeCoent@srit.siemens.fr > 



[-- Attachment #2: mqueue.c --]
[-- Type: text/x-c, Size: 15921 bytes --]

/*****************************************************************************
 *
 * POSIX Message Queue for Windows NT
 *
 *****************************************************************************/
#include <stdlib.h>
#include <stdio.h>
#include <stdarg.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#if defined(WIN32)
#   include <io.h>
#endif
#include "mqueue.h"

#if defined(WIN32)
#   define S_IXUSR  0000100
#   define sleep(a) Sleep((a)*1000)

    typedef unsigned short mode_t;
#endif

#define MAX_TRIES   10
struct mq_attr defattr = { 0, 128, 1024, 0 };

int mq_close(mqd_t mqd)
{
    long            msgsize, filesize;
    struct mq_hdr  *mqhdr;
    struct mq_attr *attr;
    struct mq_info *mqinfo;

    mqinfo = mqd;
    if (mqinfo->mqi_magic != MQI_MAGIC) {
        errno = EBADF;
        return(-1);
    }
    mqhdr = mqinfo->mqi_hdr;
    attr = &mqhdr->mqh_attr;

    if (mq_notify(mqd, NULL) != 0)        /* unregister calling process */
        return(-1);

    msgsize = MSGSIZE(attr->mq_msgsize);
    filesize = sizeof(struct mq_hdr) + (attr->mq_maxmsg *
                      (sizeof(struct msg_hdr) + msgsize));
#if defined(WIN32)
    if (!UnmapViewOfFile(mqinfo->mqi_hdr) || !CloseHandle(mqinfo->mqi_fmap))
#else
    if (munmap(mqinfo->mqi_hdr, filesize) == -1)
#endif
        return(-1);

    mqinfo->mqi_magic = 0;          /* just in case */
    free(mqinfo);
    return(0);
}

int mq_getattr(mqd_t mqd, struct mq_attr *mqstat)
{
    int             n;
    struct mq_hdr  *mqhdr;
    struct mq_attr *attr;
    struct mq_info *mqinfo;

    mqinfo = mqd;
    if (mqinfo->mqi_magic != MQI_MAGIC) {
        errno = EBADF;
        return(-1);
    }
    mqhdr = mqinfo->mqi_hdr;
    attr = &mqhdr->mqh_attr;
    if ( (n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {
        errno = n;
        return(-1);
    }

    mqstat->mq_flags = mqinfo->mqi_flags;   /* per-open */
    mqstat->mq_maxmsg = attr->mq_maxmsg;    /* remaining three per-queue */
    mqstat->mq_msgsize = attr->mq_msgsize;
    mqstat->mq_curmsgs = attr->mq_curmsgs;

    pthread_mutex_unlock(&mqhdr->mqh_lock);
    return(0);
}

int mq_notify(mqd_t mqd, const struct sigevent *notification)
{
#if !defined(WIN32)
    int             n;
    pid_t           pid;
    struct mq_hdr  *mqhdr;
    struct mq_info *mqinfo;

    mqinfo = mqd;
    if (mqinfo->mqi_magic != MQI_MAGIC) {
        errno = EBADF;
        return(-1);
    }
    mqhdr = mqinfo->mqi_hdr;
    if ( (n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {
        errno = n;
        return(-1);
    }

    pid = getpid();
    if (notification == NULL) {
        if (mqhdr->mqh_pid == pid) {
            mqhdr->mqh_pid = 0;     /* unregister calling process */
        }                           /* no error if c aller not registered */
    } else {
        if (mqhdr->mqh_pid != 0) {
            if (kill(mqhdr->mqh_pid, 0) != -1 || errno != ESRCH) {
                errno = EBUSY;
                goto err;
            }
        }
        mqhdr->mqh_pid = pid;
        mqhdr->mqh_event = *notification;
    }
    pthread_mutex_unlock(&mqhdr->mqh_lock);
    return(0);

err:
    pthread_mutex_unlock(&mqhdr->mqh_lock);
    return(-1);
#else
    if (notification == NULL) {
        return(0);
    }
    errno = EINVAL;
    return(-1);
#endif
}

mqd_t mq_open(const char *pathname, int oflag, ...)
{
    int                  i, fd, nonblock, created, save_errno;
    long                 msgsize, filesize, index;
    va_list              ap;
    mode_t               mode;
    char                *mptr;
    struct stat          statbuff;
    struct mq_hdr       *mqhdr;
    struct msg_hdr      *msghdr;
    struct mq_attr      *attr;
    struct mq_info      *mqinfo;
    pthread_mutexattr_t  mattr;
    pthread_condattr_t   cattr;
#if defined(WIN32)
    HANDLE fmap;

    mptr = NULL;
#else
    mptr = (char *) MAP_FAILED;
#endif
    created = 0;
    nonblock = oflag & O_NONBLOCK;
    oflag &= ~O_NONBLOCK;
    mqinfo = NULL;

again:
    if (oflag & O_CREAT) {
        va_start(ap, oflag); /* init ap to final named argument */
        mode = va_arg(ap, mode_t) & ~S_IXUSR;
        attr = va_arg(ap, struct mq_attr *);
        va_end(ap);

        /* open and specify O_EXCL and user-execute */
        fd = open(pathname, oflag | O_EXCL | O_RDWR, mode | S_IXUSR);
        if (fd < 0) {
            if (errno == EEXIST && (oflag & O_EXCL) == 0)
                goto exists;            /* already exists, OK */
            else
                return((mqd_t) -1);
            }
            created = 1;
                        /* first one to create the file initializes it */
            if (attr == NULL)
                attr = &defattr;
            else {
                if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
                    errno = EINVAL;
                    goto err;
                }
            }
            /* calculate and set the file size */
            msgsize = MSGSIZE(attr->mq_msgsize);
            filesize = sizeof(struct mq_hdr) + (attr->mq_maxmsg *
                               (sizeof(struct msg_hdr) + msgsize));
            if (lseek(fd, filesize - 1, SEEK_SET) == -1)
                goto err;
            if (write(fd, "", 1) == -1)
                goto err;

            /* memory map the file */
#if defined(WIN32)
            fmap = CreateFileMapping((HANDLE)_get_osfhandle(fd), NULL, 
                                     PAGE_READWRITE, 0, 0, NULL);
            if (fmap == NULL)
                goto err;
            mptr = MapViewOfFile(fmap, FILE_MAP_WRITE, 0, 0, filesize);
            if (mptr == NULL)
#else
            mptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE,
                                        MAP_SHARED, fd, 0);
            if (mptr == MAP_FAILED)
#endif
                goto err;

            /* allocate one mq_info{} for the queue */
            if ( (mqinfo = malloc(sizeof(struct mq_info))) == NULL)
                goto err;
#if defined(WIN32)
            mqinfo->mqi_fmap = fmap;
#endif
            mqinfo->mqi_hdr = mqhdr = (struct mq_hdr *) mptr;
            mqinfo->mqi_magic = MQI_MAGIC;
            mqinfo->mqi_flags = nonblock;

            /* initialize header at beginning of file */
            /* create free list with all messages on it */
            mqhdr->mqh_attr.mq_flags = 0;
            mqhdr->mqh_attr.mq_maxmsg = attr->mq_maxmsg;
            mqhdr->mqh_attr.mq_msgsize = attr->mq_msgsize;
            mqhdr->mqh_attr.mq_curmsgs = 0;
            mqhdr->mqh_nwait = 0;
            mqhdr->mqh_pid = 0;
            mqhdr->mqh_head = 0;
            index = sizeof(struct mq_hdr);
            mqhdr->mqh_free = index;
            for (i = 0; i < attr->mq_maxmsg - 1; i++) {
                msghdr = (struct msg_hdr *) &mptr[index];
                index += sizeof(struct msg_hdr) + msgsize;
                msghdr->msg_next = index;
            }
            msghdr = (struct msg_hdr *) &mptr[index];
            msghdr->msg_next = 0;           /* end of free list */

            /* initialize mutex & condition variable */
            if ( (i = pthread_mutexattr_init(&mattr)) != 0)
                goto pthreaderr;
            pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
            i = pthread_mutex_init(&mqhdr->mqh_lock, &mattr);
            pthread_mutexattr_destroy(&mattr);      /* be sure to destroy */
            if (i != 0)
                goto pthreaderr;

            if ( (i = pthread_condattr_init(&cattr)) != 0)
                goto pthreaderr;
            pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
            i = pthread_cond_init(&mqhdr->mqh_wait, &cattr);
            pthread_condattr_destroy(&cattr);       /* be sure to destroy */
            if (i != 0)
                goto pthreaderr;

            /* initialization complete, turn off user-execute bit */
#if defined(WIN32)
            if (chmod(pathname, mode) == -1)
#else
            if (fchmod(fd, mode) == -1)
#endif
                goto err;
            close(fd);
            return((mqd_t) mqinfo);
    }
exists:
    /* open the file then memory map */
    if ( (fd = open(pathname, O_RDWR)) < 0) {
        if (errno == ENOENT && (oflag & O_CREAT))
            goto again;
        goto err;
    }

    /* make certain initialization is complete */
    for (i = 0; i < MAX_TRIES; i++) {
        if (stat(pathname, &statbuff) == -1) {
            if (errno == ENOENT && (oflag & O_CREAT)) {
                close(fd);
                goto again;
            }
            goto err;
        }
        if ((statbuff.st_mode & S_IXUSR) == 0)
            break;
        sleep(1);
    }
    if (i == MAX_TRIES) {
        errno = ETIMEDOUT;
        goto err;
    }

    filesize = statbuff.st_size;
#if defined(WIN32)
    fmap = CreateFileMapping((HANDLE)_get_osfhandle(fd), NULL, PAGE_READWRITE, 
                             0, 0, NULL);                             
    if (fmap == NULL)
        goto err;
    mptr = MapViewOfFile(fmap, FILE_MAP_WRITE, 0, 0, filesize);
    if (mptr == NULL)
#else
    mptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
    if (mptr == MAP_FAILED)
#endif
        goto err;
    close(fd);

    /* allocate one mq_info{} for each open */
    if ( (mqinfo = malloc(sizeof(struct mq_info))) == NULL)
        goto err;
    mqinfo->mqi_hdr = (struct mq_hdr *) mptr;
    mqinfo->mqi_magic = MQI_MAGIC;
    mqinfo->mqi_flags = nonblock;
    return((mqd_t) mqinfo);
pthreaderr:
    errno = i;
err:
    /* don't let following function calls change errno */
    save_errno = errno;
    if (created)
        unlink(pathname);
#if defined(WIN32)
    if (fmap != NULL) {
        if (mptr != NULL) {
            UnmapViewOfFile(mptr);
        }
        CloseHandle(fmap);
    }
#else
    if (mptr != MAP_FAILED)
        munmap(mptr, filesize);
#endif
    if (mqinfo != NULL)
        free(mqinfo);
    close(fd);
    errno = save_errno;
    return((mqd_t) -1);
}

ssize_t mq_receive(mqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop)
{
    int             n;
    long            index;
    char           *mptr;
    ssize_t         len;
    struct mq_hdr  *mqhdr;
    struct mq_attr *attr;
    struct msg_hdr *msghdr;
    struct mq_info *mqinfo;

    mqinfo = mqd;
    if (mqinfo->mqi_magic != MQI_MAGIC) {
        errno = EBADF;
        return(-1);
    }
    mqhdr = mqinfo->mqi_hdr;        /* struct pointer */
    mptr = (char *) mqhdr;          /* byte pointer */
    attr = &mqhdr->mqh_attr;
    if ( (n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {
        errno = n;
        return(-1);
    }

    if (maxlen < (size_t)attr->mq_msgsize) {
        errno = EMSGSIZE;
        goto err;
    }
    if (attr->mq_curmsgs == 0) {            /* queue is empty */
        if (mqinfo->mqi_flags & O_NONBLOCK) {
            errno = EAGAIN;
            goto err;
        }
        /* wait for a message to be placed onto queue */
        mqhdr->mqh_nwait++;
        while (attr->mq_curmsgs == 0)
            pthread_cond_wait(&mqhdr->mqh_wait, &mqhdr->mqh_lock);
        mqhdr->mqh_nwait--;
    }

    if ( (index = mqhdr->mqh_head) == 0) {
        fprintf(stderr, "mq_receive: curmsgs = %ld; head = 0",attr->mq_curmsgs);
        abort();
    }

    msghdr = (struct msg_hdr *) &mptr[index];
    mqhdr->mqh_head = msghdr->msg_next;     /* new head of list */
    len = msghdr->msg_len;
    memcpy(ptr, msghdr + 1, len);           /* copy the message itself */
    if (priop != NULL)
        *priop = msghdr->msg_prio;

    /* just-read message goes to front of free list */
    msghdr->msg_next = mqhdr->mqh_free;
    mqhdr->mqh_free = index;

    /* wake up anyone blocked in mq_send waiting for room */
    if (attr->mq_curmsgs == attr->mq_maxmsg)
        pthread_cond_signal(&mqhdr->mqh_wait);
    attr->mq_curmsgs--;

    pthread_mutex_unlock(&mqhdr->mqh_lock);
    return(len);

err:
    pthread_mutex_unlock(&mqhdr->mqh_lock);
    return(-1);
}

int mq_send(mqd_t mqd, const char *ptr, size_t len, unsigned int prio)
{
    int              n;
    long             index, freeindex;
    char            *mptr;
    struct sigevent *sigev;
    struct mq_hdr   *mqhdr;
    struct mq_attr  *attr;
    struct msg_hdr  *msghdr, *nmsghdr, *pmsghdr;
    struct mq_info  *mqinfo;

    mqinfo = mqd;
    if (mqinfo->mqi_magic != MQI_MAGIC) {
        errno = EBADF;
        return(-1);
    }
    mqhdr = mqinfo->mqi_hdr;        /* struct pointer */
    mptr = (char *) mqhdr;          /* byte pointer */
    attr = &mqhdr->mqh_attr;
    if ( (n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {
        errno = n;
        return(-1);
    }

    if (len > (size_t)attr->mq_msgsize) {
        errno = EMSGSIZE;
        goto err;
    }
    if (attr->mq_curmsgs == 0) {
        if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0) {
            sigev = &mqhdr->mqh_event;
#if !defined(WIN32)
            if (sigev->sigev_notify == SIGEV_SIGNAL) {
                sigqueue(mqhdr->mqh_pid, sigev->sigev_signo,
                                         sigev->sigev_value);
            }
#endif
            mqhdr->mqh_pid = 0;             /* unregister */
        }
    } else if (attr->mq_curmsgs >= attr->mq_maxmsg) {
        /* queue is full */
        if (mqinfo->mqi_flags & O_NONBLOCK) {
            errno = EAGAIN;
            goto err;
        }
        /* wait for room for one message on the queue */
        while (attr->mq_curmsgs >= attr->mq_maxmsg)
            pthread_cond_wait(&mqhdr->mqh_wait, &mqhdr->mqh_lock);
    }
    /* nmsghdr will point to new message */
    if ( (freeindex = mqhdr->mqh_free) == 0) {
        fprintf(stderr, "mq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);
    }

    nmsghdr = (struct msg_hdr *) &mptr[freeindex];
    nmsghdr->msg_prio = prio;
    nmsghdr->msg_len = len;
    memcpy(nmsghdr + 1, ptr, len);          /* copy message from caller */
    mqhdr->mqh_free = nmsghdr->msg_next;    /* new freelist head */

    /* find right place for message in linked list */
    index = mqhdr->mqh_head;
    pmsghdr = (struct msg_hdr *) &(mqhdr->mqh_head);
    while (index != 0) {
        msghdr = (struct msg_hdr *) &mptr[index];
        if (prio > msghdr->msg_prio) {
            nmsghdr->msg_next = index;
            pmsghdr->msg_next = freeindex;
            break;
        }
        index = msghdr->msg_next;
        pmsghdr = msghdr;
    }
    if (index == 0) {
        /* queue was empty or new goes at end of list */
        pmsghdr->msg_next = freeindex;
        nmsghdr->msg_next = 0;
    }
    /* wake up anyone blocked in mq_receive waiting for a message */ 
    if (attr->mq_curmsgs == 0)
        pthread_cond_signal(&mqhdr->mqh_wait);
    attr->mq_curmsgs++;

    pthread_mutex_unlock(&mqhdr->mqh_lock);
    return(0);

err:
    pthread_mutex_unlock(&mqhdr->mqh_lock);
    return(-1);
}

int mq_setattr(mqd_t mqd, const struct mq_attr *mqstat, struct mq_attr *omqstat)
{
    int             n;
    struct mq_hdr  *mqhdr;
    struct mq_attr *attr;
    struct mq_info *mqinfo;

    mqinfo = mqd;
    if (mqinfo->mqi_magic != MQI_MAGIC) {
        errno = EBADF;
        return(-1);
    }
    mqhdr = mqinfo->mqi_hdr;
    attr = &mqhdr->mqh_attr;
    if ( (n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {
        errno = n;
        return(-1);
    }

    if (omqstat != NULL) {
        omqstat->mq_flags = mqinfo->mqi_flags;  /* previous attributes */
        omqstat->mq_maxmsg = attr->mq_maxmsg;
        omqstat->mq_msgsize = attr->mq_msgsize;
        omqstat->mq_curmsgs = attr->mq_curmsgs; /* and current status */
    }

    if (mqstat->mq_flags & O_NONBLOCK)
        mqinfo->mqi_flags |= O_NONBLOCK;
    else
        mqinfo->mqi_flags &= ~O_NONBLOCK;

    pthread_mutex_unlock(&mqhdr->mqh_lock);
    return(0);
}

int mq_unlink(const char *pathname)
{
    if (unlink(pathname) == -1)
        return(-1);
    return(0);
}

[-- Attachment #3: mqueue.h --]
[-- Type: text/x-c, Size: 3120 bytes --]

/*****************************************************************************
 *
 * POSIX Message Queue library implemented using memory mapped files
 *
 *****************************************************************************/
#ifndef __mqueue_h
#define __mqueue_h
#include <pthread.h>

#if defined(WIN32)
#   include <fcntl.h>
#   define EMSGSIZE    4200
#   define O_NONBLOCK  0200000

    union sigval {
        int           sival_int;     /* integer value */
        void          *sival_ptr;    /* pointer value */
    };
    struct sigevent {
        int           sigev_notify;  /* notification type */
        int           sigev_signo;   /* signal number */
        union sigval  sigev_value;   /* signal value */
    };
    typedef int pid_t;
    typedef int ssize_t;
#endif

/*****************************************************************************/

typedef struct mq_info *mqd_t;       /* opaque datatype */

struct mq_attr {
    long mq_flags;     /* message queue flag: O_NONBLOCK */
    long mq_maxmsg;    /* max number of messages allowed on queue */
    long mq_msgsize;   /* max size of a message (in bytes) */
    long mq_curmsgs;   /* number of messages currently on queue */
};

/* one mq_hdr{} per queue, at beginning of mapped file */
struct mq_hdr {
    struct mq_attr    mqh_attr;  /* the queue's attributes */
    long              mqh_head;  /* index of first message */
    long              mqh_free;  /* index of first free message */
    long              mqh_nwait; /* #threads blocked in mq_receive() */
    pid_t             mqh_pid;   /* nonzero PID if mqh_event set */
    struct sigevent   mqh_event; /* for mq_notify() */
    pthread_mutex_t   mqh_lock;  /* mutex lock */
    pthread_cond_t    mqh_wait;  /* and condition variable */
};

/* one msg_hdr{} at the front of each message in the mapped file */
struct msg_hdr {
    long            msg_next;    /* index of next on linked list */
                                 /* msg_next must be first member in struct */
    ssize_t         msg_len;     /* actual length */
    unsigned int    msg_prio;    /* priority */
};

/* one mq_info{} malloc'ed per process per mq_open() */
struct mq_info {
#if defined(WIN32)
    HANDLE         mqi_fmap;     /* file mapping object */
#endif
    struct mq_hdr *mqi_hdr;      /* start of mmap'ed region */
    long           mqi_magic;    /* magic number if open */
    int            mqi_flags;    /* flags for this process */
};
#define MQI_MAGIC  0x98765432

/* size of message in file is rounded up for alignment */
#define MSGSIZE(i) ((((i) + sizeof(long)-1) / sizeof(long)) * sizeof(long))

/* message queue functions */
extern int     mq_close(mqd_t);
extern int     mq_getattr(mqd_t, struct mq_attr *);
extern int     mq_notify(mqd_t, const struct sigevent *);
extern mqd_t   mq_open(const char *, int, ...);
extern ssize_t mq_receive(mqd_t, char *, size_t, unsigned int *);
extern int     mq_send(mqd_t, const char *, size_t, unsigned int);
extern int     mq_setattr(mqd_t, const struct mq_attr *, struct mq_attr *);
extern int     mq_unlink(const char *name);

#endif

^ permalink raw reply	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2001-08-03  6:19 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2001-08-02  7:27 Last version of message queues Le Coent Yannick
2001-08-02  7:36 Aurelio Medina
2001-08-02 18:19 ` Ross Johnson
2001-08-02 21:48   ` Ross Johnson
2001-08-03  6:19 Aurelio Medina

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).