/* * pmsg.cpp, parallel sysv msg pingpong * * Copyright (C) 1999, 2001, 2005, 2008 by Manfred Spraul. * All rights reserved except the rights granted by the GPL. * * Redistribution of this file is permitted under the terms of the GNU * General Public License (GPL) version 2 or later. * $Header$ */ #include #include #include #include #include #include #include #include #include #include ////////////////////////////////////////////////////////////////////////////// static enum { WAITING, RUNNING, STOPPED, } volatile g_state = WAITING; unsigned long long *g_results; int *g_svmsg_ids; pthread_t *g_threads; struct taskinfo { int svmsg_id; int threadid; int sender; }; #define DATASIZE 8 void* worker_thread(void *arg) { struct taskinfo *ti = (struct taskinfo*)arg; unsigned long long rounds; int ret; struct { long mtype; char buffer[DATASIZE]; } mbuf; { cpu_set_t cpus; CPU_ZERO(&cpus); CPU_SET(ti->threadid/2, &cpus); printf("ti: %d %lxh\n", ti->threadid/2, cpus.__bits[0]); ret = pthread_setaffinity_np(g_threads[ti->threadid], sizeof(cpus), &cpus); if (ret < 0) { printf("pthread_setaffinity_np failed for thread %d with errno %d.\n", ti->threadid, errno); } ret = pthread_getaffinity_np(g_threads[ti->threadid], sizeof(cpus), &cpus); if (ret < 0) { printf("pthread_getaffinity_np() failed for thread %d with errno %d.\n", ti->threadid, errno); fflush(stdout); } else { printf("thread %d: sysvmsg %8d type %d bound to %lxh\n",ti->threadid, ti->svmsg_id, ti->sender, cpus.__bits[0]); } fflush(stdout); } rounds = 0; while(g_state == WAITING) { #ifdef __i386__ __asm__ __volatile__("pause": : :"memory"); #endif } if (ti->sender) { mbuf.mtype = ti->sender+1; ret = msgsnd(ti->svmsg_id, &mbuf, DATASIZE, 0); if (ret != 0) { printf("Initial send failed, errno %d.\n", errno); exit(1); } } while(g_state == RUNNING) { int target = 1+!ti->sender; ret = msgrcv(ti->svmsg_id, &mbuf, DATASIZE, target, 0); if (ret != DATASIZE) { if (errno == EIDRM) break; printf("Error on msgrcv, got %d, errno %d.\n", ret, errno); exit(1); } mbuf.mtype = ti->sender+1; ret = msgsnd(ti->svmsg_id, &mbuf, DATASIZE, 0); if (ret != 0) { if (errno == EIDRM) break; printf("send failed, errno %d.\n", errno); exit(1); } rounds++; } /* store result */ g_results[ti->threadid] = rounds; pthread_exit(0); return NULL; } void init_thread(int thread1, int thread2) { int ret; struct taskinfo *ti1, *ti2; ti1 = new (struct taskinfo); ti2 = new (struct taskinfo); if (!ti1 || !ti2) { printf("Could not allocate task info\n"); exit(1); } g_svmsg_ids[thread1] = msgget(IPC_PRIVATE,0777|IPC_CREAT); if(g_svmsg_ids[thread1] == -1) { printf(" message queue create failed.\n"); exit(1); } ti1->svmsg_id = g_svmsg_ids[thread1]; ti2->svmsg_id = ti1->svmsg_id; ti1->threadid = thread1; ti2->threadid = thread2; ti1->sender = 1; ti2->sender = 0; ret = pthread_create(&g_threads[thread1], NULL, worker_thread, ti1); if (ret) { printf(" pthread_create failed with error code %d\n", ret); exit(1); } ret = pthread_create(&g_threads[thread2], NULL, worker_thread, ti2); if (ret) { printf(" pthread_create failed with error code %d\n", ret); exit(1); } } ////////////////////////////////////////////////////////////////////////////// int main(int argc, char **argv) { int queues, timeout; unsigned long long totals; int i; printf("pmsg [nr queues] [timeout]\n"); if (argc != 3) { printf(" Invalid parameters.\n"); return 0; } queues = atoi(argv[1]); timeout = atoi(argv[2]); printf("Using %d queues (%d threads) for %d seconds.\n", queues, 2*queues, timeout); g_results = new unsigned long long[2*queues]; g_svmsg_ids = new int[queues]; g_threads = new pthread_t[2*queues]; for (i=0;i