/*
* Copyright (C) 2002 Sistina Software (UK) Limited.
+ * Copyright (C) 2006 Red Hat GmbH
*
* This file is released under the GPL.
*
#include <asm/atomic.h>
#include <linux/blkdev.h>
-#include <linux/config.h>
#include <linux/fs.h>
#include <linux/init.h>
#include <linux/list.h>
#include <linux/slab.h>
#include <linux/vmalloc.h>
#include <linux/workqueue.h>
+#include <linux/mutex.h>
#include "kcopyd.h"
static struct workqueue_struct *_kcopyd_wq;
static struct work_struct _kcopyd_work;
-static inline void wake(void)
+static void wake(void)
{
queue_work(_kcopyd_wq, &_kcopyd_work);
}
struct page_list *pages;
unsigned int nr_pages;
unsigned int nr_free_pages;
+
+ struct dm_io_client *io_client;
+
+ wait_queue_head_t destroyq;
+ atomic_t nr_jobs;
};
static struct page_list *alloc_pl(void)
/* FIXME: this should scale with the number of pages */
#define MIN_JOBS 512
-static kmem_cache_t *_job_cache;
+static struct kmem_cache *_job_cache;
static mempool_t *_job_pool;
/*
static int jobs_init(void)
{
- _job_cache = kmem_cache_create("kcopyd-jobs",
- sizeof(struct kcopyd_job),
- __alignof__(struct kcopyd_job),
- 0, NULL, NULL);
+ _job_cache = KMEM_CACHE(kcopyd_job, 0);
if (!_job_cache)
return -ENOMEM;
* Functions to push and pop a job onto the head of a given job
* list.
*/
-static inline struct kcopyd_job *pop(struct list_head *jobs)
+static struct kcopyd_job *pop(struct list_head *jobs)
{
struct kcopyd_job *job = NULL;
unsigned long flags;
return job;
}
-static inline void push(struct list_head *jobs, struct kcopyd_job *job)
+static void push(struct list_head *jobs, struct kcopyd_job *job)
{
unsigned long flags;
int read_err = job->read_err;
unsigned int write_err = job->write_err;
kcopyd_notify_fn fn = job->fn;
+ struct kcopyd_client *kc = job->kc;
- kcopyd_put_pages(job->kc, job->pages);
+ kcopyd_put_pages(kc, job->pages);
mempool_free(job, _job_pool);
fn(read_err, write_err, context);
+
+ if (atomic_dec_and_test(&kc->nr_jobs))
+ wake_up(&kc->destroyq);
+
return 0;
}
if (error) {
if (job->rw == WRITE)
- job->write_err &= error;
+ job->write_err |= error;
else
job->read_err = 1;
static int run_io_job(struct kcopyd_job *job)
{
int r;
+ struct dm_io_request io_req = {
+ .bi_rw = job->rw,
+ .mem.type = DM_IO_PAGE_LIST,
+ .mem.ptr.pl = job->pages,
+ .mem.offset = job->offset,
+ .notify.fn = complete_io,
+ .notify.context = job,
+ .client = job->kc->io_client,
+ };
if (job->rw == READ)
- r = dm_io_async(1, &job->source, job->rw,
- job->pages,
- job->offset, complete_io, job);
-
+ r = dm_io(&io_req, 1, &job->source, NULL);
else
- r = dm_io_async(job->num_dests, job->dests, job->rw,
- job->pages,
- job->offset, complete_io, job);
+ r = dm_io(&io_req, job->num_dests, job->dests, NULL);
return r;
}
/*
* kcopyd does this every time it's woken up.
*/
-static void do_work(void *ignored)
+static void do_work(struct work_struct *ignored)
{
/*
* The order that these are called is *very* important.
*/
static void dispatch_job(struct kcopyd_job *job)
{
+ atomic_inc(&job->kc->nr_jobs);
push(&_pages_jobs, job);
wake();
}
job->read_err = 1;
if (write_err)
- job->write_err &= write_err;
+ job->write_err |= write_err;
/*
* Only dispatch more work if there hasn't been an error.
/*-----------------------------------------------------------------
* Unit setup
*---------------------------------------------------------------*/
-static DECLARE_MUTEX(_client_lock);
+static DEFINE_MUTEX(_client_lock);
static LIST_HEAD(_clients);
static void client_add(struct kcopyd_client *kc)
{
- down(&_client_lock);
+ mutex_lock(&_client_lock);
list_add(&kc->list, &_clients);
- up(&_client_lock);
+ mutex_unlock(&_client_lock);
}
static void client_del(struct kcopyd_client *kc)
{
- down(&_client_lock);
+ mutex_lock(&_client_lock);
list_del(&kc->list);
- up(&_client_lock);
+ mutex_unlock(&_client_lock);
}
static DEFINE_MUTEX(kcopyd_init_lock);
}
kcopyd_clients++;
- INIT_WORK(&_kcopyd_work, do_work, NULL);
+ INIT_WORK(&_kcopyd_work, do_work);
mutex_unlock(&kcopyd_init_lock);
return 0;
}
return r;
}
- r = dm_io_get(nr_pages);
- if (r) {
+ kc->io_client = dm_io_client_create(nr_pages);
+ if (IS_ERR(kc->io_client)) {
+ r = PTR_ERR(kc->io_client);
client_free_pages(kc);
kfree(kc);
kcopyd_exit();
return r;
}
+ init_waitqueue_head(&kc->destroyq);
+ atomic_set(&kc->nr_jobs, 0);
+
client_add(kc);
*result = kc;
return 0;
void kcopyd_client_destroy(struct kcopyd_client *kc)
{
- dm_io_put(kc->nr_pages);
+ /* Wait for completion of all jobs submitted by this client. */
+ wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
+
+ dm_io_client_destroy(kc->io_client);
client_free_pages(kc);
client_del(kc);
kfree(kc);