>From 563903cbc136b1b9c5b5f2b52d8acb0bbb6a8732 Mon Sep 17 00:00:00 2001 From: Manfred Spraul Date: Sat, 15 Aug 2009 11:48:21 +0200 Subject: [PATCH] [PATCH 4a/4] ipc: sem optimise simple operations, v2 An alternative patch to Nick's proposal: http://lkml.org/lkml/2009/8/11/11 The patch is based on Nick's patch. The patch is ontop of the first three patches from Nick. I've added two additional optimizations for two common cases: - do not restart scanning for all "alter" operations, only if necessary. - stop scanning decrements if the semaphore value is already 0. Identical with Nick's patch: - per semaphore list to optimize single sop semop calls: This avoids that the semaphore operations have to scan through all waiting tasks, even for the tasks that are waiting on a different semaphore from the same array. Differences: - same code for per-semaphore and global list. - both simple and complex operations are improved. - one list for both wait-for-zero and decrement instead of two lists. - unlink_queue helper function. - do not use likely/unlikely in exit_sem(): complex_count depends on the application, the kernel doesn't know anything about the running app. And: it's a very rare path. Open points: - further testing Signed-off-by: Manfred Spraul --- include/linux/sem.h | 5 ++- ipc/sem.c | 148 ++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 122 insertions(+), 31 deletions(-) diff --git a/include/linux/sem.h b/include/linux/sem.h index 1b191c1..ea03058 100644 --- a/include/linux/sem.h +++ b/include/linux/sem.h @@ -86,6 +86,7 @@ struct task_struct; struct sem { int semval; /* current value */ int sempid; /* pid of last operation */ + struct list_head sem_pending; /* pending operations to be processed */ }; /* One sem_array data structure for each set of semaphores in the system. */ @@ -96,11 +97,13 @@ struct sem_array { struct sem *sem_base; /* ptr to first semaphore in array */ struct list_head sem_pending; /* pending operations to be processed */ struct list_head list_id; /* undo requests on this array */ - unsigned long sem_nsems; /* no. of semaphores in array */ + int sem_nsems; /* no. of semaphores in array */ + int complex_count; /* pending complex operations */ }; /* One queue for each sleeping process in the system. */ struct sem_queue { + struct list_head simple_list; /* queue of pending operations */ struct list_head list; /* queue of pending operations */ struct task_struct *sleeper; /* this process */ struct sem_undo *undo; /* undo structure */ diff --git a/ipc/sem.c b/ipc/sem.c index 3629ef8..ee49b38 100644 --- a/ipc/sem.c +++ b/ipc/sem.c @@ -240,6 +240,7 @@ static int newary(struct ipc_namespace *ns, struct ipc_params *params) key_t key = params->key; int nsems = params->u.nsems; int semflg = params->flg; + int i; if (!nsems) return -EINVAL; @@ -272,6 +273,14 @@ static int newary(struct ipc_namespace *ns, struct ipc_params *params) ns->used_sems += nsems; sma->sem_base = (struct sem *) &sma[1]; + + for (i = 0; i < nsems; i++) { + struct sem *s = &sma->sem_base[i]; + + INIT_LIST_HEAD(&s->sem_pending); + } + + sma->complex_count = 0; INIT_LIST_HEAD(&sma->sem_pending); INIT_LIST_HEAD(&sma->list_id); sma->sem_nsems = nsems; @@ -335,13 +344,16 @@ SYSCALL_DEFINE3(semget, key_t, key, int, nsems, int, semflg) * all at once. Return 0 if yes, 1 if need to sleep, else return error code. */ -static int try_atomic_semop (struct sem_array * sma, struct sembuf * sops, - int nsops, struct sem_undo *un, int pid) +static int try_atomic_semop(struct sem_array * sma, struct sembuf * sops, + int nsops, struct sem_undo *un, int pid, + int *phelps) { - int result, sem_op; + int result, sem_op, helps; struct sembuf *sop; struct sem * curr; + helps = 0; + for (sop = sops; sop < sops + nsops; sop++) { curr = sma->sem_base + sop->sem_num; sem_op = sop->sem_op; @@ -363,6 +375,15 @@ static int try_atomic_semop (struct sem_array * sma, struct sembuf * sops, if (undo < (-SEMAEM - 1) || undo > SEMAEM) goto out_of_range; } + /* If we incremented a semaphore, or decremented + * a semaphore to 0, then this might allow other operations to + * proceed. Remember that. + * Note: helps is only an optimization, it doesn't matter that + * is is a bit conservative (e.g. decrement by 3, increment + * that sem by 2 doesn't help, but the code returns "helps=1"). + */ + if (sem_op > 0 || (sem_op < 0 && result == 0)) + helps = 1; curr->semval = result; } @@ -375,6 +396,7 @@ static int try_atomic_semop (struct sem_array * sma, struct sembuf * sops, } sma->sem_otime = get_seconds(); + *phelps = helps; return 0; out_of_range: @@ -394,6 +416,8 @@ undo: sop--; } + /* a failed operation can never help other operations. */ + *phelps = 0; return result; } @@ -418,40 +442,85 @@ static void wake_up_sem_queue(struct sem_queue *q, int error) preempt_enable(); } -/* Go through the pending queue for the indicated semaphore - * looking for tasks that can be completed. +static void unlink_queue(struct sem_array *sma, struct sem_queue *q) +{ + list_del(&q->list); + if (q->nsops == 1) { + BUG_ON(list_empty(&q->simple_list)); + list_del(&q->simple_list); + } else { + BUG_ON(!list_empty(&q->simple_list)); + sma->complex_count--; + } +} + + +/** + * update_queue(sma, semnum): Look for tasks that can be completed. + * @sma: semaphore array. + * @semnum: semaphore that was modified. + * + * update_queue must be called after a semaphore in a semaphore array + * was modified. If multiple semaphore were modified, then @semnum + * must be set to -1. */ -static void update_queue (struct sem_array * sma) +static void update_queue(struct sem_array *sma, int semnum) { struct sem_queue *q, *tq; + struct list_head *pending_list; + + /* if there are complex operations around, then knowing the semaphore + * that was modified doesn't help us. Assume that multiple semaphores + * were modified. + */ + if (sma->complex_count) + semnum = -1; + + if (semnum == -1) + pending_list = &sma->sem_pending; + else + pending_list = &sma->sem_base[semnum].sem_pending; again: - list_for_each_entry_safe(q, tq, &sma->sem_pending, list) { + list_for_each_entry_safe(q, tq, pending_list, list) { int error; - int alter; + int could_help; + + /* If we are looking for only one semaphore and that semaphore + * is 0, then it does not make sense to scan the "alter" + * entries: simple increments that affect only one entry + * succeed immediately and cannot be in the pending queue, + * and decrements cannot succeed if the value is already 0. + */ + if (semnum != -1 && sma->sem_base[semnum].semval == 0 && + q->alter) + break; error = try_atomic_semop(sma, q->sops, q->nsops, - q->undo, q->pid); + q->undo, q->pid, &could_help); - /* Does q->sleeper still need to sleep? */ + /* q still needs to sleep, nothing happened */ if (error > 0) continue; - list_del(&q->list); + /* q must be woken up: + * The semaphore operation was processed, either successfully + * (error=0) or it failed (e.g. UNDO overflow). + */ + unlink_queue(sma, q); + wake_up_sem_queue(q, error); /* * The next operation that must be checked depends on the type * of the completed operation: - * - if the operation modified the array, then restart from the - * head of the queue and check for threads that might be - * waiting for semaphore values to become 0. - * - if the operation didn't modify the array, then just - * continue. + * - if we incremented something or if a semaphore value got 0, + * then restart from the head of pending list: there could + * be tasks that are waiting for this event. */ - alter = q->alter; - wake_up_sem_queue(q, error); - if (alter) + if (could_help) goto again; + /* - Otherwise: just continue scanning. + */ } } @@ -531,8 +600,7 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp) /* Wake up all pending processes and let them fail with EIDRM. */ list_for_each_entry_safe(q, tq, &sma->sem_pending, list) { - list_del(&q->list); - + unlink_queue(sma, q); wake_up_sem_queue(q, -EIDRM); } @@ -754,7 +822,7 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum, } sma->sem_ctime = get_seconds(); /* maybe some queued-up processes were waiting for this */ - update_queue(sma); + update_queue(sma, -1); err = 0; goto out_unlock; } @@ -796,7 +864,7 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum, curr->sempid = task_tgid_vnr(current); sma->sem_ctime = get_seconds(); /* maybe some queued-up processes were waiting for this */ - update_queue(sma); + update_queue(sma, semnum); err = 0; goto out_unlock; } @@ -1072,7 +1140,7 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops, struct sembuf fast_sops[SEMOPM_FAST]; struct sembuf* sops = fast_sops, *sop; struct sem_undo *un; - int undos = 0, alter = 0, max; + int undos = 0, alter = 0, max, could_help; struct sem_queue queue; unsigned long jiffies_left = 0; struct ipc_namespace *ns; @@ -1169,10 +1237,15 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops, if (error) goto out_unlock_free; - error = try_atomic_semop (sma, sops, nsops, un, task_tgid_vnr(current)); + error = try_atomic_semop(sma, sops, nsops, un, + task_tgid_vnr(current), &could_help); if (error <= 0) { - if (alter && error == 0) - update_queue (sma); + if (could_help) { + if (nsops == 1) + update_queue(sma, sops->sem_num); + else + update_queue(sma, -1); + } goto out_unlock_free; } @@ -1189,6 +1262,18 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops, list_add_tail(&queue.list, &sma->sem_pending); else list_add(&queue.list, &sma->sem_pending); + if (nsops == 1) { + struct sem *curr; + curr = &sma->sem_base[sops->sem_num]; + + if (alter) + list_add_tail(&queue.list, &curr->sem_pending); + else + list_add(&queue.list, &curr->sem_pending); + } else { + INIT_LIST_HEAD(&queue.simple_list); + sma->complex_count++; + } queue.status = -EINTR; queue.sleeper = current; @@ -1231,7 +1316,7 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops, */ if (timeout && jiffies_left == 0) error = -EAGAIN; - list_del(&queue.list); + unlink_queue(sma, &queue); out_unlock_free: sem_unlock(sma); @@ -1356,11 +1441,14 @@ void exit_sem(struct task_struct *tsk) if (semaphore->semval > SEMVMX) semaphore->semval = SEMVMX; semaphore->sempid = task_tgid_vnr(current); + if (!sma->complex_count) + update_queue(sma, i); } } sma->sem_otime = get_seconds(); /* maybe some queued-up processes were waiting for this */ - update_queue(sma); + if (sma->complex_count) + update_queue(sma, -1); sem_unlock(sma); call_rcu(&un->rcu, free_un); @@ -1374,7 +1462,7 @@ static int sysvipc_sem_proc_show(struct seq_file *s, void *it) struct sem_array *sma = it; return seq_printf(s, - "%10d %10d %4o %10lu %5u %5u %5u %5u %10lu %10lu\n", + "%10d %10d %4o %10u %5u %5u %5u %5u %10lu %10lu\n", sma->sem_perm.key, sma->sem_perm.id, sma->sem_perm.mode, -- 1.6.2.5