#include #include #include #include #include #include #include int CTHREADPAIRCOUNT; int CBUFFER_SIZE; int CBUFFER_COUNT; int CMESSAGES_COUNT; int todo_messages; pthread_mutex_t producer_mutex; pthread_cond_t producer_cond; pthread_mutex_t consumer_mutex; pthread_cond_t consumer_cond; pthread_mutex_t result_mutex; pthread_mutex_t todo_mutex; int message_count = 0; double start; double end; int terminate; int terminate_producer; double* buffers; //[CBUFFER_COUNT][CBUFFER_SIZE]; int freebuffer_count; int freebuffer_pos; double** free_buffers; //[CBUFFER_COUNT]; int filledbuffer_count; int filledbuffer_pos; double** filled_buffers; //[CBUFFER_COUNT]; /** * Return system uptime in µs */ double getSystemTime() { struct timespec tv; clock_gettime(CLOCK_MONOTONIC, &tv); return (double) (tv.tv_sec) * 1000000.0 + (double) (tv.tv_nsec) / 1000.0; } /** * Get free buffer and block thread if not buffer is available. */ double* getFreeBuffer() { pthread_mutex_lock(&producer_mutex); // If there is no free buffer to be filled, wait while (freebuffer_count == 0) { //printf("wait for free buffer\n"); /** exit if all masages are finished **/ if (terminate || terminate_producer) { pthread_mutex_unlock(&producer_mutex); return NULL; } pthread_cond_wait(&producer_cond, &producer_mutex); } usleep(1); double* result = free_buffers[freebuffer_pos]; freebuffer_pos = (freebuffer_pos + 1) % CBUFFER_COUNT; freebuffer_count--; pthread_mutex_unlock(&producer_mutex); return result; } /** * Return free buffer and notify producer */ void returnFreeBuffer(double* buff) { pthread_mutex_lock(&producer_mutex); free_buffers[(freebuffer_pos + freebuffer_count) % CBUFFER_COUNT] = buff; freebuffer_count++; // Notify waiting producer //printf("added free buffer\n"); pthread_cond_signal(&producer_cond); pthread_mutex_unlock(&producer_mutex); } /** * Add filled buffer and notify consumer */ void putFilledBuffer(double* buff) { pthread_mutex_lock(&consumer_mutex); filled_buffers[(filledbuffer_pos + filledbuffer_count) % CBUFFER_COUNT] = buff; filledbuffer_count++; // Notify waiting consumers //printf("added filled buffer\n"); pthread_cond_signal(&consumer_cond); pthread_mutex_unlock(&consumer_mutex); } /** * Get filled buffer or wait until exists */ double* getFilledBuffer() { pthread_mutex_lock(&consumer_mutex); // If there is no filled buffer, wait until producer fills a new one while (filledbuffer_count == 0) { //printf("wait for filled buffer\n"); /** * exit if all massages are finished * This can cause the loosing of some * already produces data. **/ if (terminate || terminate_producer) { terminate = 1; pthread_mutex_unlock(&consumer_mutex); return NULL; } pthread_cond_wait(&consumer_cond, &consumer_mutex); } double* result = filled_buffers[filledbuffer_pos]; filledbuffer_pos = (filledbuffer_pos + 1) % CBUFFER_COUNT; filledbuffer_count--; pthread_mutex_unlock(&consumer_mutex); return result; } /** * Producer thread. Filled buffer with random numbers and add to consumer list. */ void *thread_producer(void *arg) { while (!terminate && !terminate_producer) { int i; pthread_mutex_lock(&todo_mutex); if (todo_messages <= 0) { terminate_producer = 1; pthread_mutex_unlock(&todo_mutex); break; } todo_messages--; pthread_mutex_unlock(&todo_mutex); double* cbuff = getFreeBuffer(); if (cbuff) { cbuff[0] = getSystemTime(); for (i = 2; i < CBUFFER_SIZE; i++) { // Fill the buffer with random character 0 - 255 cbuff[i] = // (double) random() / (double) RAND_MAX * // (double) random() / (double) RAND_MAX * // (double) random() / (double) RAND_MAX * // (double) random() / (double) RAND_MAX * // (double) random() / (double) RAND_MAX * // (double) random() / (double) RAND_MAX * // (double) random() / (double) RAND_MAX * // (double) random() / (double) RAND_MAX * (double) random() / (double) RAND_MAX; } cbuff[1] = getSystemTime(); putFilledBuffer(cbuff); } } pthread_exit(NULL); } /** * Consumer thread. Get filled buffer. Make something and return to producer list. */ void *thread_consumer(void *arg) { while (!terminate) { int i; double* cbuff = getFilledBuffer(); if (cbuff) { cbuff[2] = getSystemTime(); for (i = 4; i < CBUFFER_SIZE - 1; i++) { // Fill the buffer with random character 0 - 255 cbuff[i] *= (double) random() / (double) RAND_MAX; } cbuff[3] = getSystemTime(); pthread_mutex_lock(&result_mutex); if ((message_count == 0) || (start > cbuff[0])) { start = cbuff[0]; } if ((message_count == 0) || (end < cbuff[3])) { end = cbuff[3]; } message_count++; pthread_mutex_unlock(&result_mutex); // printf("Message runntime Calc:%1.3fms / Sendmessage: %1.3fms / Calc:%1.3fms\n", // (cbuff[1] - cbuff[0])/1000.0, // (cbuff[2] - cbuff[1])/1000.0, // (cbuff[3] - cbuff[2])/1000.0 // ); returnFreeBuffer(cbuff); } } pthread_exit(NULL); } /** * Set terminate flag on sig quit. */ void sig_quit(int a) { terminate = 1; printf("Terminate calculation\n"); /* * Notify producers, a they can wait for * free buffers. */ pthread_cond_broadcast(&producer_cond); } /** * For testing purposes only. */ int main(int argc, char *argv[]) { terminate = 0; terminate_producer = 0; if (signal(SIGINT, sig_quit) == SIG_ERR) { printf("Could not init quit signal\n"); return -1; } if (argc < 5) { printf( "Need tree parameters. Number of thread pairs - message size in doubles - buffer count - overall messages - (show intermediate data intervall)\n"); exit(-1); } int show_intermediate = 0; CTHREADPAIRCOUNT = atoi(argv[1]); CBUFFER_SIZE = atoi(argv[2]); CBUFFER_COUNT = atoi(argv[3]); CMESSAGES_COUNT = atoi(argv[4]); if (argc > 5) { show_intermediate = atoi(argv[5]); } if ((CTHREADPAIRCOUNT < 1) || (CTHREADPAIRCOUNT > 256)) { printf("Number of thread pairs is limited by 1-256\n"); exit(-1); } if ((CBUFFER_SIZE < 8) || (CBUFFER_SIZE > 1048576)) { printf("Buffer size is limited by 8-1,048,576\n"); exit(-1); } if ((CBUFFER_COUNT < 1) || (CBUFFER_COUNT > CTHREADPAIRCOUNT * 8)) { printf( "Number of buffers is limited by 1 and tread pairs * 8\n"); exit(-1); } if ((CMESSAGES_COUNT < CTHREADPAIRCOUNT * 2) || (CBUFFER_COUNT > CTHREADPAIRCOUNT * 100)) { printf( "Number of messages is limited by thread pairs * 2 and tread pairs * 100\n"); exit(-1); } if ((show_intermediate < 0) || (show_intermediate > 10)) { printf( "Intermediate data interval must be in [0-10]\n"); exit(-1); } buffers = malloc(CBUFFER_COUNT * CBUFFER_SIZE * sizeof(double)); free_buffers = malloc(CBUFFER_COUNT * sizeof(double*)); filled_buffers = malloc(CBUFFER_COUNT * sizeof(double*)); todo_messages = CMESSAGES_COUNT; pthread_mutex_init(&consumer_mutex, NULL); pthread_cond_init(&consumer_cond, NULL); pthread_mutex_init(&producer_mutex, NULL); pthread_cond_init(&producer_cond, NULL); pthread_mutex_init(&result_mutex, NULL); pthread_mutex_init(&todo_mutex, NULL); int i; for (i = 0; i < CBUFFER_COUNT; i++) { free_buffers[i] = &(buffers[i * CBUFFER_SIZE]); } freebuffer_count = CBUFFER_COUNT; freebuffer_pos = 0; filledbuffer_count = 0; filledbuffer_pos = 0; pthread_t threads[CTHREADPAIRCOUNT * 2]; for (i = 0; i < CTHREADPAIRCOUNT; i++) { if (pthread_create(&threads[i], NULL, thread_producer, NULL)) { printf("Could not create producer %d\n", i); } if (pthread_create(&threads[i + CTHREADPAIRCOUNT], NULL, thread_consumer, NULL)) { printf("Could not create consumer %d\n", i); } } double start_overall = -1; double end_overall = -1; int all_messages = 0; if (show_intermediate) { while (!terminate) { sleep(show_intermediate); pthread_mutex_lock(&result_mutex); printf("Messages %d - msg/s: %1.3f\n", message_count, ((double) message_count) / ((end - start) / 1000000.0)); if ((start_overall < 0) || (start_overall > start)) { start_overall = start; } if ((end_overall < 0) || (end_overall < end)) { end_overall = end; } //start = getSystemTime(); all_messages += message_count; message_count = 0; pthread_mutex_unlock(&result_mutex); } } for (i = 0; i < CTHREADPAIRCOUNT; i++) { //printf("Wait for thread %d\n", i); pthread_join(threads[i], NULL); } terminate = 1; /** * Notify consumers, as they can wait for * data. */ pthread_cond_broadcast(&consumer_cond); for (i = CTHREADPAIRCOUNT; i < CTHREADPAIRCOUNT*2; i++) { //printf("Wait for thread %d\n", i); pthread_join(threads[i], NULL); } if (!show_intermediate) { start_overall = start; end_overall = end; all_messages = message_count; } printf( "All threads finished: %d messages in %1.3f seconds / %1.3f msg/s\n", all_messages, (end_overall - start_overall) / 1000000.0, (double) all_messages / ((end_overall - start_overall) / 1000000.0)); pthread_mutex_destroy(&producer_mutex); pthread_cond_destroy(&producer_cond); pthread_mutex_destroy(&consumer_mutex); pthread_cond_destroy(&consumer_cond); pthread_mutex_destroy(&result_mutex); pthread_mutex_destroy(&todo_mutex); free(buffers); free(free_buffers); free(filled_buffers); return EXIT_SUCCESS; }