#include <linux/slab.h>
#include <linux/mempool.h>
#include <linux/smp.h>
-#include <linux/smp_lock.h>
#include <linux/spinlock.h>
#include <linux/mutex.h>
#include <linux/sunrpc/clnt.h>
+#include "sunrpc.h"
+
#ifdef RPC_DEBUG
#define RPCDBG_FACILITY RPCDBG_SCHED
#define RPC_TASK_MAGIC_ID 0xf00baa
{
__rpc_init_priority_wait_queue(queue, qname, RPC_NR_PRIORITY);
}
+EXPORT_SYMBOL_GPL(rpc_init_priority_wait_queue);
void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
{
rpc_clear_queued(task);
if (rpc_test_and_set_running(task))
return;
- /* We might have raced */
- if (RPC_IS_QUEUED(task)) {
- rpc_clear_running(task);
- return;
- }
if (RPC_IS_ASYNC(task)) {
int status;
*/
static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
{
- if (!RPC_IS_QUEUED(task) || task->tk_waitqueue != queue)
- return;
- if (rpc_start_wakeup(task)) {
- __rpc_do_wake_up_task(queue, task);
- rpc_finish_wakeup(task);
- }
+ if (RPC_IS_QUEUED(task) && task->tk_waitqueue == queue)
+ __rpc_do_wake_up_task(queue, task);
}
/*
+ * Tests whether rpc queue is empty
+ */
+int rpc_queue_empty(struct rpc_wait_queue *queue)
+{
+ int res;
+
+ spin_lock_bh(&queue->lock);
+ res = queue->qlen;
+ spin_unlock_bh(&queue->lock);
+ return (res == 0);
+}
+EXPORT_SYMBOL_GPL(rpc_queue_empty);
+
+/*
* Wake up a task on a specific queue
*/
void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
{
- rcu_read_lock_bh();
- spin_lock(&queue->lock);
+ spin_lock_bh(&queue->lock);
rpc_wake_up_task_queue_locked(queue, task);
- spin_unlock(&queue->lock);
- rcu_read_unlock_bh();
+ spin_unlock_bh(&queue->lock);
}
EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
dprintk("RPC: wake_up_next(%p \"%s\")\n",
queue, rpc_qname(queue));
- rcu_read_lock_bh();
- spin_lock(&queue->lock);
+ spin_lock_bh(&queue->lock);
if (RPC_IS_PRIORITY(queue))
task = __rpc_wake_up_next_priority(queue);
else {
task_for_first(task, &queue->tasks[0])
rpc_wake_up_task_queue_locked(queue, task);
}
- spin_unlock(&queue->lock);
- rcu_read_unlock_bh();
+ spin_unlock_bh(&queue->lock);
return task;
}
struct rpc_task *task, *next;
struct list_head *head;
- rcu_read_lock_bh();
- spin_lock(&queue->lock);
+ spin_lock_bh(&queue->lock);
head = &queue->tasks[queue->maxpriority];
for (;;) {
list_for_each_entry_safe(task, next, head, u.tk_wait.list)
break;
head--;
}
- spin_unlock(&queue->lock);
- rcu_read_unlock_bh();
+ spin_unlock_bh(&queue->lock);
}
EXPORT_SYMBOL_GPL(rpc_wake_up);
struct rpc_task *task, *next;
struct list_head *head;
- rcu_read_lock_bh();
- spin_lock(&queue->lock);
+ spin_lock_bh(&queue->lock);
head = &queue->tasks[queue->maxpriority];
for (;;) {
list_for_each_entry_safe(task, next, head, u.tk_wait.list) {
break;
head--;
}
- spin_unlock(&queue->lock);
- rcu_read_unlock_bh();
+ spin_unlock_bh(&queue->lock);
}
EXPORT_SYMBOL_GPL(rpc_wake_up_status);
/*
* Helper to call task->tk_ops->rpc_call_prepare
*/
-static void rpc_prepare_task(struct rpc_task *task)
+void rpc_prepare_task(struct rpc_task *task)
{
- lock_kernel();
task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
- unlock_kernel();
}
/*
{
task->tk_action = NULL;
if (task->tk_ops->rpc_call_done != NULL) {
- lock_kernel();
task->tk_ops->rpc_call_done(task, task->tk_calldata);
- unlock_kernel();
if (task->tk_action != NULL) {
WARN_ON(RPC_ASSASSINATED(task));
/* Always release the RPC slot and buffer memory */
void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
{
- if (ops->rpc_release != NULL) {
- lock_kernel();
+ if (ops->rpc_release != NULL)
ops->rpc_release(calldata);
- unlock_kernel();
- }
}
/*
*/
static void __rpc_execute(struct rpc_task *task)
{
- int status = 0;
+ struct rpc_wait_queue *queue;
+ int task_is_async = RPC_IS_ASYNC(task);
+ int status = 0;
dprintk("RPC: %5u __rpc_execute flags=0x%x\n",
task->tk_pid, task->tk_flags);
/*
* Execute any pending callback.
*/
- if (RPC_DO_CALLBACK(task)) {
- /* Define a callback save pointer */
+ if (task->tk_callback) {
void (*save_callback)(struct rpc_task *);
/*
- * If a callback exists, save it, reset it,
- * call it.
- * The save is needed to stop from resetting
- * another callback set within the callback handler
- * - Dave
+ * We set tk_callback to NULL before calling it,
+ * in case it sets the tk_callback field itself:
*/
- save_callback=task->tk_callback;
- task->tk_callback=NULL;
+ save_callback = task->tk_callback;
+ task->tk_callback = NULL;
save_callback(task);
}
*/
if (!RPC_IS_QUEUED(task))
continue;
- rpc_clear_running(task);
- if (RPC_IS_ASYNC(task)) {
- /* Careful! we may have raced... */
- if (RPC_IS_QUEUED(task))
- return;
- if (rpc_test_and_set_running(task))
- return;
+ /*
+ * The queue->lock protects against races with
+ * rpc_make_runnable().
+ *
+ * Note that once we clear RPC_TASK_RUNNING on an asynchronous
+ * rpc_task, rpc_make_runnable() can assign it to a
+ * different workqueue. We therefore cannot assume that the
+ * rpc_task pointer may still be dereferenced.
+ */
+ queue = task->tk_waitqueue;
+ spin_lock_bh(&queue->lock);
+ if (!RPC_IS_QUEUED(task)) {
+ spin_unlock_bh(&queue->lock);
continue;
}
+ rpc_clear_running(task);
+ spin_unlock_bh(&queue->lock);
+ if (task_is_async)
+ return;
/* sync task: sleep here */
dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid);
__rpc_execute(container_of(work, struct rpc_task, u.tk_work));
}
-struct rpc_buffer {
- size_t len;
- char data[];
-};
-
/**
* rpc_malloc - allocate an RPC buffer
* @task: RPC task that will use this buffer
task->tk_action = rpc_prepare_task;
if (task_setup_data->rpc_message != NULL) {
- memcpy(&task->tk_msg, task_setup_data->rpc_message, sizeof(task->tk_msg));
+ task->tk_msg.rpc_proc = task_setup_data->rpc_message->rpc_proc;
+ task->tk_msg.rpc_argp = task_setup_data->rpc_message->rpc_argp;
+ task->tk_msg.rpc_resp = task_setup_data->rpc_message->rpc_resp;
/* Bind the user cred */
- if (task->tk_msg.rpc_cred != NULL)
- rpcauth_holdcred(task);
- else
- rpcauth_bindcred(task);
+ rpcauth_bindcred(task, task_setup_data->rpc_message->rpc_cred, task_setup_data->flags);
if (task->tk_action == NULL)
rpc_call_start(task);
}
return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
}
-static void rpc_free_task_rcu(struct rcu_head *rcu)
-{
- struct rpc_task *task = container_of(rcu, struct rpc_task, u.tk_rcu);
- dprintk("RPC: %5u freeing task\n", task->tk_pid);
- mempool_free(task, rpc_task_mempool);
-}
-
/*
* Create a new task for the specified client.
*/
const struct rpc_call_ops *tk_ops = task->tk_ops;
void *calldata = task->tk_calldata;
- if (task->tk_flags & RPC_TASK_DYNAMIC)
- call_rcu_bh(&task->u.tk_rcu, rpc_free_task_rcu);
+ if (task->tk_flags & RPC_TASK_DYNAMIC) {
+ dprintk("RPC: %5u freeing task\n", task->tk_pid);
+ mempool_free(task, rpc_task_mempool);
+ }
rpc_release_calldata(tk_ops, calldata);
}