Introduction
Assuming that if you have a process and you want to send a message from it to another process on the same computer or even another computer on the network? You might think about an approach to meet this requirement: socket. Yes, socket, in my knowledge, is the only way to do this. Using socket, you can send a message to another process on the same or different computers on the network but you must have knowledge of socket programming as well as other related things like multithreads programming…
Using OpenClovis Message, you don’t need to have knowledge of socket programming, you also don’t need to know the address of the destination computers you want to send to and what port to bind, you can still send messages from one computer to another. Let’s see the details below.
Overview
OpenClovis Message comprises a library used by applications to communicate efficiently single node or across nodes. Using OpenClovis Message, the communication can be performed across mixed endian machines and mixed mode (32-bit and 64-bit architecture) setup (you don’t care about this, the library itself does it for you). The library was implemented according to the SAF specification: https://help.openclovis.com/saf/SAI-AIS-MSG-B.03.01.AL/. If you want to understand it more deeply, please read it carefully.
Message Usage
If you want to use the Message library, you need to:
- Install the OpenClovis SAFplus SDK 6.0
- Create a model using SAFplus-IDE. A model can contain one or more applications. After generating code, open clCompAppMain.c for editing (for API reference guides, refer to https://help.openclovis.com/saf/SAI-AIS-MSG-B.03.01.AL/). Note:
○ Message queue: send to or receive from a single queue
○ Message queue group: send to or receive from a group of queues
○ Both sender and receiver need to know the “queue name” or “queue group name”
■ Sender: to send a message, call saMsgMessageSend() in which the “queue name” or “queue group name” must be passed. Note that in case of a queue group, a queue needs to be added to the queue group first by saMsgQueueGroupCreate() then saMsgQueueGroupInsert(). Then, whensending message to the queue group, it will be sent to all the members of the group
■ Receiver: to receive a message,call saMsgMessageGet() in which the queueHandle must be passed. This argument is returned (as the output argument) from saMsgQueueOpen() call
#include <saMsg.h>
Both sender and receiver: saMsgInitialize(), saMsgQueueOpen()
Sender: saMsgMessageSend()
Receiver: saMsgMessageGet()
In case you want to use message queue group, you can use the APIs:
saMsgQueueOpen(), saMsgQueueGroupInsert(), saMsgQueueGroupCreate(),
saMsgMessageSend(), saMsgMessageGet()
Finalizing message: saMsgQueueClose(), saMsgQueueUnlink(),
saMsgQueueGroupDelete()
Message APIs usage examples
Message queue:
Please refer to example in src/examples/eval/src/app/csa104Comp
Message queue group:
Common functions:
#define QG_NUM 100
#define RECEIVER_NUM 100
#define QG_NAME1 “safMqg=QGNAME1”
#define QG_NAME2 “safMqg=QGNAME2”
SaAisErrorT initSaMsg(SaMsgHandleT* msgHandle)
{
SaVersionT version;
SaAisErrorT rc;
version.releaseCode= ‘B’;
version.majorVersion = 1;
version.minorVersion = 1;
rc = saMsgInitialize(msgHandle,NULL,&version);
return rc;
}
SaAisErrorT finalizeSaMsg(SaMsgHandleT msgHandle)
{
SaAisErrorT rc = saMsgFinalize(msgHandle);
return rc;
}
SaAisErrorT initSaQueueQG(SaMsgHandleT msgHandle, char* queueName, SaTimeT
retentionTime, int msgSize, SaMsgQueueHandleT* queueHandle)
{
if (queueHandle[0] != 0)
{
printf(“initQueue do nothing because of valid queueHandle provided\n”);
return SA_AIS_OK;
}
SaVersionT version;
SaAisErrorT rc = SA_AIS_OK;
SaMsgQueueCreationAttributesT creation_attributes;
SaMsgQueueOpenFlagsT open_flags;
SaTimeT timeout = APP_TIMEOUT;
version.releaseCode= ‘B’;
version.majorVersion = 1;
version.minorVersion = 1;
creation_attributes.creationFlags = 0;
creation_attributes.size[0] = 100000000;
creation_attributes.size[1] = 0;
creation_attributes.size[2] = 0;
creation_attributes.size[3] = 0;
creation_attributes.retentionTime = retentionTime;
/* Receiver creates the message queue */
open_flags = SA_MSG_QUEUE_CREATE;
int i;
SaNameT q_name;
for (i=0;i<QG_NUM;i++)
{
sprintf((char *)q_name.value, “%s_%d”, queueName, i);
q_name.length = strlen((char*)q_name.value);
printf(“Opening queue name = %s\n”, (char*)q_name.value);
rc = saMsgQueueOpen(msgHandle, &q_name, &creation_attributes,
open_flags, timeout, &queueHandle[i]);
if (rc == SA_AIS_OK)
{
printf(“\nOpened message queue with params:-\nQueue Name –
%s\nQCreation Flags: %u\nQSize – %llu\nQRetention time – %lld success\n”,q_name.value,
creation_attributes.creationFlags, creation_attributes.size[0], creation_attributes.retentionTime);
}
else
{
printf(“\nOpened message queue with params:-\nQueue Name –
%s\nQCreation Flags: %u\nQSize – %llu\nQRetention time – %lld fail rc – %d\n”,q_name.value,
creation_attributes.creationFlags, creation_attributes.size[0],
creation_attributes.retentionTime,
rc)
}
return rc;
}
}
SaAisErrorT sendSaMsgQG(SaMsgHandleT msgHandle, char* QGName, char* queueName,
char* msg, int msgSize)
{
SaAisErrorT rc = SA_AIS_OK;
SaNameT g_name;
g_name.length = strlen(QGName);
strcpy((char*)g_name.value, QGName);
int i;
SaNameT q_name;
for (i=0;i<QG_NUM;i++)
{
sprintf((char *)q_name.value, “%s_%d”, queueName, i);
q_name.length = strlen((char*) q_name.value);
printf(“Inserting queueName=%s to queueGroupName=%s\n”, q_name.value,
g_name.value);
insert_qg:
rc = saMsgQueueGroupInsert(msgHandle, &g_name, &q_name);
if (rc == SA_AIS_OK)
{
printf(“saMsgQueueGroupInsert success\n”);
}
else if (rc == SA_AIS_ERR_NOT_EXIST)// if the queue group did not exist create
it first
{
rc = saMsgQueueGroupCreate(msgHandle, &g_name,
SA_MSG_QUEUE_GROUP_BROADCAST);
if (rc != SA_AIS_OK)
{
printf(“saMsgQueueGroupCreate return rc – %d\n”, rc);
return rc;
}
else
{
goto insert_qg;
}
}
else if (rc == SA_AIS_ERR_EXIST)
{
printf(“queueName=%s is already a member of queueGroupName=%s\n”,
q_name.value, g_name.value);
}
else
{
printf(“saMsgQueueGroupInsert return rc – %d\n”, rc);
return rc;
}
}
rc = saMsgMessageSend(msgHandle, &g_name, &saMsg, timeout);
if (rc != SA_AIS_OK)
{
printf(“saMsgMessageSend failed with rc – %d\n”, rc);
}
else
{
printf(“saMsgMessageSend success\n”);
}
return rc;
}
SaAisErrorT recvSaMsgQG(SaMsgHandleT msgHandle, SaMsgQueueHandleT* queueHandle,
int msgSize, char* QGName, char* queueName)
{
SaTimeT sendTime;
SaMsgSenderIdT senderId;
unsigned long long int totalTime;
SaTimeT timeout = APP_TIMEOUT;
char* data = (char*)malloc(msgSize);
if (!data)
{
return -2;
}
SaMsgMessageT recvMsg;
memset(&recvMsg, ‘\0’, sizeof(recvMsg));
recvMsg.data = data;
recvMsg.size = msgSize ;
printf(“Reading the message using saMsgMessageGet()\n”);
memset(&sendTime, ‘\0’, sizeof(sendTime));
memset(&senderId, ‘\0’, sizeof(senderId));
int i;
SaAisErrorT rc = SA_AIS_OK;
for (i=0;i<QG_NUM;i++)
{
rc = saMsgMessageGet(queueHandle[i], &recvMsg, &sendTime, &senderId,
APP_TIMEOUT);
if (rc != SA_AIS_OK)
{
printf(“saMsgMessageGet failed with rc – %d\n”, rc);
return rc;
}
printf(“\nReceived message size=%u\n”, (unsigned int)recvMsg.size);
}
done:
free(data);
return rc;
}
SaAisErrorT closeSaQueueQG(SaMsgHandleT msgHandle, char* gname, char* queueName,
SaMsgQueueHandleT *queueHandle)
{
int i;
SaAisErrorT rc = SA_AIS_OK;
for (i=0;i<QG_NUM;i++)
{
rc = saMsgQueueClose(queueHandle[i]);
If (rc != SA_AIS_OK)
{
printf(“saMsgQueueClose return rc – %d\n”, rc);
}
else
{
queueHandle[i] = 0;
}
}
SaNameT g_name;
g_name.length = strlen(gname);
strcpy((char*)g_name.value, gname);
rc = saMsgQueueGroupDelete(msgHandle, &g_name);
if (rc != SA_AIS_OK)
{
printf(“saMsgQueueGroupDelete return rc – %d\n”, rc);
}
SaNameT q_name;
if (rc == SA_AIS_OK)
{
for (i=0;i<QG_NUM;i++)
{
sprintf((char *)q_name.value, “%s_%d”, queueName, i);
q_name.length = strlen((char*)q_name.value);
printf(“unlink queueName: %s\n”, (char*) q_name.value);
rc = saMsgQueueUnlink(msgHandle, &q_name);
if (rc != SA_AIS_OK)
{
printf(“saMsgQueueUnlink failed rc – %d\n”, rc);
}
}
}
return rc;
}
Sender:
SaMsgHandleT msgHandle = 0;
SaMsgQueueHandleT queueHandles[QG_NUM] = {0};
SaTimeT retentiontime = 200000000000ll;
rc = initSaMsg(&msgHandle);
if (rc != SA_AIS_OK)
{
printf(“initSaMsg failed rc – %d\n”, rc);
goto done;
}
int msgsize;
for (msgsize=10;msgsize<1000000000;msgsize*=10)
{
rc = initSaQueueQG(msgHandle, DEMO_Q_NAME_RECV, retentiontime,
msgsize, queueHandles)
if (rc != SA_AIS_OK)
{
printf(“initQueue %s failed rc – %d\n”, DEMO_Q_NAME_RECV, rc);
goto finalize_msghandle;
}
char* msg = (char*)malloc(msgSize);
if (!msg) return;
memset(msg, ‘x’, msgSize);
rc = sendSaMsgQG(msgHandle, QG_NAME1, DEMO_Q_NAME_SENDER,
msg, msgSize);
free(msg);
if (rc != SA_AIS_OK)
{
printf(“echotest failed rc – %d\n”, rc);
goto finalize_queue;
}
}
finalize_queue:
closeSaQueueQG(msgHandle, QG_NAME2, DEMO_Q_NAME_SENDER,
queueHandles);
finalize_msghandle:
finalizeSaMsg(msgHandle);
done:
Receiver:
SaMsgHandleT msgHandle = 0;
SaMsgQueueHandleT queueHandles[QG_NUM] = {0};
SaTimeT retentiontime = 200000000000ll;
rc = initSaMsg(&msgHandle);
if (rc != SA_AIS_OK)
{
printf(“initSaMsg failed rc – %d\n”, rc);
goto done;
}
int msgsize;
for (msgsize=10;msgsize<1000000000;msgsize*=10)
{
rc = initSaQueueQG(msgHandle, DEMO_Q_NAME_SENDER, retentiontime,
msgsize, queueHandles)
if (rc != SA_AIS_OK)
{
printf(“initQueue %s failed rc – %d\n”, DEMO_Q_NAME_SENDER, rc);
goto finalize_msghandle;
}
rc = recvSaMsgQG(msgHandle, queueHandles, msgSize, NULL, NULL);
if (rc != SA_AIS_OK)
{
goto finalize_queue;
}
}
finalize_queue:
closeSaQueueQG(msgHandle, QG_NAME1, DEMO_Q_NAME_RECV, queueHandles);
finalize_msghandle:
finalizeSaMsg(msgHandle);
done:
Conclusion
OpenClovis Message is the easy way to send and receive messages among processes across
computers in the network without knowing the socket programming skills, regardless of big
endian or small endian issues, regardless of the ip address of the computers or what port to
bind . By this way, you can communicate across nodes in OpenClovis SAFplus middleware
environment.
Other support, please send email to support@openclovis.org.
