nfs: new subdir Documentation/filesystems/nfs
[safe/jmp/linux-2.6] / fs / dlm / dir.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "lowcomms.h"
18 #include "rcom.h"
19 #include "config.h"
20 #include "memory.h"
21 #include "recover.h"
22 #include "util.h"
23 #include "lock.h"
24 #include "dir.h"
25
26
27 static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
28 {
29         spin_lock(&ls->ls_recover_list_lock);
30         list_add(&de->list, &ls->ls_recover_list);
31         spin_unlock(&ls->ls_recover_list_lock);
32 }
33
34 static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
35 {
36         int found = 0;
37         struct dlm_direntry *de;
38
39         spin_lock(&ls->ls_recover_list_lock);
40         list_for_each_entry(de, &ls->ls_recover_list, list) {
41                 if (de->length == len) {
42                         list_del(&de->list);
43                         de->master_nodeid = 0;
44                         memset(de->name, 0, len);
45                         found = 1;
46                         break;
47                 }
48         }
49         spin_unlock(&ls->ls_recover_list_lock);
50
51         if (!found)
52                 de = kzalloc(sizeof(struct dlm_direntry) + len,
53                              ls->ls_allocation);
54         return de;
55 }
56
57 void dlm_clear_free_entries(struct dlm_ls *ls)
58 {
59         struct dlm_direntry *de;
60
61         spin_lock(&ls->ls_recover_list_lock);
62         while (!list_empty(&ls->ls_recover_list)) {
63                 de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
64                                 list);
65                 list_del(&de->list);
66                 kfree(de);
67         }
68         spin_unlock(&ls->ls_recover_list_lock);
69 }
70
71 /*
72  * We use the upper 16 bits of the hash value to select the directory node.
73  * Low bits are used for distribution of rsb's among hash buckets on each node.
74  *
75  * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
76  * num_nodes to the hash value.  This value in the desired range is used as an
77  * offset into the sorted list of nodeid's to give the particular nodeid.
78  */
79
80 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
81 {
82         struct list_head *tmp;
83         struct dlm_member *memb = NULL;
84         uint32_t node, n = 0;
85         int nodeid;
86
87         if (ls->ls_num_nodes == 1) {
88                 nodeid = dlm_our_nodeid();
89                 goto out;
90         }
91
92         if (ls->ls_node_array) {
93                 node = (hash >> 16) % ls->ls_total_weight;
94                 nodeid = ls->ls_node_array[node];
95                 goto out;
96         }
97
98         /* make_member_array() failed to kmalloc ls_node_array... */
99
100         node = (hash >> 16) % ls->ls_num_nodes;
101
102         list_for_each(tmp, &ls->ls_nodes) {
103                 if (n++ != node)
104                         continue;
105                 memb = list_entry(tmp, struct dlm_member, list);
106                 break;
107         }
108
109         DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
110                                  ls->ls_num_nodes, n, node););
111         nodeid = memb->nodeid;
112  out:
113         return nodeid;
114 }
115
116 int dlm_dir_nodeid(struct dlm_rsb *r)
117 {
118         return dlm_hash2nodeid(r->res_ls, r->res_hash);
119 }
120
121 static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
122 {
123         uint32_t val;
124
125         val = jhash(name, len, 0);
126         val &= (ls->ls_dirtbl_size - 1);
127
128         return val;
129 }
130
131 static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
132 {
133         uint32_t bucket;
134
135         bucket = dir_hash(ls, de->name, de->length);
136         list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
137 }
138
139 static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
140                                           int namelen, uint32_t bucket)
141 {
142         struct dlm_direntry *de;
143
144         list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
145                 if (de->length == namelen && !memcmp(name, de->name, namelen))
146                         goto out;
147         }
148         de = NULL;
149  out:
150         return de;
151 }
152
153 void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
154 {
155         struct dlm_direntry *de;
156         uint32_t bucket;
157
158         bucket = dir_hash(ls, name, namelen);
159
160         spin_lock(&ls->ls_dirtbl[bucket].lock);
161
162         de = search_bucket(ls, name, namelen, bucket);
163
164         if (!de) {
165                 log_error(ls, "remove fr %u none", nodeid);
166                 goto out;
167         }
168
169         if (de->master_nodeid != nodeid) {
170                 log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
171                 goto out;
172         }
173
174         list_del(&de->list);
175         kfree(de);
176  out:
177         spin_unlock(&ls->ls_dirtbl[bucket].lock);
178 }
179
180 void dlm_dir_clear(struct dlm_ls *ls)
181 {
182         struct list_head *head;
183         struct dlm_direntry *de;
184         int i;
185
186         DLM_ASSERT(list_empty(&ls->ls_recover_list), );
187
188         for (i = 0; i < ls->ls_dirtbl_size; i++) {
189                 spin_lock(&ls->ls_dirtbl[i].lock);
190                 head = &ls->ls_dirtbl[i].list;
191                 while (!list_empty(head)) {
192                         de = list_entry(head->next, struct dlm_direntry, list);
193                         list_del(&de->list);
194                         put_free_de(ls, de);
195                 }
196                 spin_unlock(&ls->ls_dirtbl[i].lock);
197         }
198 }
199
200 int dlm_recover_directory(struct dlm_ls *ls)
201 {
202         struct dlm_member *memb;
203         struct dlm_direntry *de;
204         char *b, *last_name = NULL;
205         int error = -ENOMEM, last_len, count = 0;
206         uint16_t namelen;
207
208         log_debug(ls, "dlm_recover_directory");
209
210         if (dlm_no_directory(ls))
211                 goto out_status;
212
213         dlm_dir_clear(ls);
214
215         last_name = kmalloc(DLM_RESNAME_MAXLEN, ls->ls_allocation);
216         if (!last_name)
217                 goto out;
218
219         list_for_each_entry(memb, &ls->ls_nodes, list) {
220                 memset(last_name, 0, DLM_RESNAME_MAXLEN);
221                 last_len = 0;
222
223                 for (;;) {
224                         int left;
225                         error = dlm_recovery_stopped(ls);
226                         if (error)
227                                 goto out_free;
228
229                         error = dlm_rcom_names(ls, memb->nodeid,
230                                                last_name, last_len);
231                         if (error)
232                                 goto out_free;
233
234                         schedule();
235
236                         /*
237                          * pick namelen/name pairs out of received buffer
238                          */
239
240                         b = ls->ls_recover_buf->rc_buf;
241                         left = ls->ls_recover_buf->rc_header.h_length;
242                         left -= sizeof(struct dlm_rcom);
243
244                         for (;;) {
245                                 __be16 v;
246
247                                 error = -EINVAL;
248                                 if (left < sizeof(__be16))
249                                         goto out_free;
250
251                                 memcpy(&v, b, sizeof(__be16));
252                                 namelen = be16_to_cpu(v);
253                                 b += sizeof(__be16);
254                                 left -= sizeof(__be16);
255
256                                 /* namelen of 0xFFFFF marks end of names for
257                                    this node; namelen of 0 marks end of the
258                                    buffer */
259
260                                 if (namelen == 0xFFFF)
261                                         goto done;
262                                 if (!namelen)
263                                         break;
264
265                                 if (namelen > left)
266                                         goto out_free;
267
268                                 if (namelen > DLM_RESNAME_MAXLEN)
269                                         goto out_free;
270
271                                 error = -ENOMEM;
272                                 de = get_free_de(ls, namelen);
273                                 if (!de)
274                                         goto out_free;
275
276                                 de->master_nodeid = memb->nodeid;
277                                 de->length = namelen;
278                                 last_len = namelen;
279                                 memcpy(de->name, b, namelen);
280                                 memcpy(last_name, b, namelen);
281                                 b += namelen;
282                                 left -= namelen;
283
284                                 add_entry_to_hash(ls, de);
285                                 count++;
286                         }
287                 }
288          done:
289                 ;
290         }
291
292  out_status:
293         error = 0;
294         dlm_set_recover_status(ls, DLM_RS_DIR);
295         log_debug(ls, "dlm_recover_directory %d entries", count);
296  out_free:
297         kfree(last_name);
298  out:
299         dlm_clear_free_entries(ls);
300         return error;
301 }
302
303 static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
304                      int namelen, int *r_nodeid)
305 {
306         struct dlm_direntry *de, *tmp;
307         uint32_t bucket;
308
309         bucket = dir_hash(ls, name, namelen);
310
311         spin_lock(&ls->ls_dirtbl[bucket].lock);
312         de = search_bucket(ls, name, namelen, bucket);
313         if (de) {
314                 *r_nodeid = de->master_nodeid;
315                 spin_unlock(&ls->ls_dirtbl[bucket].lock);
316                 if (*r_nodeid == nodeid)
317                         return -EEXIST;
318                 return 0;
319         }
320
321         spin_unlock(&ls->ls_dirtbl[bucket].lock);
322
323         if (namelen > DLM_RESNAME_MAXLEN)
324                 return -EINVAL;
325
326         de = kzalloc(sizeof(struct dlm_direntry) + namelen, ls->ls_allocation);
327         if (!de)
328                 return -ENOMEM;
329
330         de->master_nodeid = nodeid;
331         de->length = namelen;
332         memcpy(de->name, name, namelen);
333
334         spin_lock(&ls->ls_dirtbl[bucket].lock);
335         tmp = search_bucket(ls, name, namelen, bucket);
336         if (tmp) {
337                 kfree(de);
338                 de = tmp;
339         } else {
340                 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
341         }
342         *r_nodeid = de->master_nodeid;
343         spin_unlock(&ls->ls_dirtbl[bucket].lock);
344         return 0;
345 }
346
347 int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
348                    int *r_nodeid)
349 {
350         return get_entry(ls, nodeid, name, namelen, r_nodeid);
351 }
352
353 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
354 {
355         struct dlm_rsb *r;
356
357         down_read(&ls->ls_root_sem);
358         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
359                 if (len == r->res_length && !memcmp(name, r->res_name, len)) {
360                         up_read(&ls->ls_root_sem);
361                         return r;
362                 }
363         }
364         up_read(&ls->ls_root_sem);
365         return NULL;
366 }
367
368 /* Find the rsb where we left off (or start again), then send rsb names
369    for rsb's we're master of and whose directory node matches the requesting
370    node.  inbuf is the rsb name last sent, inlen is the name's length */
371
372 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
373                            char *outbuf, int outlen, int nodeid)
374 {
375         struct list_head *list;
376         struct dlm_rsb *r;
377         int offset = 0, dir_nodeid;
378         __be16 be_namelen;
379
380         down_read(&ls->ls_root_sem);
381
382         if (inlen > 1) {
383                 r = find_rsb_root(ls, inbuf, inlen);
384                 if (!r) {
385                         inbuf[inlen - 1] = '\0';
386                         log_error(ls, "copy_master_names from %d start %d %s",
387                                   nodeid, inlen, inbuf);
388                         goto out;
389                 }
390                 list = r->res_root_list.next;
391         } else {
392                 list = ls->ls_root_list.next;
393         }
394
395         for (offset = 0; list != &ls->ls_root_list; list = list->next) {
396                 r = list_entry(list, struct dlm_rsb, res_root_list);
397                 if (r->res_nodeid)
398                         continue;
399
400                 dir_nodeid = dlm_dir_nodeid(r);
401                 if (dir_nodeid != nodeid)
402                         continue;
403
404                 /*
405                  * The block ends when we can't fit the following in the
406                  * remaining buffer space:
407                  * namelen (uint16_t) +
408                  * name (r->res_length) +
409                  * end-of-block record 0x0000 (uint16_t)
410                  */
411
412                 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
413                         /* Write end-of-block record */
414                         be_namelen = cpu_to_be16(0);
415                         memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
416                         offset += sizeof(__be16);
417                         goto out;
418                 }
419
420                 be_namelen = cpu_to_be16(r->res_length);
421                 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
422                 offset += sizeof(__be16);
423                 memcpy(outbuf + offset, r->res_name, r->res_length);
424                 offset += r->res_length;
425         }
426
427         /*
428          * If we've reached the end of the list (and there's room) write a
429          * terminating record.
430          */
431
432         if ((list == &ls->ls_root_list) &&
433             (offset + sizeof(uint16_t) <= outlen)) {
434                 be_namelen = cpu_to_be16(0xFFFF);
435                 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
436                 offset += sizeof(__be16);
437         }
438
439  out:
440         up_read(&ls->ls_root_sem);
441 }
442