[PATCH] rcu batch tuning
authorDipankar Sarma <dipankar@in.ibm.com>
Wed, 8 Mar 2006 05:55:33 +0000 (21:55 -0800)
committerLinus Torvalds <torvalds@g5.osdl.org>
Wed, 8 Mar 2006 22:14:01 +0000 (14:14 -0800)
This patch adds new tunables for RCU queue and finished batches.  There are
two types of controls - number of completed RCU updates invoked in a batch
(blimit) and monitoring for high rate of incoming RCUs on a cpu (qhimark,
qlowmark).

By default, the per-cpu batch limit is set to a small value.  If the input
RCU rate exceeds the high watermark, we do two things - force quiescent
state on all cpus and set the batch limit of the CPU to INTMAX.  Setting
batch limit to INTMAX forces all finished RCUs to be processed in one shot.
 If we have more than INTMAX RCUs queued up, then we have bigger problems
anyway.  Once the incoming queued RCUs fall below the low watermark, the
batch limit is set to the default.

Signed-off-by: Dipankar Sarma <dipankar@in.ibm.com>
Cc: "Paul E. McKenney" <paulmck@us.ibm.com>
Cc: "David S. Miller" <davem@davemloft.net>
Signed-off-by: Andrew Morton <akpm@osdl.org>
Signed-off-by: Linus Torvalds <torvalds@osdl.org>
Documentation/kernel-parameters.txt
include/linux/rcupdate.h
kernel/rcupdate.c

index 7520539..bad5987 100644 (file)
@@ -1284,6 +1284,19 @@ running once the system is up.
                        New name for the ramdisk parameter.
                        See Documentation/ramdisk.txt.
 
+       rcu.blimit=     [KNL,BOOT] Set maximum number of finished
+                       RCU callbacks to process in one batch.
+
+       rcu.qhimark=    [KNL,BOOT] Set threshold of queued
+                       RCU callbacks over which batch limiting is disabled.
+
+       rcu.qlowmark=   [KNL,BOOT] Set threshold of queued
+                       RCU callbacks below which batch limiting is re-enabled.
+
+       rcu.rsinterval= [KNL,BOOT,SMP] Set the number of additional
+                       RCU callbacks to queued before forcing reschedule
+                       on all cpus.
+
        rdinit=         [KNL]
                        Format: <full_path>
                        Run specified binary instead of /init from the ramdisk,
index b87aefa..c2ec6c7 100644 (file)
@@ -98,13 +98,17 @@ struct rcu_data {
        long            batch;           /* Batch # for current RCU batch */
        struct rcu_head *nxtlist;
        struct rcu_head **nxttail;
-       long            count; /* # of queued items */
+       long            qlen;            /* # of queued callbacks */
        struct rcu_head *curlist;
        struct rcu_head **curtail;
        struct rcu_head *donelist;
        struct rcu_head **donetail;
+       long            blimit;          /* Upper limit on a processed batch */
        int cpu;
        struct rcu_head barrier;
+#ifdef CONFIG_SMP
+       long            last_rs_qlen;    /* qlen during the last resched */
+#endif
 };
 
 DECLARE_PER_CPU(struct rcu_data, rcu_data);
index 0cf8146..8cf15a5 100644 (file)
@@ -67,7 +67,43 @@ DEFINE_PER_CPU(struct rcu_data, rcu_bh_data) = { 0L };
 
 /* Fake initialization required by compiler */
 static DEFINE_PER_CPU(struct tasklet_struct, rcu_tasklet) = {NULL};
-static int maxbatch = 10000;
+static int blimit = 10;
+static int qhimark = 10000;
+static int qlowmark = 100;
+#ifdef CONFIG_SMP
+static int rsinterval = 1000;
+#endif
+
+static atomic_t rcu_barrier_cpu_count;
+static struct semaphore rcu_barrier_sema;
+static struct completion rcu_barrier_completion;
+
+#ifdef CONFIG_SMP
+static void force_quiescent_state(struct rcu_data *rdp,
+                       struct rcu_ctrlblk *rcp)
+{
+       int cpu;
+       cpumask_t cpumask;
+       set_need_resched();
+       if (unlikely(rdp->qlen - rdp->last_rs_qlen > rsinterval)) {
+               rdp->last_rs_qlen = rdp->qlen;
+               /*
+                * Don't send IPI to itself. With irqs disabled,
+                * rdp->cpu is the current cpu.
+                */
+               cpumask = rcp->cpumask;
+               cpu_clear(rdp->cpu, cpumask);
+               for_each_cpu_mask(cpu, cpumask)
+                       smp_send_reschedule(cpu);
+       }
+}
+#else
+static inline void force_quiescent_state(struct rcu_data *rdp,
+                       struct rcu_ctrlblk *rcp)
+{
+       set_need_resched();
+}
+#endif
 
 /**
  * call_rcu - Queue an RCU callback for invocation after a grace period.
@@ -92,17 +128,13 @@ void fastcall call_rcu(struct rcu_head *head,
        rdp = &__get_cpu_var(rcu_data);
        *rdp->nxttail = head;
        rdp->nxttail = &head->next;
-
-       if (unlikely(++rdp->count > 10000))
-               set_need_resched();
-
+       if (unlikely(++rdp->qlen > qhimark)) {
+               rdp->blimit = INT_MAX;
+               force_quiescent_state(rdp, &rcu_ctrlblk);
+       }
        local_irq_restore(flags);
 }
 
-static atomic_t rcu_barrier_cpu_count;
-static struct semaphore rcu_barrier_sema;
-static struct completion rcu_barrier_completion;
-
 /**
  * call_rcu_bh - Queue an RCU for invocation after a quicker grace period.
  * @head: structure to be used for queueing the RCU updates.
@@ -131,12 +163,12 @@ void fastcall call_rcu_bh(struct rcu_head *head,
        rdp = &__get_cpu_var(rcu_bh_data);
        *rdp->nxttail = head;
        rdp->nxttail = &head->next;
-       rdp->count++;
-/*
- *  Should we directly call rcu_do_batch() here ?
- *  if (unlikely(rdp->count > 10000))
- *      rcu_do_batch(rdp);
- */
+
+       if (unlikely(++rdp->qlen > qhimark)) {
+               rdp->blimit = INT_MAX;
+               force_quiescent_state(rdp, &rcu_bh_ctrlblk);
+       }
+
        local_irq_restore(flags);
 }
 
@@ -199,10 +231,12 @@ static void rcu_do_batch(struct rcu_data *rdp)
                next = rdp->donelist = list->next;
                list->func(list);
                list = next;
-               rdp->count--;
-               if (++count >= maxbatch)
+               rdp->qlen--;
+               if (++count >= rdp->blimit)
                        break;
        }
+       if (rdp->blimit == INT_MAX && rdp->qlen <= qlowmark)
+               rdp->blimit = blimit;
        if (!rdp->donelist)
                rdp->donetail = &rdp->donelist;
        else
@@ -473,6 +507,7 @@ static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
        rdp->quiescbatch = rcp->completed;
        rdp->qs_pending = 0;
        rdp->cpu = cpu;
+       rdp->blimit = blimit;
 }
 
 static void __devinit rcu_online_cpu(int cpu)
@@ -567,7 +602,12 @@ void synchronize_kernel(void)
        synchronize_rcu();
 }
 
-module_param(maxbatch, int, 0);
+module_param(blimit, int, 0);
+module_param(qhimark, int, 0);
+module_param(qlowmark, int, 0);
+#ifdef CONFIG_SMP
+module_param(rsinterval, int, 0);
+#endif
 EXPORT_SYMBOL_GPL(rcu_batches_completed);
 EXPORT_SYMBOL(call_rcu);  /* WARNING: GPL-only in April 2006. */
 EXPORT_SYMBOL(call_rcu_bh);  /* WARNING: GPL-only in April 2006. */