ceph: message pools
[safe/jmp/linux-2.6] / fs / ceph / msgpool.c
1 #include "ceph_debug.h"
2
3 #include <linux/err.h>
4 #include <linux/sched.h>
5 #include <linux/types.h>
6 #include <linux/vmalloc.h>
7
8 #include "msgpool.h"
9
10 /*
11  * We use msg pools to preallocate memory for messages we expect to
12  * receive over the wire, to avoid getting ourselves into OOM
13  * conditions at unexpected times.  We take use a few different
14  * strategies:
15  *
16  *  - for request/response type interactions, we preallocate the
17  * memory needed for the response when we generate the request.
18  *
19  *  - for messages we can receive at any time from the MDS, we preallocate
20  * a pool of messages we can re-use.
21  *
22  *  - for writeback, we preallocate some number of messages to use for
23  * requests and their replies, so that we always make forward
24  * progress.
25  *
26  * The msgpool behaves like a mempool_t, but keeps preallocated
27  * ceph_msgs strung together on a list_head instead of using a pointer
28  * vector.  This avoids vector reallocation when we adjust the number
29  * of preallocated items (which happens frequently).
30  */
31
32
33 /*
34  * Allocate or release as necessary to meet our target pool size.
35  */
36 static int __fill_msgpool(struct ceph_msgpool *pool)
37 {
38         struct ceph_msg *msg;
39
40         while (pool->num < pool->min) {
41                 dout("fill_msgpool %p %d/%d allocating\n", pool, pool->num,
42                      pool->min);
43                 spin_unlock(&pool->lock);
44                 msg = ceph_msg_new(0, pool->front_len, 0, 0, NULL);
45                 spin_lock(&pool->lock);
46                 if (IS_ERR(msg))
47                         return PTR_ERR(msg);
48                 msg->pool = pool;
49                 list_add(&msg->list_head, &pool->msgs);
50                 pool->num++;
51         }
52         while (pool->num > pool->min) {
53                 msg = list_first_entry(&pool->msgs, struct ceph_msg, list_head);
54                 dout("fill_msgpool %p %d/%d releasing %p\n", pool, pool->num,
55                      pool->min, msg);
56                 list_del_init(&msg->list_head);
57                 pool->num--;
58                 ceph_msg_kfree(msg);
59         }
60         return 0;
61 }
62
63 int ceph_msgpool_init(struct ceph_msgpool *pool,
64                       int front_len, int min, bool blocking)
65 {
66         int ret;
67
68         dout("msgpool_init %p front_len %d min %d\n", pool, front_len, min);
69         spin_lock_init(&pool->lock);
70         pool->front_len = front_len;
71         INIT_LIST_HEAD(&pool->msgs);
72         pool->num = 0;
73         pool->min = min;
74         pool->blocking = blocking;
75         init_waitqueue_head(&pool->wait);
76
77         spin_lock(&pool->lock);
78         ret = __fill_msgpool(pool);
79         spin_unlock(&pool->lock);
80         return ret;
81 }
82
83 void ceph_msgpool_destroy(struct ceph_msgpool *pool)
84 {
85         dout("msgpool_destroy %p\n", pool);
86         spin_lock(&pool->lock);
87         pool->min = 0;
88         __fill_msgpool(pool);
89         spin_unlock(&pool->lock);
90 }
91
92 int ceph_msgpool_resv(struct ceph_msgpool *pool, int delta)
93 {
94         int ret;
95
96         spin_lock(&pool->lock);
97         dout("msgpool_resv %p delta %d\n", pool, delta);
98         pool->min += delta;
99         ret = __fill_msgpool(pool);
100         spin_unlock(&pool->lock);
101         return ret;
102 }
103
104 struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool)
105 {
106         wait_queue_t wait;
107         struct ceph_msg *msg;
108
109         if (pool->blocking) {
110                 /* mempool_t behavior; first try to alloc */
111                 msg = ceph_msg_new(0, pool->front_len, 0, 0, NULL);
112                 if (!IS_ERR(msg))
113                         return msg;
114         }
115
116         while (1) {
117                 spin_lock(&pool->lock);
118                 if (likely(pool->num)) {
119                         msg = list_entry(pool->msgs.next, struct ceph_msg,
120                                          list_head);
121                         list_del_init(&msg->list_head);
122                         pool->num--;
123                         dout("msgpool_get %p got %p, now %d/%d\n", pool, msg,
124                              pool->num, pool->min);
125                         spin_unlock(&pool->lock);
126                         return msg;
127                 }
128                 pr_err("msgpool_get %p now %d/%d, %s\n", pool, pool->num,
129                        pool->min, pool->blocking ? "waiting" : "failing");
130                 spin_unlock(&pool->lock);
131
132                 if (!pool->blocking) {
133                         WARN_ON(1);
134
135                         /* maybe we can allocate it now? */
136                         msg = ceph_msg_new(0, pool->front_len, 0, 0, NULL);
137                         if (!IS_ERR(msg))
138                                 return msg;
139
140                         return ERR_PTR(-ENOMEM);
141                 }
142
143                 init_wait(&wait);
144                 prepare_to_wait(&pool->wait, &wait, TASK_UNINTERRUPTIBLE);
145                 schedule();
146                 finish_wait(&pool->wait, &wait);
147         }
148 }
149
150 void ceph_msgpool_put(struct ceph_msgpool *pool, struct ceph_msg *msg)
151 {
152         spin_lock(&pool->lock);
153         if (pool->num < pool->min) {
154                 ceph_msg_get(msg);   /* retake a single ref */
155                 list_add(&msg->list_head, &pool->msgs);
156                 pool->num++;
157                 dout("msgpool_put %p reclaim %p, now %d/%d\n", pool, msg,
158                      pool->num, pool->min);
159                 spin_unlock(&pool->lock);
160                 wake_up(&pool->wait);
161         } else {
162                 dout("msgpool_put %p drop %p, at %d/%d\n", pool, msg,
163                      pool->num, pool->min);
164                 spin_unlock(&pool->lock);
165                 ceph_msg_kfree(msg);
166         }
167 }