Inter-process Communication (IPC): messaging system to publish/consume one million messages per second using just one cheap machine.

Nowadays people is talking about how to use technologies like kafka to publish two million messages per second like in this post, or cassandra to execute over a million writes per second like in this other post. The idea for this post is similar  I would like to show how to publish/consume one million messages, but in this case it will be using just one cheap machine, not a cluster of machines, and it will not be using a third party technology, instead we will see how to implement our own solution .

The objective is to create a program able to publish messages to be processed by one or more external processes, using C++ as programming language and shared memory as mechanism to exchange the messages among processes. The reason to use shared memory is because is  versatile (not like pipes allowing communication between just two processes) and fast (faster than other IPC technologies).

Implementation overview

The source code for the job described in this post is available in github, in two different repositories.

  1. https://github.com/poncos/c-commons. This repository contains the core libraries implemented in C++ to make easier use other lower level APIs like, shared memory, the one used for this post.
  2. https://github.com/poncos/messaging. This is a example command line application making use of the c-commons library from the previous repository implemented the main code able to publish and consume one million messages per second.

The most interesting source code files are the one implementing the functions to add and consume messages in a FIFO style, (ShmFifo.cpp , ShmFifo.hpp ), and the main files making use of this class to publish and consume messages (MessageConsumer.cpp, MessageProducer.cpp).

Shared Memory structure as a log file

The most important part of the source code is optimize the structure of the shared memory area used to store exchange the messages among the producer and consumer processes. A shared memory segment is like a file, and using the POSIX API it can be created, opened, closed, written and read. For the case of the work for this post, the shared memory is treated as a sequential file to store a sequence of equal size records, logically the shared memory section will be like the picture bellow.

logfile

Using the structure commented above the writer process, will be just adding records of equal size each time a new message needs to be published. This type of files are usually called “log files“.

The readers processes will have an easy job as well, because as each record has the same size the only they will have to do is just read consecutive segments of bytes, without have to execute multiple reading to know the size of the next record before read the actual content.

The listing below shows the C source code defining the records written into the shared memory for each message.

#define MESSAGE_CONTENT_SIZE 1024

struct shm_fifo_message
{
int64_t value_length    = 0;
int64_t message_number    = 0;
int8_t  status            = 0;
char     content[MESSAGE_CONTENT_SIZE] = {0};
};

#define SIZE_MESSAGE sizeof(shm_fifo_message)

Producer and Consumers

As said before, the writer process will be just adding messages to the shared memory region, without having to change the position indicator between write operations. As control mechanism, the code does not allow add more messages once the maximum size of the shared memory are is reach.

The code below shows the source code for the write method from the class ShmFifo. As it can be seen, the code included the actual message to be published in a structure “shm_fifo_message” with a fixed size, and some extra information about the message as message number or status, this information could be useful for the consumer processes or for a potential management process marking messages to be deleted (changing the status values for example).


void ShmFifo::write(const char* message, int length)
{
shm_fifo_message struct_message;
std::memcpy(struct_message.content, message, length);
struct_message.value_length = length;
struct_message.status = 0;

this->written_messages++;
struct_message.message_number = this->written_messages;

this->shm->write(reinterpret_cast<const char*>(&struct_message), SIZE_MESSAGE);
}

The code below shows the simple code for the reader part, just reading a fixed shared memory portion and locating it into a buffer passed as parameter. There is another method not receiving the buffer to locate the message as parameter, but it is considerable slower because the read message itself needs to allocate memory every time it is invoked.


shm_fifo_message& ShmFifo::read()
{
shm_fifo_message* struct_message = new shm_fifo_message();

this->shm->read(reinterpret_cast<char*>(struct_message), SIZE_MESSAGE);

return *struct_message;
}

Results

The code has been tested in a machine with the following characteristics.

  1. Processor: AMD Bulldozer FX-6100 Socket AM3+ 6 Core Processor – 3.30GHz, 3.90GHz Turbo
  2. Memory: 8GB 1600MHz CL9 DDR3 Vengeance Memory Two Module.

And the results are shown in the following table. Five executions have been done, for the writer and reader (message producer and message consumer), and the time spent on publishing/consuming one million messages are shown in the table in milliseconds. As it can be seen, the writer took less than a second, at about 900 milliseconds on each execution and the reader is far below the second time.

For the writer, two implementations have been tested to publish the messages.

  1. Text Message: publish one million string messages.
  2. Structured Message: Send string messages is not very realistic, because normally in the real world, the messages to be published do not magically appear as strings, instead, they are originally in the form of a class with properties, or structures, and they need to be transformed, or serialized before being published into the queue. This is the reason why this other test has been done, using a structure with the content of the messages to be published.

 

1 2 3 4 5
Writer Text Message 908 (ms) 917(ms) 911(ms) 926(ms) 918(ms)
Structured Message 900(ms) 895(ms) 908(ms) 907(ms) 905(ms)
Reader Text Message 591(ms) 585(ms) 593(ms) 584(ms) 589(ms)
Structured Message 585(ms) 588(ms) 601(ms) 599(ms) 597(ms)

 

producer

Below, The code to publish both type of messages is shown, the first code is sending string messages and the second one the structured messages, serializing the content of a structure with the message content.


int sendTextMessage()
{
ccommons::ShmFifo shmFifo("/messaging-text-fifo_shm",(1048000016), true);
string message_content         = "holaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaholaadios";
const char* message_content_char
= message_content.c_str();
long message_content_size     = strlen(message_content_char);

struct timeval tp;
gettimeofday(&tp, NULL);
long int ms1 = tp.tv_sec * 1000 + tp.tv_usec / 1000;

for (int i=0;i<1000000;i++)
{
shmFifo.write(message_content_char,message_content_size);
}

gettimeofday(&tp, NULL);
long int ms2 = tp.tv_sec * 1000 + tp.tv_usec / 1000;

cout << "Text Message Delay: " << (ms2-ms1) << endl;

return 0;
}

 


int sendTextObjectMessage()
{
ccommons::ShmFifo shmFifo("/messaging-textobj-fifo_shm",(1048000016), true);
messagingmodel::CheckImeiReq checkImei_req;
char imei[16] = "254784512658411";
char imsi[16] = "214021245145781";
char msisdn[19] = "123456987412547896";
long checkimei_req_size = sizeof(messagingmodel::CheckImeiReq);

strcpy(checkImei_req.imei, imei);
strcpy(checkImei_req.imsi, imsi);
strcpy(checkImei_req.msisdn, msisdn);

struct timeval tp;
gettimeofday(&tp, NULL);
stringstream ss;
long int ms1 = tp.tv_sec * 1000 + tp.tv_usec / 1000;

for (int i=0;i<1000000;i++)
{
shmFifo.write(reinterpret_cast<const char*>(&checkImei_req),checkimei_req_size);
}

gettimeofday(&tp, NULL);
long int ms2 = tp.tv_sec * 1000 + tp.tv_usec / 1000;

cout << "Text Object Delay: " << (ms2-ms1) << endl;

return 0;
}

Mobile applications with Apache Cordova: access to native resources like accelerometer, GPS location, bluetooth, etc.

For last week I have been reading about developing mobile application using Apache Cordova  because in my work they wanted to develop application for apple and android devices without having to duplicate the code, and this is why I ended up looking at Cordova, because it give you the possibility of developing mobile applications using web technologies like HTML, Javascript and CSS and later on compile them to native code for each one of the platform we want to deploy our application on.

The topics I was more interested in were how to create cross platform mobile applications with capabilities to interact with native resources like accelerometer, location framework (collect the GPS location), bluetooth, and as well get access to functionalities like execute the application on background mode to being able to keep communicating with these devices even if the user open other applications and our application is sent to the background. The reason why I was interested in these capabilities is because it would be straightforward make a purely Web application with any requirement to interact with the native APIs of the platform where the application is running, for those type of application we do not need things like Cordova.

As result, I decided to write this post sharing with anyone interesting what I have learnt, including a sample application I wrote to test the Cordova capabilities I mentioned above.

 

Pre-requisites

Because Cordova compiles to native code it needs the native SDK environment for each platform the application will have to compile to, for example, if the application needs to support IOS and Android devices like the case of the sample application developed for this document, the Xcode environment for IOS and Android Studio will have to be installed.

The Xcode is the official environment for developing applications for apple devices, and can be installed using the AppStore or downloaded from  “https://developer.apple.com/xcode/“. This tool is a good option as well for managing the developer certificates required to execute the applications in real devices.

For Android, the list of pre-requisites needed to install are:

  1. Android Studio ( https://developer.android.com/studio/index.html ).
  2. Android File transfer ( https://www.android.com/filetransfer/ ): if using a MAC OS as development environment, this tool is necessary for installing the application in android devices, because MAC Os does not detect automatically these devices when they are plugged unless the android file transfer application is running.
  3. Install the google Apis using the android studio  sdk manger (Tools->Android->SDK Manager).

AndroidStudio.png

Cordova concepts and installation

To understand the structure of the Cordova platform the diagram available with the official documentation can be very descriptive. As the picture shows, Cordova applications are written using web technologies like HTML, JavaScript, CSS, and if they needs access to low level resources from the device, or access to any functionality exposed by the device native API  this is achieved using the Cordova APIs.

cordovaapparchitecture

For a deeper overview of Cordova it is highly recommendable go through the official documentation, here we move directly to the steps to install the Cordova command line tool and create a project.

  1. Cordova command line is a Node.js application and can be installed using the npm command.

                   npm install -g cordova
 

2. Install ios-deploy

                 npm install -g iso-deploy
 

3. Once the cordova command line installed projects can be created.

                 cordova create samplecordovaapp com.ecollado.training.cordova SampleCordovaApp

4. The platform the application will have to support can be added now

              cordova platform add ios –save
cordova platform add android –save
 

5. To execute the application in the emulators

               cordova emulate [ ios | android ]

6. And to execute the application in the real devices.
               cordova run [ios | android ]

 

The command above will create the application within the folder “samplecordovaapp”, containing a sub-directory “www” where the web application is located. The native project for each platform is available as well under the folder platforms, this project can be opened by the native environment like Xcode for IOS.

Plugins

 

The Cordova APIs giving access to the low level resources are implemented as plugins, therefore, each time the application needs to use a new functionality from the OS, the developer should look for the proper plugin, add it to the project and then use the javascript api exposed by the plugin. The Cordova documentation contains information about a list of plugins (battery status, camera, contacts, etc) but more are available through the plugin search site (https://cordova.apache.org/plugins/).

One essential plugin can be the console one, allowing us to write message to the console using the java script function “console.log()“. To add this plugin to our just created project we can type in the command line:

cordova plugin add https://git-wip-us.apache.org/repos/asf/cordova-plugin-console.git

Once added the plugin to the project the javascript function showed above can be used to print traces in the console, and for example if the application is running in an IOS device attached to the Xcode the messages will appear in the Xcode console.

Device ready javascript event

Most of the Cordova APIs (or maybe all of them) needs to wait until the cordova core libraries have been loaded and are ready to be invoked, and this is notified to the cordova application by a concrete event named “deviceready”, therefore it could be said that this event is the main method for the cordova application. Taking this into account the minimum javascript code every cordova application should have could be similar to the example below.

var app = {
// Application Constructor
initialize: function()
{
document.addEventListener('deviceready', this.onDeviceReady.bind(this), false);
},

// deviceready Event Handler
//
// Bind any cordova events here. Common events are:
// 'pause', 'resume', etc.
onDeviceReady: function()
{
console.log("cordova core libraries have been initialised");
}
};

app.initialize();

Sample application

As said before, there is a sample application published in github, where it can be seen how to used the cordova APIs to use devices like (source javascript file index.js):

  1. Bluetooth LE: using the plugin “cordova-plugin-bluetoothle”.
  2. Accelerometer: using the plugin “cordova-plugin-device-motion“.
  3. Location: using the plugin “cordova-plugin-geolocation“.

Screen orientation.

The screen orientation can be detected on android devices using the javascript event “orientationchange” as it can be seen in the source code file indicated above. For IOs devices is more complex because that listener does not work (it seems the UIWebView component used by Cordova to embed the application in a web container does not support sit.) reason why it is needed to use the plugin cordova-plugin-device-orientation“, to use the compass to detect rotation devices changes and check the screen status, like for example the code below.


...
if (platform === "iOS")
{
// Initialize the view with the current screen dimensions
onOrientationChange();
var watchID = navigator.compass.watchHeading(onOrientationChange, compassError, { filter: 10 });
}
...

function onOrientationChange(result)
{

    var physicalScreenWidth;
    var physicalScreenHeight;

    if (screen.orientation === 'portrait-primary')
    {
        if (window.screen.width &amp;gt; window.screen.height)
        {
            physicalScreenWidth = window.screen.height;
            physicalScreenHeight = window.screen.width;
        }
        else
        {
            physicalScreenWidth = window.screen.width;
            physicalScreenHeight = window.screen.height;
        }
    }
    else if (screen.orientation === 'landscape-primary')
    {
        if (window.screen.width &amp;lt; window.screen.height)
        {
            physicalScreenWidth = window.screen.height;
            physicalScreenHeight = window.screen.width;
        }
        else
        {
            physicalScreenWidth = window.screen.width;
            physicalScreenHeight = window.screen.height;
        }
    }

    physicalScreenHeight *= window.devicePixelRatio;
    physicalScreenWidth *= window.devicePixelRatio;

    var widthElement = document.getElementById('screen_width');
    widthElement.innerHTML = physicalScreenWidth;

    var heightElement = document.getElementById('screen_height');
    heightElement.innerHTML = physicalScreenHeight;
}