Inter-process Communication (IPC): messaging system to publish/consume one million messages per second using just one cheap machine.

Nowadays people is talking about how to use technologies like kafka to publish two million messages per second like in this post, or cassandra to execute over a million writes per second like in this other post. The idea for this post is similar  I would like to show how to publish/consume one million messages, but in this case it will be using just one cheap machine, not a cluster of machines, and it will not be using a third party technology, instead we will see how to implement our own solution .

The objective is to create a program able to publish messages to be processed by one or more external processes, using C++ as programming language and shared memory as mechanism to exchange the messages among processes. The reason to use shared memory is because is  versatile (not like pipes allowing communication between just two processes) and fast (faster than other IPC technologies).

Implementation overview

The source code for the job described in this post is available in github, in two different repositories.

  1. https://github.com/poncos/c-commons. This repository contains the core libraries implemented in C++ to make easier use other lower level APIs like, shared memory, the one used for this post.
  2. https://github.com/poncos/messaging. This is a example command line application making use of the c-commons library from the previous repository implemented the main code able to publish and consume one million messages per second.

The most interesting source code files are the one implementing the functions to add and consume messages in a FIFO style, (ShmFifo.cpp , ShmFifo.hpp ), and the main files making use of this class to publish and consume messages (MessageConsumer.cpp, MessageProducer.cpp).

Shared Memory structure as a log file

The most important part of the source code is optimize the structure of the shared memory area used to store exchange the messages among the producer and consumer processes. A shared memory segment is like a file, and using the POSIX API it can be created, opened, closed, written and read. For the case of the work for this post, the shared memory is treated as a sequential file to store a sequence of equal size records, logically the shared memory section will be like the picture bellow.

logfile

Using the structure commented above the writer process, will be just adding records of equal size each time a new message needs to be published. This type of files are usually called “log files“.

The readers processes will have an easy job as well, because as each record has the same size the only they will have to do is just read consecutive segments of bytes, without have to execute multiple reading to know the size of the next record before read the actual content.

The listing below shows the C source code defining the records written into the shared memory for each message.

#define MESSAGE_CONTENT_SIZE 1024

struct shm_fifo_message
{
int64_t value_length    = 0;
int64_t message_number    = 0;
int8_t  status            = 0;
char     content[MESSAGE_CONTENT_SIZE] = {0};
};

#define SIZE_MESSAGE sizeof(shm_fifo_message)

Producer and Consumers

As said before, the writer process will be just adding messages to the shared memory region, without having to change the position indicator between write operations. As control mechanism, the code does not allow add more messages once the maximum size of the shared memory are is reach.

The code below shows the source code for the write method from the class ShmFifo. As it can be seen, the code included the actual message to be published in a structure “shm_fifo_message” with a fixed size, and some extra information about the message as message number or status, this information could be useful for the consumer processes or for a potential management process marking messages to be deleted (changing the status values for example).


void ShmFifo::write(const char* message, int length)
{
shm_fifo_message struct_message;
std::memcpy(struct_message.content, message, length);
struct_message.value_length = length;
struct_message.status = 0;

this->written_messages++;
struct_message.message_number = this->written_messages;

this->shm->write(reinterpret_cast<const char*>(&struct_message), SIZE_MESSAGE);
}

The code below shows the simple code for the reader part, just reading a fixed shared memory portion and locating it into a buffer passed as parameter. There is another method not receiving the buffer to locate the message as parameter, but it is considerable slower because the read message itself needs to allocate memory every time it is invoked.


shm_fifo_message& ShmFifo::read()
{
shm_fifo_message* struct_message = new shm_fifo_message();

this->shm->read(reinterpret_cast<char*>(struct_message), SIZE_MESSAGE);

return *struct_message;
}

Results

The code has been tested in a machine with the following characteristics.

  1. Processor: AMD Bulldozer FX-6100 Socket AM3+ 6 Core Processor – 3.30GHz, 3.90GHz Turbo
  2. Memory: 8GB 1600MHz CL9 DDR3 Vengeance Memory Two Module.

And the results are shown in the following table. Five executions have been done, for the writer and reader (message producer and message consumer), and the time spent on publishing/consuming one million messages are shown in the table in milliseconds. As it can be seen, the writer took less than a second, at about 900 milliseconds on each execution and the reader is far below the second time.

For the writer, two implementations have been tested to publish the messages.

  1. Text Message: publish one million string messages.
  2. Structured Message: Send string messages is not very realistic, because normally in the real world, the messages to be published do not magically appear as strings, instead, they are originally in the form of a class with properties, or structures, and they need to be transformed, or serialized before being published into the queue. This is the reason why this other test has been done, using a structure with the content of the messages to be published.

 

1 2 3 4 5
Writer Text Message 908 (ms) 917(ms) 911(ms) 926(ms) 918(ms)
Structured Message 900(ms) 895(ms) 908(ms) 907(ms) 905(ms)
Reader Text Message 591(ms) 585(ms) 593(ms) 584(ms) 589(ms)
Structured Message 585(ms) 588(ms) 601(ms) 599(ms) 597(ms)

 

producer

Below, The code to publish both type of messages is shown, the first code is sending string messages and the second one the structured messages, serializing the content of a structure with the message content.


int sendTextMessage()
{
ccommons::ShmFifo shmFifo("/messaging-text-fifo_shm",(1048000016), true);
string message_content         = "holaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaadios";
const char* message_content_char
= message_content.c_str();
long message_content_size     = strlen(message_content_char);

struct timeval tp;
gettimeofday(&tp, NULL);
long int ms1 = tp.tv_sec * 1000 + tp.tv_usec / 1000;

for (int i=0;i<1000000;i++)
{
shmFifo.write(message_content_char,message_content_size);
}

gettimeofday(&tp, NULL);
long int ms2 = tp.tv_sec * 1000 + tp.tv_usec / 1000;

cout << "Text Message Delay: " << (ms2-ms1) << endl;

return 0;
}

 


int sendTextObjectMessage()
{
ccommons::ShmFifo shmFifo("/messaging-textobj-fifo_shm",(1048000016), true);
messagingmodel::CheckImeiReq checkImei_req;
char imei[16] = "254784512658411";
char imsi[16] = "214021245145781";
char msisdn[19] = "123456987412547896";
long checkimei_req_size = sizeof(messagingmodel::CheckImeiReq);

strcpy(checkImei_req.imei, imei);
strcpy(checkImei_req.imsi, imsi);
strcpy(checkImei_req.msisdn, msisdn);

struct timeval tp;
gettimeofday(&tp, NULL);
stringstream ss;
long int ms1 = tp.tv_sec * 1000 + tp.tv_usec / 1000;

for (int i=0;i<1000000;i++)
{
shmFifo.write(reinterpret_cast<const char*>(&checkImei_req),checkimei_req_size);
}

gettimeofday(&tp, NULL);
long int ms2 = tp.tv_sec * 1000 + tp.tv_usec / 1000;

cout << "Text Object Delay: " << (ms2-ms1) << endl;

return 0;
}

Advertisements