[PATCH] drivers/md/kcopyd.c: #if 0 kcopyd_cancel()
[safe/jmp/linux-2.6] / drivers / md / kcopyd.c
1 /*
2  * Copyright (C) 2002 Sistina Software (UK) Limited.
3  *
4  * This file is released under the GPL.
5  *
6  * Kcopyd provides a simple interface for copying an area of one
7  * block-device to one or more other block-devices, with an asynchronous
8  * completion notification.
9  */
10
11 #include <asm/atomic.h>
12
13 #include <linux/blkdev.h>
14 #include <linux/config.h>
15 #include <linux/fs.h>
16 #include <linux/init.h>
17 #include <linux/list.h>
18 #include <linux/mempool.h>
19 #include <linux/module.h>
20 #include <linux/pagemap.h>
21 #include <linux/slab.h>
22 #include <linux/vmalloc.h>
23 #include <linux/workqueue.h>
24
25 #include "kcopyd.h"
26
27 static struct workqueue_struct *_kcopyd_wq;
28 static struct work_struct _kcopyd_work;
29
30 static inline void wake(void)
31 {
32         queue_work(_kcopyd_wq, &_kcopyd_work);
33 }
34
35 /*-----------------------------------------------------------------
36  * Each kcopyd client has its own little pool of preallocated
37  * pages for kcopyd io.
38  *---------------------------------------------------------------*/
39 struct kcopyd_client {
40         struct list_head list;
41
42         spinlock_t lock;
43         struct page_list *pages;
44         unsigned int nr_pages;
45         unsigned int nr_free_pages;
46 };
47
48 static struct page_list *alloc_pl(void)
49 {
50         struct page_list *pl;
51
52         pl = kmalloc(sizeof(*pl), GFP_KERNEL);
53         if (!pl)
54                 return NULL;
55
56         pl->page = alloc_page(GFP_KERNEL);
57         if (!pl->page) {
58                 kfree(pl);
59                 return NULL;
60         }
61
62         return pl;
63 }
64
65 static void free_pl(struct page_list *pl)
66 {
67         __free_page(pl->page);
68         kfree(pl);
69 }
70
71 static int kcopyd_get_pages(struct kcopyd_client *kc,
72                             unsigned int nr, struct page_list **pages)
73 {
74         struct page_list *pl;
75
76         spin_lock(&kc->lock);
77         if (kc->nr_free_pages < nr) {
78                 spin_unlock(&kc->lock);
79                 return -ENOMEM;
80         }
81
82         kc->nr_free_pages -= nr;
83         for (*pages = pl = kc->pages; --nr; pl = pl->next)
84                 ;
85
86         kc->pages = pl->next;
87         pl->next = NULL;
88
89         spin_unlock(&kc->lock);
90
91         return 0;
92 }
93
94 static void kcopyd_put_pages(struct kcopyd_client *kc, struct page_list *pl)
95 {
96         struct page_list *cursor;
97
98         spin_lock(&kc->lock);
99         for (cursor = pl; cursor->next; cursor = cursor->next)
100                 kc->nr_free_pages++;
101
102         kc->nr_free_pages++;
103         cursor->next = kc->pages;
104         kc->pages = pl;
105         spin_unlock(&kc->lock);
106 }
107
108 /*
109  * These three functions resize the page pool.
110  */
111 static void drop_pages(struct page_list *pl)
112 {
113         struct page_list *next;
114
115         while (pl) {
116                 next = pl->next;
117                 free_pl(pl);
118                 pl = next;
119         }
120 }
121
122 static int client_alloc_pages(struct kcopyd_client *kc, unsigned int nr)
123 {
124         unsigned int i;
125         struct page_list *pl = NULL, *next;
126
127         for (i = 0; i < nr; i++) {
128                 next = alloc_pl();
129                 if (!next) {
130                         if (pl)
131                                 drop_pages(pl);
132                         return -ENOMEM;
133                 }
134                 next->next = pl;
135                 pl = next;
136         }
137
138         kcopyd_put_pages(kc, pl);
139         kc->nr_pages += nr;
140         return 0;
141 }
142
143 static void client_free_pages(struct kcopyd_client *kc)
144 {
145         BUG_ON(kc->nr_free_pages != kc->nr_pages);
146         drop_pages(kc->pages);
147         kc->pages = NULL;
148         kc->nr_free_pages = kc->nr_pages = 0;
149 }
150
151 /*-----------------------------------------------------------------
152  * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
153  * for this reason we use a mempool to prevent the client from
154  * ever having to do io (which could cause a deadlock).
155  *---------------------------------------------------------------*/
156 struct kcopyd_job {
157         struct kcopyd_client *kc;
158         struct list_head list;
159         unsigned long flags;
160
161         /*
162          * Error state of the job.
163          */
164         int read_err;
165         unsigned int write_err;
166
167         /*
168          * Either READ or WRITE
169          */
170         int rw;
171         struct io_region source;
172
173         /*
174          * The destinations for the transfer.
175          */
176         unsigned int num_dests;
177         struct io_region dests[KCOPYD_MAX_REGIONS];
178
179         sector_t offset;
180         unsigned int nr_pages;
181         struct page_list *pages;
182
183         /*
184          * Set this to ensure you are notified when the job has
185          * completed.  'context' is for callback to use.
186          */
187         kcopyd_notify_fn fn;
188         void *context;
189
190         /*
191          * These fields are only used if the job has been split
192          * into more manageable parts.
193          */
194         struct semaphore lock;
195         atomic_t sub_jobs;
196         sector_t progress;
197 };
198
199 /* FIXME: this should scale with the number of pages */
200 #define MIN_JOBS 512
201
202 static kmem_cache_t *_job_cache;
203 static mempool_t *_job_pool;
204
205 /*
206  * We maintain three lists of jobs:
207  *
208  * i)   jobs waiting for pages
209  * ii)  jobs that have pages, and are waiting for the io to be issued.
210  * iii) jobs that have completed.
211  *
212  * All three of these are protected by job_lock.
213  */
214 static DEFINE_SPINLOCK(_job_lock);
215
216 static LIST_HEAD(_complete_jobs);
217 static LIST_HEAD(_io_jobs);
218 static LIST_HEAD(_pages_jobs);
219
220 static int jobs_init(void)
221 {
222         _job_cache = kmem_cache_create("kcopyd-jobs",
223                                        sizeof(struct kcopyd_job),
224                                        __alignof__(struct kcopyd_job),
225                                        0, NULL, NULL);
226         if (!_job_cache)
227                 return -ENOMEM;
228
229         _job_pool = mempool_create(MIN_JOBS, mempool_alloc_slab,
230                                    mempool_free_slab, _job_cache);
231         if (!_job_pool) {
232                 kmem_cache_destroy(_job_cache);
233                 return -ENOMEM;
234         }
235
236         return 0;
237 }
238
239 static void jobs_exit(void)
240 {
241         BUG_ON(!list_empty(&_complete_jobs));
242         BUG_ON(!list_empty(&_io_jobs));
243         BUG_ON(!list_empty(&_pages_jobs));
244
245         mempool_destroy(_job_pool);
246         kmem_cache_destroy(_job_cache);
247         _job_pool = NULL;
248         _job_cache = NULL;
249 }
250
251 /*
252  * Functions to push and pop a job onto the head of a given job
253  * list.
254  */
255 static inline struct kcopyd_job *pop(struct list_head *jobs)
256 {
257         struct kcopyd_job *job = NULL;
258         unsigned long flags;
259
260         spin_lock_irqsave(&_job_lock, flags);
261
262         if (!list_empty(jobs)) {
263                 job = list_entry(jobs->next, struct kcopyd_job, list);
264                 list_del(&job->list);
265         }
266         spin_unlock_irqrestore(&_job_lock, flags);
267
268         return job;
269 }
270
271 static inline void push(struct list_head *jobs, struct kcopyd_job *job)
272 {
273         unsigned long flags;
274
275         spin_lock_irqsave(&_job_lock, flags);
276         list_add_tail(&job->list, jobs);
277         spin_unlock_irqrestore(&_job_lock, flags);
278 }
279
280 /*
281  * These three functions process 1 item from the corresponding
282  * job list.
283  *
284  * They return:
285  * < 0: error
286  *   0: success
287  * > 0: can't process yet.
288  */
289 static int run_complete_job(struct kcopyd_job *job)
290 {
291         void *context = job->context;
292         int read_err = job->read_err;
293         unsigned int write_err = job->write_err;
294         kcopyd_notify_fn fn = job->fn;
295
296         kcopyd_put_pages(job->kc, job->pages);
297         mempool_free(job, _job_pool);
298         fn(read_err, write_err, context);
299         return 0;
300 }
301
302 static void complete_io(unsigned long error, void *context)
303 {
304         struct kcopyd_job *job = (struct kcopyd_job *) context;
305
306         if (error) {
307                 if (job->rw == WRITE)
308                         job->write_err &= error;
309                 else
310                         job->read_err = 1;
311
312                 if (!test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) {
313                         push(&_complete_jobs, job);
314                         wake();
315                         return;
316                 }
317         }
318
319         if (job->rw == WRITE)
320                 push(&_complete_jobs, job);
321
322         else {
323                 job->rw = WRITE;
324                 push(&_io_jobs, job);
325         }
326
327         wake();
328 }
329
330 /*
331  * Request io on as many buffer heads as we can currently get for
332  * a particular job.
333  */
334 static int run_io_job(struct kcopyd_job *job)
335 {
336         int r;
337
338         if (job->rw == READ)
339                 r = dm_io_async(1, &job->source, job->rw,
340                                 job->pages,
341                                 job->offset, complete_io, job);
342
343         else
344                 r = dm_io_async(job->num_dests, job->dests, job->rw,
345                                 job->pages,
346                                 job->offset, complete_io, job);
347
348         return r;
349 }
350
351 static int run_pages_job(struct kcopyd_job *job)
352 {
353         int r;
354
355         job->nr_pages = dm_div_up(job->dests[0].count + job->offset,
356                                   PAGE_SIZE >> 9);
357         r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages);
358         if (!r) {
359                 /* this job is ready for io */
360                 push(&_io_jobs, job);
361                 return 0;
362         }
363
364         if (r == -ENOMEM)
365                 /* can't complete now */
366                 return 1;
367
368         return r;
369 }
370
371 /*
372  * Run through a list for as long as possible.  Returns the count
373  * of successful jobs.
374  */
375 static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *))
376 {
377         struct kcopyd_job *job;
378         int r, count = 0;
379
380         while ((job = pop(jobs))) {
381
382                 r = fn(job);
383
384                 if (r < 0) {
385                         /* error this rogue job */
386                         if (job->rw == WRITE)
387                                 job->write_err = (unsigned int) -1;
388                         else
389                                 job->read_err = 1;
390                         push(&_complete_jobs, job);
391                         break;
392                 }
393
394                 if (r > 0) {
395                         /*
396                          * We couldn't service this job ATM, so
397                          * push this job back onto the list.
398                          */
399                         push(jobs, job);
400                         break;
401                 }
402
403                 count++;
404         }
405
406         return count;
407 }
408
409 /*
410  * kcopyd does this every time it's woken up.
411  */
412 static void do_work(void *ignored)
413 {
414         /*
415          * The order that these are called is *very* important.
416          * complete jobs can free some pages for pages jobs.
417          * Pages jobs when successful will jump onto the io jobs
418          * list.  io jobs call wake when they complete and it all
419          * starts again.
420          */
421         process_jobs(&_complete_jobs, run_complete_job);
422         process_jobs(&_pages_jobs, run_pages_job);
423         process_jobs(&_io_jobs, run_io_job);
424 }
425
426 /*
427  * If we are copying a small region we just dispatch a single job
428  * to do the copy, otherwise the io has to be split up into many
429  * jobs.
430  */
431 static void dispatch_job(struct kcopyd_job *job)
432 {
433         push(&_pages_jobs, job);
434         wake();
435 }
436
437 #define SUB_JOB_SIZE 128
438 static void segment_complete(int read_err,
439                              unsigned int write_err, void *context)
440 {
441         /* FIXME: tidy this function */
442         sector_t progress = 0;
443         sector_t count = 0;
444         struct kcopyd_job *job = (struct kcopyd_job *) context;
445
446         down(&job->lock);
447
448         /* update the error */
449         if (read_err)
450                 job->read_err = 1;
451
452         if (write_err)
453                 job->write_err &= write_err;
454
455         /*
456          * Only dispatch more work if there hasn't been an error.
457          */
458         if ((!job->read_err && !job->write_err) ||
459             test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) {
460                 /* get the next chunk of work */
461                 progress = job->progress;
462                 count = job->source.count - progress;
463                 if (count) {
464                         if (count > SUB_JOB_SIZE)
465                                 count = SUB_JOB_SIZE;
466
467                         job->progress += count;
468                 }
469         }
470         up(&job->lock);
471
472         if (count) {
473                 int i;
474                 struct kcopyd_job *sub_job = mempool_alloc(_job_pool, GFP_NOIO);
475
476                 *sub_job = *job;
477                 sub_job->source.sector += progress;
478                 sub_job->source.count = count;
479
480                 for (i = 0; i < job->num_dests; i++) {
481                         sub_job->dests[i].sector += progress;
482                         sub_job->dests[i].count = count;
483                 }
484
485                 sub_job->fn = segment_complete;
486                 sub_job->context = job;
487                 dispatch_job(sub_job);
488
489         } else if (atomic_dec_and_test(&job->sub_jobs)) {
490
491                 /*
492                  * To avoid a race we must keep the job around
493                  * until after the notify function has completed.
494                  * Otherwise the client may try and stop the job
495                  * after we've completed.
496                  */
497                 job->fn(read_err, write_err, job->context);
498                 mempool_free(job, _job_pool);
499         }
500 }
501
502 /*
503  * Create some little jobs that will do the move between
504  * them.
505  */
506 #define SPLIT_COUNT 8
507 static void split_job(struct kcopyd_job *job)
508 {
509         int i;
510
511         atomic_set(&job->sub_jobs, SPLIT_COUNT);
512         for (i = 0; i < SPLIT_COUNT; i++)
513                 segment_complete(0, 0u, job);
514 }
515
516 int kcopyd_copy(struct kcopyd_client *kc, struct io_region *from,
517                 unsigned int num_dests, struct io_region *dests,
518                 unsigned int flags, kcopyd_notify_fn fn, void *context)
519 {
520         struct kcopyd_job *job;
521
522         /*
523          * Allocate a new job.
524          */
525         job = mempool_alloc(_job_pool, GFP_NOIO);
526
527         /*
528          * set up for the read.
529          */
530         job->kc = kc;
531         job->flags = flags;
532         job->read_err = 0;
533         job->write_err = 0;
534         job->rw = READ;
535
536         job->source = *from;
537
538         job->num_dests = num_dests;
539         memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
540
541         job->offset = 0;
542         job->nr_pages = 0;
543         job->pages = NULL;
544
545         job->fn = fn;
546         job->context = context;
547
548         if (job->source.count < SUB_JOB_SIZE)
549                 dispatch_job(job);
550
551         else {
552                 init_MUTEX(&job->lock);
553                 job->progress = 0;
554                 split_job(job);
555         }
556
557         return 0;
558 }
559
560 /*
561  * Cancels a kcopyd job, eg. someone might be deactivating a
562  * mirror.
563  */
564 #if 0
565 int kcopyd_cancel(struct kcopyd_job *job, int block)
566 {
567         /* FIXME: finish */
568         return -1;
569 }
570 #endif  /*  0  */
571
572 /*-----------------------------------------------------------------
573  * Unit setup
574  *---------------------------------------------------------------*/
575 static DECLARE_MUTEX(_client_lock);
576 static LIST_HEAD(_clients);
577
578 static void client_add(struct kcopyd_client *kc)
579 {
580         down(&_client_lock);
581         list_add(&kc->list, &_clients);
582         up(&_client_lock);
583 }
584
585 static void client_del(struct kcopyd_client *kc)
586 {
587         down(&_client_lock);
588         list_del(&kc->list);
589         up(&_client_lock);
590 }
591
592 static DECLARE_MUTEX(kcopyd_init_lock);
593 static int kcopyd_clients = 0;
594
595 static int kcopyd_init(void)
596 {
597         int r;
598
599         down(&kcopyd_init_lock);
600
601         if (kcopyd_clients) {
602                 /* Already initialized. */
603                 kcopyd_clients++;
604                 up(&kcopyd_init_lock);
605                 return 0;
606         }
607
608         r = jobs_init();
609         if (r) {
610                 up(&kcopyd_init_lock);
611                 return r;
612         }
613
614         _kcopyd_wq = create_singlethread_workqueue("kcopyd");
615         if (!_kcopyd_wq) {
616                 jobs_exit();
617                 up(&kcopyd_init_lock);
618                 return -ENOMEM;
619         }
620
621         kcopyd_clients++;
622         INIT_WORK(&_kcopyd_work, do_work, NULL);
623         up(&kcopyd_init_lock);
624         return 0;
625 }
626
627 static void kcopyd_exit(void)
628 {
629         down(&kcopyd_init_lock);
630         kcopyd_clients--;
631         if (!kcopyd_clients) {
632                 jobs_exit();
633                 destroy_workqueue(_kcopyd_wq);
634                 _kcopyd_wq = NULL;
635         }
636         up(&kcopyd_init_lock);
637 }
638
639 int kcopyd_client_create(unsigned int nr_pages, struct kcopyd_client **result)
640 {
641         int r = 0;
642         struct kcopyd_client *kc;
643
644         r = kcopyd_init();
645         if (r)
646                 return r;
647
648         kc = kmalloc(sizeof(*kc), GFP_KERNEL);
649         if (!kc) {
650                 kcopyd_exit();
651                 return -ENOMEM;
652         }
653
654         spin_lock_init(&kc->lock);
655         kc->pages = NULL;
656         kc->nr_pages = kc->nr_free_pages = 0;
657         r = client_alloc_pages(kc, nr_pages);
658         if (r) {
659                 kfree(kc);
660                 kcopyd_exit();
661                 return r;
662         }
663
664         r = dm_io_get(nr_pages);
665         if (r) {
666                 client_free_pages(kc);
667                 kfree(kc);
668                 kcopyd_exit();
669                 return r;
670         }
671
672         client_add(kc);
673         *result = kc;
674         return 0;
675 }
676
677 void kcopyd_client_destroy(struct kcopyd_client *kc)
678 {
679         dm_io_put(kc->nr_pages);
680         client_free_pages(kc);
681         client_del(kc);
682         kfree(kc);
683         kcopyd_exit();
684 }
685
686 EXPORT_SYMBOL(kcopyd_client_create);
687 EXPORT_SYMBOL(kcopyd_client_destroy);
688 EXPORT_SYMBOL(kcopyd_copy);