[PATCH] pipe: introduce ->pin() buffer operation
[safe/jmp/linux-2.6] / fs / splice.c
1 /*
2  * "splice": joining two ropes together by interweaving their strands.
3  *
4  * This is the "extended pipe" functionality, where a pipe is used as
5  * an arbitrary in-memory buffer. Think of a pipe as a small kernel
6  * buffer that you can use to transfer data from one end to the other.
7  *
8  * The traditional unix read/write is extended with a "splice()" operation
9  * that transfers data buffers to or from a pipe buffer.
10  *
11  * Named by Larry McVoy, original implementation from Linus, extended by
12  * Jens to support splicing to files, network, direct splicing, etc and
13  * fixing lots of bugs.
14  *
15  * Copyright (C) 2005-2006 Jens Axboe <axboe@suse.de>
16  * Copyright (C) 2005-2006 Linus Torvalds <torvalds@osdl.org>
17  * Copyright (C) 2006 Ingo Molnar <mingo@elte.hu>
18  *
19  */
20 #include <linux/fs.h>
21 #include <linux/file.h>
22 #include <linux/pagemap.h>
23 #include <linux/pipe_fs_i.h>
24 #include <linux/mm_inline.h>
25 #include <linux/swap.h>
26 #include <linux/writeback.h>
27 #include <linux/buffer_head.h>
28 #include <linux/module.h>
29 #include <linux/syscalls.h>
30 #include <linux/uio.h>
31
32 struct partial_page {
33         unsigned int offset;
34         unsigned int len;
35 };
36
37 /*
38  * Passed to splice_to_pipe
39  */
40 struct splice_pipe_desc {
41         struct page **pages;            /* page map */
42         struct partial_page *partial;   /* pages[] may not be contig */
43         int nr_pages;                   /* number of pages in map */
44         unsigned int flags;             /* splice flags */
45         struct pipe_buf_operations *ops;/* ops associated with output pipe */
46 };
47
48 /*
49  * Attempt to steal a page from a pipe buffer. This should perhaps go into
50  * a vm helper function, it's already simplified quite a bit by the
51  * addition of remove_mapping(). If success is returned, the caller may
52  * attempt to reuse this page for another destination.
53  */
54 static int page_cache_pipe_buf_steal(struct pipe_inode_info *info,
55                                      struct pipe_buffer *buf)
56 {
57         struct page *page = buf->page;
58         struct address_space *mapping = page_mapping(page);
59
60         lock_page(page);
61
62         WARN_ON(!PageUptodate(page));
63
64         /*
65          * At least for ext2 with nobh option, we need to wait on writeback
66          * completing on this page, since we'll remove it from the pagecache.
67          * Otherwise truncate wont wait on the page, allowing the disk
68          * blocks to be reused by someone else before we actually wrote our
69          * data to them. fs corruption ensues.
70          */
71         wait_on_page_writeback(page);
72
73         if (PagePrivate(page))
74                 try_to_release_page(page, mapping_gfp_mask(mapping));
75
76         if (!remove_mapping(mapping, page)) {
77                 unlock_page(page);
78                 return 1;
79         }
80
81         buf->flags |= PIPE_BUF_FLAG_LRU;
82         return 0;
83 }
84
85 static void page_cache_pipe_buf_release(struct pipe_inode_info *info,
86                                         struct pipe_buffer *buf)
87 {
88         page_cache_release(buf->page);
89         buf->page = NULL;
90         buf->flags &= ~PIPE_BUF_FLAG_LRU;
91 }
92
93 static int page_cache_pipe_buf_pin(struct pipe_inode_info *info,
94                                    struct pipe_buffer *buf)
95 {
96         struct page *page = buf->page;
97         int err;
98
99         if (!PageUptodate(page)) {
100                 lock_page(page);
101
102                 /*
103                  * Page got truncated/unhashed. This will cause a 0-byte
104                  * splice, if this is the first page.
105                  */
106                 if (!page->mapping) {
107                         err = -ENODATA;
108                         goto error;
109                 }
110
111                 /*
112                  * Uh oh, read-error from disk.
113                  */
114                 if (!PageUptodate(page)) {
115                         err = -EIO;
116                         goto error;
117                 }
118
119                 /*
120                  * Page is ok afterall, we are done.
121                  */
122                 unlock_page(page);
123         }
124
125         return 0;
126 error:
127         unlock_page(page);
128         return err;
129 }
130
131 static struct pipe_buf_operations page_cache_pipe_buf_ops = {
132         .can_merge = 0,
133         .map = generic_pipe_buf_map,
134         .unmap = generic_pipe_buf_unmap,
135         .pin = page_cache_pipe_buf_pin,
136         .release = page_cache_pipe_buf_release,
137         .steal = page_cache_pipe_buf_steal,
138         .get = generic_pipe_buf_get,
139 };
140
141 static int user_page_pipe_buf_steal(struct pipe_inode_info *pipe,
142                                     struct pipe_buffer *buf)
143 {
144         return 1;
145 }
146
147 static struct pipe_buf_operations user_page_pipe_buf_ops = {
148         .can_merge = 0,
149         .map = generic_pipe_buf_map,
150         .unmap = generic_pipe_buf_unmap,
151         .pin = generic_pipe_buf_pin,
152         .release = page_cache_pipe_buf_release,
153         .steal = user_page_pipe_buf_steal,
154         .get = generic_pipe_buf_get,
155 };
156
157 /*
158  * Pipe output worker. This sets up our pipe format with the page cache
159  * pipe buffer operations. Otherwise very similar to the regular pipe_writev().
160  */
161 static ssize_t splice_to_pipe(struct pipe_inode_info *pipe,
162                               struct splice_pipe_desc *spd)
163 {
164         int ret, do_wakeup, page_nr;
165
166         ret = 0;
167         do_wakeup = 0;
168         page_nr = 0;
169
170         if (pipe->inode)
171                 mutex_lock(&pipe->inode->i_mutex);
172
173         for (;;) {
174                 if (!pipe->readers) {
175                         send_sig(SIGPIPE, current, 0);
176                         if (!ret)
177                                 ret = -EPIPE;
178                         break;
179                 }
180
181                 if (pipe->nrbufs < PIPE_BUFFERS) {
182                         int newbuf = (pipe->curbuf + pipe->nrbufs) & (PIPE_BUFFERS - 1);
183                         struct pipe_buffer *buf = pipe->bufs + newbuf;
184
185                         buf->page = spd->pages[page_nr];
186                         buf->offset = spd->partial[page_nr].offset;
187                         buf->len = spd->partial[page_nr].len;
188                         buf->ops = spd->ops;
189                         pipe->nrbufs++;
190                         page_nr++;
191                         ret += buf->len;
192
193                         if (pipe->inode)
194                                 do_wakeup = 1;
195
196                         if (!--spd->nr_pages)
197                                 break;
198                         if (pipe->nrbufs < PIPE_BUFFERS)
199                                 continue;
200
201                         break;
202                 }
203
204                 if (spd->flags & SPLICE_F_NONBLOCK) {
205                         if (!ret)
206                                 ret = -EAGAIN;
207                         break;
208                 }
209
210                 if (signal_pending(current)) {
211                         if (!ret)
212                                 ret = -ERESTARTSYS;
213                         break;
214                 }
215
216                 if (do_wakeup) {
217                         smp_mb();
218                         if (waitqueue_active(&pipe->wait))
219                                 wake_up_interruptible_sync(&pipe->wait);
220                         kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
221                         do_wakeup = 0;
222                 }
223
224                 pipe->waiting_writers++;
225                 pipe_wait(pipe);
226                 pipe->waiting_writers--;
227         }
228
229         if (pipe->inode)
230                 mutex_unlock(&pipe->inode->i_mutex);
231
232         if (do_wakeup) {
233                 smp_mb();
234                 if (waitqueue_active(&pipe->wait))
235                         wake_up_interruptible(&pipe->wait);
236                 kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
237         }
238
239         while (page_nr < spd->nr_pages)
240                 page_cache_release(spd->pages[page_nr++]);
241
242         return ret;
243 }
244
245 static int
246 __generic_file_splice_read(struct file *in, loff_t *ppos,
247                            struct pipe_inode_info *pipe, size_t len,
248                            unsigned int flags)
249 {
250         struct address_space *mapping = in->f_mapping;
251         unsigned int loff, nr_pages;
252         struct page *pages[PIPE_BUFFERS];
253         struct partial_page partial[PIPE_BUFFERS];
254         struct page *page;
255         pgoff_t index, end_index;
256         loff_t isize;
257         size_t total_len;
258         int error, page_nr;
259         struct splice_pipe_desc spd = {
260                 .pages = pages,
261                 .partial = partial,
262                 .flags = flags,
263                 .ops = &page_cache_pipe_buf_ops,
264         };
265
266         index = *ppos >> PAGE_CACHE_SHIFT;
267         loff = *ppos & ~PAGE_CACHE_MASK;
268         nr_pages = (len + loff + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
269
270         if (nr_pages > PIPE_BUFFERS)
271                 nr_pages = PIPE_BUFFERS;
272
273         /*
274          * Initiate read-ahead on this page range. however, don't call into
275          * read-ahead if this is a non-zero offset (we are likely doing small
276          * chunk splice and the page is already there) for a single page.
277          */
278         if (!loff || nr_pages > 1)
279                 page_cache_readahead(mapping, &in->f_ra, in, index, nr_pages);
280
281         /*
282          * Now fill in the holes:
283          */
284         error = 0;
285         total_len = 0;
286
287         /*
288          * Lookup the (hopefully) full range of pages we need.
289          */
290         spd.nr_pages = find_get_pages_contig(mapping, index, nr_pages, pages);
291
292         /*
293          * If find_get_pages_contig() returned fewer pages than we needed,
294          * allocate the rest.
295          */
296         index += spd.nr_pages;
297         while (spd.nr_pages < nr_pages) {
298                 /*
299                  * Page could be there, find_get_pages_contig() breaks on
300                  * the first hole.
301                  */
302                 page = find_get_page(mapping, index);
303                 if (!page) {
304                         /*
305                          * page didn't exist, allocate one.
306                          */
307                         page = page_cache_alloc_cold(mapping);
308                         if (!page)
309                                 break;
310
311                         error = add_to_page_cache_lru(page, mapping, index,
312                                               mapping_gfp_mask(mapping));
313                         if (unlikely(error)) {
314                                 page_cache_release(page);
315                                 break;
316                         }
317                         /*
318                          * add_to_page_cache() locks the page, unlock it
319                          * to avoid convoluting the logic below even more.
320                          */
321                         unlock_page(page);
322                 }
323
324                 pages[spd.nr_pages++] = page;
325                 index++;
326         }
327
328         /*
329          * Now loop over the map and see if we need to start IO on any
330          * pages, fill in the partial map, etc.
331          */
332         index = *ppos >> PAGE_CACHE_SHIFT;
333         nr_pages = spd.nr_pages;
334         spd.nr_pages = 0;
335         for (page_nr = 0; page_nr < nr_pages; page_nr++) {
336                 unsigned int this_len;
337
338                 if (!len)
339                         break;
340
341                 /*
342                  * this_len is the max we'll use from this page
343                  */
344                 this_len = min_t(unsigned long, len, PAGE_CACHE_SIZE - loff);
345                 page = pages[page_nr];
346
347                 /*
348                  * If the page isn't uptodate, we may need to start io on it
349                  */
350                 if (!PageUptodate(page)) {
351                         /*
352                          * If in nonblock mode then dont block on waiting
353                          * for an in-flight io page
354                          */
355                         if (flags & SPLICE_F_NONBLOCK)
356                                 break;
357
358                         lock_page(page);
359
360                         /*
361                          * page was truncated, stop here. if this isn't the
362                          * first page, we'll just complete what we already
363                          * added
364                          */
365                         if (!page->mapping) {
366                                 unlock_page(page);
367                                 break;
368                         }
369                         /*
370                          * page was already under io and is now done, great
371                          */
372                         if (PageUptodate(page)) {
373                                 unlock_page(page);
374                                 goto fill_it;
375                         }
376
377                         /*
378                          * need to read in the page
379                          */
380                         error = mapping->a_ops->readpage(in, page);
381                         if (unlikely(error)) {
382                                 /*
383                                  * We really should re-lookup the page here,
384                                  * but it complicates things a lot. Instead
385                                  * lets just do what we already stored, and
386                                  * we'll get it the next time we are called.
387                                  */
388                                 if (error == AOP_TRUNCATED_PAGE)
389                                         error = 0;
390
391                                 break;
392                         }
393
394                         /*
395                          * i_size must be checked after ->readpage().
396                          */
397                         isize = i_size_read(mapping->host);
398                         end_index = (isize - 1) >> PAGE_CACHE_SHIFT;
399                         if (unlikely(!isize || index > end_index))
400                                 break;
401
402                         /*
403                          * if this is the last page, see if we need to shrink
404                          * the length and stop
405                          */
406                         if (end_index == index) {
407                                 loff = PAGE_CACHE_SIZE - (isize & ~PAGE_CACHE_MASK);
408                                 if (total_len + loff > isize)
409                                         break;
410                                 /*
411                                  * force quit after adding this page
412                                  */
413                                 len = this_len;
414                                 this_len = min(this_len, loff);
415                                 loff = 0;
416                         }
417                 }
418 fill_it:
419                 partial[page_nr].offset = loff;
420                 partial[page_nr].len = this_len;
421                 len -= this_len;
422                 total_len += this_len;
423                 loff = 0;
424                 spd.nr_pages++;
425                 index++;
426         }
427
428         /*
429          * Release any pages at the end, if we quit early. 'i' is how far
430          * we got, 'nr_pages' is how many pages are in the map.
431          */
432         while (page_nr < nr_pages)
433                 page_cache_release(pages[page_nr++]);
434
435         if (spd.nr_pages)
436                 return splice_to_pipe(pipe, &spd);
437
438         return error;
439 }
440
441 /**
442  * generic_file_splice_read - splice data from file to a pipe
443  * @in:         file to splice from
444  * @pipe:       pipe to splice to
445  * @len:        number of bytes to splice
446  * @flags:      splice modifier flags
447  *
448  * Will read pages from given file and fill them into a pipe.
449  */
450 ssize_t generic_file_splice_read(struct file *in, loff_t *ppos,
451                                  struct pipe_inode_info *pipe, size_t len,
452                                  unsigned int flags)
453 {
454         ssize_t spliced;
455         int ret;
456
457         ret = 0;
458         spliced = 0;
459
460         while (len) {
461                 ret = __generic_file_splice_read(in, ppos, pipe, len, flags);
462
463                 if (ret < 0)
464                         break;
465                 else if (!ret) {
466                         if (spliced)
467                                 break;
468                         if (flags & SPLICE_F_NONBLOCK) {
469                                 ret = -EAGAIN;
470                                 break;
471                         }
472                 }
473
474                 *ppos += ret;
475                 len -= ret;
476                 spliced += ret;
477         }
478
479         if (spliced)
480                 return spliced;
481
482         return ret;
483 }
484
485 EXPORT_SYMBOL(generic_file_splice_read);
486
487 /*
488  * Send 'sd->len' bytes to socket from 'sd->file' at position 'sd->pos'
489  * using sendpage(). Return the number of bytes sent.
490  */
491 static int pipe_to_sendpage(struct pipe_inode_info *info,
492                             struct pipe_buffer *buf, struct splice_desc *sd)
493 {
494         struct file *file = sd->file;
495         loff_t pos = sd->pos;
496         int ret, more;
497
498         ret = buf->ops->pin(info, buf);
499         if (!ret) {
500                 more = (sd->flags & SPLICE_F_MORE) || sd->len < sd->total_len;
501
502                 ret = file->f_op->sendpage(file, buf->page, buf->offset,
503                                            sd->len, &pos, more);
504         }
505
506         return ret;
507 }
508
509 /*
510  * This is a little more tricky than the file -> pipe splicing. There are
511  * basically three cases:
512  *
513  *      - Destination page already exists in the address space and there
514  *        are users of it. For that case we have no other option that
515  *        copying the data. Tough luck.
516  *      - Destination page already exists in the address space, but there
517  *        are no users of it. Make sure it's uptodate, then drop it. Fall
518  *        through to last case.
519  *      - Destination page does not exist, we can add the pipe page to
520  *        the page cache and avoid the copy.
521  *
522  * If asked to move pages to the output file (SPLICE_F_MOVE is set in
523  * sd->flags), we attempt to migrate pages from the pipe to the output
524  * file address space page cache. This is possible if no one else has
525  * the pipe page referenced outside of the pipe and page cache. If
526  * SPLICE_F_MOVE isn't set, or we cannot move the page, we simply create
527  * a new page in the output file page cache and fill/dirty that.
528  */
529 static int pipe_to_file(struct pipe_inode_info *info, struct pipe_buffer *buf,
530                         struct splice_desc *sd)
531 {
532         struct file *file = sd->file;
533         struct address_space *mapping = file->f_mapping;
534         gfp_t gfp_mask = mapping_gfp_mask(mapping);
535         unsigned int offset, this_len;
536         struct page *page;
537         pgoff_t index;
538         int ret;
539
540         /*
541          * make sure the data in this buffer is uptodate
542          */
543         ret = buf->ops->pin(info, buf);
544         if (unlikely(ret))
545                 return ret;
546
547         index = sd->pos >> PAGE_CACHE_SHIFT;
548         offset = sd->pos & ~PAGE_CACHE_MASK;
549
550         this_len = sd->len;
551         if (this_len + offset > PAGE_CACHE_SIZE)
552                 this_len = PAGE_CACHE_SIZE - offset;
553
554         /*
555          * Reuse buf page, if SPLICE_F_MOVE is set and we are doing a full
556          * page.
557          */
558         if ((sd->flags & SPLICE_F_MOVE) && this_len == PAGE_CACHE_SIZE) {
559                 /*
560                  * If steal succeeds, buf->page is now pruned from the vm
561                  * side (LRU and page cache) and we can reuse it. The page
562                  * will also be looked on successful return.
563                  */
564                 if (buf->ops->steal(info, buf))
565                         goto find_page;
566
567                 page = buf->page;
568                 if (add_to_page_cache(page, mapping, index, gfp_mask)) {
569                         unlock_page(page);
570                         goto find_page;
571                 }
572
573                 page_cache_get(page);
574
575                 if (!(buf->flags & PIPE_BUF_FLAG_LRU))
576                         lru_cache_add(page);
577         } else {
578 find_page:
579                 page = find_lock_page(mapping, index);
580                 if (!page) {
581                         ret = -ENOMEM;
582                         page = page_cache_alloc_cold(mapping);
583                         if (unlikely(!page))
584                                 goto out_nomem;
585
586                         /*
587                          * This will also lock the page
588                          */
589                         ret = add_to_page_cache_lru(page, mapping, index,
590                                                     gfp_mask);
591                         if (unlikely(ret))
592                                 goto out;
593                 }
594
595                 /*
596                  * We get here with the page locked. If the page is also
597                  * uptodate, we don't need to do more. If it isn't, we
598                  * may need to bring it in if we are not going to overwrite
599                  * the full page.
600                  */
601                 if (!PageUptodate(page)) {
602                         if (this_len < PAGE_CACHE_SIZE) {
603                                 ret = mapping->a_ops->readpage(file, page);
604                                 if (unlikely(ret))
605                                         goto out;
606
607                                 lock_page(page);
608
609                                 if (!PageUptodate(page)) {
610                                         /*
611                                          * Page got invalidated, repeat.
612                                          */
613                                         if (!page->mapping) {
614                                                 unlock_page(page);
615                                                 page_cache_release(page);
616                                                 goto find_page;
617                                         }
618                                         ret = -EIO;
619                                         goto out;
620                                 }
621                         } else
622                                 SetPageUptodate(page);
623                 }
624         }
625
626         ret = mapping->a_ops->prepare_write(file, page, offset, offset+this_len);
627         if (ret == AOP_TRUNCATED_PAGE) {
628                 page_cache_release(page);
629                 goto find_page;
630         } else if (ret)
631                 goto out;
632
633         if (buf->page != page) {
634                 /*
635                  * Careful, ->map() uses KM_USER0!
636                  */
637                 char *src = buf->ops->map(info, buf);
638                 char *dst = kmap_atomic(page, KM_USER1);
639
640                 memcpy(dst + offset, src + buf->offset, this_len);
641                 flush_dcache_page(page);
642                 kunmap_atomic(dst, KM_USER1);
643                 buf->ops->unmap(info, buf);
644         }
645
646         ret = mapping->a_ops->commit_write(file, page, offset, offset+this_len);
647         if (!ret) {
648                 /*
649                  * Return the number of bytes written and mark page as
650                  * accessed, we are now done!
651                  */
652                 ret = this_len;
653                 mark_page_accessed(page);
654                 balance_dirty_pages_ratelimited(mapping);
655         } else if (ret == AOP_TRUNCATED_PAGE) {
656                 page_cache_release(page);
657                 goto find_page;
658         }
659 out:
660         page_cache_release(page);
661         unlock_page(page);
662 out_nomem:
663         return ret;
664 }
665
666 /*
667  * Pipe input worker. Most of this logic works like a regular pipe, the
668  * key here is the 'actor' worker passed in that actually moves the data
669  * to the wanted destination. See pipe_to_file/pipe_to_sendpage above.
670  */
671 ssize_t splice_from_pipe(struct pipe_inode_info *pipe, struct file *out,
672                          loff_t *ppos, size_t len, unsigned int flags,
673                          splice_actor *actor)
674 {
675         int ret, do_wakeup, err;
676         struct splice_desc sd;
677
678         ret = 0;
679         do_wakeup = 0;
680
681         sd.total_len = len;
682         sd.flags = flags;
683         sd.file = out;
684         sd.pos = *ppos;
685
686         if (pipe->inode)
687                 mutex_lock(&pipe->inode->i_mutex);
688
689         for (;;) {
690                 if (pipe->nrbufs) {
691                         struct pipe_buffer *buf = pipe->bufs + pipe->curbuf;
692                         struct pipe_buf_operations *ops = buf->ops;
693
694                         sd.len = buf->len;
695                         if (sd.len > sd.total_len)
696                                 sd.len = sd.total_len;
697
698                         err = actor(pipe, buf, &sd);
699                         if (err <= 0) {
700                                 if (!ret && err != -ENODATA)
701                                         ret = err;
702
703                                 break;
704                         }
705
706                         ret += err;
707                         buf->offset += err;
708                         buf->len -= err;
709
710                         sd.len -= err;
711                         sd.pos += err;
712                         sd.total_len -= err;
713                         if (sd.len)
714                                 continue;
715
716                         if (!buf->len) {
717                                 buf->ops = NULL;
718                                 ops->release(pipe, buf);
719                                 pipe->curbuf = (pipe->curbuf + 1) & (PIPE_BUFFERS - 1);
720                                 pipe->nrbufs--;
721                                 if (pipe->inode)
722                                         do_wakeup = 1;
723                         }
724
725                         if (!sd.total_len)
726                                 break;
727                 }
728
729                 if (pipe->nrbufs)
730                         continue;
731                 if (!pipe->writers)
732                         break;
733                 if (!pipe->waiting_writers) {
734                         if (ret)
735                                 break;
736                 }
737
738                 if (flags & SPLICE_F_NONBLOCK) {
739                         if (!ret)
740                                 ret = -EAGAIN;
741                         break;
742                 }
743
744                 if (signal_pending(current)) {
745                         if (!ret)
746                                 ret = -ERESTARTSYS;
747                         break;
748                 }
749
750                 if (do_wakeup) {
751                         smp_mb();
752                         if (waitqueue_active(&pipe->wait))
753                                 wake_up_interruptible_sync(&pipe->wait);
754                         kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
755                         do_wakeup = 0;
756                 }
757
758                 pipe_wait(pipe);
759         }
760
761         if (pipe->inode)
762                 mutex_unlock(&pipe->inode->i_mutex);
763
764         if (do_wakeup) {
765                 smp_mb();
766                 if (waitqueue_active(&pipe->wait))
767                         wake_up_interruptible(&pipe->wait);
768                 kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
769         }
770
771         return ret;
772 }
773
774 /**
775  * generic_file_splice_write - splice data from a pipe to a file
776  * @pipe:       pipe info
777  * @out:        file to write to
778  * @len:        number of bytes to splice
779  * @flags:      splice modifier flags
780  *
781  * Will either move or copy pages (determined by @flags options) from
782  * the given pipe inode to the given file.
783  *
784  */
785 ssize_t
786 generic_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
787                           loff_t *ppos, size_t len, unsigned int flags)
788 {
789         struct address_space *mapping = out->f_mapping;
790         ssize_t ret;
791
792         ret = splice_from_pipe(pipe, out, ppos, len, flags, pipe_to_file);
793         if (ret > 0) {
794                 struct inode *inode = mapping->host;
795
796                 *ppos += ret;
797
798                 /*
799                  * If file or inode is SYNC and we actually wrote some data,
800                  * sync it.
801                  */
802                 if (unlikely((out->f_flags & O_SYNC) || IS_SYNC(inode))) {
803                         int err;
804
805                         mutex_lock(&inode->i_mutex);
806                         err = generic_osync_inode(inode, mapping,
807                                                   OSYNC_METADATA|OSYNC_DATA);
808                         mutex_unlock(&inode->i_mutex);
809
810                         if (err)
811                                 ret = err;
812                 }
813         }
814
815         return ret;
816 }
817
818 EXPORT_SYMBOL(generic_file_splice_write);
819
820 /**
821  * generic_splice_sendpage - splice data from a pipe to a socket
822  * @inode:      pipe inode
823  * @out:        socket to write to
824  * @len:        number of bytes to splice
825  * @flags:      splice modifier flags
826  *
827  * Will send @len bytes from the pipe to a network socket. No data copying
828  * is involved.
829  *
830  */
831 ssize_t generic_splice_sendpage(struct pipe_inode_info *pipe, struct file *out,
832                                 loff_t *ppos, size_t len, unsigned int flags)
833 {
834         return splice_from_pipe(pipe, out, ppos, len, flags, pipe_to_sendpage);
835 }
836
837 EXPORT_SYMBOL(generic_splice_sendpage);
838
839 /*
840  * Attempt to initiate a splice from pipe to file.
841  */
842 static long do_splice_from(struct pipe_inode_info *pipe, struct file *out,
843                            loff_t *ppos, size_t len, unsigned int flags)
844 {
845         int ret;
846
847         if (unlikely(!out->f_op || !out->f_op->splice_write))
848                 return -EINVAL;
849
850         if (unlikely(!(out->f_mode & FMODE_WRITE)))
851                 return -EBADF;
852
853         ret = rw_verify_area(WRITE, out, ppos, len);
854         if (unlikely(ret < 0))
855                 return ret;
856
857         return out->f_op->splice_write(pipe, out, ppos, len, flags);
858 }
859
860 /*
861  * Attempt to initiate a splice from a file to a pipe.
862  */
863 static long do_splice_to(struct file *in, loff_t *ppos,
864                          struct pipe_inode_info *pipe, size_t len,
865                          unsigned int flags)
866 {
867         loff_t isize, left;
868         int ret;
869
870         if (unlikely(!in->f_op || !in->f_op->splice_read))
871                 return -EINVAL;
872
873         if (unlikely(!(in->f_mode & FMODE_READ)))
874                 return -EBADF;
875
876         ret = rw_verify_area(READ, in, ppos, len);
877         if (unlikely(ret < 0))
878                 return ret;
879
880         isize = i_size_read(in->f_mapping->host);
881         if (unlikely(*ppos >= isize))
882                 return 0;
883         
884         left = isize - *ppos;
885         if (unlikely(left < len))
886                 len = left;
887
888         return in->f_op->splice_read(in, ppos, pipe, len, flags);
889 }
890
891 long do_splice_direct(struct file *in, loff_t *ppos, struct file *out,
892                       size_t len, unsigned int flags)
893 {
894         struct pipe_inode_info *pipe;
895         long ret, bytes;
896         loff_t out_off;
897         umode_t i_mode;
898         int i;
899
900         /*
901          * We require the input being a regular file, as we don't want to
902          * randomly drop data for eg socket -> socket splicing. Use the
903          * piped splicing for that!
904          */
905         i_mode = in->f_dentry->d_inode->i_mode;
906         if (unlikely(!S_ISREG(i_mode) && !S_ISBLK(i_mode)))
907                 return -EINVAL;
908
909         /*
910          * neither in nor out is a pipe, setup an internal pipe attached to
911          * 'out' and transfer the wanted data from 'in' to 'out' through that
912          */
913         pipe = current->splice_pipe;
914         if (unlikely(!pipe)) {
915                 pipe = alloc_pipe_info(NULL);
916                 if (!pipe)
917                         return -ENOMEM;
918
919                 /*
920                  * We don't have an immediate reader, but we'll read the stuff
921                  * out of the pipe right after the splice_to_pipe(). So set
922                  * PIPE_READERS appropriately.
923                  */
924                 pipe->readers = 1;
925
926                 current->splice_pipe = pipe;
927         }
928
929         /*
930          * Do the splice.
931          */
932         ret = 0;
933         bytes = 0;
934         out_off = 0;
935
936         while (len) {
937                 size_t read_len, max_read_len;
938
939                 /*
940                  * Do at most PIPE_BUFFERS pages worth of transfer:
941                  */
942                 max_read_len = min(len, (size_t)(PIPE_BUFFERS*PAGE_SIZE));
943
944                 ret = do_splice_to(in, ppos, pipe, max_read_len, flags);
945                 if (unlikely(ret < 0))
946                         goto out_release;
947
948                 read_len = ret;
949
950                 /*
951                  * NOTE: nonblocking mode only applies to the input. We
952                  * must not do the output in nonblocking mode as then we
953                  * could get stuck data in the internal pipe:
954                  */
955                 ret = do_splice_from(pipe, out, &out_off, read_len,
956                                      flags & ~SPLICE_F_NONBLOCK);
957                 if (unlikely(ret < 0))
958                         goto out_release;
959
960                 bytes += ret;
961                 len -= ret;
962
963                 /*
964                  * In nonblocking mode, if we got back a short read then
965                  * that was due to either an IO error or due to the
966                  * pagecache entry not being there. In the IO error case
967                  * the _next_ splice attempt will produce a clean IO error
968                  * return value (not a short read), so in both cases it's
969                  * correct to break out of the loop here:
970                  */
971                 if ((flags & SPLICE_F_NONBLOCK) && (read_len < max_read_len))
972                         break;
973         }
974
975         pipe->nrbufs = pipe->curbuf = 0;
976
977         return bytes;
978
979 out_release:
980         /*
981          * If we did an incomplete transfer we must release
982          * the pipe buffers in question:
983          */
984         for (i = 0; i < PIPE_BUFFERS; i++) {
985                 struct pipe_buffer *buf = pipe->bufs + i;
986
987                 if (buf->ops) {
988                         buf->ops->release(pipe, buf);
989                         buf->ops = NULL;
990                 }
991         }
992         pipe->nrbufs = pipe->curbuf = 0;
993
994         /*
995          * If we transferred some data, return the number of bytes:
996          */
997         if (bytes > 0)
998                 return bytes;
999
1000         return ret;
1001 }
1002
1003 EXPORT_SYMBOL(do_splice_direct);
1004
1005 /*
1006  * Determine where to splice to/from.
1007  */
1008 static long do_splice(struct file *in, loff_t __user *off_in,
1009                       struct file *out, loff_t __user *off_out,
1010                       size_t len, unsigned int flags)
1011 {
1012         struct pipe_inode_info *pipe;
1013         loff_t offset, *off;
1014         long ret;
1015
1016         pipe = in->f_dentry->d_inode->i_pipe;
1017         if (pipe) {
1018                 if (off_in)
1019                         return -ESPIPE;
1020                 if (off_out) {
1021                         if (out->f_op->llseek == no_llseek)
1022                                 return -EINVAL;
1023                         if (copy_from_user(&offset, off_out, sizeof(loff_t)))
1024                                 return -EFAULT;
1025                         off = &offset;
1026                 } else
1027                         off = &out->f_pos;
1028
1029                 ret = do_splice_from(pipe, out, off, len, flags);
1030
1031                 if (off_out && copy_to_user(off_out, off, sizeof(loff_t)))
1032                         ret = -EFAULT;
1033
1034                 return ret;
1035         }
1036
1037         pipe = out->f_dentry->d_inode->i_pipe;
1038         if (pipe) {
1039                 if (off_out)
1040                         return -ESPIPE;
1041                 if (off_in) {
1042                         if (in->f_op->llseek == no_llseek)
1043                                 return -EINVAL;
1044                         if (copy_from_user(&offset, off_in, sizeof(loff_t)))
1045                                 return -EFAULT;
1046                         off = &offset;
1047                 } else
1048                         off = &in->f_pos;
1049
1050                 ret = do_splice_to(in, off, pipe, len, flags);
1051
1052                 if (off_in && copy_to_user(off_in, off, sizeof(loff_t)))
1053                         ret = -EFAULT;
1054
1055                 return ret;
1056         }
1057
1058         return -EINVAL;
1059 }
1060
1061 /*
1062  * Map an iov into an array of pages and offset/length tupples. With the
1063  * partial_page structure, we can map several non-contiguous ranges into
1064  * our ones pages[] map instead of splitting that operation into pieces.
1065  * Could easily be exported as a generic helper for other users, in which
1066  * case one would probably want to add a 'max_nr_pages' parameter as well.
1067  */
1068 static int get_iovec_page_array(const struct iovec __user *iov,
1069                                 unsigned int nr_vecs, struct page **pages,
1070                                 struct partial_page *partial)
1071 {
1072         int buffers = 0, error = 0;
1073
1074         /*
1075          * It's ok to take the mmap_sem for reading, even
1076          * across a "get_user()".
1077          */
1078         down_read(&current->mm->mmap_sem);
1079
1080         while (nr_vecs) {
1081                 unsigned long off, npages;
1082                 void __user *base;
1083                 size_t len;
1084                 int i;
1085
1086                 /*
1087                  * Get user address base and length for this iovec.
1088                  */
1089                 error = get_user(base, &iov->iov_base);
1090                 if (unlikely(error))
1091                         break;
1092                 error = get_user(len, &iov->iov_len);
1093                 if (unlikely(error))
1094                         break;
1095
1096                 /*
1097                  * Sanity check this iovec. 0 read succeeds.
1098                  */
1099                 if (unlikely(!len))
1100                         break;
1101                 error = -EFAULT;
1102                 if (unlikely(!base))
1103                         break;
1104
1105                 /*
1106                  * Get this base offset and number of pages, then map
1107                  * in the user pages.
1108                  */
1109                 off = (unsigned long) base & ~PAGE_MASK;
1110                 npages = (off + len + PAGE_SIZE - 1) >> PAGE_SHIFT;
1111                 if (npages > PIPE_BUFFERS - buffers)
1112                         npages = PIPE_BUFFERS - buffers;
1113
1114                 error = get_user_pages(current, current->mm,
1115                                        (unsigned long) base, npages, 0, 0,
1116                                        &pages[buffers], NULL);
1117
1118                 if (unlikely(error <= 0))
1119                         break;
1120
1121                 /*
1122                  * Fill this contiguous range into the partial page map.
1123                  */
1124                 for (i = 0; i < error; i++) {
1125                         const int plen = min_t(size_t, len, PAGE_SIZE) - off;
1126
1127                         partial[buffers].offset = off;
1128                         partial[buffers].len = plen;
1129
1130                         off = 0;
1131                         len -= plen;
1132                         buffers++;
1133                 }
1134
1135                 /*
1136                  * We didn't complete this iov, stop here since it probably
1137                  * means we have to move some of this into a pipe to
1138                  * be able to continue.
1139                  */
1140                 if (len)
1141                         break;
1142
1143                 /*
1144                  * Don't continue if we mapped fewer pages than we asked for,
1145                  * or if we mapped the max number of pages that we have
1146                  * room for.
1147                  */
1148                 if (error < npages || buffers == PIPE_BUFFERS)
1149                         break;
1150
1151                 nr_vecs--;
1152                 iov++;
1153         }
1154
1155         up_read(&current->mm->mmap_sem);
1156
1157         if (buffers)
1158                 return buffers;
1159
1160         return error;
1161 }
1162
1163 /*
1164  * vmsplice splices a user address range into a pipe. It can be thought of
1165  * as splice-from-memory, where the regular splice is splice-from-file (or
1166  * to file). In both cases the output is a pipe, naturally.
1167  *
1168  * Note that vmsplice only supports splicing _from_ user memory to a pipe,
1169  * not the other way around. Splicing from user memory is a simple operation
1170  * that can be supported without any funky alignment restrictions or nasty
1171  * vm tricks. We simply map in the user memory and fill them into a pipe.
1172  * The reverse isn't quite as easy, though. There are two possible solutions
1173  * for that:
1174  *
1175  *      - memcpy() the data internally, at which point we might as well just
1176  *        do a regular read() on the buffer anyway.
1177  *      - Lots of nasty vm tricks, that are neither fast nor flexible (it
1178  *        has restriction limitations on both ends of the pipe).
1179  *
1180  * Alas, it isn't here.
1181  *
1182  */
1183 static long do_vmsplice(struct file *file, const struct iovec __user *iov,
1184                         unsigned long nr_segs, unsigned int flags)
1185 {
1186         struct pipe_inode_info *pipe = file->f_dentry->d_inode->i_pipe;
1187         struct page *pages[PIPE_BUFFERS];
1188         struct partial_page partial[PIPE_BUFFERS];
1189         struct splice_pipe_desc spd = {
1190                 .pages = pages,
1191                 .partial = partial,
1192                 .flags = flags,
1193                 .ops = &user_page_pipe_buf_ops,
1194         };
1195
1196         if (unlikely(!pipe))
1197                 return -EBADF;
1198         if (unlikely(nr_segs > UIO_MAXIOV))
1199                 return -EINVAL;
1200         else if (unlikely(!nr_segs))
1201                 return 0;
1202
1203         spd.nr_pages = get_iovec_page_array(iov, nr_segs, pages, partial);
1204         if (spd.nr_pages <= 0)
1205                 return spd.nr_pages;
1206
1207         return splice_to_pipe(pipe, &spd);
1208 }
1209
1210 asmlinkage long sys_vmsplice(int fd, const struct iovec __user *iov,
1211                              unsigned long nr_segs, unsigned int flags)
1212 {
1213         struct file *file;
1214         long error;
1215         int fput;
1216
1217         error = -EBADF;
1218         file = fget_light(fd, &fput);
1219         if (file) {
1220                 if (file->f_mode & FMODE_WRITE)
1221                         error = do_vmsplice(file, iov, nr_segs, flags);
1222
1223                 fput_light(file, fput);
1224         }
1225
1226         return error;
1227 }
1228
1229 asmlinkage long sys_splice(int fd_in, loff_t __user *off_in,
1230                            int fd_out, loff_t __user *off_out,
1231                            size_t len, unsigned int flags)
1232 {
1233         long error;
1234         struct file *in, *out;
1235         int fput_in, fput_out;
1236
1237         if (unlikely(!len))
1238                 return 0;
1239
1240         error = -EBADF;
1241         in = fget_light(fd_in, &fput_in);
1242         if (in) {
1243                 if (in->f_mode & FMODE_READ) {
1244                         out = fget_light(fd_out, &fput_out);
1245                         if (out) {
1246                                 if (out->f_mode & FMODE_WRITE)
1247                                         error = do_splice(in, off_in,
1248                                                           out, off_out,
1249                                                           len, flags);
1250                                 fput_light(out, fput_out);
1251                         }
1252                 }
1253
1254                 fput_light(in, fput_in);
1255         }
1256
1257         return error;
1258 }
1259
1260 /*
1261  * Link contents of ipipe to opipe.
1262  */
1263 static int link_pipe(struct pipe_inode_info *ipipe,
1264                      struct pipe_inode_info *opipe,
1265                      size_t len, unsigned int flags)
1266 {
1267         struct pipe_buffer *ibuf, *obuf;
1268         int ret, do_wakeup, i, ipipe_first;
1269
1270         ret = do_wakeup = ipipe_first = 0;
1271
1272         /*
1273          * Potential ABBA deadlock, work around it by ordering lock
1274          * grabbing by inode address. Otherwise two different processes
1275          * could deadlock (one doing tee from A -> B, the other from B -> A).
1276          */
1277         if (ipipe->inode < opipe->inode) {
1278                 ipipe_first = 1;
1279                 mutex_lock(&ipipe->inode->i_mutex);
1280                 mutex_lock(&opipe->inode->i_mutex);
1281         } else {
1282                 mutex_lock(&opipe->inode->i_mutex);
1283                 mutex_lock(&ipipe->inode->i_mutex);
1284         }
1285
1286         for (i = 0;; i++) {
1287                 if (!opipe->readers) {
1288                         send_sig(SIGPIPE, current, 0);
1289                         if (!ret)
1290                                 ret = -EPIPE;
1291                         break;
1292                 }
1293                 if (ipipe->nrbufs - i) {
1294                         ibuf = ipipe->bufs + ((ipipe->curbuf + i) & (PIPE_BUFFERS - 1));
1295
1296                         /*
1297                          * If we have room, fill this buffer
1298                          */
1299                         if (opipe->nrbufs < PIPE_BUFFERS) {
1300                                 int nbuf = (opipe->curbuf + opipe->nrbufs) & (PIPE_BUFFERS - 1);
1301
1302                                 /*
1303                                  * Get a reference to this pipe buffer,
1304                                  * so we can copy the contents over.
1305                                  */
1306                                 ibuf->ops->get(ipipe, ibuf);
1307
1308                                 obuf = opipe->bufs + nbuf;
1309                                 *obuf = *ibuf;
1310
1311                                 if (obuf->len > len)
1312                                         obuf->len = len;
1313
1314                                 opipe->nrbufs++;
1315                                 do_wakeup = 1;
1316                                 ret += obuf->len;
1317                                 len -= obuf->len;
1318
1319                                 if (!len)
1320                                         break;
1321                                 if (opipe->nrbufs < PIPE_BUFFERS)
1322                                         continue;
1323                         }
1324
1325                         /*
1326                          * We have input available, but no output room.
1327                          * If we already copied data, return that. If we
1328                          * need to drop the opipe lock, it must be ordered
1329                          * last to avoid deadlocks.
1330                          */
1331                         if ((flags & SPLICE_F_NONBLOCK) || !ipipe_first) {
1332                                 if (!ret)
1333                                         ret = -EAGAIN;
1334                                 break;
1335                         }
1336                         if (signal_pending(current)) {
1337                                 if (!ret)
1338                                         ret = -ERESTARTSYS;
1339                                 break;
1340                         }
1341                         if (do_wakeup) {
1342                                 smp_mb();
1343                                 if (waitqueue_active(&opipe->wait))
1344                                         wake_up_interruptible(&opipe->wait);
1345                                 kill_fasync(&opipe->fasync_readers, SIGIO, POLL_IN);
1346                                 do_wakeup = 0;
1347                         }
1348
1349                         opipe->waiting_writers++;
1350                         pipe_wait(opipe);
1351                         opipe->waiting_writers--;
1352                         continue;
1353                 }
1354
1355                 /*
1356                  * No input buffers, do the usual checks for available
1357                  * writers and blocking and wait if necessary
1358                  */
1359                 if (!ipipe->writers)
1360                         break;
1361                 if (!ipipe->waiting_writers) {
1362                         if (ret)
1363                                 break;
1364                 }
1365                 /*
1366                  * pipe_wait() drops the ipipe mutex. To avoid deadlocks
1367                  * with another process, we can only safely do that if
1368                  * the ipipe lock is ordered last.
1369                  */
1370                 if ((flags & SPLICE_F_NONBLOCK) || ipipe_first) {
1371                         if (!ret)
1372                                 ret = -EAGAIN;
1373                         break;
1374                 }
1375                 if (signal_pending(current)) {
1376                         if (!ret)
1377                                 ret = -ERESTARTSYS;
1378                         break;
1379                 }
1380
1381                 if (waitqueue_active(&ipipe->wait))
1382                         wake_up_interruptible_sync(&ipipe->wait);
1383                 kill_fasync(&ipipe->fasync_writers, SIGIO, POLL_OUT);
1384
1385                 pipe_wait(ipipe);
1386         }
1387
1388         mutex_unlock(&ipipe->inode->i_mutex);
1389         mutex_unlock(&opipe->inode->i_mutex);
1390
1391         if (do_wakeup) {
1392                 smp_mb();
1393                 if (waitqueue_active(&opipe->wait))
1394                         wake_up_interruptible(&opipe->wait);
1395                 kill_fasync(&opipe->fasync_readers, SIGIO, POLL_IN);
1396         }
1397
1398         return ret;
1399 }
1400
1401 /*
1402  * This is a tee(1) implementation that works on pipes. It doesn't copy
1403  * any data, it simply references the 'in' pages on the 'out' pipe.
1404  * The 'flags' used are the SPLICE_F_* variants, currently the only
1405  * applicable one is SPLICE_F_NONBLOCK.
1406  */
1407 static long do_tee(struct file *in, struct file *out, size_t len,
1408                    unsigned int flags)
1409 {
1410         struct pipe_inode_info *ipipe = in->f_dentry->d_inode->i_pipe;
1411         struct pipe_inode_info *opipe = out->f_dentry->d_inode->i_pipe;
1412
1413         /*
1414          * Link ipipe to the two output pipes, consuming as we go along.
1415          */
1416         if (ipipe && opipe)
1417                 return link_pipe(ipipe, opipe, len, flags);
1418
1419         return -EINVAL;
1420 }
1421
1422 asmlinkage long sys_tee(int fdin, int fdout, size_t len, unsigned int flags)
1423 {
1424         struct file *in;
1425         int error, fput_in;
1426
1427         if (unlikely(!len))
1428                 return 0;
1429
1430         error = -EBADF;
1431         in = fget_light(fdin, &fput_in);
1432         if (in) {
1433                 if (in->f_mode & FMODE_READ) {
1434                         int fput_out;
1435                         struct file *out = fget_light(fdout, &fput_out);
1436
1437                         if (out) {
1438                                 if (out->f_mode & FMODE_WRITE)
1439                                         error = do_tee(in, out, len, flags);
1440                                 fput_light(out, fput_out);
1441                         }
1442                 }
1443                 fput_light(in, fput_in);
1444         }
1445
1446         return error;
1447 }