Skip to content

Instantly share code, notes, and snippets.

@davidlohr
Last active July 22, 2016 17:11
Show Gist options
  • Save davidlohr/7bfd521f198211c24e1bc6a58affd8f6 to your computer and use it in GitHub Desktop.
Save davidlohr/7bfd521f198211c24e1bc6a58affd8f6 to your computer and use it in GitHub Desktop.
parallel sysv msg pingpong
/*
* pmsg.cpp, parallel sysv msg pingpong
* compilation: g++ -Wall -O2 -lpthread pmsg-shared.cpp -o pmsg-shared
*
* Copyright (C) 1999, 2001, 2005, 2008 by Manfred Spraul.
* All rights reserved except the rights granted by the GPL.
*
* Redistribution of this file is permitted under the terms of the GNU
* General Public License (GPL) version 2 or later.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <getopt.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <pthread.h>
//////////////////////////////////////////////////////////////////////////////
static enum {
WAITING,
RUNNING,
STOPPED,
} volatile g_state = WAITING;
unsigned long long *g_results;
int g_svmsg_id;
pthread_t *g_threads;
struct taskinfo {
int svmsg_id;
int threadid;
int cpuid;
int sender;
int offset;
};
#define DATASIZE 16
void* worker_thread(void *arg)
{
struct taskinfo *ti = (struct taskinfo*)arg;
unsigned long long rounds;
int ret;
struct {
long mtype;
unsigned char buffer[DATASIZE];
} mbuf;
{
cpu_set_t cpus;
CPU_ZERO(&cpus);
CPU_SET(ti->cpuid, &cpus);
ret = pthread_setaffinity_np(g_threads[ti->threadid], sizeof(cpus), &cpus);
if (ret < 0) {
printf("pthread_setaffinity_np failed for thread %d with errno %d.\n",
ti->threadid, errno);
}
ret = pthread_getaffinity_np(g_threads[ti->threadid], sizeof(cpus), &cpus);
if (ret < 0) {
printf("pthread_getaffinity_np() failed for thread %d with errno %d.\n",
ti->threadid, errno);
fflush(stdout);
} else {
printf("thread %d: sysvmsg %8d, offset 0x%08x type %d bound to %04lxh\n",ti->threadid,
ti->svmsg_id, ti->offset, ti->sender, cpus.__bits[0]);
}
fflush(stdout);
}
rounds = 0;
while(g_state == WAITING) {
#ifdef __i386__
__asm__ __volatile__("pause": : :"memory");
#endif
}
if (ti->sender) {
mbuf.mtype = ti->offset+ti->sender+1;
mbuf.buffer[0] = ti->offset & 0xff;
mbuf.buffer[1] = (ti->offset >> 8) & 0xff;
ret = msgsnd(ti->svmsg_id, &mbuf, DATASIZE, 0);
if (ret != 0) {
printf("Initial send failed, errno %d.\n", errno);
exit(1);
}
}
while(g_state == RUNNING) {
int target = ti->offset+1+!ti->sender;
ret = msgrcv(ti->svmsg_id, &mbuf, DATASIZE, target, 0);
if (ret != DATASIZE) {
if (errno == EIDRM)
break;
printf("Error on msgrcv, got %d, errno %d.\n", ret, errno);
exit(1);
}
if ((mbuf.buffer[0] != (unsigned)(ti->offset & 0xff)) ||
(mbuf.buffer[1] != (unsigned)((ti->offset >> 8) & 0xff))) {
printf("Error - incorrect message received.\n");
printf("cpu %d ti->offset 0x%08x ti->sender %d.\n",
ti->cpuid, ti->offset, ti->sender);
printf("got %02x%02x.\n",
(unsigned char)mbuf.buffer[0],
(unsigned char)mbuf.buffer[1]);
exit(1);
}
mbuf.mtype = ti->offset+ti->sender+1;
ret = msgsnd(ti->svmsg_id, &mbuf, DATASIZE, 0);
if (ret != 0) {
if (errno == EIDRM)
break;
printf("send failed, errno %d.\n", errno);
exit(1);
}
rounds++;
}
/* store result */
g_results[ti->threadid] = rounds;
pthread_exit(0);
return NULL;
}
void init_threads(int cpu, int cpus)
{
int ret;
struct taskinfo *ti1, *ti2;
ti1 = new (struct taskinfo);
ti2 = new (struct taskinfo);
if (!ti1 || !ti2) {
printf("Could not allocate task info\n");
exit(1);
}
g_results[cpu] = 0;
g_results[cpu+cpus] = 0;
ti1->svmsg_id = g_svmsg_id;
ti1->offset = 3*cpu+5;
ti1->threadid = cpu;
ti1->cpuid = cpu;
ti1->sender = 1;
ti2->svmsg_id = g_svmsg_id;
ti2->offset = ti1->offset;
ti2->threadid = cpu+cpus;
ti2->cpuid = cpu;
ti2->sender = 0;
ret = pthread_create(&g_threads[ti1->threadid], NULL, worker_thread, ti1);
if (ret) {
printf(" pthread_create failed with error code %d\n", ret);
exit(1);
}
ret = pthread_create(&g_threads[ti2->threadid], NULL, worker_thread, ti2);
if (ret) {
printf(" pthread_create failed with error code %d\n", ret);
exit(1);
}
}
//////////////////////////////////////////////////////////////////////////////
int main(int argc, char **argv)
{
int queues, timeout, i, res;
unsigned long long totals = 0;
if (argc != 3) {
printf("pmsg [nr queues] [timeout]\n");
printf(" Invalid parameters.\n");
return 0;
}
queues = atoi(argv[1]);
timeout = atoi(argv[2]);
printf("Using %d queues/cpus (%d threads) for %d seconds.\n",
queues, 2*queues, timeout);
g_svmsg_id = msgget(IPC_PRIVATE, 0777|IPC_CREAT);
if(g_svmsg_id == -1) {
printf(" message queue create failed.\n");
exit(1);
}
g_results = new unsigned long long[2*queues];
g_threads = new pthread_t[2*queues];
for (i=0;i<queues;i++) {
init_threads(i, queues);
}
sleep(1);
g_state = RUNNING;
sleep(timeout);
g_state = STOPPED;
sleep(1);
res = msgctl(g_svmsg_id,IPC_RMID,NULL);
if (res < 0) {
printf("msgctl(IPC_RMID) failed for %d, errno%d.\n",
g_svmsg_id, errno);
}
for (i=0;i<2*queues;i++)
pthread_join(g_threads[i], NULL);
printf("Result matrix:\n");
for (i=0;i<queues;i++) {
printf(" Thread %3d: %8lld %3d: %8lld\n",
i, g_results[i], i+queues, g_results[i+queues]);
totals += g_results[i] + g_results[i+queues];
}
printf("Total: %lld\n", totals);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment