X-Git-Url: http://ftp.safe.ca/?p=safe%2Fjmp%2Flinux-2.6;a=blobdiff_plain;f=fs%2Fceph%2Fcaps.c;h=ae3e3a3064451f7b49fedaa3931120ff1af6d169;hp=d8132b6e770d0740515291276ad6112a1b6aa2ae;hb=7ea8085910ef3dd4f3cad6845aaa2b580d39b115;hpb=cdac830313fa6bf2831693af80fefe4aaac11b7d diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index d8132b6..ae3e3a3 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -3,8 +3,10 @@ #include #include #include +#include #include #include +#include #include "super.h" #include "decode.h" @@ -128,6 +130,7 @@ static int caps_total_count; /* total caps allocated */ static int caps_use_count; /* in use */ static int caps_reserve_count; /* unused, reserved */ static int caps_avail_count; /* unused, unreserved */ +static int caps_min_count; /* keep at least this many (unreserved) */ void __init ceph_caps_init(void) { @@ -149,6 +152,15 @@ void ceph_caps_finalize(void) caps_avail_count = 0; caps_use_count = 0; caps_reserve_count = 0; + caps_min_count = 0; + spin_unlock(&caps_list_lock); +} + +void ceph_adjust_min_caps(int delta) +{ + spin_lock(&caps_list_lock); + caps_min_count += delta; + BUG_ON(caps_min_count < 0); spin_unlock(&caps_list_lock); } @@ -256,30 +268,22 @@ static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx) return cap; } -static void put_cap(struct ceph_cap *cap, - struct ceph_cap_reservation *ctx) +void ceph_put_cap(struct ceph_cap *cap) { spin_lock(&caps_list_lock); - dout("put_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n", - ctx, ctx ? ctx->count : 0, caps_total_count, caps_use_count, + dout("put_cap %p %d = %d used + %d resv + %d avail\n", + cap, caps_total_count, caps_use_count, caps_reserve_count, caps_avail_count); caps_use_count--; /* - * Keep some preallocated caps around, at least enough to do a - * readdir (which needs to preallocate lots of them), to avoid - * lots of free/alloc churn. + * Keep some preallocated caps around (ceph_min_count), to + * avoid lots of free/alloc churn. */ - if (caps_avail_count >= caps_reserve_count + - ceph_client(cap->ci->vfs_inode.i_sb)->mount_args->max_readdir) { + if (caps_avail_count >= caps_reserve_count + caps_min_count) { caps_total_count--; kmem_cache_free(ceph_cap_cachep, cap); } else { - if (ctx) { - ctx->count++; - caps_reserve_count++; - } else { - caps_avail_count++; - } + caps_avail_count++; list_add(&cap->caps_item, &caps_list); } @@ -289,7 +293,8 @@ static void put_cap(struct ceph_cap *cap, } void ceph_reservation_status(struct ceph_client *client, - int *total, int *avail, int *used, int *reserved) + int *total, int *avail, int *used, int *reserved, + int *min) { if (total) *total = caps_total_count; @@ -299,6 +304,8 @@ void ceph_reservation_status(struct ceph_client *client, *used = caps_use_count; if (reserved) *reserved = caps_reserve_count; + if (min) + *min = caps_min_count; } /* @@ -650,7 +657,7 @@ static int __cap_is_valid(struct ceph_cap *cap) */ int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented) { - int have = ci->i_snap_caps; + int have = ci->i_snap_caps | ci->i_cap_exporting_issued; struct ceph_cap *cap; struct rb_node *p; @@ -697,10 +704,15 @@ static void __touch_cap(struct ceph_cap *cap) { struct ceph_mds_session *s = cap->session; - dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap, - s->s_mds); spin_lock(&s->s_cap_lock); - list_move_tail(&cap->session_caps, &s->s_caps); + if (s->s_cap_iterator == NULL) { + dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap, + s->s_mds); + list_move_tail(&cap->session_caps, &s->s_caps); + } else { + dout("__touch_cap %p cap %p mds%d NOP, iterating over caps\n", + &cap->ci->vfs_inode, cap, s->s_mds); + } spin_unlock(&s->s_cap_lock); } @@ -846,31 +858,44 @@ static int __ceph_is_any_caps(struct ceph_inode_info *ci) } /* - * caller should hold i_lock, and session s_mutex. - * returns true if this is the last cap. if so, caller should iput. + * Remove a cap. Take steps to deal with a racing iterate_session_caps. + * + * caller should hold i_lock. + * caller will not hold session s_mutex if called from destroy_inode. */ -void __ceph_remove_cap(struct ceph_cap *cap, - struct ceph_cap_reservation *ctx) +void __ceph_remove_cap(struct ceph_cap *cap) { struct ceph_mds_session *session = cap->session; struct ceph_inode_info *ci = cap->ci; - struct ceph_mds_client *mdsc = &ceph_client(ci->vfs_inode.i_sb)->mdsc; + struct ceph_mds_client *mdsc = + &ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc; + int removed = 0; dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode); /* remove from session list */ spin_lock(&session->s_cap_lock); - list_del_init(&cap->session_caps); - session->s_nr_caps--; + if (session->s_cap_iterator == cap) { + /* not yet, we are iterating over this very cap */ + dout("__ceph_remove_cap delaying %p removal from session %p\n", + cap, cap->session); + } else { + list_del_init(&cap->session_caps); + session->s_nr_caps--; + cap->session = NULL; + removed = 1; + } + /* protect backpointer with s_cap_lock: see iterate_session_caps */ + cap->ci = NULL; spin_unlock(&session->s_cap_lock); /* remove from inode list */ rb_erase(&cap->ci_node, &ci->i_caps); - cap->session = NULL; if (ci->i_auth_cap == cap) ci->i_auth_cap = NULL; - put_cap(cap, ctx); + if (removed) + ceph_put_cap(cap); if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) { struct ceph_snap_realm *realm = ci->i_snap_realm; @@ -913,18 +938,18 @@ static int send_cap_msg(struct ceph_mds_session *session, seq, issue_seq, mseq, follows, size, max_size, xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0); - msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), 0, 0, NULL); - if (IS_ERR(msg)) - return PTR_ERR(msg); + msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS); + if (!msg) + return -ENOMEM; - fc = msg->front.iov_base; + msg->hdr.tid = cpu_to_le64(flush_tid); + fc = msg->front.iov_base; memset(fc, 0, sizeof(*fc)); fc->cap_id = cpu_to_le64(cid); fc->op = cpu_to_le32(op); fc->seq = cpu_to_le32(seq); - fc->client_tid = cpu_to_le64(flush_tid); fc->issue_seq = cpu_to_le32(issue_seq); fc->migrate_seq = cpu_to_le32(mseq); fc->caps = cpu_to_le32(caps); @@ -957,15 +982,14 @@ static int send_cap_msg(struct ceph_mds_session *session, } /* - * Queue cap releases when an inode is dropped from our - * cache. + * Queue cap releases when an inode is dropped from our cache. Since + * inode is about to be destroyed, there is no need for i_lock. */ void ceph_queue_caps_release(struct inode *inode) { struct ceph_inode_info *ci = ceph_inode(inode); struct rb_node *p; - spin_lock(&inode->i_lock); p = rb_first(&ci->i_caps); while (p) { struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node); @@ -1006,10 +1030,8 @@ void ceph_queue_caps_release(struct inode *inode) } spin_unlock(&session->s_cap_lock); p = rb_next(p); - __ceph_remove_cap(cap, NULL); - + __ceph_remove_cap(cap); } - spin_unlock(&inode->i_lock); } /* @@ -1037,10 +1059,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, struct ceph_inode_info *ci = cap->ci; struct inode *inode = &ci->vfs_inode; u64 cap_id = cap->cap_id; - int held = cap->issued | cap->implemented; - int revoking = cap->implemented & ~cap->issued; - int dropping = cap->issued & ~retain; - int keep; + int held, revoking, dropping, keep; u64 seq, issue_seq, mseq, time_warp_seq, follows; u64 size, max_size; struct timespec mtime, atime; @@ -1055,6 +1074,11 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, int i; int ret; + held = cap->issued | cap->implemented; + revoking = cap->implemented & ~cap->issued; + retain &= ~revoking; + dropping = cap->issued & ~retain; + dout("__send_cap %p cap %p session %p %s -> %s (revoking %s)\n", inode, cap, cap->session, ceph_cap_string(held), ceph_cap_string(held & retain), @@ -1130,12 +1154,6 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, spin_unlock(&inode->i_lock); - if (dropping & CEPH_CAP_FILE_CACHE) { - /* invalidate what we can */ - dout("invalidating pages on %p\n", inode); - invalidate_mapping_pages(&inode->i_data, 0, -1); - } - ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id, op, keep, want, flushing, seq, flush_tid, issue_seq, mseq, size, max_size, &mtime, &atime, time_warp_seq, @@ -1193,6 +1211,12 @@ retry: if (capsnap->dirty_pages || capsnap->writing) continue; + /* + * if cap writeback already occurred, we should have dropped + * the capsnap in ceph_put_wrbuffer_cap_refs. + */ + BUG_ON(capsnap->dirty == 0); + /* pick mds, take s_mutex */ mds = __ceph_get_cap_mds(ci, &mseq); if (session && session->s_mds != mds) { @@ -1275,7 +1299,8 @@ static void ceph_flush_snaps(struct ceph_inode_info *ci) */ void __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) { - struct ceph_mds_client *mdsc = &ceph_client(ci->vfs_inode.i_sb)->mdsc; + struct ceph_mds_client *mdsc = + &ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc; struct inode *inode = &ci->vfs_inode; int was = ci->i_dirty_caps; int dirty = 0; @@ -1313,10 +1338,10 @@ void __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) static int __mark_caps_flushing(struct inode *inode, struct ceph_mds_session *session) { - struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc; + struct ceph_mds_client *mdsc = &ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); int flushing; - + BUG_ON(ci->i_dirty_caps == 0); BUG_ON(list_empty(&ci->i_dirty_item)); @@ -1349,6 +1374,41 @@ static int __mark_caps_flushing(struct inode *inode, } /* + * try to invalidate mapping pages without blocking. + */ +static int mapping_is_empty(struct address_space *mapping) +{ + struct page *page = find_get_page(mapping, 0); + + if (!page) + return 1; + + put_page(page); + return 0; +} + +static int try_nonblocking_invalidate(struct inode *inode) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + u32 invalidating_gen = ci->i_rdcache_gen; + + spin_unlock(&inode->i_lock); + invalidate_mapping_pages(&inode->i_data, 0, -1); + spin_lock(&inode->i_lock); + + if (mapping_is_empty(&inode->i_data) && + invalidating_gen == ci->i_rdcache_gen) { + /* success. */ + dout("try_nonblocking_invalidate %p success\n", inode); + ci->i_rdcache_gen = 0; + ci->i_rdcache_revoking = 0; + return 0; + } + dout("try_nonblocking_invalidate %p failed\n", inode); + return -1; +} + +/* * Swiss army knife function to examine currently used and wanted * versus held caps. Release, flush, ack revoked caps to mds as * appropriate. @@ -1361,6 +1421,7 @@ static int __mark_caps_flushing(struct inode *inode, */ void ceph_check_caps(struct ceph_inode_info *ci, int flags, struct ceph_mds_session *session) + __releases(session->s_mutex) { struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode); struct ceph_mds_client *mdsc = &client->mdsc; @@ -1368,13 +1429,13 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, struct ceph_cap *cap; int file_wanted, used; int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */ - int drop_session_lock = session ? 0 : 1; - int want, retain, revoking, flushing = 0; + int issued, implemented, want, retain, revoking, flushing = 0; int mds = -1; /* keep track of how far we've gone through i_caps list to avoid an infinite loop on retry */ struct rb_node *p; int tried_invalidate = 0; int delayed = 0, sent = 0, force_requeue = 0, num; + int queue_invalidate = 0; int is_delayed = flags & CHECK_CAPS_NODELAY; /* if we are unmounting, flush any unused caps immediately. */ @@ -1396,6 +1457,8 @@ retry_locked: file_wanted = __ceph_caps_file_wanted(ci); used = __ceph_caps_used(ci); want = file_wanted | used; + issued = __ceph_caps_issued(ci, &implemented); + revoking = implemented & ~issued; retain = want | CEPH_CAP_PIN; if (!mdsc->stopping && inode->i_nlink > 0) { @@ -1414,11 +1477,11 @@ retry_locked: } dout("check_caps %p file_want %s used %s dirty %s flushing %s" - " issued %s retain %s %s%s%s\n", inode, + " issued %s revoking %s retain %s %s%s%s\n", inode, ceph_cap_string(file_wanted), ceph_cap_string(used), ceph_cap_string(ci->i_dirty_caps), ceph_cap_string(ci->i_flushing_caps), - ceph_cap_string(__ceph_caps_issued(ci, NULL)), + ceph_cap_string(issued), ceph_cap_string(revoking), ceph_cap_string(retain), (flags & CHECK_CAPS_AUTHONLY) ? " AUTHONLY" : "", (flags & CHECK_CAPS_NODELAY) ? " NODELAY" : "", @@ -1432,26 +1495,22 @@ retry_locked: if ((!is_delayed || mdsc->stopping) && ci->i_wrbuffer_ref == 0 && /* no dirty pages... */ ci->i_rdcache_gen && /* may have cached pages */ - file_wanted == 0 && /* no open files */ - !ci->i_truncate_pending && + (file_wanted == 0 || /* no open files */ + (revoking & CEPH_CAP_FILE_CACHE)) && /* or revoking cache */ !tried_invalidate) { - u32 invalidating_gen = ci->i_rdcache_gen; - int ret; - dout("check_caps trying to invalidate on %p\n", inode); - spin_unlock(&inode->i_lock); - ret = invalidate_inode_pages2(&inode->i_data); - spin_lock(&inode->i_lock); - if (ret == 0 && invalidating_gen == ci->i_rdcache_gen) { - /* success. */ - ci->i_rdcache_gen = 0; - ci->i_rdcache_revoking = 0; - } else { - dout("check_caps failed to invalidate pages\n"); - /* we failed to invalidate pages. check these - caps again later. */ - force_requeue = 1; - __cap_set_timeouts(mdsc, ci); + if (try_nonblocking_invalidate(inode) < 0) { + if (revoking & CEPH_CAP_FILE_CACHE) { + dout("check_caps queuing invalidate\n"); + queue_invalidate = 1; + ci->i_rdcache_revoking = ci->i_rdcache_gen; + } else { + dout("check_caps failed to invalidate pages\n"); + /* we failed to invalidate pages. check these + caps again later. */ + force_requeue = 1; + __cap_set_timeouts(mdsc, ci); + } } tried_invalidate = 1; goto retry_locked; @@ -1471,7 +1530,7 @@ retry_locked: revoking = cap->implemented & ~cap->issued; if (revoking) - dout("mds%d revoking %s\n", cap->mds, + dout(" mds%d revoking %s\n", cap->mds, ceph_cap_string(revoking)); if (cap == ci->i_auth_cap && @@ -1529,6 +1588,11 @@ retry_locked: } ack: + if (ci->i_ceph_flags & CEPH_I_NOFLUSH) { + dout(" skipping %p I_NOFLUSH set\n", inode); + continue; + } + if (session && session != cap->session) { dout("oops, wrong session %p mutex\n", session); mutex_unlock(&session->s_mutex); @@ -1586,7 +1650,10 @@ ack: spin_unlock(&inode->i_lock); - if (session && drop_session_lock) + if (queue_invalidate) + ceph_queue_invalidate(inode); + + if (session) mutex_unlock(&session->s_mutex); if (took_snap_rwsem) up_read(&mdsc->snap_rwsem); @@ -1598,13 +1665,17 @@ ack: static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session, unsigned *flush_tid) { - struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc; + struct ceph_mds_client *mdsc = &ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); int unlock_session = session ? 0 : 1; int flushing = 0; retry: spin_lock(&inode->i_lock); + if (ci->i_ceph_flags & CEPH_I_NOFLUSH) { + dout("try_flush_caps skipping %p I_NOFLUSH set\n", inode); + goto out; + } if (ci->i_dirty_caps && ci->i_auth_cap) { struct ceph_cap *cap = ci->i_auth_cap; int used = __ceph_caps_used(ci); @@ -1647,10 +1718,9 @@ out_unlocked: static int caps_are_flushed(struct inode *inode, unsigned tid) { struct ceph_inode_info *ci = ceph_inode(inode); - int dirty, i, ret = 1; + int i, ret = 1; spin_lock(&inode->i_lock); - dirty = __ceph_caps_dirty(ci); for (i = 0; i < CEPH_CAP_BITS; i++) if ((ci->i_flushing_caps & (1 << i)) && ci->i_cap_flush_tid[i] <= tid) { @@ -1706,9 +1776,9 @@ out: spin_unlock(&ci->i_unsafe_lock); } -int ceph_fsync(struct file *file, struct dentry *dentry, int datasync) +int ceph_fsync(struct file *file, int datasync) { - struct inode *inode = dentry->d_inode; + struct inode *inode = file->f_mapping->host; struct ceph_inode_info *ci = ceph_inode(inode); unsigned flush_tid; int ret; @@ -1745,12 +1815,13 @@ int ceph_fsync(struct file *file, struct dentry *dentry, int datasync) * get by with fewer MDS messages if we wait for data writeback to * complete first. */ -int ceph_write_inode(struct inode *inode, int wait) +int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) { struct ceph_inode_info *ci = ceph_inode(inode); unsigned flush_tid; int err = 0; int dirty; + int wait = wbc->sync_mode == WB_SYNC_ALL; dout("write_inode %p wait=%d\n", inode, wait); if (wait) { @@ -1759,7 +1830,8 @@ int ceph_write_inode(struct inode *inode, int wait) err = wait_event_interruptible(ci->i_cap_wq, caps_are_flushed(inode, flush_tid)); } else { - struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc; + struct ceph_mds_client *mdsc = + &ceph_sb_to_client(inode->i_sb)->mdsc; spin_lock(&inode->i_lock); if (__ceph_caps_dirty(ci)) @@ -1796,8 +1868,8 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc, } else { pr_err("%p auth cap %p not mds%d ???\n", inode, cap, session->s_mds); - spin_unlock(&inode->i_lock); } + spin_unlock(&inode->i_lock); } } @@ -1876,14 +1948,17 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, struct inode *inode = &ci->vfs_inode; int ret = 0; int have, implemented; + int file_wanted; dout("get_cap_refs %p need %s want %s\n", inode, ceph_cap_string(need), ceph_cap_string(want)); spin_lock(&inode->i_lock); - /* make sure we _have_ some caps! */ - if (!__ceph_is_any_caps(ci)) { - dout("get_cap_refs %p no real caps\n", inode); + /* make sure file is actually open */ + file_wanted = __ceph_caps_file_wanted(ci); + if ((file_wanted & need) == 0) { + dout("try_get_cap_refs need %s file_wanted %s, EBADF\n", + ceph_cap_string(need), ceph_cap_string(file_wanted)); *err = -EBADF; ret = 1; goto out; @@ -2056,8 +2131,8 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) } spin_unlock(&inode->i_lock); - dout("put_cap_refs %p had %s %s\n", inode, ceph_cap_string(had), - last ? "last" : ""); + dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had), + last ? " last" : "", put ? " put" : ""); if (last && !flushsnaps) ceph_check_caps(ci, 0, NULL); @@ -2081,7 +2156,8 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, { struct inode *inode = &ci->vfs_inode; int last = 0; - int last_snap = 0; + int complete_capsnap = 0; + int drop_capsnap = 0; int found = 0; struct ceph_cap_snap *capsnap = NULL; @@ -2104,19 +2180,32 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { if (capsnap->context == snapc) { found = 1; - capsnap->dirty_pages -= nr; - last_snap = !capsnap->dirty_pages; break; } } BUG_ON(!found); + capsnap->dirty_pages -= nr; + if (capsnap->dirty_pages == 0) { + complete_capsnap = 1; + if (capsnap->dirty == 0) + /* cap writeback completed before we created + * the cap_snap; no FLUSHSNAP is needed */ + drop_capsnap = 1; + } dout("put_wrbuffer_cap_refs on %p cap_snap %p " - " snap %lld %d/%d -> %d/%d %s%s\n", + " snap %lld %d/%d -> %d/%d %s%s%s\n", inode, capsnap, capsnap->context->seq, ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr, ci->i_wrbuffer_ref, capsnap->dirty_pages, last ? " (wrbuffer last)" : "", - last_snap ? " (capsnap last)" : ""); + complete_capsnap ? " (complete capsnap)" : "", + drop_capsnap ? " (drop capsnap)" : ""); + if (drop_capsnap) { + ceph_put_snap_context(capsnap->context); + list_del(&capsnap->ci_item); + list_del(&capsnap->flushing_item); + ceph_put_cap_snap(capsnap); + } } spin_unlock(&inode->i_lock); @@ -2124,28 +2213,31 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, if (last) { ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); iput(inode); - } else if (last_snap) { + } else if (complete_capsnap) { ceph_flush_snaps(ci); wake_up(&ci->i_cap_wq); } + if (drop_capsnap) + iput(inode); } /* * Handle a cap GRANT message from the MDS. (Note that a GRANT may * actually be a revocation if it specifies a smaller cap set.) * - * caller holds s_mutex. + * caller holds s_mutex and i_lock, we drop both. + * * return value: * 0 - ok * 1 - check_caps on auth cap only (writeback) * 2 - check_caps (ack revoke) */ -static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, - struct ceph_mds_session *session, - struct ceph_cap *cap, - struct ceph_buffer *xattr_buf) +static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, + struct ceph_mds_session *session, + struct ceph_cap *cap, + struct ceph_buffer *xattr_buf) __releases(inode->i_lock) - + __releases(session->s_mutex) { struct ceph_inode_info *ci = ceph_inode(inode); int mds = session->s_mds; @@ -2155,13 +2247,11 @@ static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, u64 size = le64_to_cpu(grant->size); u64 max_size = le64_to_cpu(grant->max_size); struct timespec mtime, atime, ctime; - int reply = 0; + int check_caps = 0; int wake = 0; int writeback = 0; int revoked_rdcache = 0; - int invalidate_async = 0; - int tried_invalidate = 0; - int ret; + int queue_invalidate = 0; dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n", inode, cap, mds, seq, ceph_cap_string(newcaps)); @@ -2173,29 +2263,18 @@ static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, * try to invalidate (once). (If there are dirty buffers, we * will invalidate _after_ writeback.) */ -restart: if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && - !ci->i_wrbuffer_ref && !tried_invalidate) { - dout("CACHE invalidation\n"); - spin_unlock(&inode->i_lock); - tried_invalidate = 1; - - ret = invalidate_inode_pages2(&inode->i_data); - spin_lock(&inode->i_lock); - if (ret < 0) { + !ci->i_wrbuffer_ref) { + if (try_nonblocking_invalidate(inode) == 0) { + revoked_rdcache = 1; + } else { /* there were locked pages.. invalidate later in a separate thread. */ if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { - invalidate_async = 1; + queue_invalidate = 1; ci->i_rdcache_revoking = ci->i_rdcache_gen; } - } else { - /* we successfully invalidated those pages */ - revoked_rdcache = 1; - ci->i_rdcache_gen = 0; - ci->i_rdcache_revoking = 0; } - goto restart; } /* side effects now are allowed */ @@ -2281,11 +2360,12 @@ restart: if ((used & ~newcaps) & CEPH_CAP_FILE_BUFFER) writeback = 1; /* will delay ack */ else if (dirty & ~newcaps) - reply = 1; /* initiate writeback in check_caps */ + check_caps = 1; /* initiate writeback in check_caps */ else if (((used & ~newcaps) & CEPH_CAP_FILE_CACHE) == 0 || revoked_rdcache) - reply = 2; /* send revoke ack in check_caps */ + check_caps = 2; /* send revoke ack in check_caps */ cap->issued = newcaps; + cap->implemented |= newcaps; } else if (cap->issued == newcaps) { dout("caps unchanged: %s -> %s\n", ceph_cap_string(cap->issued), ceph_cap_string(newcaps)); @@ -2298,44 +2378,45 @@ restart: * pending revocation */ wake = 1; } + BUG_ON(cap->issued & ~cap->implemented); spin_unlock(&inode->i_lock); - if (writeback) { + if (writeback) /* * queue inode for writeback: we can't actually call * filemap_write_and_wait, etc. from message handler * context. */ - dout("queueing %p for writeback\n", inode); - if (ceph_queue_writeback(inode)) - igrab(inode); - } - if (invalidate_async) { - dout("queueing %p for page invalidation\n", inode); - if (ceph_queue_page_invalidation(inode)) - igrab(inode); - } + ceph_queue_writeback(inode); + if (queue_invalidate) + ceph_queue_invalidate(inode); if (wake) wake_up(&ci->i_cap_wq); - return reply; + + if (check_caps == 1) + ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY, + session); + else if (check_caps == 2) + ceph_check_caps(ci, CHECK_CAPS_NODELAY, session); + else + mutex_unlock(&session->s_mutex); } /* * Handle FLUSH_ACK from MDS, indicating that metadata we sent to the * MDS has been safely committed. */ -static void handle_cap_flush_ack(struct inode *inode, +static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, struct ceph_mds_caps *m, struct ceph_mds_session *session, struct ceph_cap *cap) __releases(inode->i_lock) { struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc; + struct ceph_mds_client *mdsc = &ceph_sb_to_client(inode->i_sb)->mdsc; unsigned seq = le32_to_cpu(m->seq); int dirty = le32_to_cpu(m->dirty); int cleaned = 0; - u64 flush_tid = le64_to_cpu(m->client_tid); int drop = 0; int i; @@ -2391,13 +2472,12 @@ out: * * Caller hold s_mutex. */ -static void handle_cap_flushsnap_ack(struct inode *inode, +static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, struct ceph_mds_caps *m, struct ceph_mds_session *session) { struct ceph_inode_info *ci = ceph_inode(inode); u64 follows = le64_to_cpu(m->snap_follows); - u64 flush_tid = le64_to_cpu(m->client_tid); struct ceph_cap_snap *capsnap; int drop = 0; @@ -2414,8 +2494,8 @@ static void handle_cap_flushsnap_ack(struct inode *inode, break; } WARN_ON(capsnap->dirty_pages || capsnap->writing); - dout(" removing cap_snap %p follows %lld\n", - capsnap, follows); + dout(" removing %p cap_snap %p follows %lld\n", + inode, capsnap, follows); ceph_put_snap_context(capsnap->context); list_del(&capsnap->ci_item); list_del(&capsnap->flushing_item); @@ -2462,9 +2542,7 @@ static void handle_cap_trunc(struct inode *inode, spin_unlock(&inode->i_lock); if (queue_trunc) - if (queue_work(ceph_client(inode->i_sb)->trunc_wq, - &ci->i_vmtruncate_work)) - igrab(inode); + ceph_queue_vmtruncate(inode); } /* @@ -2509,10 +2587,9 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, ci->i_cap_exporting_mseq = mseq; ci->i_cap_exporting_issued = cap->issued; } - __ceph_remove_cap(cap, NULL); - } else { - WARN_ON(!cap); + __ceph_remove_cap(cap); } + /* else, we already released it */ spin_unlock(&inode->i_lock); } @@ -2576,21 +2653,23 @@ void ceph_handle_caps(struct ceph_mds_session *session, struct inode *inode; struct ceph_cap *cap; struct ceph_mds_caps *h; - int mds = le64_to_cpu(msg->hdr.src.name.num); + int mds = session->s_mds; int op; u32 seq; struct ceph_vino vino; u64 cap_id; u64 size, max_size; - int check_caps = 0; - int r; + u64 tid; + void *snaptrace; dout("handle_caps from mds%d\n", mds); /* decode */ + tid = le64_to_cpu(msg->hdr.tid); if (msg->front.iov_len < sizeof(*h)) goto bad; h = msg->front.iov_base; + snaptrace = h + 1; op = le32_to_cpu(h->op); vino.ino = le64_to_cpu(h->ino); vino.snap = CEPH_NOSNAP; @@ -2616,7 +2695,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, /* these will work even if we don't have a cap yet */ switch (op) { case CEPH_CAP_OP_FLUSHSNAP_ACK: - handle_cap_flushsnap_ack(inode, h, session); + handle_cap_flushsnap_ack(inode, tid, h, session); goto done; case CEPH_CAP_OP_EXPORT: @@ -2625,10 +2704,10 @@ void ceph_handle_caps(struct ceph_mds_session *session, case CEPH_CAP_OP_IMPORT: handle_cap_import(mdsc, inode, h, session, - msg->middle, - le32_to_cpu(h->snap_trace_len)); - check_caps = 1; /* we may have sent a RELEASE to the old auth */ - goto done; + snaptrace, le32_to_cpu(h->snap_trace_len)); + ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY, + session); + goto done_unlocked; } /* the rest require a cap */ @@ -2645,19 +2724,11 @@ void ceph_handle_caps(struct ceph_mds_session *session, switch (op) { case CEPH_CAP_OP_REVOKE: case CEPH_CAP_OP_GRANT: - r = handle_cap_grant(inode, h, session, cap, msg->middle); - if (r == 1) - ceph_check_caps(ceph_inode(inode), - CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY, - session); - else if (r == 2) - ceph_check_caps(ceph_inode(inode), - CHECK_CAPS_NODELAY, - session); - break; + handle_cap_grant(inode, h, session, cap, msg->middle); + goto done_unlocked; case CEPH_CAP_OP_FLUSH_ACK: - handle_cap_flush_ack(inode, h, session, cap); + handle_cap_flush_ack(inode, tid, h, session, cap); break; case CEPH_CAP_OP_TRUNC: @@ -2672,15 +2743,14 @@ void ceph_handle_caps(struct ceph_mds_session *session, done: mutex_unlock(&session->s_mutex); - - if (check_caps) - ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY, NULL); +done_unlocked: if (inode) iput(inode); return; bad: pr_err("ceph_handle_caps: corrupt message\n"); + ceph_msg_dump(msg); return; } @@ -2716,16 +2786,38 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc) */ void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc) { - struct ceph_inode_info *ci; - struct inode *inode; + struct ceph_inode_info *ci, *nci = NULL; + struct inode *inode, *ninode = NULL; + struct list_head *p, *n; dout("flush_dirty_caps\n"); spin_lock(&mdsc->cap_dirty_lock); - while (!list_empty(&mdsc->cap_dirty)) { - ci = list_first_entry(&mdsc->cap_dirty, - struct ceph_inode_info, - i_dirty_item); - inode = igrab(&ci->vfs_inode); + list_for_each_safe(p, n, &mdsc->cap_dirty) { + if (nci) { + ci = nci; + inode = ninode; + ci->i_ceph_flags &= ~CEPH_I_NOFLUSH; + dout("flush_dirty_caps inode %p (was next inode)\n", + inode); + } else { + ci = list_entry(p, struct ceph_inode_info, + i_dirty_item); + inode = igrab(&ci->vfs_inode); + BUG_ON(!inode); + dout("flush_dirty_caps inode %p\n", inode); + } + if (n != &mdsc->cap_dirty) { + nci = list_entry(n, struct ceph_inode_info, + i_dirty_item); + ninode = igrab(&nci->vfs_inode); + BUG_ON(!ninode); + nci->i_ceph_flags |= CEPH_I_NOFLUSH; + dout("flush_dirty_caps next inode %p, noflush\n", + ninode); + } else { + nci = NULL; + ninode = NULL; + } spin_unlock(&mdsc->cap_dirty_lock); if (inode) { ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_FLUSH, @@ -2774,11 +2866,18 @@ int ceph_encode_inode_release(void **p, struct inode *inode, struct ceph_cap *cap; struct ceph_mds_request_release *rel = *p; int ret = 0; - - dout("encode_inode_release %p mds%d drop %s unless %s\n", inode, - mds, ceph_cap_string(drop), ceph_cap_string(unless)); + int used = 0; spin_lock(&inode->i_lock); + used = __ceph_caps_used(ci); + + dout("encode_inode_release %p mds%d used %s drop %s unless %s\n", inode, + mds, ceph_cap_string(used), ceph_cap_string(drop), + ceph_cap_string(unless)); + + /* only drop unused caps */ + drop &= ~used; + cap = __get_cap_for_mds(ci, mds); if (cap && __cap_is_valid(cap)) { if (force ||