Merge branch 'fix/hda' into for-linus
[safe/jmp/linux-2.6] / fs / ceph / msgpool.c
1 #include "ceph_debug.h"
2
3 #include <linux/err.h>
4 #include <linux/sched.h>
5 #include <linux/types.h>
6 #include <linux/vmalloc.h>
7
8 #include "msgpool.h"
9
10 /*
11  * We use msg pools to preallocate memory for messages we expect to
12  * receive over the wire, to avoid getting ourselves into OOM
13  * conditions at unexpected times.  We take use a few different
14  * strategies:
15  *
16  *  - for request/response type interactions, we preallocate the
17  * memory needed for the response when we generate the request.
18  *
19  *  - for messages we can receive at any time from the MDS, we preallocate
20  * a pool of messages we can re-use.
21  *
22  *  - for writeback, we preallocate some number of messages to use for
23  * requests and their replies, so that we always make forward
24  * progress.
25  *
26  * The msgpool behaves like a mempool_t, but keeps preallocated
27  * ceph_msgs strung together on a list_head instead of using a pointer
28  * vector.  This avoids vector reallocation when we adjust the number
29  * of preallocated items (which happens frequently).
30  */
31
32
33 /*
34  * Allocate or release as necessary to meet our target pool size.
35  */
36 static int __fill_msgpool(struct ceph_msgpool *pool)
37 {
38         struct ceph_msg *msg;
39
40         while (pool->num < pool->min) {
41                 dout("fill_msgpool %p %d/%d allocating\n", pool, pool->num,
42                      pool->min);
43                 spin_unlock(&pool->lock);
44                 msg = ceph_msg_new(0, pool->front_len, 0, 0, NULL);
45                 spin_lock(&pool->lock);
46                 if (IS_ERR(msg))
47                         return PTR_ERR(msg);
48                 msg->pool = pool;
49                 list_add(&msg->list_head, &pool->msgs);
50                 pool->num++;
51         }
52         while (pool->num > pool->min) {
53                 msg = list_first_entry(&pool->msgs, struct ceph_msg, list_head);
54                 dout("fill_msgpool %p %d/%d releasing %p\n", pool, pool->num,
55                      pool->min, msg);
56                 list_del_init(&msg->list_head);
57                 pool->num--;
58                 ceph_msg_kfree(msg);
59         }
60         return 0;
61 }
62
63 int ceph_msgpool_init(struct ceph_msgpool *pool,
64                       int front_len, int min, bool blocking)
65 {
66         int ret;
67
68         dout("msgpool_init %p front_len %d min %d\n", pool, front_len, min);
69         spin_lock_init(&pool->lock);
70         pool->front_len = front_len;
71         INIT_LIST_HEAD(&pool->msgs);
72         pool->num = 0;
73         pool->min = min;
74         pool->blocking = blocking;
75         init_waitqueue_head(&pool->wait);
76
77         spin_lock(&pool->lock);
78         ret = __fill_msgpool(pool);
79         spin_unlock(&pool->lock);
80         return ret;
81 }
82
83 void ceph_msgpool_destroy(struct ceph_msgpool *pool)
84 {
85         dout("msgpool_destroy %p\n", pool);
86         spin_lock(&pool->lock);
87         pool->min = 0;
88         __fill_msgpool(pool);
89         spin_unlock(&pool->lock);
90 }
91
92 int ceph_msgpool_resv(struct ceph_msgpool *pool, int delta)
93 {
94         int ret;
95
96         spin_lock(&pool->lock);
97         dout("msgpool_resv %p delta %d\n", pool, delta);
98         pool->min += delta;
99         ret = __fill_msgpool(pool);
100         spin_unlock(&pool->lock);
101         return ret;
102 }
103
104 struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len)
105 {
106         wait_queue_t wait;
107         struct ceph_msg *msg;
108
109         if (front_len && front_len > pool->front_len) {
110                 pr_err("msgpool_get pool %p need front %d, pool size is %d\n",
111                        pool, front_len, pool->front_len);
112                 WARN_ON(1);
113
114                 /* try to alloc a fresh message */
115                 msg = ceph_msg_new(0, front_len, 0, 0, NULL);
116                 if (!IS_ERR(msg))
117                         return msg;
118         }
119
120         if (!front_len)
121                 front_len = pool->front_len;
122
123         if (pool->blocking) {
124                 /* mempool_t behavior; first try to alloc */
125                 msg = ceph_msg_new(0, front_len, 0, 0, NULL);
126                 if (!IS_ERR(msg))
127                         return msg;
128         }
129
130         while (1) {
131                 spin_lock(&pool->lock);
132                 if (likely(pool->num)) {
133                         msg = list_entry(pool->msgs.next, struct ceph_msg,
134                                          list_head);
135                         list_del_init(&msg->list_head);
136                         pool->num--;
137                         dout("msgpool_get %p got %p, now %d/%d\n", pool, msg,
138                              pool->num, pool->min);
139                         spin_unlock(&pool->lock);
140                         return msg;
141                 }
142                 pr_err("msgpool_get %p now %d/%d, %s\n", pool, pool->num,
143                        pool->min, pool->blocking ? "waiting" : "may fail");
144                 spin_unlock(&pool->lock);
145
146                 if (!pool->blocking) {
147                         WARN_ON(1);
148
149                         /* maybe we can allocate it now? */
150                         msg = ceph_msg_new(0, front_len, 0, 0, NULL);
151                         if (!IS_ERR(msg))
152                                 return msg;
153
154                         pr_err("msgpool_get %p empty + alloc failed\n", pool);
155                         return ERR_PTR(-ENOMEM);
156                 }
157
158                 init_wait(&wait);
159                 prepare_to_wait(&pool->wait, &wait, TASK_UNINTERRUPTIBLE);
160                 schedule();
161                 finish_wait(&pool->wait, &wait);
162         }
163 }
164
165 void ceph_msgpool_put(struct ceph_msgpool *pool, struct ceph_msg *msg)
166 {
167         spin_lock(&pool->lock);
168         if (pool->num < pool->min) {
169                 /* reset msg front_len; user may have changed it */
170                 msg->front.iov_len = pool->front_len;
171                 msg->hdr.front_len = cpu_to_le32(pool->front_len);
172
173                 kref_set(&msg->kref, 1);  /* retake a single ref */
174                 list_add(&msg->list_head, &pool->msgs);
175                 pool->num++;
176                 dout("msgpool_put %p reclaim %p, now %d/%d\n", pool, msg,
177                      pool->num, pool->min);
178                 spin_unlock(&pool->lock);
179                 wake_up(&pool->wait);
180         } else {
181                 dout("msgpool_put %p drop %p, at %d/%d\n", pool, msg,
182                      pool->num, pool->min);
183                 spin_unlock(&pool->lock);
184                 ceph_msg_kfree(msg);
185         }
186 }