ipc/sem.c: use ERR_CAST
[safe/jmp/linux-2.6] / ipc / sem.c
index eac3f46..506c849 100644 (file)
--- a/ipc/sem.c
+++ b/ipc/sem.c
@@ -3,56 +3,6 @@
  * Copyright (C) 1992 Krishna Balasubramanian
  * Copyright (C) 1995 Eric Schenk, Bruno Haible
  *
- * IMPLEMENTATION NOTES ON CODE REWRITE (Eric Schenk, January 1995):
- * This code underwent a massive rewrite in order to solve some problems
- * with the original code. In particular the original code failed to
- * wake up processes that were waiting for semval to go to 0 if the
- * value went to 0 and was then incremented rapidly enough. In solving
- * this problem I have also modified the implementation so that it
- * processes pending operations in a FIFO manner, thus give a guarantee
- * that processes waiting for a lock on the semaphore won't starve
- * unless another locking process fails to unlock.
- * In addition the following two changes in behavior have been introduced:
- * - The original implementation of semop returned the value
- *   last semaphore element examined on success. This does not
- *   match the manual page specifications, and effectively
- *   allows the user to read the semaphore even if they do not
- *   have read permissions. The implementation now returns 0
- *   on success as stated in the manual page.
- * - There is some confusion over whether the set of undo adjustments
- *   to be performed at exit should be done in an atomic manner.
- *   That is, if we are attempting to decrement the semval should we queue
- *   up and wait until we can do so legally?
- *   The original implementation attempted to do this.
- *   The current implementation does not do so. This is because I don't
- *   think it is the right thing (TM) to do, and because I couldn't
- *   see a clean way to get the old behavior with the new design.
- *   The POSIX standard and SVID should be consulted to determine
- *   what behavior is mandated.
- *
- * Further notes on refinement (Christoph Rohland, December 1998):
- * - The POSIX standard says, that the undo adjustments simply should
- *   redo. So the current implementation is o.K.
- * - The previous code had two flaws:
- *   1) It actively gave the semaphore to the next waiting process
- *      sleeping on the semaphore. Since this process did not have the
- *      cpu this led to many unnecessary context switches and bad
- *      performance. Now we only check which process should be able to
- *      get the semaphore and if this process wants to reduce some
- *      semaphore value we simply wake it up without doing the
- *      operation. So it has to try to get it later. Thus e.g. the
- *      running process may reacquire the semaphore during the current
- *      time slice. If it only waits for zero or increases the semaphore,
- *      we do the operation in advance and wake it up.
- *   2) It did not wake up all zero waiting processes. We try to do
- *      better but only get the semops right which only wait for zero or
- *      increase. If there are decrement operations in the operations
- *      array we do the same as before.
- *
- * With the incarnation of O(1) scheduler, it becomes unnecessary to perform
- * check/retry algorithm for waking up blocked processes as the new scheduler
- * is better at handling thread switch than the old one.
- *
  * /proc/sysvipc/sem support (c) 1999 Dragos Acostachioaie <dragos@iname.com>
  *
  * SMP-threaded, sysctl's added
@@ -61,6 +11,8 @@
  * (c) 2001 Red Hat Inc
  * Lockless wakeup
  * (c) 2003 Manfred Spraul <manfred@colorfullife.com>
+ * Further wakeup optimizations, documentation
+ * (c) 2010 Manfred Spraul <manfred@colorfullife.com>
  *
  * support for audit of ipc object properties and permission changes
  * Dustin Kirkland <dustin.kirkland@us.ibm.com>
  * namespaces support
  * OpenVZ, SWsoft Inc.
  * Pavel Emelianov <xemul@openvz.org>
+ *
+ * Implementation notes: (May 2010)
+ * This file implements System V semaphores.
+ *
+ * User space visible behavior:
+ * - FIFO ordering for semop() operations (just FIFO, not starvation
+ *   protection)
+ * - multiple semaphore operations that alter the same semaphore in
+ *   one semop() are handled.
+ * - sem_ctime (time of last semctl()) is updated in the IPC_SET, SETVAL and
+ *   SETALL calls.
+ * - two Linux specific semctl() commands: SEM_STAT, SEM_INFO.
+ * - undo adjustments at process exit are limited to 0..SEMVMX.
+ * - namespace are supported.
+ * - SEMMSL, SEMMNS, SEMOPM and SEMMNI can be configured at runtine by writing
+ *   to /proc/sys/kernel/sem.
+ * - statistics about the usage are reported in /proc/sysvipc/sem.
+ *
+ * Internals:
+ * - scalability:
+ *   - all global variables are read-mostly.
+ *   - semop() calls and semctl(RMID) are synchronized by RCU.
+ *   - most operations do write operations (actually: spin_lock calls) to
+ *     the per-semaphore array structure.
+ *   Thus: Perfect SMP scaling between independent semaphore arrays.
+ *         If multiple semaphores in one array are used, then cache line
+ *         trashing on the semaphore array spinlock will limit the scaling.
+ * - semncnt and semzcnt are calculated on demand in count_semncnt() and
+ *   count_semzcnt()
+ * - the task that performs a successful semop() scans the list of all
+ *   sleeping tasks and completes any pending operations that can be fulfilled.
+ *   Semaphores are actively given to waiting tasks (necessary for FIFO).
+ *   (see update_queue())
+ * - To improve the scalability, the actual wake-up calls are performed after
+ *   dropping all locks. (see wake_up_sem_queue_prepare(),
+ *   wake_up_sem_queue_do())
+ * - All work is done by the waker, the woken up task does not have to do
+ *   anything - not even acquiring a lock or dropping a refcount.
+ * - A woken up task may not even touch the semaphore array anymore, it may
+ *   have been destroyed already by a semctl(RMID).
+ * - The synchronizations between wake-ups due to a timeout/signal and a
+ *   wake-up due to a completed semaphore operation is achieved by using an
+ *   intermediate state (IN_WAKEUP).
+ * - UNDO values are stored in an array (one per process and per
+ *   semaphore array, lazily allocated). For backwards compatibility, multiple
+ *   modes for the UNDO variables are supported (per process, per thread)
+ *   (see copy_semundo, CLONE_SYSVSEM)
+ * - There are two lists of the pending operations: a per-array list
+ *   and per-semaphore list (stored in the array). This allows to achieve FIFO
+ *   ordering without always scanning all pending operations.
+ *   The worst-case behavior is nevertheless O(N^2) for N wakeups.
  */
 
 #include <linux/slab.h>
@@ -241,6 +244,7 @@ static int newary(struct ipc_namespace *ns, struct ipc_params *params)
        key_t key = params->key;
        int nsems = params->u.nsems;
        int semflg = params->flg;
+       int i;
 
        if (!nsems)
                return -EINVAL;
@@ -273,6 +277,11 @@ static int newary(struct ipc_namespace *ns, struct ipc_params *params)
        ns->used_sems += nsems;
 
        sma->sem_base = (struct sem *) &sma[1];
+
+       for (i = 0; i < nsems; i++)
+               INIT_LIST_HEAD(&sma->sem_base[i].sem_pending);
+
+       sma->complex_count = 0;
        INIT_LIST_HEAD(&sma->sem_pending);
        INIT_LIST_HEAD(&sma->list_id);
        sma->sem_nsems = nsems;
@@ -375,7 +384,6 @@ static int try_atomic_semop (struct sem_array * sma, struct sembuf * sops,
                sop--;
        }
        
-       sma->sem_otime = get_seconds();
        return 0;
 
 out_of_range:
@@ -398,38 +406,180 @@ undo:
        return result;
 }
 
-/*
- * Wake up a process waiting on the sem queue with a given error.
- * The queue is invalid (may not be accessed) after the function returns.
+/** wake_up_sem_queue_prepare(q, error): Prepare wake-up
+ * @q: queue entry that must be signaled
+ * @error: Error value for the signal
+ *
+ * Prepare the wake-up of the queue entry q.
+ */
+static void wake_up_sem_queue_prepare(struct list_head *pt,
+                               struct sem_queue *q, int error)
+{
+       if (list_empty(pt)) {
+               /*
+                * Hold preempt off so that we don't get preempted and have the
+                * wakee busy-wait until we're scheduled back on.
+                */
+               preempt_disable();
+       }
+       q->status = IN_WAKEUP;
+       q->pid = error;
+
+       list_add_tail(&q->simple_list, pt);
+}
+
+/**
+ * wake_up_sem_queue_do(pt) - do the actual wake-up
+ * @pt: list of tasks to be woken up
+ *
+ * Do the actual wake-up.
+ * The function is called without any locks held, thus the semaphore array
+ * could be destroyed already and the tasks can disappear as soon as the
+ * status is set to the actual return code.
  */
-static void wake_up_sem_queue(struct sem_queue *q, int error)
+static void wake_up_sem_queue_do(struct list_head *pt)
 {
+       struct sem_queue *q, *t;
+       int did_something;
+
+       did_something = !list_empty(pt);
+       list_for_each_entry_safe(q, t, pt, simple_list) {
+               wake_up_process(q->sleeper);
+               /* q can disappear immediately after writing q->status. */
+               smp_wmb();
+               q->status = q->pid;
+       }
+       if (did_something)
+               preempt_enable();
+}
+
+static void unlink_queue(struct sem_array *sma, struct sem_queue *q)
+{
+       list_del(&q->list);
+       if (q->nsops == 1)
+               list_del(&q->simple_list);
+       else
+               sma->complex_count--;
+}
+
+/** check_restart(sma, q)
+ * @sma: semaphore array
+ * @q: the operation that just completed
+ *
+ * update_queue is O(N^2) when it restarts scanning the whole queue of
+ * waiting operations. Therefore this function checks if the restart is
+ * really necessary. It is called after a previously waiting operation
+ * was completed.
+ */
+static int check_restart(struct sem_array *sma, struct sem_queue *q)
+{
+       struct sem *curr;
+       struct sem_queue *h;
+
+       /* if the operation didn't modify the array, then no restart */
+       if (q->alter == 0)
+               return 0;
+
+       /* pending complex operations are too difficult to analyse */
+       if (sma->complex_count)
+               return 1;
+
+       /* we were a sleeping complex operation. Too difficult */
+       if (q->nsops > 1)
+               return 1;
+
+       curr = sma->sem_base + q->sops[0].sem_num;
+
+       /* No-one waits on this queue */
+       if (list_empty(&curr->sem_pending))
+               return 0;
+
+       /* the new semaphore value */
+       if (curr->semval) {
+               /* It is impossible that someone waits for the new value:
+                * - q is a previously sleeping simple operation that
+                *   altered the array. It must be a decrement, because
+                *   simple increments never sleep.
+                * - The value is not 0, thus wait-for-zero won't proceed.
+                * - If there are older (higher priority) decrements
+                *   in the queue, then they have observed the original
+                *   semval value and couldn't proceed. The operation
+                *   decremented to value - thus they won't proceed either.
+                */
+               BUG_ON(q->sops[0].sem_op >= 0);
+               return 0;
+       }
        /*
-        * Hold preempt off so that we don't get preempted and have the
-        * wakee busy-wait until we're scheduled back on. We're holding
-        * locks here so it may not strictly be needed, however if the
-        * locks become preemptible then this prevents such a problem.
+        * semval is 0. Check if there are wait-for-zero semops.
+        * They must be the first entries in the per-semaphore simple queue
         */
-       preempt_disable();
-       q->status = IN_WAKEUP;
-       wake_up_process(q->sleeper);
-       /* hands-off: q can disappear immediately after writing q->status. */
-       smp_wmb();
-       q->status = error;
-       preempt_enable();
+       h = list_first_entry(&curr->sem_pending, struct sem_queue, simple_list);
+       BUG_ON(h->nsops != 1);
+       BUG_ON(h->sops[0].sem_num != q->sops[0].sem_num);
+
+       /* Yes, there is a wait-for-zero semop. Restart */
+       if (h->sops[0].sem_op == 0)
+               return 1;
+
+       /* Again - no-one is waiting for the new value. */
+       return 0;
 }
 
-/* Go through the pending queue for the indicated semaphore
- * looking for tasks that can be completed.
+
+/**
+ * update_queue(sma, semnum): Look for tasks that can be completed.
+ * @sma: semaphore array.
+ * @semnum: semaphore that was modified.
+ * @pt: list head for the tasks that must be woken up.
+ *
+ * update_queue must be called after a semaphore in a semaphore array
+ * was modified. If multiple semaphore were modified, then @semnum
+ * must be set to -1.
+ * The tasks that must be woken up are added to @pt. The return code
+ * is stored in q->pid.
+ * The function return 1 if at least one semop was completed successfully.
  */
-static void update_queue (struct sem_array * sma)
+static int update_queue(struct sem_array *sma, int semnum, struct list_head *pt)
 {
-       struct sem_queue *q, *tq;
+       struct sem_queue *q;
+       struct list_head *walk;
+       struct list_head *pending_list;
+       int offset;
+       int semop_completed = 0;
+
+       /* if there are complex operations around, then knowing the semaphore
+        * that was modified doesn't help us. Assume that multiple semaphores
+        * were modified.
+        */
+       if (sma->complex_count)
+               semnum = -1;
+
+       if (semnum == -1) {
+               pending_list = &sma->sem_pending;
+               offset = offsetof(struct sem_queue, list);
+       } else {
+               pending_list = &sma->sem_base[semnum].sem_pending;
+               offset = offsetof(struct sem_queue, simple_list);
+       }
 
 again:
-       list_for_each_entry_safe(q, tq, &sma->sem_pending, list) {
-               int error;
-               int alter;
+       walk = pending_list->next;
+       while (walk != pending_list) {
+               int error, restart;
+
+               q = (struct sem_queue *)((char *)walk - offset);
+               walk = walk->next;
+
+               /* If we are scanning the single sop, per-semaphore list of
+                * one semaphore and that semaphore is 0, then it is not
+                * necessary to scan the "alter" entries: simple increments
+                * that affect only one entry succeed immediately and cannot
+                * be in the  per semaphore pending queue, and decrements
+                * cannot be successful if the value is already 0.
+                */
+               if (semnum != -1 && sma->sem_base[semnum].semval == 0 &&
+                               q->alter)
+                       break;
 
                error = try_atomic_semop(sma, q->sops, q->nsops,
                                         q->undo, q->pid);
@@ -438,24 +588,60 @@ again:
                if (error > 0)
                        continue;
 
-               list_del(&q->list);
+               unlink_queue(sma, q);
 
-               /*
-                * The next operation that must be checked depends on the type
-                * of the completed operation:
-                * - if the operation modified the array, then restart from the
-                *   head of the queue and check for threads that might be
-                *   waiting for the new semaphore values.
-                * - if the operation didn't modify the array, then just
-                *   continue.
-                */
-               alter = q->alter;
-               wake_up_sem_queue(q, error);
-               if (alter && !error)
+               if (error) {
+                       restart = 0;
+               } else {
+                       semop_completed = 1;
+                       restart = check_restart(sma, q);
+               }
+
+               wake_up_sem_queue_prepare(pt, q, error);
+               if (restart)
                        goto again;
        }
+       return semop_completed;
 }
 
+/**
+ * do_smart_update(sma, sops, nsops, otime, pt) - optimized update_queue
+ * @sma: semaphore array
+ * @sops: operations that were performed
+ * @nsops: number of operations
+ * @otime: force setting otime
+ * @pt: list head of the tasks that must be woken up.
+ *
+ * do_smart_update() does the required called to update_queue, based on the
+ * actual changes that were performed on the semaphore array.
+ * Note that the function does not do the actual wake-up: the caller is
+ * responsible for calling wake_up_sem_queue_do(@pt).
+ * It is safe to perform this call after dropping all locks.
+ */
+static void do_smart_update(struct sem_array *sma, struct sembuf *sops, int nsops,
+                       int otime, struct list_head *pt)
+{
+       int i;
+
+       if (sma->complex_count || sops == NULL) {
+               if (update_queue(sma, -1, pt))
+                       otime = 1;
+               goto done;
+       }
+
+       for (i = 0; i < nsops; i++) {
+               if (sops[i].sem_op > 0 ||
+                       (sops[i].sem_op < 0 &&
+                               sma->sem_base[sops[i].sem_num].semval == 0))
+                       if (update_queue(sma, sops[i].sem_num, pt))
+                               otime = 1;
+       }
+done:
+       if (otime)
+               sma->sem_otime = get_seconds();
+}
+
+
 /* The following counts are associated to each semaphore:
  *   semncnt        number of tasks waiting on semval being nonzero
  *   semzcnt        number of tasks waiting on semval being zero
@@ -518,6 +704,7 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
        struct sem_undo *un, *tu;
        struct sem_queue *q, *tq;
        struct sem_array *sma = container_of(ipcp, struct sem_array, sem_perm);
+       struct list_head tasks;
 
        /* Free the existing undo structures for this semaphore set.  */
        assert_spin_locked(&sma->sem_perm.lock);
@@ -531,16 +718,17 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
        }
 
        /* Wake up all pending processes and let them fail with EIDRM. */
+       INIT_LIST_HEAD(&tasks);
        list_for_each_entry_safe(q, tq, &sma->sem_pending, list) {
-               list_del(&q->list);
-
-               wake_up_sem_queue(q, -EIDRM);
+               unlink_queue(sma, q);
+               wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
        }
 
        /* Remove the semaphore set from the IDR */
        sem_rmid(ns, sma);
        sem_unlock(sma);
 
+       wake_up_sem_queue_do(&tasks);
        ns->used_sems -= sma->sem_nsems;
        security_sem_free(sma);
        ipc_rcu_putref(sma);
@@ -571,7 +759,7 @@ static unsigned long copy_semid_to_user(void __user *buf, struct semid64_ds *in,
 static int semctl_nolock(struct ipc_namespace *ns, int semid,
                         int cmd, int version, union semun arg)
 {
-       int err = -EINVAL;
+       int err;
        struct sem_array *sma;
 
        switch(cmd) {
@@ -648,7 +836,6 @@ static int semctl_nolock(struct ipc_namespace *ns, int semid,
        default:
                return -EINVAL;
        }
-       return err;
 out_unlock:
        sem_unlock(sma);
        return err;
@@ -663,11 +850,13 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
        ushort fast_sem_io[SEMMSL_FAST];
        ushort* sem_io = fast_sem_io;
        int nsems;
+       struct list_head tasks;
 
        sma = sem_lock_check(ns, semid);
        if (IS_ERR(sma))
                return PTR_ERR(sma);
 
+       INIT_LIST_HEAD(&tasks);
        nsems = sma->sem_nsems;
 
        err = -EACCES;
@@ -755,7 +944,7 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
                }
                sma->sem_ctime = get_seconds();
                /* maybe some queued-up processes were waiting for this */
-               update_queue(sma);
+               do_smart_update(sma, NULL, 0, 0, &tasks);
                err = 0;
                goto out_unlock;
        }
@@ -797,13 +986,15 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
                curr->sempid = task_tgid_vnr(current);
                sma->sem_ctime = get_seconds();
                /* maybe some queued-up processes were waiting for this */
-               update_queue(sma);
+               do_smart_update(sma, NULL, 0, 0, &tasks);
                err = 0;
                goto out_unlock;
        }
        }
 out_unlock:
        sem_unlock(sma);
+       wake_up_sem_queue_do(&tasks);
+
 out_free:
        if(sem_io != fast_sem_io)
                ipc_free(sem_io, sizeof(ushort)*nsems);
@@ -1017,7 +1208,7 @@ static struct sem_undo *find_alloc_undo(struct ipc_namespace *ns, int semid)
        /* step 1: figure out the size of the semaphore array */
        sma = sem_lock_check(ns, semid);
        if (IS_ERR(sma))
-               return ERR_PTR(PTR_ERR(sma));
+               return ERR_CAST(sma);
 
        nsems = sma->sem_nsems;
        sem_getref_and_unlock(sma);
@@ -1077,6 +1268,7 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
        struct sem_queue queue;
        unsigned long jiffies_left = 0;
        struct ipc_namespace *ns;
+       struct list_head tasks;
 
        ns = current->nsproxy->ipc_ns;
 
@@ -1125,6 +1317,8 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
        } else
                un = NULL;
 
+       INIT_LIST_HEAD(&tasks);
+
        sma = sem_lock_check(ns, semid);
        if (IS_ERR(sma)) {
                if (un)
@@ -1173,7 +1367,8 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
        error = try_atomic_semop (sma, sops, nsops, un, task_tgid_vnr(current));
        if (error <= 0) {
                if (alter && error == 0)
-                       update_queue (sma);
+                       do_smart_update(sma, sops, nsops, 1, &tasks);
+
                goto out_unlock_free;
        }
 
@@ -1191,6 +1386,19 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
        else
                list_add(&queue.list, &sma->sem_pending);
 
+       if (nsops == 1) {
+               struct sem *curr;
+               curr = &sma->sem_base[sops->sem_num];
+
+               if (alter)
+                       list_add_tail(&queue.simple_list, &curr->sem_pending);
+               else
+                       list_add(&queue.simple_list, &curr->sem_pending);
+       } else {
+               INIT_LIST_HEAD(&queue.simple_list);
+               sma->complex_count++;
+       }
+
        queue.status = -EINTR;
        queue.sleeper = current;
        current->state = TASK_INTERRUPTIBLE;
@@ -1232,10 +1440,12 @@ SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
         */
        if (timeout && jiffies_left == 0)
                error = -EAGAIN;
-       list_del(&queue.list);
+       unlink_queue(sma, &queue);
 
 out_unlock_free:
        sem_unlock(sma);
+
+       wake_up_sem_queue_do(&tasks);
 out_free:
        if(sops != fast_sops)
                kfree(sops);
@@ -1296,6 +1506,7 @@ void exit_sem(struct task_struct *tsk)
        for (;;) {
                struct sem_array *sma;
                struct sem_undo *un;
+               struct list_head tasks;
                int semid;
                int i;
 
@@ -1359,10 +1570,11 @@ void exit_sem(struct task_struct *tsk)
                                semaphore->sempid = task_tgid_vnr(current);
                        }
                }
-               sma->sem_otime = get_seconds();
                /* maybe some queued-up processes were waiting for this */
-               update_queue(sma);
+               INIT_LIST_HEAD(&tasks);
+               do_smart_update(sma, NULL, 0, 1, &tasks);
                sem_unlock(sma);
+               wake_up_sem_queue_do(&tasks);
 
                call_rcu(&un->rcu, free_un);
        }
@@ -1375,7 +1587,7 @@ static int sysvipc_sem_proc_show(struct seq_file *s, void *it)
        struct sem_array *sma = it;
 
        return seq_printf(s,
-                         "%10d %10d  %4o %10lu %5u %5u %5u %5u %10lu %10lu\n",
+                         "%10d %10d  %4o %10u %5u %5u %5u %5u %10lu %10lu\n",
                          sma->sem_perm.key,
                          sma->sem_perm.id,
                          sma->sem_perm.mode,