/* * Usage: udpsink [ -p baseport] nbports */ #include #include #include #include #include #include #include #include #include #include struct worker_data { struct event *snk_ev; struct event_base *base; struct timeval t; unsigned long pack_count; unsigned long bytes_count; unsigned long tout; int fd; /* move to avoid hole on 64-bit */ int pad1; /*64B - let Eric figure the math;-> */ //unsigned long _padd[16 - 3]; /* alignment */ }; void usage(int code) { fprintf(stderr, "Usage: udpsink [-p baseport] nbports\n"); exit(code); } void process_recv(int fd, short ev, void *arg) { char buffer[4096]; struct sockaddr_in addr; socklen_t len = sizeof(addr); struct worker_data *wdata = (struct worker_data *)arg; int lu = 0; if ((event_add(wdata->snk_ev, &wdata->t)) < 0) { perror("cb event_add"); return; } if (ev == EV_TIMEOUT) { wdata->tout++; } else { lu = recvfrom(wdata->fd, buffer, sizeof(buffer), 0, (struct sockaddr *)&addr, &len); if (lu > 0) { wdata->pack_count++; wdata->bytes_count += lu; } } } int prep_thread(struct worker_data *wdata) { wdata->t.tv_sec = 1; wdata->t.tv_usec = random() % 50000L; wdata->base = event_init(); event_set(wdata->snk_ev, wdata->fd, EV_READ, process_recv, wdata); event_base_set(wdata->base, wdata->snk_ev); if ((event_add(wdata->snk_ev, &wdata->t)) < 0) { perror("event_add"); return -1; } return 0; } void *worker_func(void *arg) { struct worker_data *wdata = (struct worker_data *)arg; return (void *)event_base_loop(wdata->base, 0); } int main(int argc, char *argv[]) { int c; int baseport = 4000; int nbthreads; struct worker_data *wdata; unsigned long ototal = 0; int concurrent = 0; int verbose = 0; int i; while ((c = getopt(argc, argv, "cvp:")) != -1) { if (c == 'p') baseport = atoi(optarg); else if (c == 'c') concurrent = 1; else if (c == 'v') verbose++; else usage(1); } if (optind == argc) usage(1); nbthreads = atoi(argv[optind]); wdata = calloc(sizeof(struct worker_data), nbthreads); if (!wdata) { perror("calloc"); return 1; } for (i = 0; i < nbthreads; i++) { struct sockaddr_in addr; pthread_t tid; if (i && concurrent) { wdata[i].fd = wdata[0].fd; } else { wdata[i].snk_ev = malloc(sizeof(struct event)); if (!wdata[i].snk_ev) return 1; memset(wdata[i].snk_ev, 0, sizeof(struct event)); wdata[i].fd = socket(PF_INET, SOCK_DGRAM, 0); if (wdata[i].fd == -1) { free(wdata[i].snk_ev); perror("socket"); return 1; } memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; // addr.sin_addr.s_addr = inet_addr(argv[optind]); addr.sin_port = htons(baseport + i); if (bind (wdata[i].fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { free(wdata[i].snk_ev); perror("bind"); return 1; } // fcntl(wdata[i].fd, F_SETFL, O_NDELAY); } if (prep_thread(wdata + i)) { printf("failed to allocate thread %d, exit\n", i); exit(0); } pthread_create(&tid, NULL, worker_func, wdata + i); } for (;;) { unsigned long total; long delta; sleep(1); total = 0; for (i = 0; i < nbthreads; i++) { total += wdata[i].pack_count; } delta = total - ototal; if (delta) { printf("%lu pps (%lu", delta, total); if (verbose) { for (i = 0; i < nbthreads; i++) { if (wdata[i].pack_count) printf(" %d:%lu", i, wdata[i].pack_count); } } printf(")\n"); } ototal = total; } }