afs: add missing up_write() on return
[safe/jmp/linux-2.6] / fs / afs / rxrpc.c
index b927742..bde3f19 100644 (file)
@@ -17,6 +17,8 @@
 
 static struct socket *afs_socket; /* my RxRPC socket */
 static struct workqueue_struct *afs_async_calls;
+static atomic_t afs_outstanding_calls;
+static atomic_t afs_outstanding_skbs;
 
 static void afs_wake_up_call_waiter(struct afs_call *);
 static int afs_wait_for_call_to_complete(struct afs_call *);
@@ -45,6 +47,7 @@ static const struct afs_wait_mode afs_async_incoming_call = {
 
 /* asynchronous incoming call initial processing */
 static const struct afs_call_type afs_RXCMxxxx = {
+       .name           = "CB.xxxx",
        .deliver        = afs_deliver_cm_op_id,
        .abort_to_error = afs_abort_to_error,
 };
@@ -118,10 +121,67 @@ void afs_close_socket(void)
 
        _debug("dework");
        destroy_workqueue(afs_async_calls);
+
+       ASSERTCMP(atomic_read(&afs_outstanding_skbs), ==, 0);
+       ASSERTCMP(atomic_read(&afs_outstanding_calls), ==, 0);
        _leave("");
 }
 
 /*
+ * note that the data in a socket buffer is now delivered and that the buffer
+ * should be freed
+ */
+static void afs_data_delivered(struct sk_buff *skb)
+{
+       if (!skb) {
+               _debug("DLVR NULL [%d]", atomic_read(&afs_outstanding_skbs));
+               dump_stack();
+       } else {
+               _debug("DLVR %p{%u} [%d]",
+                      skb, skb->mark, atomic_read(&afs_outstanding_skbs));
+               if (atomic_dec_return(&afs_outstanding_skbs) == -1)
+                       BUG();
+               rxrpc_kernel_data_delivered(skb);
+       }
+}
+
+/*
+ * free a socket buffer
+ */
+static void afs_free_skb(struct sk_buff *skb)
+{
+       if (!skb) {
+               _debug("FREE NULL [%d]", atomic_read(&afs_outstanding_skbs));
+               dump_stack();
+       } else {
+               _debug("FREE %p{%u} [%d]",
+                      skb, skb->mark, atomic_read(&afs_outstanding_skbs));
+               if (atomic_dec_return(&afs_outstanding_skbs) == -1)
+                       BUG();
+               rxrpc_kernel_free_skb(skb);
+       }
+}
+
+/*
+ * free a call
+ */
+static void afs_free_call(struct afs_call *call)
+{
+       _debug("DONE %p{%s} [%d]",
+              call, call->type->name, atomic_read(&afs_outstanding_calls));
+       if (atomic_dec_return(&afs_outstanding_calls) == -1)
+               BUG();
+
+       ASSERTCMP(call->rxcall, ==, NULL);
+       ASSERT(!work_pending(&call->async_work));
+       ASSERT(skb_queue_empty(&call->rx_queue));
+       ASSERT(call->type->name != NULL);
+
+       kfree(call->request);
+       kfree(call);
+}
+
+/*
  * allocate a call with flat request and reply buffers
  */
 struct afs_call *afs_alloc_flat_call(const struct afs_call_type *type,
@@ -133,30 +193,32 @@ struct afs_call *afs_alloc_flat_call(const struct afs_call_type *type,
        if (!call)
                goto nomem_call;
 
+       _debug("CALL %p{%s} [%d]",
+              call, type->name, atomic_read(&afs_outstanding_calls));
+       atomic_inc(&afs_outstanding_calls);
+
+       call->type = type;
+       call->request_size = request_size;
+       call->reply_max = reply_size;
+
        if (request_size) {
                call->request = kmalloc(request_size, GFP_NOFS);
                if (!call->request)
-                       goto nomem_request;
+                       goto nomem_free;
        }
 
        if (reply_size) {
                call->buffer = kmalloc(reply_size, GFP_NOFS);
                if (!call->buffer)
-                       goto nomem_buffer;
+                       goto nomem_free;
        }
 
-       call->type = type;
-       call->request_size = request_size;
-       call->reply_max = reply_size;
-
        init_waitqueue_head(&call->waitq);
        skb_queue_head_init(&call->rx_queue);
        return call;
 
-nomem_buffer:
-       kfree(call->request);
-nomem_request:
-       kfree(call);
+nomem_free:
+       afs_free_call(call);
 nomem_call:
        return NULL;
 }
@@ -175,6 +237,71 @@ void afs_flat_call_destructor(struct afs_call *call)
 }
 
 /*
+ * attach the data from a bunch of pages on an inode to a call
+ */
+static int afs_send_pages(struct afs_call *call, struct msghdr *msg,
+                         struct kvec *iov)
+{
+       struct page *pages[8];
+       unsigned count, n, loop, offset, to;
+       pgoff_t first = call->first, last = call->last;
+       int ret;
+
+       _enter("");
+
+       offset = call->first_offset;
+       call->first_offset = 0;
+
+       do {
+               _debug("attach %lx-%lx", first, last);
+
+               count = last - first + 1;
+               if (count > ARRAY_SIZE(pages))
+                       count = ARRAY_SIZE(pages);
+               n = find_get_pages_contig(call->mapping, first, count, pages);
+               ASSERTCMP(n, ==, count);
+
+               loop = 0;
+               do {
+                       msg->msg_flags = 0;
+                       to = PAGE_SIZE;
+                       if (first + loop >= last)
+                               to = call->last_to;
+                       else
+                               msg->msg_flags = MSG_MORE;
+                       iov->iov_base = kmap(pages[loop]) + offset;
+                       iov->iov_len = to - offset;
+                       offset = 0;
+
+                       _debug("- range %u-%u%s",
+                              offset, to, msg->msg_flags ? " [more]" : "");
+                       msg->msg_iov = (struct iovec *) iov;
+                       msg->msg_iovlen = 1;
+
+                       /* have to change the state *before* sending the last
+                        * packet as RxRPC might give us the reply before it
+                        * returns from sending the request */
+                       if (first + loop >= last)
+                               call->state = AFS_CALL_AWAIT_REPLY;
+                       ret = rxrpc_kernel_send_data(call->rxcall, msg,
+                                                    to - offset);
+                       kunmap(pages[loop]);
+                       if (ret < 0)
+                               break;
+               } while (++loop < count);
+               first += count;
+
+               for (loop = 0; loop < count; loop++)
+                       put_page(pages[loop]);
+               if (ret < 0)
+                       break;
+       } while (first <= last);
+
+       _leave(" = %d", ret);
+       return ret;
+}
+
+/*
  * initiate a call
  */
 int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
@@ -188,6 +315,13 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
 
        _enter("%x,{%d},", addr->s_addr, ntohs(call->port));
 
+       ASSERT(call->type != NULL);
+       ASSERT(call->type->name != NULL);
+
+       _debug("____MAKE %p{%s,%x} [%d]____",
+              call, call->type->name, key_serial(call->key),
+              atomic_read(&afs_outstanding_calls));
+
        call->wait_mode = wait_mode;
        INIT_WORK(&call->async_work, afs_process_async_call);
 
@@ -203,6 +337,7 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
        /* create a call */
        rxcall = rxrpc_kernel_begin_call(afs_socket, &srx, call->key,
                                         (unsigned long) call, gfp);
+       call->key = NULL;
        if (IS_ERR(rxcall)) {
                ret = PTR_ERR(rxcall);
                goto error_kill_call;
@@ -220,16 +355,23 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
        msg.msg_iovlen          = 1;
        msg.msg_control         = NULL;
        msg.msg_controllen      = 0;
-       msg.msg_flags           = 0;
+       msg.msg_flags           = (call->send_pages ? MSG_MORE : 0);
 
        /* have to change the state *before* sending the last packet as RxRPC
         * might give us the reply before it returns from sending the
         * request */
-       call->state = AFS_CALL_AWAIT_REPLY;
+       if (!call->send_pages)
+               call->state = AFS_CALL_AWAIT_REPLY;
        ret = rxrpc_kernel_send_data(rxcall, &msg, call->request_size);
        if (ret < 0)
                goto error_do_abort;
 
+       if (call->send_pages) {
+               ret = afs_send_pages(call, &msg, iov);
+               if (ret < 0)
+                       goto error_do_abort;
+       }
+
        /* at this point, an async call may no longer exist as it may have
         * already completed */
        return wait_mode->wait(call);
@@ -237,10 +379,10 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
 error_do_abort:
        rxrpc_kernel_abort_call(rxcall, RX_USER_ABORT);
        rxrpc_kernel_end_call(rxcall);
+       call->rxcall = NULL;
 error_kill_call:
        call->type->destructor(call);
-       ASSERT(skb_queue_empty(&call->rx_queue));
-       kfree(call);
+       afs_free_call(call);
        _leave(" = %d", ret);
        return ret;
 }
@@ -257,15 +399,19 @@ static void afs_rx_interceptor(struct sock *sk, unsigned long user_call_ID,
 
        _enter("%p,,%u", call, skb->mark);
 
+       _debug("ICPT %p{%u} [%d]",
+              skb, skb->mark, atomic_read(&afs_outstanding_skbs));
+
        ASSERTCMP(sk, ==, afs_socket->sk);
+       atomic_inc(&afs_outstanding_skbs);
 
        if (!call) {
                /* its an incoming call for our callback service */
-               __skb_queue_tail(&afs_incoming_calls, skb);
+               skb_queue_tail(&afs_incoming_calls, skb);
                schedule_work(&afs_collect_incoming_call_work);
        } else {
                /* route the messages directly to the appropriate call */
-               __skb_queue_tail(&call->rx_queue, skb);
+               skb_queue_tail(&call->rx_queue, skb);
                call->wait_mode->rx_wakeup(call);
        }
 
@@ -317,9 +463,9 @@ static void afs_deliver_to_call(struct afs_call *call)
                                call->state = AFS_CALL_ERROR;
                                break;
                        }
-                       rxrpc_kernel_data_delivered(skb);
+                       afs_data_delivered(skb);
                        skb = NULL;
-                       break;
+                       continue;
                case RXRPC_SKB_MARK_FINAL_ACK:
                        _debug("Rcv ACK");
                        call->state = AFS_CALL_COMPLETE;
@@ -350,19 +496,19 @@ static void afs_deliver_to_call(struct afs_call *call)
                        break;
                }
 
-               rxrpc_kernel_free_skb(skb);
+               afs_free_skb(skb);
        }
 
        /* make sure the queue is empty if the call is done with (we might have
         * aborted the call early because of an unmarshalling error) */
        if (call->state >= AFS_CALL_COMPLETE) {
                while ((skb = skb_dequeue(&call->rx_queue)))
-                       rxrpc_kernel_free_skb(skb);
+                       afs_free_skb(skb);
                if (call->incoming) {
                        rxrpc_kernel_end_call(call->rxcall);
+                       call->rxcall = NULL;
                        call->type->destructor(call);
-                       ASSERT(skb_queue_empty(&call->rx_queue));
-                       kfree(call);
+                       afs_free_call(call);
                }
        }
 
@@ -409,14 +555,14 @@ static int afs_wait_for_call_to_complete(struct afs_call *call)
                _debug("call incomplete");
                rxrpc_kernel_abort_call(call->rxcall, RX_CALL_DEAD);
                while ((skb = skb_dequeue(&call->rx_queue)))
-                       rxrpc_kernel_free_skb(skb);
+                       afs_free_skb(skb);
        }
 
        _debug("call complete");
        rxrpc_kernel_end_call(call->rxcall);
+       call->rxcall = NULL;
        call->type->destructor(call);
-       ASSERT(skb_queue_empty(&call->rx_queue));
-       kfree(call);
+       afs_free_call(call);
        _leave(" = %d", ret);
        return ret;
 }
@@ -459,9 +605,7 @@ static void afs_delete_async_call(struct work_struct *work)
 
        _enter("");
 
-       ASSERT(skb_queue_empty(&call->rx_queue));
-       ASSERT(!work_pending(&call->async_work));
-       kfree(call);
+       afs_free_call(call);
 
        _leave("");
 }
@@ -489,6 +633,7 @@ static void afs_process_async_call(struct work_struct *work)
 
                /* kill the call */
                rxrpc_kernel_end_call(call->rxcall);
+               call->rxcall = NULL;
                if (call->type->destructor)
                        call->type->destructor(call);
 
@@ -526,7 +671,7 @@ static void afs_collect_incoming_call(struct work_struct *work)
                _debug("new call");
 
                /* don't need the notification */
-               rxrpc_kernel_free_skb(skb);
+               afs_free_skb(skb);
 
                if (!call) {
                        call = kzalloc(sizeof(struct afs_call), GFP_KERNEL);
@@ -541,6 +686,11 @@ static void afs_collect_incoming_call(struct work_struct *work)
                        init_waitqueue_head(&call->waitq);
                        skb_queue_head_init(&call->rx_queue);
                        call->state = AFS_CALL_AWAIT_OP_ID;
+
+                       _debug("CALL %p{%s} [%d]",
+                              call, call->type->name,
+                              atomic_read(&afs_outstanding_calls));
+                       atomic_inc(&afs_outstanding_calls);
                }
 
                rxcall = rxrpc_kernel_accept_call(afs_socket,
@@ -551,7 +701,8 @@ static void afs_collect_incoming_call(struct work_struct *work)
                }
        }
 
-       kfree(call);
+       if (call)
+               afs_free_call(call);
 }
 
 /*
@@ -629,14 +780,51 @@ void afs_send_empty_reply(struct afs_call *call)
                rxrpc_kernel_end_call(call->rxcall);
                call->rxcall = NULL;
                call->type->destructor(call);
-               ASSERT(skb_queue_empty(&call->rx_queue));
-               kfree(call);
+               afs_free_call(call);
                _leave(" [error]");
                return;
        }
 }
 
 /*
+ * send a simple reply
+ */
+void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
+{
+       struct msghdr msg;
+       struct iovec iov[1];
+       int n;
+
+       _enter("");
+
+       iov[0].iov_base         = (void *) buf;
+       iov[0].iov_len          = len;
+       msg.msg_name            = NULL;
+       msg.msg_namelen         = 0;
+       msg.msg_iov             = iov;
+       msg.msg_iovlen          = 1;
+       msg.msg_control         = NULL;
+       msg.msg_controllen      = 0;
+       msg.msg_flags           = 0;
+
+       call->state = AFS_CALL_AWAIT_ACK;
+       n = rxrpc_kernel_send_data(call->rxcall, &msg, len);
+       if (n >= 0) {
+               _leave(" [replied]");
+               return;
+       }
+       if (n == -ENOMEM) {
+               _debug("oom");
+               rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT);
+       }
+       rxrpc_kernel_end_call(call->rxcall);
+       call->rxcall = NULL;
+       call->type->destructor(call);
+       afs_free_call(call);
+       _leave(" [error]");
+}
+
+/*
  * extract a piece of data from the received data socket buffers
  */
 int afs_extract_data(struct afs_call *call, struct sk_buff *skb,
@@ -656,7 +844,7 @@ int afs_extract_data(struct afs_call *call, struct sk_buff *skb,
 
        if (call->offset < count) {
                if (last) {
-                       _leave(" = -EBADMSG [%d < %lu]", call->offset, count);
+                       _leave(" = -EBADMSG [%d < %zu]", call->offset, count);
                        return -EBADMSG;
                }
                _leave(" = -EAGAIN");