/* Copyright (c) 2013 by Talari Networks * * Author: Todd Martin * * Description: This program creates a workload intended to test how a system handles a large number of threads * constantly doing small bits of work followed by small sleeps. This program creates a variety of worker threads of * varying priorities with workloads that vary in size. This also creates a thread with real time priority that * continually sleeps for 250ms and measures the amount of time that the sleep takes. This script logs whenever a sleep * takes longer than 1 second. */ #include #include #include #include #include #include #include #include #include #include #include #include u_int64_t max_time_delta_us = 0; u_int64_t max_ever_time_delta_us = 0; FILE * logout; /*************************************/ /* Tunable parameters of the program */ /*************************************/ /* Which scheduling policy should we use for the worker threads */ //#define WORKER_THREAD_SCHEDULE_POLICY SCHED_FIFO /* The priorities for the worker threads. The priorities will range from BASE_WORKER_PRIORITY to * BASE_WORKER_PRIORITY+PRIORITY_RANGE */ #define BASE_WORKER_PRIORITY 66 #define PRIORITY_RANGE 20 /* The minimum amount of time that a worker thread will sleep in microseconds */ #define MIN_WORKER_THREAD_SLEEP_TIME_US 150 /* The default number of worker threads to create */ #define DEFAULT_NUM_WORKER_THREADS 45 char * syscmd=NULL; int t2_set_processor_affinity(pthread_t thread_id, unsigned long cpu_set) { int ret = pthread_setaffinity_np(thread_id, sizeof(cpu_set), &cpu_set); if (ret) { printf("pthread_setaffinity_np failed. Error: %d cpu_set: 0x%X\n", ret, (unsigned int) cpu_set); } return 0; } #if __WORDSIZE == 64 #define rdtscll(val) do { \ unsigned int __a,__d; \ asm volatile("rdtsc" : "=a" (__a), "=d" (__d)); \ (val) = ((unsigned long)__a) | (((unsigned long)__d)<<32); \ } while(0) #else #define rdtscll(val) __asm__ __volatile__ ("rdtsc" : "=A" (val)) #endif int fib(int n) { if (n == 0 || n == 1) return n; else return fib(n - 1) + fib(n - 2); } #define MIN_SCALE 1 #define MAX_SCALE 5 void do_dummy_work(int scale) { scale = MAX(MIN_SCALE, scale); scale = MIN(MAX_SCALE, scale); switch (scale) { case 1: fib(5); break; case 2: fib(10); break; case 3: //fib(13); fib(10); break; case 4: //fib(15); fib(12); break; case 5: //fib(16); fib(12); break; } /* Timings on T510 Time for do_dummy_work(1) = 1 us Time for do_dummy_work(2) = 4 us Time for do_dummy_work(3) = 4 us Time for do_dummy_work(4) = 10 us Time for do_dummy_work(5) = 10 us */ } float CPU_MHZ; int num_worker_threads; int sched_policy=SCHED_OTHER; int num_cpus = 0; int get_cpu_count(void) { int ret=0; int cpu_count=0; char * cmd= "cat /proc/cpuinfo | grep processor | wc > /tmp/cpu_count"; system(cmd); FILE *file; if (!(file = fopen("/tmp/cpu_count", "r"))) { int err = errno; error(-1, err, "Could not open /tmp/cpu_count: %s", strerror(err)); } ret = fscanf(file, "%d", &cpu_count); printf("Cpu count= %d\n", cpu_count); fclose(file); return cpu_count; } void init_timer() { char *pos; size_t len; FILE *file; if (!(file = fopen("/proc/cpuinfo", "r"))) { int err = errno; error(-1, err, "Could not open /proc/cpuinfo: %s", strerror(err)); } char buf[1024]; while (fgets(buf, sizeof (buf), file)) { if (!(pos = strchr(buf, ':'))) continue; if (!(len = strspn(pos, ": \t"))) continue; else if (!strncmp(buf, "cpu MHz", strlen("cpu MHz"))) CPU_MHZ = atof(pos + len); } fclose(file); } /* get_rtc_uS Returns the real_time_clock value in microseconds */ u_int64_t get_rtc_uS() { u_int64_t now; double now_f; /* Get the time from the rtc */ rdtscll(now); now_f = now; /* Convert it to microseconds */ now_f /= CPU_MHZ; now = now_f; /* Return the result */ return now; } static inline u_int64_t delta_calc_u64(u_int64_t c1, u_int64_t c0) { // take into account wrap... return (c1>=c0) ? (c1-c0) : (((u_int64_t )(0xffffffffffffffffLLU))-c0) + c1+1; } int t2_usleep(u_int64_t u_sec) { struct timespec sleep_time = {0,0}; sleep_time.tv_sec = 0; sleep_time.tv_nsec= u_sec*1000; struct timespec remaining_time = {0,0}; while(nanosleep(&sleep_time, &remaining_time) != 0) { int err = errno; if (err == EINTR) { sleep_time = remaining_time; continue; } else { printf("Unexpected error from nanosleep\n"); exit(-1); } } return 1; } void time_work(int work_scale, const char *str) { u_int64_t start_time_us = get_rtc_uS(); do_dummy_work(work_scale); u_int64_t stop_time_us = get_rtc_uS(); u_int64_t time_delta_us = delta_calc_u64(stop_time_us, start_time_us); printf("Time for %s = %"PRIu64" us\n", str, time_delta_us); } void worker_thread_task(int thread_num) { while(1) { do_dummy_work( (thread_num % MAX_SCALE) + 1); t2_usleep( (MAX_SCALE - (thread_num%MAX_SCALE)) * MIN_WORKER_THREAD_SLEEP_TIME_US ); } } #define WORKER_PRIORITY(n) (BASE_WORKER_PRIORITY + (n%PRIORITY_RANGE)) void *dispatch_worker_thread_task(void *thread_num) { int num = (intptr_t) thread_num; char thread_name[100]; snprintf(thread_name, sizeof(thread_name), "worker_%d", num); prctl(PR_SET_NAME, (unsigned long) &thread_name); pthread_t pthread_id = pthread_self(); struct sched_param param; param.sched_priority = WORKER_PRIORITY(num); pthread_setschedparam(pthread_id, sched_policy, ¶m); worker_thread_task(num); return NULL; } void start_worker_threads(void) { intptr_t i; for (i = 0; i < num_worker_threads; i++) { unsigned long cpu_set = 1<< i%num_cpus; //printf(" [%d] mod %d cpu_set %x\n", i, i%num_cpus, cpu_set); pthread_t new_thread_id; pthread_create(&new_thread_id, NULL, dispatch_worker_thread_task, (void *) i); t2_set_processor_affinity(new_thread_id, cpu_set); // pthread_detach(new_thread_id); } } void *monitor_thread_task(void *dummy __attribute__((unused))) { char thread_name[100]; snprintf(thread_name, sizeof(thread_name), "monitor"); prctl(PR_SET_NAME, (unsigned long) &thread_name); pthread_t pthread_id = pthread_self(); struct sched_param param; param.sched_priority = 99; pthread_setschedparam(pthread_id, SCHED_FIFO, ¶m); u_int32_t loop_counter = 0; while (1) { u_int64_t start_time_us = get_rtc_uS(); t2_usleep(250000); /* 250 ms */ u_int64_t stop_time_us = get_rtc_uS(); u_int64_t time_delta_us = delta_calc_u64(stop_time_us, start_time_us); if (time_delta_us > max_time_delta_us) max_time_delta_us = time_delta_us; if (time_delta_us > max_ever_time_delta_us) max_ever_time_delta_us = time_delta_us; //if (time_delta_us > 1000000) if (time_delta_us > 400000) { time_t current_time_t = time(NULL); struct tm *current_tm = localtime(¤t_time_t); char time_str[128]; strftime(time_str, sizeof(time_str), "%c", current_tm); fprintf(logout,"%s: Error: 250 ms sleep took %"PRIu64" us\n", time_str, time_delta_us); fflush(logout); if(syscmd) system((char *)syscmd); } if ((loop_counter % 1200) == 60) { /* Every 5 minutes log a message to indicate that this thread is still running. */ time_t current_time_t = time(NULL); struct tm *current_tm = localtime(¤t_time_t); char time_str[128]; strftime(time_str, sizeof(time_str), "%c", current_tm); fprintf(logout,"%s:running %d threads max delta %"PRIu64" us max_ever %"PRIu64"\n", time_str, num_worker_threads, max_time_delta_us, max_ever_time_delta_us); fflush(logout); max_time_delta_us = 0; } loop_counter++; } } void start_monitor_thread(void) { pthread_t new_thread_id; unsigned long cpu_set=1; pthread_create(&new_thread_id, NULL, monitor_thread_task, NULL); t2_set_processor_affinity(new_thread_id, cpu_set); // pthread_detach(new_thread_id); } const char *schedule_policy_to_str(int policy) { switch (policy) { case SCHED_FIFO: return "SCHED_FIFO"; case SCHED_RR: return "SCHED_RR"; case SCHED_OTHER: return "SCHED_OTHER"; default: return "unknown"; } } void print_thread_test_parameters(void) { printf("thread_test parameters:\n"); printf("\tnumber of worker threads = %d\n", num_worker_threads); printf("\tworker threads scheduling policy = %s\n", schedule_policy_to_str(sched_policy)); printf("\tbase worker thread priority = %d\n", BASE_WORKER_PRIORITY); printf("\tmax worker thread priority = %d\n", BASE_WORKER_PRIORITY + PRIORITY_RANGE); printf("\tmin worker thread sleep time = %d us\n", MIN_WORKER_THREAD_SLEEP_TIME_US); if(syscmd) printf("\tCommand used when delay detected (%s) \n", syscmd); } int main(int argc, const char *argv[]) { int pnum; logout = fopen("/var/log/thread_test","w"); if (!logout) { printf("unable to open log file (%s), quitting\n", "/var/log/thread_test"); return 0; } if (argc > 1 ) num_worker_threads = atoi(argv[1]); else num_worker_threads = DEFAULT_NUM_WORKER_THREADS; pnum = 0; if (argc > 2 ) { pnum = atoi(argv[2]); } if(pnum >0) { sched_policy = SCHED_FIFO; } else { sched_policy = SCHED_OTHER; } if (argc > 3 ) { syscmd = (char *)argv[3]; } if (num_worker_threads <= 0) num_worker_threads = DEFAULT_NUM_WORKER_THREADS; print_thread_test_parameters(); init_timer(); time_work(1, "do_dummy_work(1)"); time_work(2, "do_dummy_work(2)"); time_work(3, "do_dummy_work(3)"); time_work(4, "do_dummy_work(4)"); time_work(5, "do_dummy_work(5)"); num_cpus= get_cpu_count(); //return 0; printf("Starting clock monitoring thread\n"); start_monitor_thread(); printf("Starting %d worker threads\n", num_worker_threads); start_worker_threads(); //return 0; printf("All threads started\n"); int i = 0; while(1) { i++; sleep(1); if (i == 30) { /* * This is for testing in case we accidently set the load on the system so high that we cannot * get in to kill the program printf("Exiting after 30 seconds\n"); exit(0); */ } } return 0; }