ipv6: Don't drop cache route entry unless timer actually expired.
[safe/jmp/linux-2.6] / net / rds / ib_recv.c
index 5061b55..04dc0d3 100644 (file)
@@ -143,15 +143,16 @@ static int rds_ib_recv_refill_one(struct rds_connection *conn,
        int ret = -ENOMEM;
 
        if (recv->r_ibinc == NULL) {
-               if (atomic_read(&rds_ib_allocation) >= rds_ib_sysctl_max_recv_allocation) {
+               if (!atomic_add_unless(&rds_ib_allocation, 1, rds_ib_sysctl_max_recv_allocation)) {
                        rds_ib_stats_inc(s_ib_rx_alloc_limit);
                        goto out;
                }
                recv->r_ibinc = kmem_cache_alloc(rds_ib_incoming_slab,
                                                 kptr_gfp);
-               if (recv->r_ibinc == NULL)
+               if (recv->r_ibinc == NULL) {
+                       atomic_dec(&rds_ib_allocation);
                        goto out;
-               atomic_inc(&rds_ib_allocation);
+               }
                INIT_LIST_HEAD(&recv->r_ibinc->ii_frags);
                rds_inc_init(&recv->r_ibinc->ii_inc, conn, conn->c_faddr);
        }
@@ -229,8 +230,8 @@ int rds_ib_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
        int ret = 0;
        u32 pos;
 
-       while ((prefill || rds_conn_up(conn))
-                       && rds_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
+       while ((prefill || rds_conn_up(conn)) &&
+              rds_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
                if (pos >= ic->i_recv_ring.w_nr) {
                        printk(KERN_NOTICE "Argh - ring alloc returned pos=%u\n",
                                        pos);
@@ -395,10 +396,37 @@ void rds_ib_recv_init_ack(struct rds_ib_connection *ic)
  * room for it beyond the ring size.  Send completion notices its special
  * wr_id and avoids working with the ring in that case.
  */
+#ifndef KERNEL_HAS_ATOMIC64
+static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq,
+                               int ack_required)
+{
+       unsigned long flags;
+
+       spin_lock_irqsave(&ic->i_ack_lock, flags);
+       ic->i_ack_next = seq;
+       if (ack_required)
+               set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
+       spin_unlock_irqrestore(&ic->i_ack_lock, flags);
+}
+
+static u64 rds_ib_get_ack(struct rds_ib_connection *ic)
+{
+       unsigned long flags;
+       u64 seq;
+
+       clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
+
+       spin_lock_irqsave(&ic->i_ack_lock, flags);
+       seq = ic->i_ack_next;
+       spin_unlock_irqrestore(&ic->i_ack_lock, flags);
+
+       return seq;
+}
+#else
 static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq,
                                int ack_required)
 {
-       rds_ib_set_64bit(&ic->i_ack_next, seq);
+       atomic64_set(&ic->i_ack_next, seq);
        if (ack_required) {
                smp_mb__before_clear_bit();
                set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
@@ -410,8 +438,10 @@ static u64 rds_ib_get_ack(struct rds_ib_connection *ic)
        clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
        smp_mb__after_clear_bit();
 
-       return ic->i_ack_next;
+       return atomic64_read(&ic->i_ack_next);
 }
+#endif
+
 
 static void rds_ib_send_ack(struct rds_ib_connection *ic, unsigned int adv_credits)
 {
@@ -464,6 +494,10 @@ static void rds_ib_send_ack(struct rds_ib_connection *ic, unsigned int adv_credi
  *  -  i_ack_next, which is the last sequence number we received
  *
  * Potentially, send queue and receive queue handlers can run concurrently.
+ * It would be nice to not have to use a spinlock to synchronize things,
+ * but the one problem that rules this out is that 64bit updates are
+ * not atomic on all platforms. Things would be a lot simpler if
+ * we had atomic64 or maybe cmpxchg64 everywhere.
  *
  * Reconnecting complicates this picture just slightly. When we
  * reconnect, we may be seeing duplicate packets. The peer
@@ -491,7 +525,7 @@ void rds_ib_attempt_ack(struct rds_ib_connection *ic)
        }
 
        /* Can we get a send credit? */
-       if (!rds_ib_send_grab_credits(ic, 1, &adv_credits, 0)) {
+       if (!rds_ib_send_grab_credits(ic, 1, &adv_credits, 0, RDS_MAX_ADV_CREDIT)) {
                rds_ib_stats_inc(s_ib_tx_throttle);
                clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
                return;
@@ -522,6 +556,47 @@ u64 rds_ib_piggyb_ack(struct rds_ib_connection *ic)
        return rds_ib_get_ack(ic);
 }
 
+static struct rds_header *rds_ib_get_header(struct rds_connection *conn,
+                                           struct rds_ib_recv_work *recv,
+                                           u32 data_len)
+{
+       struct rds_ib_connection *ic = conn->c_transport_data;
+       void *hdr_buff = &ic->i_recv_hdrs[recv - ic->i_recvs];
+       void *addr;
+       u32 misplaced_hdr_bytes;
+
+       /*
+        * Support header at the front (RDS 3.1+) as well as header-at-end.
+        *
+        * Cases:
+        * 1) header all in header buff (great!)
+        * 2) header all in data page (copy all to header buff)
+        * 3) header split across hdr buf + data page
+        *    (move bit in hdr buff to end before copying other bit from data page)
+        */
+       if (conn->c_version > RDS_PROTOCOL_3_0 || data_len == RDS_FRAG_SIZE)
+               return hdr_buff;
+
+       if (data_len <= (RDS_FRAG_SIZE - sizeof(struct rds_header))) {
+               addr = kmap_atomic(recv->r_frag->f_page, KM_SOFTIRQ0);
+               memcpy(hdr_buff,
+                      addr + recv->r_frag->f_offset + data_len,
+                      sizeof(struct rds_header));
+               kunmap_atomic(addr, KM_SOFTIRQ0);
+               return hdr_buff;
+       }
+
+       misplaced_hdr_bytes = (sizeof(struct rds_header) - (RDS_FRAG_SIZE - data_len));
+
+       memmove(hdr_buff + misplaced_hdr_bytes, hdr_buff, misplaced_hdr_bytes);
+
+       addr = kmap_atomic(recv->r_frag->f_page, KM_SOFTIRQ0);
+       memcpy(hdr_buff, addr + recv->r_frag->f_offset + data_len,
+              sizeof(struct rds_header) - misplaced_hdr_bytes);
+       kunmap_atomic(addr, KM_SOFTIRQ0);
+       return hdr_buff;
+}
+
 /*
  * It's kind of lame that we're copying from the posted receive pages into
  * long-lived bitmaps.  We could have posted the bitmaps and rdma written into
@@ -612,7 +687,7 @@ struct rds_ib_ack_state {
 };
 
 static void rds_ib_process_recv(struct rds_connection *conn,
-                               struct rds_ib_recv_work *recv, u32 byte_len,
+                               struct rds_ib_recv_work *recv, u32 data_len,
                                struct rds_ib_ack_state *state)
 {
        struct rds_ib_connection *ic = conn->c_transport_data;
@@ -622,9 +697,9 @@ static void rds_ib_process_recv(struct rds_connection *conn,
        /* XXX shut down the connection if port 0,0 are seen? */
 
        rdsdebug("ic %p ibinc %p recv %p byte len %u\n", ic, ibinc, recv,
-                byte_len);
+                data_len);
 
-       if (byte_len < sizeof(struct rds_header)) {
+       if (data_len < sizeof(struct rds_header)) {
                rds_ib_conn_error(conn, "incoming message "
                       "from %pI4 didn't inclue a "
                       "header, disconnecting and "
@@ -632,9 +707,9 @@ static void rds_ib_process_recv(struct rds_connection *conn,
                       &conn->c_faddr);
                return;
        }
-       byte_len -= sizeof(struct rds_header);
+       data_len -= sizeof(struct rds_header);
 
-       ihdr = &ic->i_recv_hdrs[recv - ic->i_recvs];
+       ihdr = rds_ib_get_header(conn, recv, data_len);
 
        /* Validate the checksum. */
        if (!rds_message_verify_checksum(ihdr)) {
@@ -654,7 +729,7 @@ static void rds_ib_process_recv(struct rds_connection *conn,
        if (ihdr->h_credit)
                rds_ib_send_add_credits(conn, ihdr->h_credit);
 
-       if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && byte_len == 0) {
+       if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && data_len == 0) {
                /* This is an ACK-only packet. The fact that it gets
                 * special treatment here is that historically, ACKs
                 * were rather special beasts.
@@ -696,10 +771,10 @@ static void rds_ib_process_recv(struct rds_connection *conn,
                hdr = &ibinc->ii_inc.i_hdr;
                /* We can't just use memcmp here; fragments of a
                 * single message may carry different ACKs */
-               if (hdr->h_sequence != ihdr->h_sequence
-                || hdr->h_len != ihdr->h_len
-                || hdr->h_sport != ihdr->h_sport
-                || hdr->h_dport != ihdr->h_dport) {
+               if (hdr->h_sequence != ihdr->h_sequence ||
+                   hdr->h_len != ihdr->h_len ||
+                   hdr->h_sport != ihdr->h_sport ||
+                   hdr->h_dport != ihdr->h_dport) {
                        rds_ib_conn_error(conn,
                                "fragment header mismatch; forcing reconnect\n");
                        return;
@@ -750,17 +825,22 @@ void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context)
 {
        struct rds_connection *conn = context;
        struct rds_ib_connection *ic = conn->c_transport_data;
-       struct ib_wc wc;
-       struct rds_ib_ack_state state = { 0, };
-       struct rds_ib_recv_work *recv;
 
        rdsdebug("conn %p cq %p\n", conn, cq);
 
        rds_ib_stats_inc(s_ib_rx_cq_call);
 
-       ib_req_notify_cq(cq, IB_CQ_SOLICITED);
+       tasklet_schedule(&ic->i_recv_tasklet);
+}
+
+static inline void rds_poll_cq(struct rds_ib_connection *ic,
+                              struct rds_ib_ack_state *state)
+{
+       struct rds_connection *conn = ic->conn;
+       struct ib_wc wc;
+       struct rds_ib_recv_work *recv;
 
-       while (ib_poll_cq(cq, 1, &wc) > 0) {
+       while (ib_poll_cq(ic->i_recv_cq, 1, &wc) > 0) {
                rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
                         (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
                         be32_to_cpu(wc.ex.imm_data));
@@ -778,7 +858,7 @@ void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context)
                if (rds_conn_up(conn) || rds_conn_connecting(conn)) {
                        /* We expect errors as the qp is drained during shutdown */
                        if (wc.status == IB_WC_SUCCESS) {
-                               rds_ib_process_recv(conn, recv, wc.byte_len, &state);
+                               rds_ib_process_recv(conn, recv, wc.byte_len, state);
                        } else {
                                rds_ib_conn_error(conn, "recv completion on "
                                       "%pI4 had status %u, disconnecting and "
@@ -789,6 +869,17 @@ void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context)
 
                rds_ib_ring_free(&ic->i_recv_ring, 1);
        }
+}
+
+void rds_ib_recv_tasklet_fn(unsigned long data)
+{
+       struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
+       struct rds_connection *conn = ic->conn;
+       struct rds_ib_ack_state state = { 0, };
+
+       rds_poll_cq(ic, &state);
+       ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
+       rds_poll_cq(ic, &state);
 
        if (state.ack_next_valid)
                rds_ib_set_ack(ic, state.ack_next, state.ack_required);