Skip to main content

Linux Message Queues: A Guide to Interprocess Communication (IPC)

As a software engineer, interprocess communication (IPC) is essential when multiple processes need to exchange data or coordinate actions. Linux provides several IPC mechanisms, including message queues, which enable asynchronous communication between processes running on the same machine. Linux message queues can be implemented using either POSIX message queues or System V message queues. Each has its features, and the choice depends on your application's requirements.

This guide focuses on Linux message queues, their use cases, and how to implement them in a real-world example.


What Are Linux Message Queues?

A message queue is a data structure that allows processes to send and receive messages asynchronously. The operating system manages the queue, ensuring messages are delivered reliably and in order.

Types of Linux Message Queues

  1. POSIX Message Queues: A modern implementation with priority-based message ordering and easier API usage. These queues are identified by names (e.g., /queue_name).
  2. System V Message Queues: An older implementation identified by integer keys. They allow fine-grained control over message permissions and resource limits.

Use Case: Real-Time Task Dispatching

Imagine a task scheduler that dispatches tasks to multiple worker processes:

  • The scheduler sends task descriptions to a message queue.
  • Worker processes read tasks from the queue and execute them asynchronously.
  • The queue ensures tasks are delivered even if workers are temporarily unavailable.

Linux message queues are ideal for such a system, offering durability, ordering, and asynchronous communication.


POSIX Message Queues: Modern and Simple IPC

Step 1: Create and Open a Message Queue

Include the necessary headers:

#include <fcntl.h>    // O_* constants
#include <sys/stat.h> // Mode constants
#include <mqueue.h> // Message queue functions
#include <stdio.h>
#include <stdlib.h>

Example Code:

#define QUEUE_NAME "/task_queue"
#define MAX_MSG_SIZE 256
#define MAX_MSG_COUNT 10
#define QUEUE_PERMISSIONS 0666

int main() {
// Define queue attributes
struct mq_attr attr;
attr.mq_flags = 0; // Blocking mode
attr.mq_maxmsg = MAX_MSG_COUNT; // Maximum number of messages
attr.mq_msgsize = MAX_MSG_SIZE; // Maximum message size
attr.mq_curmsgs = 0; // Number of messages currently in the queue

// Create the message queue
mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, QUEUE_PERMISSIONS, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
exit(EXIT_FAILURE);
}

printf("Message queue created: %s\n", QUEUE_NAME);
mq_close(mq);
return 0;
}

Step 2: Send Messages to the Queue

Example Code (Producer):

#include <mqueue.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

int main() {
mqd_t mq = mq_open(QUEUE_NAME, O_WRONLY);
if (mq == (mqd_t)-1) {
perror("mq_open");
exit(EXIT_FAILURE);
}

// Message to send
char message[MAX_MSG_SIZE] = "Task 1: Process file";
unsigned int priority = 5; // Higher priority = processed first

if (mq_send(mq, message, strlen(message) + 1, priority) == -1) {
perror("mq_send");
exit(EXIT_FAILURE);
}

printf("Message sent: %s\n", message);
mq_close(mq);
return 0;
}

Step 3: Receive Messages from the Queue

Example Code (Consumer):

#include <mqueue.h>
#include <stdio.h>
#include <stdlib.h>

int main() {
mqd_t mq = mq_open(QUEUE_NAME, O_RDONLY);
if (mq == (mqd_t)-1) {
perror("mq_open");
exit(EXIT_FAILURE);
}

char buffer[MAX_MSG_SIZE];
unsigned int priority;

// Receive a message
if (mq_receive(mq, buffer, MAX_MSG_SIZE, &priority) == -1) {
perror("mq_receive");
exit(EXIT_FAILURE);
}

printf("Received message: %s (priority: %u)\n", buffer, priority);
mq_close(mq);
return 0;
}

Step 4: Clean Up Resources

Remove the message queue when it is no longer needed:

int main() {
if (mq_unlink(QUEUE_NAME) == -1) {
perror("mq_unlink");
exit(EXIT_FAILURE);
}

printf("Message queue deleted: %s\n", QUEUE_NAME);
return 0;
}

System V Message Queues: Legacy IPC

Step 1: Create a Message Queue

System V message queues are identified by keys and managed with an ID. Include the headers:

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>

Example Code:

#define QUEUE_KEY 1234
#define PERMISSIONS 0666

int main() {
// Create a message queue
int msgid = msgget(QUEUE_KEY, IPC_CREAT | PERMISSIONS);
if (msgid == -1) {
perror("msgget");
exit(EXIT_FAILURE);
}

printf("Message queue created with ID: %d\n", msgid);
return 0;
}

Step 2: Send Messages to the Queue

System V messages include a type field for categorizing messages.

Example Code (Producer):

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define MSG_SIZE 256

struct message {
long type; // Message type
char text[MSG_SIZE]; // Message content
};

int main() {
int msgid = msgget(QUEUE_KEY, 0);
if (msgid == -1) {
perror("msgget");
exit(EXIT_FAILURE);
}

struct message msg;
msg.type = 1; // Type 1 message
strcpy(msg.text, "Task 1: Process file");

if (msgsnd(msgid, &msg, sizeof(msg.text), 0) == -1) {
perror("msgsnd");
exit(EXIT_FAILURE);
}

printf("Message sent: %s\n", msg.text);
return 0;
}

Step 3: Receive Messages from the Queue

Example Code (Consumer):

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>

#define MSG_SIZE 256

struct message {
long type; // Message type
char text[MSG_SIZE]; // Message content
};

int main() {
int msgid = msgget(QUEUE_KEY, 0);
if (msgid == -1) {
perror("msgget");
exit(EXIT_FAILURE);
}

struct message msg;

// Receive a message of type 1
if (msgrcv(msgid, &msg, sizeof(msg.text), 1, 0) == -1) {
perror("msgrcv");
exit(EXIT_FAILURE);
}

printf("Received message: %s\n", msg.text);
return 0;
}

Step 4: Clean Up Resources

Remove the queue using msgctl:

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>

int main() {
int msgid = msgget(QUEUE_KEY, 0);
if (msgid == -1) {
perror("msgget");
exit(EXIT_FAILURE);
}

// Remove the queue
if (msgctl(msgid, IPC_RMID, NULL) == -1) {
perror("msgctl");
exit(EXIT_FAILURE);
}

printf("Message queue deleted.\n");
return 0;
}

Comparison: POSIX vs. System V Message Queues

FeaturePOSIX Message QueuesSystem V Message Queues
NamingNamed with a stringIdentified by key
Priority SupportYesNo
Ease of UseSimple APIMore complex API
ScalabilityHigherLower
StandardizationPOSIX standardLegacy System V

Best Practices for Using Linux Message Queues

  1. Clean Up Resources: Always unlink or remove queues after use to avoid resource leaks.
  2. Error Handling: Check return values of system calls for proper error handling.
  3. Priority Handling: Use message priorities to process critical tasks first (POSIX).
  4. Concurrency: Use locks or message attributes to prevent race conditions in high-concurrency scenarios.

Conclusion

Linux message queues are a powerful tool for interprocess communication, offering asynchronous, ordered, and reliable messaging. POSIX message queues provide a modern, user-friendly API with advanced features like message priority, while System V message queues offer fine-grained control but are less commonly used in modern systems.

By following this guide, you can implement Linux message queues to build efficient IPC mechanisms tailored to your application’s needs. Let me know if you'd like further examples or details on other Linux IPC mechanisms like shared memory or semaphores!