#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0])) #define MAX_LIST 1000 char *dir = "/tmp"; int buf_size = 8, files_nr = 4, thread_nr = 2, o_dir = 0; struct asyc_file { struct timeval *t_diff; int idx; }; void aio_complete(sigval_t sigev_value) { int i; struct timeval ts, te; struct asyc_file *asfp; struct timeval *t_diff; asfp = sigev_value.sival_ptr; t_diff = asfp->t_diff; i = asfp->idx; gettimeofday(&te, NULL); ts = t_diff[i]; timersub(&te, &ts, &t_diff[i]); free(asfp); /* printf("aio %i, started %f, ended %f, diff %f\n", i, ts.tv_sec + ts.tv_usec / 1000000.0, te.tv_sec + te.tv_usec / 1000000.0, t_diff[i].tv_sec + t_diff[i].tv_usec / 1000000.0 ); */ } /* * run in every thread: * MAX_LIST aio per file descriptor, * files_nr filedes per thread, */ void run_once(void) { struct aiocb my_aiocb[MAX_LIST] = {}; char buf[buf_size * 1024]; int i, fds[files_nr], ret; unsigned long int pid, tid; struct timeval t_diff[MAX_LIST]; float sum, avg; pid = getpid(); tid = pthread_self(); for (i = 0; i < files_nr; ++i) { snprintf(buf, sizeof buf, "%s/dir1%lu/file1%lu-%li", dir, tid, tid, random()); ret = fds[i] = open(buf, O_CREAT|O_WRONLY|O_TRUNC|O_NOATIME| o_dir, 0644); if (ret < 0) { fprintf(stderr, "open error: %i, %s, " "tid %lu of pid %lu exited\n", errno, strerror(errno), tid, pid); pthread_exit(NULL); } } int div = ARRAY_SIZE(t_diff) / files_nr; for (i = 0; i < ARRAY_SIZE(t_diff); ++i) { struct asyc_file *asfp; asfp = calloc(1, sizeof(*asfp)); asfp->t_diff = t_diff; asfp->idx = i; int fd_idx = i / div; int fd = fds[fd_idx]; if (isatty(fd) || fd <= 2) { fprintf(stderr, "size %i, idx %i, div %i, fd_idx %i, " "fd %i, %i, %s\n", ARRAY_SIZE(t_diff), i, div, fd_idx, fd, errno, strerror(errno)); pthread_exit(NULL); } my_aiocb[i].aio_fildes = fd; my_aiocb[i].aio_buf = buf; my_aiocb[i].aio_nbytes = sizeof buf; my_aiocb[i].aio_offset = (i % div) * sizeof buf; my_aiocb[i].aio_sigevent.sigev_notify = SIGEV_THREAD; my_aiocb[i].aio_sigevent.sigev_notify_function = aio_complete; my_aiocb[i].aio_sigevent.sigev_notify_attributes = NULL; my_aiocb[i].aio_sigevent.sigev_value.sival_ptr = asfp; gettimeofday(&t_diff[i], NULL); aio_write(&my_aiocb[i]); /* sched_yield(); */ } struct timeval tv; struct tm tm; /* gettimeofday(&tv, NULL); localtime_r(&tv.tv_sec, &tm); strftime(buf, sizeof buf, "%Y%m%d-%H%M%S", &tm); printf("%s.%06lu/%lu/%lu/%i aio_write started\n", buf, tv.tv_usec, pid, tid, ARRAY_SIZE(t_diff) ); */ for (i = 0; i < ARRAY_SIZE(t_diff); ++i) { const struct aiocb *my_aiocb_p = &my_aiocb[i]; ret = aio_suspend(&my_aiocb_p, 1, NULL); if (ret < 0) perror("aio_suspend"); /* else printf("aio_suspend %i returned with ret %i\n", i, ret); */ } /* gettimeofday(&tv, NULL); localtime_r(&tv.tv_sec, &tm); strftime(buf, sizeof buf, "%Y%m%d-%H%M%S", &tm); printf("%s.%06lu/%lu/%lu/%i aio_write completed\n", buf, tv.tv_usec, pid, tid, ARRAY_SIZE(t_diff) ); */ sleep(1); int errors = 0; for (i = 0; i < ARRAY_SIZE(t_diff); ++i) { ret = aio_return(&my_aiocb[i]); if (ret < 0) errors++; } for (i = 0; i < files_nr; ++i) close(fds[i]); sum = 0.0f; int valid = 0; for (i = 0; i < ARRAY_SIZE(t_diff); ++i) { float t = t_diff[i].tv_sec + t_diff[i].tv_usec / 1000000.0; if (t > 1199116800.0) /* time_t for 2008/01/01 */ fprintf(stderr, "timeout for %f\n", t); else { valid++; sum += t; } } avg = sum / valid; gettimeofday(&tv, NULL); localtime_r(&tv.tv_sec, &tm); strftime(buf, sizeof buf, "%Y%m%d-%H%M%S", &tm); printf("%s.%06lu: pid %lu, tid %lu, total valid %i aio, avg %f, errors %i%%\n", buf, tv.tv_usec, pid, tid, valid, avg, errors * 100 / ARRAY_SIZE(t_diff)); } void *per_thread_run(void *parm) { unsigned long int tid; char buf[BUFSIZ]; int ret; tid = pthread_self(); printf("thread %lu started\n", tid); snprintf(buf, BUFSIZ, "%s/dir1%lu", dir, tid); ret = mkdir(buf, 0755); if (ret < 0 && errno != EEXIST) { perror("mkdir"); pthread_exit(NULL); } while (1) run_once(); return NULL; } char *progname; char *help_message = "Usage: %s [-d dir] [-t thread_nr] [-b writebuf_size] [-f files_nr] [-h]\n" "\t-%c\n"; int main(int argc, char *argv[]) { int opt; int i, ret; pthread_t *thread_ids; progname = argv[0]; while ((opt = getopt(argc, argv, "d:t:b:f:oh")) != EOF) { switch (opt) { case 'd': dir = strdup(optarg); break; case 't': thread_nr = atoi(optarg); break; case 'b': buf_size = atoi(optarg); break; case 'f': files_nr = atoi(optarg); break; case 'o': o_dir = O_DIRECT; break; case '?': case 'h': default: fprintf(stderr, help_message, progname, opt); exit(1); } } setlinebuf(stdout); struct rlimit rl; getrlimit(RLIMIT_NOFILE, &rl); printf("curr %lu, max %lu\n", rl.rlim_cur, rl.rlim_max); rl.rlim_cur = rl.rlim_max = 10240; setrlimit(RLIMIT_NOFILE, &rl); getrlimit(RLIMIT_NOFILE, &rl); printf("curr %lu, max %lu\n", rl.rlim_cur, rl.rlim_max); /* total 5 processes */ for (i = 0; i < 4; ++i) { ret = fork(); if (ret < 0) { perror("fork"); exit(1); } if (ret == 0) /* break in child */ break; } srandom(time(NULL)); thread_ids = calloc(thread_nr, sizeof(pthread_t)); for (i = 0; i < thread_nr; ++i) pthread_create(&thread_ids[i], NULL, per_thread_run, NULL); for (i = 0; i < thread_nr; ++i) pthread_join(thread_ids[i], NULL); return 0; }