/* Includes */
#include < pthread.h >
#include < stdio.h >
#include < stdlib.h >
#include < unistd.h >
#include < mqueue.h >
#include < sys/stat.h >
#include < errno.h >
#include < string.h >
/* Threads */
static void *MainThread (void *);
static void *AnotherThread (void *);
/* Defines */
#define MAIN_QNAME "/MainQueue"
pthread_mutex_t wait_mutex;
int mq_test(void)
{
pthread_t mainThread, anotherThread;
printf ("Creating threads .. \n");
pthread_create (&mainThread, NULL, &MainThread, NULL);
pthread_create (&anotherThread, NULL, &AnotherThread, NULL);
pthread_mutex_init (&wait_mutex, NULL);
pthread_join (mainThread, NULL);
pthread_join (anotherThread, NULL);
return 1;
}
#define BUFFER_SIZE 10000
/* Main thread .. Waiting for messages */
static void *MainThread (void *args)
{
mqd_t queue_handle;
char buffer[BUFFER_SIZE];
int bytes_read;
struct mq_attr msgq_attr;
unsigned int sender;
printf ("[MainThread] Inside main thread \n");
// Let the other thread wait till I am ready!
pthread_mutex_lock (&wait_mutex);
// Clear the buffer
memset (buffer, 0, BUFFER_SIZE);
// Detach the thread
pthread_detach (pthread_self());
// unlink the queue if it exisits - debug
mq_unlink (MAIN_QNAME);
printf ("[MainThread]Opening MQ \n");
queue_handle= mq_open(MAIN_QNAME, O_RDWR | O_CREAT, S_IRWXU | S_IRWXG, NULL);
if (queue_handle == -1)
{
perror ("[MainThread] Error Opening MQ: ");
return 0;
}
printf ("[MainThread] Waiting for messages ... \n");
pthread_mutex_unlock (&wait_mutex);
for (; ;)
{
bytes_read = mq_receive(queue_handle, buffer, BUFFER_SIZE, &sender);
if (bytes_read == -1)
{
perror("[MainThread] Failed to recieve:");
return 0;
}
else
{
printf ("[MainThread] Data: %s \n", buffer);
// Get the MQ attributes
mq_getattr(queue_handle, &msgq_attr);
printf("[MainThread] Queue \"%s\":\n"
"\t- stores at most %ld messages\n"
"\t- large at most %ld bytes each\n"
"\t- currently holds %ld messages\n",
MAIN_QNAME,
msgq_attr.mq_maxmsg,
msgq_attr.mq_msgsize,
msgq_attr.mq_curmsgs);
// Clear buffer and sleep to block for some time t and see
// if you get all the messages!
memset(buffer, 0, BUFFER_SIZE);
sleep(5);
}
}
mq_close (queue_handle);
}
#define MAX_SEND_BUFFER 70
static void *AnotherThread (void *args)
{
mqd_t queue_handle;
char buffer[MAX_SEND_BUFFER];
unsigned int msgprio = 1;
int count;
printf ("[AnotherThread] Inside Another thread \n");
pthread_mutex_lock (&wait_mutex);
queue_handle= mq_open(MAIN_QNAME, O_RDWR);
if (queue_handle == -1)
{
perror ("[AnotherThread] Error Opening MQ:");
return 0;
}
for (count = 0; count < 100; count++)
{
snprintf (buffer, MAX_SEND_BUFFER, "Message %d from Another thread", count);
if (0 != mq_send (queue_handle, buffer, strlen(buffer)+1, msgprio))
{
perror ("[AnotherThread] Sending:");
mq_close (queue_handle);
pthread_mutex_unlock (&wait_mutex);
return 0;
}
}
pthread_mutex_unlock (&wait_mutex);
mq_close (queue_handle);
return 0;
}
#ifdef _DEBUG_
int main (void)
{
return mq_test(void);
}
#endif
Using POSIX Message Queues
Code