/* * Portions Copyright (c) 2014, PostgreSQL Global Development Group * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without a written agreement * is hereby granted, provided that the above copyright notice and this * paragraph appear in all copies. * * Test program roughly simulating postgres' IO. * * Parameters will need need to be changed to reproduce the problem on * individual systems. * * Author: Andres Freund, andres@2ndquadrant.com, andres@anarazel.de */ #define _POSIX_C_SOURCE 200809L #define _XOPEN_SOURCE 800 #include #include #include #include #include #include #include #include #include #include #include #include /* CHANGE: number of reading processes */ #define NUM_RANDOM_READERS 16 /* * CHANGE: set to memory size * 2 or so. * * Remove the 'data', 'wal' files after changing. */ static const size_t data_size = 1024L * 1024 * 1024 * 48; /* probably ok this way */ static const size_t wal_size = 1024L * 1024 * 1024 * 1; /* after how many iterations should stuff get reported */ static const uint64_t read_report_interval = 10000; static const uint64_t wal_report_interval = 1000; static const uint64_t commit_report_interval = 500; /* internal data */ static const char initdata[8192]; static pid_t readers[NUM_RANDOM_READERS]; struct timing { uint64_t iter; uint64_t period_total; uint64_t total; uint64_t period_max; struct timespec t_before; struct timespec t_after; }; static void fatal_error(int e) { fprintf(stderr, "frak me: %d: %s\n", e, strerror(e)); _exit(0); } static void nsleep(int64_t s) { struct timespec d; d.tv_sec = 0; d.tv_nsec = s; if (nanosleep(&d, NULL) < 0) fatal_error(errno); } static off_t random_block(size_t end) { return (((double) random())/RAND_MAX) * (end - 1); } static int64_t nsec_diff(const struct timespec *a, const struct timespec *b) { return ((int64_t)(a->tv_sec - b->tv_sec) * 1000000000) + (a->tv_nsec - b->tv_nsec); } static void timing_init(struct timing *t) { t->iter = 0; t->total = 0; t->period_total = 0; t->period_max = 0; } static void timing_before_action(struct timing *t) { clock_gettime(CLOCK_MONOTONIC, &t->t_before); } static void timing_after_action(struct timing *t, const char *ctx, int64_t report_interval) { uint64_t dur; clock_gettime(CLOCK_MONOTONIC, &t->t_after); dur = nsec_diff(&t->t_after, &t->t_before); t->iter++; t->period_total += dur; t->period_max = t->period_max < dur ? dur : t->period_max; if ((t->iter % report_interval) == 0) { fprintf(stdout, "%s[%d]: avg: %.1f msec; max: %.1f msec\n", ctx, getpid(), (double) (t->period_total / read_report_interval) / 1000000, (double) t->period_max / 1000000); t->total += t->period_total; t->period_total = 0; t->period_max = 0; } } static void do_wal_writes(void) { int fd; off_t pos = 0; int64_t iter = 0; struct timing wal_timing; struct timing commit_timing; timing_init(&wal_timing); timing_init(&commit_timing); fd = open("wal", O_RDWR, S_IRUSR|S_IWUSR); if (fd < 0) fatal_error(errno); while(true) { bool is_commit = (iter++ % 5) == 0; if (lseek(fd, pos, SEEK_SET) < 0) fatal_error(errno); timing_before_action(&wal_timing); if (is_commit) timing_before_action(&commit_timing); if (write(fd, initdata, 8192) < 0) fatal_error(errno); timing_after_action(&wal_timing, "wal", wal_report_interval); if (is_commit) { if (fdatasync(fd) < 0) fatal_error(errno); timing_after_action(&commit_timing, "commit", commit_report_interval); } pos += 8192; if (pos + 8192 >= wal_size) pos = 0; nsleep(1000000); } } static void do_checkpointer_writes(void) { int fd; int64_t writes = 0; fd = open("data", O_RDWR, S_IRUSR|S_IWUSR); if (fd < 0) fatal_error(errno); while(true) { off_t pos = random_block(data_size); if (lseek(fd, pos, SEEK_SET) < 0) fatal_error(errno); if (write(fd, initdata, 8192) < 0) fatal_error(errno); if ((++writes % 100000) == 0) { fprintf(stdout, "starting fsync() of files\n"); if (fsync(fd) < 0) fatal_error(errno); fprintf(stdout, "finished fsync() of files\n"); } nsleep(200000); } } static void do_random_reads(void) { int fd; struct timing timing; timing_init(&timing); fd = open("data", O_RDWR, S_IRUSR|S_IWUSR); if (fd < 0) fatal_error(errno); while(true) { char data[8192]; off_t pos = random_block(data_size); if (lseek(fd, pos, SEEK_SET) < 0) fatal_error(errno); timing_before_action(&timing); if (read(fd, data, 8192) < 0) fatal_error(errno); timing_after_action(&timing, "read", read_report_interval); } } static void initialize_files(void) { int fd; ssize_t data_size_written = 0; ssize_t wal_size_written = 0; /* initialize data file */ fd = open("data", O_CREAT|O_EXCL|O_RDWR, S_IRUSR|S_IWUSR); if (fd < 0 && errno == EEXIST) ; else if (fd < 0) fatal_error(errno); else { while (data_size_written <= data_size) { ssize_t ret = write(fd, initdata, sizeof(initdata)); if (ret == -1) fatal_error(errno); data_size_written += ret; } if (fsync(fd) < 0) fatal_error(errno); close(fd); } /* initialize wal file */ fd = open("wal", O_CREAT|O_EXCL|O_RDWR, S_IRUSR|S_IWUSR); if (fd < 0 && errno == EEXIST) ; else if (fd < 0) fatal_error(errno); else { while (wal_size_written <= wal_size) { ssize_t ret = write(fd, initdata, sizeof(initdata)); if (ret == -1) fatal_error(errno); wal_size_written += ret; } fsync(fd); close(fd); } } static pid_t start_subprocess(void (*sub)(void)) { pid_t pid; pid = fork(); if (pid == -1) fatal_error(errno); else if (pid == 0) sub(); return pid; } int main(int argc, char **argv) { int status; pid_t checkpointer_pid, wal_pid; /* * Don't want to hit the same, already cached, pages after restarting. */ srandom((int)time(NULL)); initialize_files(); checkpointer_pid = start_subprocess(do_checkpointer_writes); wal_pid = start_subprocess(do_wal_writes); /* start all reader processes */ for (int i = 0; i < NUM_RANDOM_READERS; i++) readers[i] = start_subprocess(do_random_reads); /* return if all subprocesses decided to die */ for (int i = 0; i < NUM_RANDOM_READERS; i++) waitpid(readers[i], &status, 0); waitpid(checkpointer_pid, &status, 0); waitpid(wal_pid, &status, 0); return 0; }