#include <linux/namei.h>
#include <linux/writeback.h>
#include <linux/vmalloc.h>
+#include <linux/pagevec.h>
#include "super.h"
#include "decode.h"
static const struct inode_operations ceph_symlink_iops;
-static void ceph_inode_invalidate_pages(struct work_struct *work);
+static void ceph_invalidate_work(struct work_struct *work);
+static void ceph_writeback_work(struct work_struct *work);
+static void ceph_vmtruncate_work(struct work_struct *work);
/*
* find or create an inode, given the ceph ino number
INIT_LIST_HEAD(&ci->i_snap_realm_item);
INIT_LIST_HEAD(&ci->i_snap_flush_item);
- INIT_WORK(&ci->i_wb_work, ceph_inode_writeback);
- INIT_WORK(&ci->i_pg_inv_work, ceph_inode_invalidate_pages);
+ INIT_WORK(&ci->i_wb_work, ceph_writeback_work);
+ INIT_WORK(&ci->i_pg_inv_work, ceph_invalidate_work);
INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work);
ceph_queue_caps_release(inode);
+ /*
+ * we may still have a snap_realm reference if there are stray
+ * caps in i_cap_exporting_issued or i_snap_caps.
+ */
+ if (ci->i_snap_realm) {
+ struct ceph_mds_client *mdsc =
+ &ceph_client(ci->vfs_inode.i_sb)->mdsc;
+ struct ceph_snap_realm *realm = ci->i_snap_realm;
+
+ dout(" dropping residual ref to snap realm %p\n", realm);
+ spin_lock(&realm->inodes_with_caps_lock);
+ list_del_init(&ci->i_snap_realm_item);
+ spin_unlock(&realm->inodes_with_caps_lock);
+ ceph_put_snap_realm(mdsc, realm);
+ }
+
kfree(ci->i_symlink);
while ((n = rb_first(&ci->i_fragtree)) != NULL) {
frag = rb_entry(n, struct ceph_inode_frag, node);
}
__ceph_destroy_xattrs(ci);
- ceph_buffer_put(ci->i_xattrs.blob);
- ceph_buffer_put(ci->i_xattrs.prealloc_blob);
+ if (ci->i_xattrs.blob)
+ ceph_buffer_put(ci->i_xattrs.blob);
+ if (ci->i_xattrs.prealloc_blob)
+ ceph_buffer_put(ci->i_xattrs.prealloc_blob);
kmem_cache_free(ceph_inode_cachep, ci);
}
dout("truncate_seq %u -> %u\n",
ci->i_truncate_seq, truncate_seq);
ci->i_truncate_seq = truncate_seq;
- if (issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_RD|
+ /*
+ * If we hold relevant caps, or in the case where we're
+ * not the only client referencing this file and we
+ * don't hold those caps, then we need to check whether
+ * the file is either opened or mmaped
+ */
+ if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_RD|
CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER|
- CEPH_CAP_FILE_EXCL)) {
+ CEPH_CAP_FILE_EXCL)) ||
+ mapping_mapped(inode->i_mapping) ||
+ __ceph_caps_file_wanted(ci)) {
ci->i_truncate_pending++;
queue_trunc = 1;
}
* bytes are the xattr count).
*/
if (iinfo->xattr_len > 4) {
- xattr_blob = ceph_buffer_new_alloc(iinfo->xattr_len, GFP_NOFS);
+ xattr_blob = ceph_buffer_new(iinfo->xattr_len, GFP_NOFS);
if (!xattr_blob)
pr_err("fill_inode ENOMEM xattr blob %d bytes\n",
iinfo->xattr_len);
/* queue truncate if we saw i_size decrease */
if (queue_trunc)
- if (queue_work(ceph_client(inode->i_sb)->trunc_wq,
- &ci->i_vmtruncate_work))
- igrab(inode);
+ ceph_queue_vmtruncate(inode);
/* populate frag tree */
/* FIXME: move me up, if/when version reflects fragtree changes */
__ceph_get_fmode(ci, cap_fmode);
spin_unlock(&inode->i_lock);
}
+ } else if (cap_fmode >= 0) {
+ pr_warning("mds issued no caps on %llx.%llx\n",
+ ceph_vinop(inode));
+ __ceph_get_fmode(ci, cap_fmode);
}
/* update delegation info? */
err = 0;
out:
- ceph_buffer_put(xattr_blob);
+ if (xattr_blob)
+ ceph_buffer_put(xattr_blob);
return err;
}
}
/*
+ * Set dentry's directory position based on the current dir's max, and
+ * order it in d_subdirs, so that dcache_readdir behaves.
+ */
+static void ceph_set_dentry_offset(struct dentry *dn)
+{
+ struct dentry *dir = dn->d_parent;
+ struct inode *inode = dn->d_parent->d_inode;
+ struct ceph_dentry_info *di;
+
+ BUG_ON(!inode);
+
+ di = ceph_dentry(dn);
+
+ spin_lock(&inode->i_lock);
+ di->offset = ceph_inode(inode)->i_max_offset++;
+ spin_unlock(&inode->i_lock);
+
+ spin_lock(&dcache_lock);
+ spin_lock(&dn->d_lock);
+ list_move_tail(&dir->d_subdirs, &dn->d_u.d_child);
+ dout("set_dentry_offset %p %lld (%p %p)\n", dn, di->offset,
+ dn->d_u.d_child.prev, dn->d_u.d_child.next);
+ spin_unlock(&dn->d_lock);
+ spin_unlock(&dcache_lock);
+}
+
+/*
* Incorporate results into the local cache. This is either just
* one inode, or a directory, dentry, and possibly linked-to inode (e.g.,
* after a lookup).
struct inode *in = NULL;
struct ceph_mds_reply_inode *ininfo;
struct ceph_vino vino;
+ struct ceph_client *client = ceph_sb_to_client(sb);
int i = 0;
int err = 0;
}
if (rinfo->head->is_dentry) {
+ struct inode *dir = req->r_locked_dir;
+
+ err = fill_inode(dir, &rinfo->diri, rinfo->dirfrag,
+ session, req->r_request_started, -1,
+ &req->r_caps_reservation);
+ if (err < 0)
+ return err;
+ }
+
+ /*
+ * ignore null lease/binding on snapdir ENOENT, or else we
+ * will have trouble splicing in the virtual snapdir later
+ */
+ if (rinfo->head->is_dentry && !req->r_aborted &&
+ (rinfo->head->is_target || strncmp(req->r_dentry->d_name.name,
+ client->mount_args->snapdir_name,
+ req->r_dentry->d_name.len))) {
/*
* lookup link rename : null -> possibly existing inode
* mknod symlink mkdir : null -> new inode
BUG_ON(ceph_snap(dir) !=
le64_to_cpu(rinfo->diri.in->snapid));
- err = fill_inode(dir, &rinfo->diri, rinfo->dirfrag,
- session, req->r_request_started, -1,
- &req->r_caps_reservation);
- if (err < 0)
- return err;
-
/* do we have a lease on the whole dir? */
have_dir_cap =
(le32_to_cpu(rinfo->diri.in->cap.caps) &
dn, dn->d_name.len, dn->d_name.name);
dout("fill_trace doing d_move %p -> %p\n",
req->r_old_dentry, dn);
+
+ /* d_move screws up d_subdirs order */
+ ceph_i_clear(dir, CEPH_I_COMPLETE);
+
d_move(req->r_old_dentry, dn);
dout(" src %p '%.*s' dst %p '%.*s'\n",
req->r_old_dentry,
req->r_old_dentry->d_name.len,
req->r_old_dentry->d_name.name,
dn, dn->d_name.len, dn->d_name.name);
+ /* ensure target dentry is invalidated, despite
+ rehashing bug in vfs_rename_dir */
+ dn->d_time = jiffies;
+ ceph_dentry(dn)->lease_shared_gen = 0;
/* take overwritten dentry's readdir offset */
ceph_dentry(req->r_old_dentry)->offset =
ceph_dentry(dn)->offset;
goto done;
}
req->r_dentry = dn; /* may have spliced */
+ ceph_set_dentry_offset(dn);
igrab(in);
} else if (ceph_ino(in) == vino.ino &&
ceph_snap(in) == vino.snap) {
err = PTR_ERR(dn);
goto done;
}
+ ceph_set_dentry_offset(dn);
req->r_dentry = dn; /* may have spliced */
igrab(in);
rinfo->head->is_dentry = 1; /* fool notrace handlers */
* Write back inode data in a worker thread. (This can't be done
* in the message handler context.)
*/
-void ceph_inode_writeback(struct work_struct *work)
+void ceph_queue_writeback(struct inode *inode)
+{
+ if (queue_work(ceph_inode_to_client(inode)->wb_wq,
+ &ceph_inode(inode)->i_wb_work)) {
+ dout("ceph_queue_writeback %p\n", inode);
+ igrab(inode);
+ } else {
+ dout("ceph_queue_writeback %p failed\n", inode);
+ }
+}
+
+static void ceph_writeback_work(struct work_struct *work)
{
struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
i_wb_work);
}
/*
+ * queue an async invalidation
+ */
+void ceph_queue_invalidate(struct inode *inode)
+{
+ if (queue_work(ceph_inode_to_client(inode)->pg_inv_wq,
+ &ceph_inode(inode)->i_pg_inv_work)) {
+ dout("ceph_queue_invalidate %p\n", inode);
+ igrab(inode);
+ } else {
+ dout("ceph_queue_invalidate %p failed\n", inode);
+ }
+}
+
+/*
+ * invalidate any pages that are not dirty or under writeback. this
+ * includes pages that are clean and mapped.
+ */
+static void ceph_invalidate_nondirty_pages(struct address_space *mapping)
+{
+ struct pagevec pvec;
+ pgoff_t next = 0;
+ int i;
+
+ pagevec_init(&pvec, 0);
+ while (pagevec_lookup(&pvec, mapping, next, PAGEVEC_SIZE)) {
+ for (i = 0; i < pagevec_count(&pvec); i++) {
+ struct page *page = pvec.pages[i];
+ pgoff_t index;
+ int skip_page =
+ (PageDirty(page) || PageWriteback(page));
+
+ if (!skip_page)
+ skip_page = !trylock_page(page);
+
+ /*
+ * We really shouldn't be looking at the ->index of an
+ * unlocked page. But we're not allowed to lock these
+ * pages. So we rely upon nobody altering the ->index
+ * of this (pinned-by-us) page.
+ */
+ index = page->index;
+ if (index > next)
+ next = index;
+ next++;
+
+ if (skip_page)
+ continue;
+
+ generic_error_remove_page(mapping, page);
+ unlock_page(page);
+ }
+ pagevec_release(&pvec);
+ cond_resched();
+ }
+}
+
+/*
* Invalidate inode pages in a worker thread. (This can't be done
* in the message handler context.)
*/
-static void ceph_inode_invalidate_pages(struct work_struct *work)
+static void ceph_invalidate_work(struct work_struct *work)
{
struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
i_pg_inv_work);
orig_gen = ci->i_rdcache_gen;
spin_unlock(&inode->i_lock);
- truncate_inode_pages(&inode->i_data, 0);
+ ceph_invalidate_nondirty_pages(inode->i_mapping);
spin_lock(&inode->i_lock);
if (orig_gen == ci->i_rdcache_gen) {
*
* We also truncate in a separate thread as well.
*/
-void ceph_vmtruncate_work(struct work_struct *work)
+static void ceph_vmtruncate_work(struct work_struct *work)
{
struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
i_vmtruncate_work);
}
/*
+ * Queue an async vmtruncate. If we fail to queue work, we will handle
+ * the truncation the next time we call __ceph_do_pending_vmtruncate.
+ */
+void ceph_queue_vmtruncate(struct inode *inode)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+
+ if (queue_work(ceph_client(inode->i_sb)->trunc_wq,
+ &ci->i_vmtruncate_work)) {
+ dout("ceph_queue_vmtruncate %p\n", inode);
+ igrab(inode);
+ } else {
+ dout("ceph_queue_vmtruncate %p failed, pending=%d\n",
+ inode, ci->i_truncate_pending);
+ }
+}
+
+/*
* called with i_mutex held.
*
* Make sure any pending truncation is applied before doing anything
int release = 0, dirtied = 0;
int mask = 0;
int err = 0;
- int queue_trunc = 0;
if (ceph_snap(inode) != CEPH_NOSNAP)
return -EROFS;
if ((issued & CEPH_CAP_FILE_EXCL) &&
attr->ia_size > inode->i_size) {
inode->i_size = attr->ia_size;
- if (attr->ia_size < inode->i_size) {
- ci->i_truncate_size = attr->ia_size;
- ci->i_truncate_pending++;
- queue_trunc = 1;
- }
inode->i_blocks =
(attr->ia_size + (1 << 9) - 1) >> 9;
inode->i_ctime = attr->ia_ctime;
release &= issued;
spin_unlock(&inode->i_lock);
- if (queue_trunc)
- __ceph_do_pending_vmtruncate(inode);
-
if (mask) {
req->r_inode = igrab(inode);
req->r_inode_drop = release;