ext3: symlink must be handled via filesystem specific operation
[safe/jmp/linux-2.6] / fs / ceph / addr.c
index bf53581..aa3cd7c 100644 (file)
@@ -5,6 +5,7 @@
 #include <linux/mm.h>
 #include <linux/pagemap.h>
 #include <linux/writeback.h>   /* generic_writepages */
+#include <linux/slab.h>
 #include <linux/pagevec.h>
 #include <linux/task_io_accounting_ops.h>
 
  * accounting is preserved.
  */
 
+#define CONGESTION_ON_THRESH(congestion_kb) (congestion_kb >> (PAGE_SHIFT-10))
+#define CONGESTION_OFF_THRESH(congestion_kb)                           \
+       (CONGESTION_ON_THRESH(congestion_kb) -                          \
+        (CONGESTION_ON_THRESH(congestion_kb) >> 2))
+
+
 
 /*
  * Dirty a page.  Optimistically adjust accounting, on the assumption
@@ -138,7 +145,7 @@ static int ceph_set_page_dirty(struct page *page)
  */
 static void ceph_invalidatepage(struct page *page, unsigned long offset)
 {
-       struct inode *inode = page->mapping->host;
+       struct inode *inode;
        struct ceph_inode_info *ci;
        struct ceph_snap_context *snapc = (void *)page->private;
 
@@ -147,6 +154,8 @@ static void ceph_invalidatepage(struct page *page, unsigned long offset)
        BUG_ON(!PagePrivate(page));
        BUG_ON(!page->mapping);
 
+       inode = page->mapping->host;
+
        /*
         * We can get non-dirty pages here due to races between
         * set_page_dirty and truncate_complete_page; just spit out a
@@ -377,6 +386,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 {
        struct inode *inode;
        struct ceph_inode_info *ci;
+       struct ceph_client *client;
        struct ceph_osd_client *osdc;
        loff_t page_off = page->index << PAGE_CACHE_SHIFT;
        int len = PAGE_CACHE_SIZE;
@@ -384,6 +394,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        int err = 0;
        struct ceph_snap_context *snapc;
        u64 snap_size = 0;
+       long writeback_stat;
 
        dout("writepage %p idx %lu\n", page, page->index);
 
@@ -393,7 +404,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        }
        inode = page->mapping->host;
        ci = ceph_inode(inode);
-       osdc = &ceph_inode_to_client(inode)->osdc;
+       client = ceph_inode_to_client(inode);
+       osdc = &client->osdc;
 
        /* verify this is a writeable snap context */
        snapc = (void *)page->private;
@@ -420,6 +432,11 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        dout("writepage %p page %p index %lu on %llu~%u\n",
             inode, page, page->index, page_off, len);
 
+       writeback_stat = atomic_long_inc_return(&client->writeback_count);
+       if (writeback_stat >
+           CONGESTION_ON_THRESH(client->mount_args->congestion_kb))
+               set_bdi_congested(&client->backing_dev_info, BLK_RW_ASYNC);
+
        set_page_writeback(page);
        err = ceph_osdc_writepages(osdc, ceph_vino(inode),
                                   &ci->i_layout, snapc,
@@ -448,8 +465,13 @@ out:
 
 static int ceph_writepage(struct page *page, struct writeback_control *wbc)
 {
-       int err = writepage_nounlock(page, wbc);
+       int err;
+       struct inode *inode = page->mapping->host;
+       BUG_ON(!inode);
+       igrab(inode);
+       err = writepage_nounlock(page, wbc);
        unlock_page(page);
+       iput(inode);
        return err;
 }
 
@@ -486,7 +508,6 @@ static void writepages_finish(struct ceph_osd_request *req,
        struct ceph_osd_op *op;
        struct ceph_inode_info *ci = ceph_inode(inode);
        unsigned wrote;
-       loff_t offset = req->r_pages[0]->index << PAGE_CACHE_SHIFT;
        struct page *page;
        int i;
        struct ceph_snap_context *snapc = req->r_snapc;
@@ -494,6 +515,9 @@ static void writepages_finish(struct ceph_osd_request *req,
        struct writeback_control *wbc = req->r_wbc;
        __s32 rc = -EIO;
        u64 bytes = 0;
+       struct ceph_client *client = ceph_inode_to_client(inode);
+       long writeback_stat;
+       unsigned issued = __ceph_caps_issued(ci, NULL);
 
        /* parse reply */
        replyhead = msg->front.iov_base;
@@ -503,9 +527,13 @@ static void writepages_finish(struct ceph_osd_request *req,
        bytes = le64_to_cpu(op->extent.length);
 
        if (rc >= 0) {
-               wrote = (bytes + (offset & ~PAGE_CACHE_MASK) + ~PAGE_CACHE_MASK)
-                       >> PAGE_CACHE_SHIFT;
-               WARN_ON(wrote != req->r_num_pages);
+               /*
+                * Assume we wrote the pages we originally sent.  The
+                * osd might reply with fewer pages if our writeback
+                * raced with a truncation and was adjusted at the osd,
+                * so don't believe the reply.
+                */
+               wrote = req->r_num_pages;
        } else {
                wrote = 0;
                mapping_set_error(mapping, rc);
@@ -519,6 +547,13 @@ static void writepages_finish(struct ceph_osd_request *req,
                BUG_ON(!page);
                WARN_ON(!PageUptodate(page));
 
+               writeback_stat =
+                       atomic_long_dec_return(&client->writeback_count);
+               if (writeback_stat <
+                   CONGESTION_OFF_THRESH(client->mount_args->congestion_kb))
+                       clear_bdi_congested(&client->backing_dev_info,
+                                           BLK_RW_ASYNC);
+
                if (i >= wrote) {
                        dout("inode %p skipping page %p\n", inode, page);
                        wbc->pages_skipped++;
@@ -528,6 +563,16 @@ static void writepages_finish(struct ceph_osd_request *req,
                ceph_put_snap_context(snapc);
                dout("unlocking %d %p\n", i, page);
                end_page_writeback(page);
+
+               /*
+                * We lost the cache cap, need to truncate the page before
+                * it is unlocked, otherwise we'd truncate it later in the
+                * page truncation thread, possibly losing some data that
+                * raced its way in
+                */
+               if ((issued & CEPH_CAP_FILE_CACHE) == 0)
+                       generic_error_remove_page(inode->i_mapping, page);
+
                unlock_page(page);
        }
        dout("%p wrote+cleaned %d pages\n", inode, wrote);
@@ -568,7 +613,7 @@ static int ceph_writepages_start(struct address_space *mapping,
        struct inode *inode = mapping->host;
        struct backing_dev_info *bdi = mapping->backing_dev_info;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_client *client = ceph_inode_to_client(inode);
+       struct ceph_client *client;
        pgoff_t index, start, end;
        int range_whole = 0;
        int should_loop = 1;
@@ -661,6 +706,7 @@ retry:
                u64 offset, len;
                struct ceph_osd_request_head *reqhead;
                struct ceph_osd_op *op;
+               long writeback_stat;
 
                next = 0;
                locked_pages = 0;
@@ -768,6 +814,12 @@ get_more_pages:
                                first = i;
                        dout("%p will write page %p idx %lu\n",
                             inode, page, page->index);
+
+                       writeback_stat = atomic_long_inc_return(&client->writeback_count);
+                       if (writeback_stat > CONGESTION_ON_THRESH(client->mount_args->congestion_kb)) {
+                               set_bdi_congested(&client->backing_dev_info, BLK_RW_ASYNC);
+                       }
+
                        set_page_writeback(page);
                        req->r_pages[locked_pages] = page;
                        locked_pages++;
@@ -868,16 +920,18 @@ static int context_is_writeable_or_written(struct inode *inode,
 /*
  * We are only allowed to write into/dirty the page if the page is
  * clean, or already dirty within the same snap context.
+ *
+ * called with page locked.
+ * return success with page locked,
+ * or any failure (incl -EAGAIN) with page unlocked.
  */
-static int ceph_write_begin(struct file *file, struct address_space *mapping,
-                           loff_t pos, unsigned len, unsigned flags,
-                           struct page **pagep, void **fsdata)
+static int ceph_update_writeable_page(struct file *file,
+                           loff_t pos, unsigned len,
+                           struct page *page)
 {
        struct inode *inode = file->f_dentry->d_inode;
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc;
-       struct page *page;
-       pgoff_t index = pos >> PAGE_CACHE_SHIFT;
        loff_t page_off = pos & PAGE_CACHE_MASK;
        int pos_in_page = pos & ~PAGE_CACHE_MASK;
        int end_in_page = pos_in_page + len;
@@ -885,16 +939,6 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
        struct ceph_snap_context *snapc;
        int r;
 
-       /* get a page*/
-retry:
-       page = grab_cache_page_write_begin(mapping, index, 0);
-       if (!page)
-               return -ENOMEM;
-       *pagep = page;
-
-       dout("write_begin file %p inode %p page %p %d~%d\n", file,
-            inode, page, (int)pos, (int)len);
-
 retry_locked:
        /* writepages currently holds page lock, but if we change that later, */
        wait_on_page_writeback(page);
@@ -921,12 +965,13 @@ retry_locked:
                         */
                        snapc = ceph_get_snap_context((void *)page->private);
                        unlock_page(page);
-                       if (ceph_queue_writeback(inode))
-                               igrab(inode);
-                       wait_event_interruptible(ci->i_cap_wq,
+                       ceph_queue_writeback(inode);
+                       r = wait_event_interruptible(ci->i_cap_wq,
                               context_is_writeable_or_written(inode, snapc));
                        ceph_put_snap_context(snapc);
-                       goto retry;
+                       if (r == -ERESTARTSYS)
+                               return r;
+                       return -EAGAIN;
                }
 
                /* yay, writeable, do it now (without dropping page lock) */
@@ -984,6 +1029,35 @@ fail_nosnap:
 }
 
 /*
+ * We are only allowed to write into/dirty the page if the page is
+ * clean, or already dirty within the same snap context.
+ */
+static int ceph_write_begin(struct file *file, struct address_space *mapping,
+                           loff_t pos, unsigned len, unsigned flags,
+                           struct page **pagep, void **fsdata)
+{
+       struct inode *inode = file->f_dentry->d_inode;
+       struct page *page;
+       pgoff_t index = pos >> PAGE_CACHE_SHIFT;
+       int r;
+
+       do {
+               /* get a page */
+               page = grab_cache_page_write_begin(mapping, index, 0);
+               if (!page)
+                       return -ENOMEM;
+               *pagep = page;
+
+               dout("write_begin file %p inode %p page %p %d~%d\n", file,
+               inode, page, (int)pos, (int)len);
+
+               r = ceph_update_writeable_page(file, pos, len, page);
+       } while (r == -EAGAIN);
+
+       return r;
+}
+
+/*
  * we don't do anything in here that simple_write_end doesn't do
  * except adjust dirty page accounting and drop read lock on
  * mdsc->snap_rwsem.
@@ -993,7 +1067,8 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
                          struct page *page, void *fsdata)
 {
        struct inode *inode = file->f_dentry->d_inode;
-       struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc;
+       struct ceph_client *client = ceph_inode_to_client(inode);
+       struct ceph_mds_client *mdsc = &client->mdsc;
        unsigned from = pos & (PAGE_CACHE_SIZE - 1);
        int check_cap = 0;
 
@@ -1065,8 +1140,6 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
        struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc;
        loff_t off = page->index << PAGE_CACHE_SHIFT;
        loff_t size, len;
-       struct page *locked_page = NULL;
-       void *fsdata = NULL;
        int ret;
 
        size = i_size_read(inode);
@@ -1077,23 +1150,30 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
 
        dout("page_mkwrite %p %llu~%llu page %p idx %lu\n", inode,
             off, len, page, page->index);
-       ret = ceph_write_begin(vma->vm_file, inode->i_mapping, off, len, 0,
-                              &locked_page, &fsdata);
-       WARN_ON(page != locked_page);
-       if (!ret) {
-               /*
-                * doing the following, instead of calling
-                * ceph_write_end. Note that we keep the
-                * page locked
-                */
+
+       lock_page(page);
+
+       ret = VM_FAULT_NOPAGE;
+       if ((off > size) ||
+           (page->mapping != inode->i_mapping))
+               goto out;
+
+       ret = ceph_update_writeable_page(vma->vm_file, off, len, page);
+       if (ret == 0) {
+               /* success.  we'll keep the page locked. */
                set_page_dirty(page);
                up_read(&mdsc->snap_rwsem);
-               page_cache_release(page);
                ret = VM_FAULT_LOCKED;
        } else {
-               ret = VM_FAULT_SIGBUS;
+               if (ret == -ENOMEM)
+                       ret = VM_FAULT_OOM;
+               else
+                       ret = VM_FAULT_SIGBUS;
        }
+out:
        dout("page_mkwrite %p %llu~%llu = %d\n", inode, off, len, ret);
+       if (ret != VM_FAULT_LOCKED)
+               unlock_page(page);
        return ret;
 }