RDS: Use spinlock to protect 64b value update on 32b archs
authorAndy Grover <andy.grover@oracle.com>
Wed, 1 Apr 2009 08:20:20 +0000 (08:20 +0000)
committerDavid S. Miller <davem@davemloft.net>
Thu, 2 Apr 2009 07:52:22 +0000 (00:52 -0700)
We have a 64bit value that needs to be set atomically.
This is easy and quick on all 64bit archs, and can also be done
on x86/32 with set_64bit() (uses cmpxchg8b). However other
32b archs don't have this.

I actually changed this to the current state in preparation for
mainline because the old way (using a spinlock on 32b) resulted in
unsightly #ifdefs in the code. But obviously, being correct takes
precedence.

Signed-off-by: Andy Grover <andy.grover@oracle.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/rds/ib.h
net/rds/ib_cm.c
net/rds/ib_recv.c
net/rds/iw.h
net/rds/iw_cm.c
net/rds/iw_recv.c
net/rds/rds.h

index c08ffff..069206c 100644 (file)
@@ -108,7 +108,12 @@ struct rds_ib_connection {
 
        /* sending acks */
        unsigned long           i_ack_flags;
+#ifdef KERNEL_HAS_ATOMIC64
+       atomic64_t              i_ack_next;     /* next ACK to send */
+#else
+       spinlock_t              i_ack_lock;     /* protect i_ack_next */
        u64                     i_ack_next;     /* next ACK to send */
+#endif
        struct rds_header       *i_ack;
        struct ib_send_wr       i_ack_wr;
        struct ib_sge           i_ack_sge;
@@ -363,13 +368,4 @@ rds_ib_data_sge(struct rds_ib_connection *ic, struct ib_sge *sge)
        return &sge[1];
 }
 
-static inline void rds_ib_set_64bit(u64 *ptr, u64 val)
-{
-#if BITS_PER_LONG == 64
-       *ptr = val;
-#else
-       set_64bit(ptr, val);
-#endif
-}
-
 #endif
index 889ab04..f8e40e1 100644 (file)
@@ -636,7 +636,11 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
 
        /* Clear the ACK state */
        clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
-       rds_ib_set_64bit(&ic->i_ack_next, 0);
+#ifdef KERNEL_HAS_ATOMIC64
+       atomic64_set(&ic->i_ack_next, 0);
+#else
+       ic->i_ack_next = 0;
+#endif
        ic->i_ack_recv = 0;
 
        /* Clear flow control state */
@@ -669,6 +673,9 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
 
        INIT_LIST_HEAD(&ic->ib_node);
        mutex_init(&ic->i_recv_mutex);
+#ifndef KERNEL_HAS_ATOMIC64
+       spin_lock_init(&ic->i_ack_lock);
+#endif
 
        /*
         * rds_ib_conn_shutdown() waits for these to be emptied so they
index 5061b55..36d9315 100644 (file)
@@ -395,10 +395,37 @@ void rds_ib_recv_init_ack(struct rds_ib_connection *ic)
  * room for it beyond the ring size.  Send completion notices its special
  * wr_id and avoids working with the ring in that case.
  */
+#ifndef KERNEL_HAS_ATOMIC64
 static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq,
                                int ack_required)
 {
-       rds_ib_set_64bit(&ic->i_ack_next, seq);
+       unsigned long flags;
+
+       spin_lock_irqsave(&ic->i_ack_lock, flags);
+       ic->i_ack_next = seq;
+       if (ack_required)
+               set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
+       spin_unlock_irqrestore(&ic->i_ack_lock, flags);
+}
+
+static u64 rds_ib_get_ack(struct rds_ib_connection *ic)
+{
+       unsigned long flags;
+       u64 seq;
+
+       clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
+
+       spin_lock_irqsave(&ic->i_ack_lock, flags);
+       seq = ic->i_ack_next;
+       spin_unlock_irqrestore(&ic->i_ack_lock, flags);
+
+       return seq;
+}
+#else
+static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq,
+                               int ack_required)
+{
+       atomic64_set(&ic->i_ack_next, seq);
        if (ack_required) {
                smp_mb__before_clear_bit();
                set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
@@ -410,8 +437,10 @@ static u64 rds_ib_get_ack(struct rds_ib_connection *ic)
        clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
        smp_mb__after_clear_bit();
 
-       return ic->i_ack_next;
+       return atomic64_read(&ic->i_ack_next);
 }
+#endif
+
 
 static void rds_ib_send_ack(struct rds_ib_connection *ic, unsigned int adv_credits)
 {
@@ -464,6 +493,10 @@ static void rds_ib_send_ack(struct rds_ib_connection *ic, unsigned int adv_credi
  *  -  i_ack_next, which is the last sequence number we received
  *
  * Potentially, send queue and receive queue handlers can run concurrently.
+ * It would be nice to not have to use a spinlock to synchronize things,
+ * but the one problem that rules this out is that 64bit updates are
+ * not atomic on all platforms. Things would be a lot simpler if
+ * we had atomic64 or maybe cmpxchg64 everywhere.
  *
  * Reconnecting complicates this picture just slightly. When we
  * reconnect, we may be seeing duplicate packets. The peer
index 70eb948..b4fb272 100644 (file)
@@ -131,7 +131,12 @@ struct rds_iw_connection {
 
        /* sending acks */
        unsigned long           i_ack_flags;
+#ifdef KERNEL_HAS_ATOMIC64
+       atomic64_t              i_ack_next;     /* next ACK to send */
+#else
+       spinlock_t              i_ack_lock;     /* protect i_ack_next */
        u64                     i_ack_next;     /* next ACK to send */
+#endif
        struct rds_header       *i_ack;
        struct ib_send_wr       i_ack_wr;
        struct ib_sge           i_ack_sge;
@@ -391,13 +396,4 @@ rds_iw_data_sge(struct rds_iw_connection *ic, struct ib_sge *sge)
        return &sge[1];
 }
 
-static inline void rds_iw_set_64bit(u64 *ptr, u64 val)
-{
-#if BITS_PER_LONG == 64
-       *ptr = val;
-#else
-       set_64bit(ptr, val);
-#endif
-}
-
 #endif
index 0ffaa3e..a416b0d 100644 (file)
@@ -659,7 +659,11 @@ void rds_iw_conn_shutdown(struct rds_connection *conn)
 
        /* Clear the ACK state */
        clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
-       rds_iw_set_64bit(&ic->i_ack_next, 0);
+#ifdef KERNEL_HAS_ATOMIC64
+       atomic64_set(&ic->i_ack_next, 0);
+#else
+       ic->i_ack_next = 0;
+#endif
        ic->i_ack_recv = 0;
 
        /* Clear flow control state */
@@ -693,6 +697,9 @@ int rds_iw_conn_alloc(struct rds_connection *conn, gfp_t gfp)
 
        INIT_LIST_HEAD(&ic->iw_node);
        mutex_init(&ic->i_recv_mutex);
+#ifndef KERNEL_HAS_ATOMIC64
+       spin_lock_init(&ic->i_ack_lock);
+#endif
 
        /*
         * rds_iw_conn_shutdown() waits for these to be emptied so they
index a1931f0..fde470f 100644 (file)
@@ -395,10 +395,37 @@ void rds_iw_recv_init_ack(struct rds_iw_connection *ic)
  * room for it beyond the ring size.  Send completion notices its special
  * wr_id and avoids working with the ring in that case.
  */
+#ifndef KERNEL_HAS_ATOMIC64
 static void rds_iw_set_ack(struct rds_iw_connection *ic, u64 seq,
                                int ack_required)
 {
-       rds_iw_set_64bit(&ic->i_ack_next, seq);
+       unsigned long flags;
+
+       spin_lock_irqsave(&ic->i_ack_lock, flags);
+       ic->i_ack_next = seq;
+       if (ack_required)
+               set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
+       spin_unlock_irqrestore(&ic->i_ack_lock, flags);
+}
+
+static u64 rds_iw_get_ack(struct rds_iw_connection *ic)
+{
+       unsigned long flags;
+       u64 seq;
+
+       clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
+
+       spin_lock_irqsave(&ic->i_ack_lock, flags);
+       seq = ic->i_ack_next;
+       spin_unlock_irqrestore(&ic->i_ack_lock, flags);
+
+       return seq;
+}
+#else
+static void rds_iw_set_ack(struct rds_iw_connection *ic, u64 seq,
+                               int ack_required)
+{
+       atomic64_set(&ic->i_ack_next, seq);
        if (ack_required) {
                smp_mb__before_clear_bit();
                set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
@@ -410,8 +437,10 @@ static u64 rds_iw_get_ack(struct rds_iw_connection *ic)
        clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
        smp_mb__after_clear_bit();
 
-       return ic->i_ack_next;
+       return atomic64_read(&ic->i_ack_next);
 }
+#endif
+
 
 static void rds_iw_send_ack(struct rds_iw_connection *ic, unsigned int adv_credits)
 {
@@ -464,6 +493,10 @@ static void rds_iw_send_ack(struct rds_iw_connection *ic, unsigned int adv_credi
  *  -  i_ack_next, which is the last sequence number we received
  *
  * Potentially, send queue and receive queue handlers can run concurrently.
+ * It would be nice to not have to use a spinlock to synchronize things,
+ * but the one problem that rules this out is that 64bit updates are
+ * not atomic on all platforms. Things would be a lot simpler if
+ * we had atomic64 or maybe cmpxchg64 everywhere.
  *
  * Reconnecting complicates this picture just slightly. When we
  * reconnect, we may be seeing duplicate packets. The peer
index 0604007..619f0a3 100644 (file)
  */
 #define RDS_PORT       18634
 
+#ifdef ATOMIC64_INIT
+#define KERNEL_HAS_ATOMIC64
+#endif
+
 #ifdef DEBUG
 #define rdsdebug(fmt, args...) pr_debug("%s(): " fmt, __func__ , ##args)
 #else