/* compile with 'gcc -g -o dma_thread dma_thread.c -lpthread' */ #define _GNU_SOURCE 1 #include #include #include #include #include #include #include #include #include #include #define FILESIZE (12*1024*1024) #define READSIZE (1024*1024) #define FILENAME "test_%.04d.tmp" #define FILECOUNT 100 #define MIN_WORKERS 2 #define MAX_WORKERS 256 #define PAGE_SIZE 4096 #define true 1 #define false 0 typedef int bool; bool done = false; int workers = 2; #define PATTERN (0xfa) static void usage (void) { fprintf(stderr, "\nUsage: dma_thread [-h | -a [ -w ]\n" "\nWith no arguments, generate test files and exit.\n" "-h Display this help and exit.\n" "-a align read buffer to offset .\n" "-w number of worker threads, 2 (default) to 256,\n" " defaults to number of cores.\n\n" "Run first with no arguments to generate files.\n" "Then run with -a = 512 or 0. \n"); } typedef struct { pthread_t tid; int worker_number; int fd; int offset; int length; int pattern; unsigned char *buffer; } worker_t; void *worker_thread(void * arg) { int bytes_read; int i,k; worker_t *worker = (worker_t *) arg; int offset = worker->offset; int fd = worker->fd; unsigned char *buffer = worker->buffer; int pattern = worker->pattern; int length = worker->length; if (lseek(fd, offset, SEEK_SET) < 0) { fprintf(stderr, "Failed to lseek to %d on fd %d: %s.\n", offset, fd, strerror(errno)); exit(1); } bytes_read = read(fd, buffer, length); if (bytes_read != length) { fprintf(stderr, "read failed on fd %d: bytes_read %d, %s\n", fd, bytes_read, strerror(errno)); exit(1); } /* Corruption check */ for (i = 0; i < length; i++) { if (buffer[i] != pattern) { printf("Bad data at 0x%.06x: %p, \n", i, buffer + i); printf("Data dump starting at 0x%.06x:\n", i - 8); printf("Expect 0x%x followed by 0x%x:\n", pattern, PATTERN); for (k = 0; k < 16; k++) { printf("%02x ", buffer[i - 8 + k]); if (k == 7) { printf("\n"); } } printf("\n"); abort(); } } return 0; } void *fork_thread (void *arg) { pid_t pid; while (!done) { pid = fork(); if (pid == 0) { exit(0); } else if (pid < 0) { fprintf(stderr, "Failed to fork child.\n"); exit(1); } waitpid(pid, NULL, 0 ); usleep(100); } return NULL; } int main(int argc, char *argv[]) { unsigned char *buffer = NULL; char filename[1024]; int fd; bool dowrite = true; pthread_t fork_tid; int c, n, j; worker_t *worker; int align = 0; int offset, rc; workers = sysconf(_SC_NPROCESSORS_ONLN); while ((c = getopt(argc, argv, "a:hw:")) != -1) { switch (c) { case 'a': align = atoi(optarg); if (align < 0 || align > PAGE_SIZE) { printf("Bad alignment %d.\n", align); exit(1); } dowrite = false; break; case 'h': usage(); exit(0); break; case 'w': workers = atoi(optarg); if (workers < MIN_WORKERS || workers > MAX_WORKERS) { fprintf(stderr, "Worker count %d not between " "%d and %d, inclusive.\n", workers, MIN_WORKERS, MAX_WORKERS); usage(); exit(1); } dowrite = false; break; default: usage(); exit(1); } } if (argc > 1 && (optind < argc)) { fprintf(stderr, "Bad command line.\n"); usage(); exit(1); } if (dowrite) { buffer = malloc(FILESIZE); if (buffer == NULL) { fprintf(stderr, "Failed to malloc write buffer.\n"); exit(1); } for (n = 1; n <= FILECOUNT; n++) { sprintf(filename, FILENAME, n); fd = open(filename, O_RDWR|O_CREAT|O_TRUNC, 0666); if (fd < 0) { printf("create failed(%s): %s.\n", filename, strerror(errno)); exit(1); } memset(buffer, n, FILESIZE); printf("Writing file %s.\n", filename); if (write(fd, buffer, FILESIZE) != FILESIZE) { printf("write failed (%s)\n", filename); } close(fd); fd = -1; } free(buffer); buffer = NULL; printf("done\n"); exit(0); } printf("Using %d workers.\n", workers); worker = malloc(workers * sizeof(worker_t)); if (worker == NULL) { fprintf(stderr, "Failed to malloc worker array.\n"); exit(1); } for (j = 0; j < workers; j++) { worker[j].worker_number = j; } printf("Using alignment %d.\n", align); posix_memalign((void *)&buffer, PAGE_SIZE, READSIZE+ align); printf("Read buffer: %p.\n", buffer); for (n = 1; n <= FILECOUNT; n++) { sprintf(filename, FILENAME, n); for (j = 0; j < workers; j++) { if ((worker[j].fd = open(filename, O_RDONLY|O_DIRECT)) < 0) { fprintf(stderr, "Failed to open %s: %s.\n", filename, strerror(errno)); exit(1); } worker[j].pattern = n; } printf("Reading file %d.\n", n); for (offset = 0; offset < FILESIZE; offset += READSIZE) { memset(buffer, PATTERN, READSIZE + align); for (j = 0; j < workers; j++) { worker[j].offset = offset + j * PAGE_SIZE; worker[j].buffer = buffer + align + j * PAGE_SIZE; worker[j].length = PAGE_SIZE; } /* The final worker reads whatever is left over. */ worker[workers - 1].length = READSIZE - PAGE_SIZE * (workers - 1); done = 0; rc = pthread_create(&fork_tid, NULL, fork_thread, NULL); if (rc != 0) { fprintf(stderr, "Can't create fork thread: %s.\n", strerror(rc)); exit(1); } for (j = 0; j < workers; j++) { rc = pthread_create(&worker[j].tid, NULL, worker_thread, worker + j); if (rc != 0) { fprintf(stderr, "Can't create worker thread %d: %s.\n", j, strerror(rc)); exit(1); } } for (j = 0; j < workers; j++) { rc = pthread_join(worker[j].tid, NULL); if (rc != 0) { fprintf(stderr, "Failed to join worker thread %d: %s.\n", j, strerror(rc)); exit(1); } } /* Let the fork thread know it's ok to exit */ done = 1; rc = pthread_join(fork_tid, NULL); if (rc != 0) { fprintf(stderr, "Failed to join fork thread: %s.\n", strerror(rc)); exit(1); } } /* Close the fd's for the next file. */ for (j = 0; j < workers; j++) { close(worker[j].fd); } } return 0; }