string: factorize skip_spaces and export it to be generally available
[safe/jmp/linux-2.6] / fs / fscache / operation.c
1 /* FS-Cache worker operation management routines
2  *
3  * Copyright (C) 2008 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells (dhowells@redhat.com)
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  *
11  * See Documentation/filesystems/caching/operations.txt
12  */
13
14 #define FSCACHE_DEBUG_LEVEL OPERATION
15 #include <linux/module.h>
16 #include <linux/seq_file.h>
17 #include "internal.h"
18
19 atomic_t fscache_op_debug_id;
20 EXPORT_SYMBOL(fscache_op_debug_id);
21
22 /**
23  * fscache_enqueue_operation - Enqueue an operation for processing
24  * @op: The operation to enqueue
25  *
26  * Enqueue an operation for processing by the FS-Cache thread pool.
27  *
28  * This will get its own ref on the object.
29  */
30 void fscache_enqueue_operation(struct fscache_operation *op)
31 {
32         _enter("{OBJ%x OP%x,%u}",
33                op->object->debug_id, op->debug_id, atomic_read(&op->usage));
34
35         fscache_set_op_state(op, "EnQ");
36
37         ASSERT(list_empty(&op->pend_link));
38         ASSERT(op->processor != NULL);
39         ASSERTCMP(op->object->state, >=, FSCACHE_OBJECT_AVAILABLE);
40         ASSERTCMP(atomic_read(&op->usage), >, 0);
41
42         fscache_stat(&fscache_n_op_enqueue);
43         switch (op->flags & FSCACHE_OP_TYPE) {
44         case FSCACHE_OP_FAST:
45                 _debug("queue fast");
46                 atomic_inc(&op->usage);
47                 if (!schedule_work(&op->fast_work))
48                         fscache_put_operation(op);
49                 break;
50         case FSCACHE_OP_SLOW:
51                 _debug("queue slow");
52                 slow_work_enqueue(&op->slow_work);
53                 break;
54         case FSCACHE_OP_MYTHREAD:
55                 _debug("queue for caller's attention");
56                 break;
57         default:
58                 printk(KERN_ERR "FS-Cache: Unexpected op type %lx",
59                        op->flags);
60                 BUG();
61                 break;
62         }
63 }
64 EXPORT_SYMBOL(fscache_enqueue_operation);
65
66 /*
67  * start an op running
68  */
69 static void fscache_run_op(struct fscache_object *object,
70                            struct fscache_operation *op)
71 {
72         fscache_set_op_state(op, "Run");
73
74         object->n_in_progress++;
75         if (test_and_clear_bit(FSCACHE_OP_WAITING, &op->flags))
76                 wake_up_bit(&op->flags, FSCACHE_OP_WAITING);
77         if (op->processor)
78                 fscache_enqueue_operation(op);
79         fscache_stat(&fscache_n_op_run);
80 }
81
82 /*
83  * submit an exclusive operation for an object
84  * - other ops are excluded from running simultaneously with this one
85  * - this gets any extra refs it needs on an op
86  */
87 int fscache_submit_exclusive_op(struct fscache_object *object,
88                                 struct fscache_operation *op)
89 {
90         int ret;
91
92         _enter("{OBJ%x OP%x},", object->debug_id, op->debug_id);
93
94         fscache_set_op_state(op, "SubmitX");
95
96         spin_lock(&object->lock);
97         ASSERTCMP(object->n_ops, >=, object->n_in_progress);
98         ASSERTCMP(object->n_ops, >=, object->n_exclusive);
99         ASSERT(list_empty(&op->pend_link));
100
101         ret = -ENOBUFS;
102         if (fscache_object_is_active(object)) {
103                 op->object = object;
104                 object->n_ops++;
105                 object->n_exclusive++;  /* reads and writes must wait */
106
107                 if (object->n_ops > 0) {
108                         atomic_inc(&op->usage);
109                         list_add_tail(&op->pend_link, &object->pending_ops);
110                         fscache_stat(&fscache_n_op_pend);
111                 } else if (!list_empty(&object->pending_ops)) {
112                         atomic_inc(&op->usage);
113                         list_add_tail(&op->pend_link, &object->pending_ops);
114                         fscache_stat(&fscache_n_op_pend);
115                         fscache_start_operations(object);
116                 } else {
117                         ASSERTCMP(object->n_in_progress, ==, 0);
118                         fscache_run_op(object, op);
119                 }
120
121                 /* need to issue a new write op after this */
122                 clear_bit(FSCACHE_OBJECT_PENDING_WRITE, &object->flags);
123                 ret = 0;
124         } else if (object->state == FSCACHE_OBJECT_CREATING) {
125                 op->object = object;
126                 object->n_ops++;
127                 object->n_exclusive++;  /* reads and writes must wait */
128                 atomic_inc(&op->usage);
129                 list_add_tail(&op->pend_link, &object->pending_ops);
130                 fscache_stat(&fscache_n_op_pend);
131                 ret = 0;
132         } else {
133                 /* not allowed to submit ops in any other state */
134                 BUG();
135         }
136
137         spin_unlock(&object->lock);
138         return ret;
139 }
140
141 /*
142  * report an unexpected submission
143  */
144 static void fscache_report_unexpected_submission(struct fscache_object *object,
145                                                  struct fscache_operation *op,
146                                                  unsigned long ostate)
147 {
148         static bool once_only;
149         struct fscache_operation *p;
150         unsigned n;
151
152         if (once_only)
153                 return;
154         once_only = true;
155
156         kdebug("unexpected submission OP%x [OBJ%x %s]",
157                op->debug_id, object->debug_id,
158                fscache_object_states[object->state]);
159         kdebug("objstate=%s [%s]",
160                fscache_object_states[object->state],
161                fscache_object_states[ostate]);
162         kdebug("objflags=%lx", object->flags);
163         kdebug("objevent=%lx [%lx]", object->events, object->event_mask);
164         kdebug("ops=%u inp=%u exc=%u",
165                object->n_ops, object->n_in_progress, object->n_exclusive);
166
167         if (!list_empty(&object->pending_ops)) {
168                 n = 0;
169                 list_for_each_entry(p, &object->pending_ops, pend_link) {
170                         ASSERTCMP(p->object, ==, object);
171                         kdebug("%p %p", op->processor, op->release);
172                         n++;
173                 }
174
175                 kdebug("n=%u", n);
176         }
177
178         dump_stack();
179 }
180
181 /*
182  * submit an operation for an object
183  * - objects may be submitted only in the following states:
184  *   - during object creation (write ops may be submitted)
185  *   - whilst the object is active
186  *   - after an I/O error incurred in one of the two above states (op rejected)
187  * - this gets any extra refs it needs on an op
188  */
189 int fscache_submit_op(struct fscache_object *object,
190                       struct fscache_operation *op)
191 {
192         unsigned long ostate;
193         int ret;
194
195         _enter("{OBJ%x OP%x},{%u}",
196                object->debug_id, op->debug_id, atomic_read(&op->usage));
197
198         ASSERTCMP(atomic_read(&op->usage), >, 0);
199
200         fscache_set_op_state(op, "Submit");
201
202         spin_lock(&object->lock);
203         ASSERTCMP(object->n_ops, >=, object->n_in_progress);
204         ASSERTCMP(object->n_ops, >=, object->n_exclusive);
205         ASSERT(list_empty(&op->pend_link));
206
207         ostate = object->state;
208         smp_rmb();
209
210         if (fscache_object_is_active(object)) {
211                 op->object = object;
212                 object->n_ops++;
213
214                 if (object->n_exclusive > 0) {
215                         atomic_inc(&op->usage);
216                         list_add_tail(&op->pend_link, &object->pending_ops);
217                         fscache_stat(&fscache_n_op_pend);
218                 } else if (!list_empty(&object->pending_ops)) {
219                         atomic_inc(&op->usage);
220                         list_add_tail(&op->pend_link, &object->pending_ops);
221                         fscache_stat(&fscache_n_op_pend);
222                         fscache_start_operations(object);
223                 } else {
224                         ASSERTCMP(object->n_exclusive, ==, 0);
225                         fscache_run_op(object, op);
226                 }
227                 ret = 0;
228         } else if (object->state == FSCACHE_OBJECT_CREATING) {
229                 op->object = object;
230                 object->n_ops++;
231                 atomic_inc(&op->usage);
232                 list_add_tail(&op->pend_link, &object->pending_ops);
233                 fscache_stat(&fscache_n_op_pend);
234                 ret = 0;
235         } else if (object->state == FSCACHE_OBJECT_DYING ||
236                    object->state == FSCACHE_OBJECT_LC_DYING ||
237                    object->state == FSCACHE_OBJECT_WITHDRAWING) {
238                 fscache_stat(&fscache_n_op_rejected);
239                 ret = -ENOBUFS;
240         } else if (!test_bit(FSCACHE_IOERROR, &object->cache->flags)) {
241                 fscache_report_unexpected_submission(object, op, ostate);
242                 ASSERT(!fscache_object_is_active(object));
243                 ret = -ENOBUFS;
244         } else {
245                 ret = -ENOBUFS;
246         }
247
248         spin_unlock(&object->lock);
249         return ret;
250 }
251
252 /*
253  * queue an object for withdrawal on error, aborting all following asynchronous
254  * operations
255  */
256 void fscache_abort_object(struct fscache_object *object)
257 {
258         _enter("{OBJ%x}", object->debug_id);
259
260         fscache_raise_event(object, FSCACHE_OBJECT_EV_ERROR);
261 }
262
263 /*
264  * jump start the operation processing on an object
265  * - caller must hold object->lock
266  */
267 void fscache_start_operations(struct fscache_object *object)
268 {
269         struct fscache_operation *op;
270         bool stop = false;
271
272         while (!list_empty(&object->pending_ops) && !stop) {
273                 op = list_entry(object->pending_ops.next,
274                                 struct fscache_operation, pend_link);
275
276                 if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags)) {
277                         if (object->n_in_progress > 0)
278                                 break;
279                         stop = true;
280                 }
281                 list_del_init(&op->pend_link);
282                 fscache_run_op(object, op);
283
284                 /* the pending queue was holding a ref on the object */
285                 fscache_put_operation(op);
286         }
287
288         ASSERTCMP(object->n_in_progress, <=, object->n_ops);
289
290         _debug("woke %d ops on OBJ%x",
291                object->n_in_progress, object->debug_id);
292 }
293
294 /*
295  * cancel an operation that's pending on an object
296  */
297 int fscache_cancel_op(struct fscache_operation *op)
298 {
299         struct fscache_object *object = op->object;
300         int ret;
301
302         _enter("OBJ%x OP%x}", op->object->debug_id, op->debug_id);
303
304         spin_lock(&object->lock);
305
306         ret = -EBUSY;
307         if (!list_empty(&op->pend_link)) {
308                 fscache_stat(&fscache_n_op_cancelled);
309                 list_del_init(&op->pend_link);
310                 object->n_ops--;
311                 if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags))
312                         object->n_exclusive--;
313                 if (test_and_clear_bit(FSCACHE_OP_WAITING, &op->flags))
314                         wake_up_bit(&op->flags, FSCACHE_OP_WAITING);
315                 fscache_put_operation(op);
316                 ret = 0;
317         }
318
319         spin_unlock(&object->lock);
320         _leave(" = %d", ret);
321         return ret;
322 }
323
324 /*
325  * release an operation
326  * - queues pending ops if this is the last in-progress op
327  */
328 void fscache_put_operation(struct fscache_operation *op)
329 {
330         struct fscache_object *object;
331         struct fscache_cache *cache;
332
333         _enter("{OBJ%x OP%x,%d}",
334                op->object->debug_id, op->debug_id, atomic_read(&op->usage));
335
336         ASSERTCMP(atomic_read(&op->usage), >, 0);
337
338         if (!atomic_dec_and_test(&op->usage))
339                 return;
340
341         fscache_set_op_state(op, "Put");
342
343         _debug("PUT OP");
344         if (test_and_set_bit(FSCACHE_OP_DEAD, &op->flags))
345                 BUG();
346
347         fscache_stat(&fscache_n_op_release);
348
349         if (op->release) {
350                 op->release(op);
351                 op->release = NULL;
352         }
353
354         object = op->object;
355
356         if (test_bit(FSCACHE_OP_DEC_READ_CNT, &op->flags))
357                 atomic_dec(&object->n_reads);
358
359         /* now... we may get called with the object spinlock held, so we
360          * complete the cleanup here only if we can immediately acquire the
361          * lock, and defer it otherwise */
362         if (!spin_trylock(&object->lock)) {
363                 _debug("defer put");
364                 fscache_stat(&fscache_n_op_deferred_release);
365
366                 cache = object->cache;
367                 spin_lock(&cache->op_gc_list_lock);
368                 list_add_tail(&op->pend_link, &cache->op_gc_list);
369                 spin_unlock(&cache->op_gc_list_lock);
370                 schedule_work(&cache->op_gc);
371                 _leave(" [defer]");
372                 return;
373         }
374
375         if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags)) {
376                 ASSERTCMP(object->n_exclusive, >, 0);
377                 object->n_exclusive--;
378         }
379
380         ASSERTCMP(object->n_in_progress, >, 0);
381         object->n_in_progress--;
382         if (object->n_in_progress == 0)
383                 fscache_start_operations(object);
384
385         ASSERTCMP(object->n_ops, >, 0);
386         object->n_ops--;
387         if (object->n_ops == 0)
388                 fscache_raise_event(object, FSCACHE_OBJECT_EV_CLEARED);
389
390         spin_unlock(&object->lock);
391
392         kfree(op);
393         _leave(" [done]");
394 }
395 EXPORT_SYMBOL(fscache_put_operation);
396
397 /*
398  * garbage collect operations that have had their release deferred
399  */
400 void fscache_operation_gc(struct work_struct *work)
401 {
402         struct fscache_operation *op;
403         struct fscache_object *object;
404         struct fscache_cache *cache =
405                 container_of(work, struct fscache_cache, op_gc);
406         int count = 0;
407
408         _enter("");
409
410         do {
411                 spin_lock(&cache->op_gc_list_lock);
412                 if (list_empty(&cache->op_gc_list)) {
413                         spin_unlock(&cache->op_gc_list_lock);
414                         break;
415                 }
416
417                 op = list_entry(cache->op_gc_list.next,
418                                 struct fscache_operation, pend_link);
419                 list_del(&op->pend_link);
420                 spin_unlock(&cache->op_gc_list_lock);
421
422                 object = op->object;
423
424                 _debug("GC DEFERRED REL OBJ%x OP%x",
425                        object->debug_id, op->debug_id);
426                 fscache_stat(&fscache_n_op_gc);
427
428                 ASSERTCMP(atomic_read(&op->usage), ==, 0);
429
430                 spin_lock(&object->lock);
431                 if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags)) {
432                         ASSERTCMP(object->n_exclusive, >, 0);
433                         object->n_exclusive--;
434                 }
435
436                 ASSERTCMP(object->n_in_progress, >, 0);
437                 object->n_in_progress--;
438                 if (object->n_in_progress == 0)
439                         fscache_start_operations(object);
440
441                 ASSERTCMP(object->n_ops, >, 0);
442                 object->n_ops--;
443                 if (object->n_ops == 0)
444                         fscache_raise_event(object, FSCACHE_OBJECT_EV_CLEARED);
445
446                 spin_unlock(&object->lock);
447
448         } while (count++ < 20);
449
450         if (!list_empty(&cache->op_gc_list))
451                 schedule_work(&cache->op_gc);
452
453         _leave("");
454 }
455
456 /*
457  * allow the slow work item processor to get a ref on an operation
458  */
459 static int fscache_op_get_ref(struct slow_work *work)
460 {
461         struct fscache_operation *op =
462                 container_of(work, struct fscache_operation, slow_work);
463
464         atomic_inc(&op->usage);
465         return 0;
466 }
467
468 /*
469  * allow the slow work item processor to discard a ref on an operation
470  */
471 static void fscache_op_put_ref(struct slow_work *work)
472 {
473         struct fscache_operation *op =
474                 container_of(work, struct fscache_operation, slow_work);
475
476         fscache_put_operation(op);
477 }
478
479 /*
480  * execute an operation using the slow thread pool to provide processing context
481  * - the caller holds a ref to this object, so we don't need to hold one
482  */
483 static void fscache_op_execute(struct slow_work *work)
484 {
485         struct fscache_operation *op =
486                 container_of(work, struct fscache_operation, slow_work);
487         unsigned long start;
488
489         _enter("{OBJ%x OP%x,%d}",
490                op->object->debug_id, op->debug_id, atomic_read(&op->usage));
491
492         ASSERT(op->processor != NULL);
493         start = jiffies;
494         op->processor(op);
495         fscache_hist(fscache_ops_histogram, start);
496
497         _leave("");
498 }
499
500 /*
501  * describe an operation for slow-work debugging
502  */
503 #ifdef CONFIG_SLOW_WORK_PROC
504 static void fscache_op_desc(struct slow_work *work, struct seq_file *m)
505 {
506         struct fscache_operation *op =
507                 container_of(work, struct fscache_operation, slow_work);
508
509         seq_printf(m, "FSC: OBJ%x OP%x: %s/%s fl=%lx",
510                    op->object->debug_id, op->debug_id,
511                    op->name, op->state, op->flags);
512 }
513 #endif
514
515 const struct slow_work_ops fscache_op_slow_work_ops = {
516         .owner          = THIS_MODULE,
517         .get_ref        = fscache_op_get_ref,
518         .put_ref        = fscache_op_put_ref,
519         .execute        = fscache_op_execute,
520 #ifdef CONFIG_SLOW_WORK_PROC
521         .desc           = fscache_op_desc,
522 #endif
523 };