Hi Jeff, these patches implement generic way of handling O_SYNC AIO DIO. They work for all filesystems except for ext4 and xfs. Thus together with your patches, all filesystems should handle O_SYNC AIO DIO correctly. I've tested ext3, btrfs, and xfs (to check that I didn't break anything when the generic code is unused) and things seem to work fine. Will you add these patches to your series please? Thanks. Honza
Jan Kara
2012-Feb-10 16:04 UTC
[Ocfs2-devel] [PATCH 1/4] vfs: Handle O_SYNC AIO DIO in generic code properly
Provide VFS helpers for handling O_SYNC AIO DIO writes. Filesystem wanting to use the helpers has to pass DIO_SYNC_WRITES to __blockdev_direct_IO. Then if they don't use direct IO end_io handler, generic code takes care of everything else. Otherwise their end_io handler is passed struct dio_sync_io_work pointer as 'private' argument and they have to call generic_dio_end_io() to finish their AIO DIO. Generic code then takes care to call generic_write_sync() from a workqueue context when AIO DIO is completed. Since all filesystems using blockdev_direct_IO() need O_SYNC aio dio handling and the generic one is enough for them, make blockdev_direct_IO() pass DIO_SYNC_WRITES flag. Signed-off-by: Jan Kara <jack at suse.cz> --- fs/direct-io.c | 128 ++++++++++++++++++++++++++++++++++++++++++++++++++-- fs/super.c | 2 + include/linux/fs.h | 13 +++++- 3 files changed, 138 insertions(+), 5 deletions(-) diff --git a/fs/direct-io.c b/fs/direct-io.c index 4a588db..79aa531 100644 --- a/fs/direct-io.c +++ b/fs/direct-io.c @@ -38,6 +38,8 @@ #include <linux/atomic.h> #include <linux/prefetch.h> +#include <asm/cmpxchg.h> + /* * How many user pages to map in one call to get_user_pages(). This determines * the size of a structure in the slab cache @@ -112,6 +114,15 @@ struct dio_submit { unsigned tail; /* last valid page + 1 */ }; +/* state needed for final sync and completion of O_SYNC AIO DIO */ +struct dio_sync_io_work { + struct kiocb *iocb; + loff_t offset; + ssize_t len; + int ret; + struct work_struct work; +}; + /* dio_state communicated between submission path and end_io */ struct dio { int flags; /* doesn't change */ @@ -134,6 +145,7 @@ struct dio { /* AIO related stuff */ struct kiocb *iocb; /* kiocb */ ssize_t result; /* IO result */ + struct dio_sync_io_work *sync_work; /* work used for O_SYNC AIO */ /* * pages[] (and any fields placed after it) are not zeroed out at @@ -261,6 +273,45 @@ static inline struct page *dio_get_page(struct dio *dio, } /** + * generic_dio_end_io() - generic dio ->end_io handler + * @iocb: iocb of finishing DIO + * @offset: the byte offset in the file of the completed operation + * @bytes: length of the completed operation + * @work: work to queue for O_SYNC AIO DIO, NULL otherwise + * @ret: error code if IO failed + * @is_async: is this AIO? + * + * This is generic callback to be called when direct IO is finished. It + * handles update of number of outstanding DIOs for an inode, completion + * of async iocb and queueing of work if we need to call fsync() because + * io was O_SYNC. + */ +void generic_dio_end_io(struct kiocb *iocb, loff_t offset, ssize_t bytes, + struct dio_sync_io_work *work, int ret, bool is_async) +{ + struct inode *inode = iocb->ki_filp->f_dentry->d_inode; + + if (!is_async) { + inode_dio_done(inode); + return; + } + + /* + * If we need to sync file, we offload completion to workqueue + */ + if (work) { + work->ret = ret; + work->offset = offset; + work->len = bytes; + queue_work(inode->i_sb->s_dio_flush_wq, &work->work); + } else { + aio_complete(iocb, ret, 0); + inode_dio_done(inode); + } +} +EXPORT_SYMBOL(generic_dio_end_io); + +/** * dio_complete() - called when all DIO BIO I/O has been completed * @offset: the byte offset in the file of the completed operation * @@ -302,12 +353,22 @@ static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is ret = transferred; if (dio->end_io && dio->result) { + void *private; + + if (dio->sync_work) + private = dio->sync_work; + else + private = dio->private; dio->end_io(dio->iocb, offset, transferred, - dio->private, ret, is_async); + private, ret, is_async); } else { - if (is_async) - aio_complete(dio->iocb, ret, 0); - inode_dio_done(dio->inode); + /* No IO submitted? Skip syncing... */ + if (!dio->result && dio->sync_work) { + kfree(dio->sync_work); + dio->sync_work = NULL; + } + generic_dio_end_io(dio->iocb, offset, transferred, + dio->sync_work, ret, is_async); } return ret; @@ -1064,6 +1125,41 @@ static inline int drop_refcount(struct dio *dio) } /* + * Work performed from workqueue when AIO DIO is finished. + */ +static void dio_aio_sync_work(struct work_struct *work) +{ + struct dio_sync_io_work *sync_work + container_of(work, struct dio_sync_io_work, work); + struct kiocb *iocb = sync_work->iocb; + struct inode *inode = iocb->ki_filp->f_path.dentry->d_inode; + int err, ret = sync_work->ret; + + err = generic_write_sync(iocb->ki_filp, sync_work->offset, + sync_work->len); + if (err < 0 && ret > 0) + ret = err; + aio_complete(iocb, ret, 0); + inode_dio_done(inode); +} + +static noinline int dio_create_flush_wq(struct super_block *sb) +{ + struct workqueue_struct *wq + alloc_workqueue("dio-sync", WQ_UNBOUND, 1); + + if (!wq) + return -ENOMEM; + /* + * Atomically put workqueue in place. Release our one in case someone + * else won the race and attached workqueue to superblock. + */ + if (cmpxchg(&sb->s_dio_flush_wq, NULL, wq)) + destroy_workqueue(wq); + return 0; +} + +/* * This is a library function for use by filesystem drivers. * * The locking rules are governed by the flags parameter: @@ -1155,6 +1251,26 @@ do_blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode, memset(dio, 0, offsetof(struct dio, pages)); dio->flags = flags; + if (flags & DIO_SYNC_WRITES && rw & WRITE && + ((iocb->ki_filp->f_flags & O_DSYNC) || IS_SYNC(inode))) { + /* The first O_SYNC AIO DIO for this FS? Create workqueue... */ + if (!inode->i_sb->s_dio_flush_wq) { + retval = dio_create_flush_wq(inode->i_sb); + if (retval) { + kmem_cache_free(dio_cache, dio); + goto out; + } + } + dio->sync_work = kmalloc(sizeof(struct dio_sync_io_work), + GFP_KERNEL); + if (!dio->sync_work) { + retval = -ENOMEM; + kmem_cache_free(dio_cache, dio); + goto out; + } + INIT_WORK(&dio->sync_work->work, dio_aio_sync_work); + dio->sync_work->iocb = iocb; + } if (dio->flags & DIO_LOCKING) { if (rw == READ) { struct address_space *mapping @@ -1167,6 +1283,7 @@ do_blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode, end - 1); if (retval) { mutex_unlock(&inode->i_mutex); + kfree(dio->sync_work); kmem_cache_free(dio_cache, dio); goto out; } @@ -1310,6 +1427,9 @@ do_blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode, if (drop_refcount(dio) == 0) { retval = dio_complete(dio, offset, retval, false); + /* Test for !NULL to save a call for common case */ + if (dio->sync_work) + kfree(dio->sync_work); kmem_cache_free(dio_cache, dio); } else BUG_ON(retval != -EIOCBQUEUED); diff --git a/fs/super.c b/fs/super.c index 6015c02..741784d 100644 --- a/fs/super.c +++ b/fs/super.c @@ -200,6 +200,8 @@ static inline void destroy_super(struct super_block *s) #ifdef CONFIG_SMP free_percpu(s->s_files); #endif + if (s->s_dio_flush_wq) + destroy_workqueue(s->s_dio_flush_wq); security_sb_free(s); WARN_ON(!list_empty(&s->s_mounts)); kfree(s->s_subtype); diff --git a/include/linux/fs.h b/include/linux/fs.h index 386da09..68cd00a 100644 --- a/include/linux/fs.h +++ b/include/linux/fs.h @@ -412,6 +412,7 @@ struct kstatfs; struct vm_area_struct; struct vfsmount; struct cred; +struct workqueue_struct; extern void __init inode_init(void); extern void __init inode_init_early(void); @@ -1496,6 +1497,9 @@ struct super_block { /* Being remounted read-only */ int s_readonly_remount; + + /* Pending fsync calls for completed AIO DIO with O_SYNC */ + struct workqueue_struct *s_dio_flush_wq; }; /* superblock cache pruning functions */ @@ -2428,11 +2432,18 @@ enum { /* filesystem does not support filling holes */ DIO_SKIP_HOLES = 0x02, + + /* need generic handling of O_SYNC aio writes */ + DIO_SYNC_WRITES = 0x04 }; +struct dio_sync_io_work; + void dio_end_io(struct bio *bio, int error); void inode_dio_wait(struct inode *inode); void inode_dio_done(struct inode *inode); +void generic_dio_end_io(struct kiocb *iocb, loff_t offset, ssize_t bytes, + struct dio_sync_io_work *work, int ret, bool is_async); ssize_t __blockdev_direct_IO(int rw, struct kiocb *iocb, struct inode *inode, struct block_device *bdev, const struct iovec *iov, loff_t offset, @@ -2445,7 +2456,7 @@ static inline ssize_t blockdev_direct_IO(int rw, struct kiocb *iocb, { return __blockdev_direct_IO(rw, iocb, inode, inode->i_sb->s_bdev, iov, offset, nr_segs, get_block, NULL, NULL, - DIO_LOCKING | DIO_SKIP_HOLES); + DIO_LOCKING | DIO_SKIP_HOLES | DIO_SYNC_WRITES); } #else static inline void inode_dio_wait(struct inode *inode) -- 1.7.1
Jan Kara
2012-Feb-10 16:04 UTC
[Ocfs2-devel] [PATCH 4/4] btrfs: Use generic handlers of O_SYNC AIO DIO
Use generic handlers to queue fsync() when AIO DIO is completed for O_SYNC file. Although we use our own bio->end_io function, we call dio_end_io() from it and thus, because we don't set any specific dio->end_io function, generic code ends up calling generic_dio_end_io() which is all what we need for proper O_SYNC AIO DIO handling. Signed-off-by: Jan Kara <jack at suse.cz> --- fs/btrfs/inode.c | 2 +- 1 files changed, 1 insertions(+), 1 deletions(-) diff --git a/fs/btrfs/inode.c b/fs/btrfs/inode.c index 32214fe..68add6e 100644 --- a/fs/btrfs/inode.c +++ b/fs/btrfs/inode.c @@ -6221,7 +6221,7 @@ static ssize_t btrfs_direct_IO(int rw, struct kiocb *iocb, ret = __blockdev_direct_IO(rw, iocb, inode, BTRFS_I(inode)->root->fs_info->fs_devices->latest_bdev, iov, offset, nr_segs, btrfs_get_blocks_direct, NULL, - btrfs_submit_direct, 0); + btrfs_submit_direct, DIO_SYNC_WRITES); if (ret < 0 && ret != -EIOCBQUEUED) { clear_extent_bit(&BTRFS_I(inode)->io_tree, offset, -- 1.7.1
Jan Kara
2012-Feb-10 16:04 UTC
[Ocfs2-devel] [PATCH 2/4] ocfs2: Use generic handlers of O_SYNC AIO DIO
Use generic handlers to queue fsync() when AIO DIO is completed for O_SYNC file. Signed-off-by: Jan Kara <jack at suse.cz> --- fs/ocfs2/aops.c | 6 ++---- 1 files changed, 2 insertions(+), 4 deletions(-) diff --git a/fs/ocfs2/aops.c b/fs/ocfs2/aops.c index 78b68af..3d14c2b 100644 --- a/fs/ocfs2/aops.c +++ b/fs/ocfs2/aops.c @@ -593,9 +593,7 @@ static void ocfs2_dio_end_io(struct kiocb *iocb, level = ocfs2_iocb_rw_locked_level(iocb); ocfs2_rw_unlock(inode, level); - if (is_async) - aio_complete(iocb, ret, 0); - inode_dio_done(inode); + generic_dio_end_io(iocb, offset, bytes, private, ret, is_async); } /* @@ -642,7 +640,7 @@ static ssize_t ocfs2_direct_IO(int rw, return __blockdev_direct_IO(rw, iocb, inode, inode->i_sb->s_bdev, iov, offset, nr_segs, ocfs2_direct_IO_get_blocks, - ocfs2_dio_end_io, NULL, 0); + ocfs2_dio_end_io, NULL, DIO_SYNC_WRITES); } static void ocfs2_figure_cluster_boundaries(struct ocfs2_super *osb, -- 1.7.1
Jan Kara
2012-Feb-10 16:04 UTC
[Ocfs2-devel] [PATCH 3/4] gfs2: Use generic handlers of O_SYNC AIO DIO
Use generic handlers to queue fsync() when AIO DIO is completed for O_SYNC file. Signed-off-by: Jan Kara <jack at suse.cz> --- fs/gfs2/aops.c | 2 +- 1 files changed, 1 insertions(+), 1 deletions(-) diff --git a/fs/gfs2/aops.c b/fs/gfs2/aops.c index 501e5cb..9c381ff 100644 --- a/fs/gfs2/aops.c +++ b/fs/gfs2/aops.c @@ -1034,7 +1034,7 @@ static ssize_t gfs2_direct_IO(int rw, struct kiocb *iocb, rv = __blockdev_direct_IO(rw, iocb, inode, inode->i_sb->s_bdev, iov, offset, nr_segs, gfs2_get_block_direct, - NULL, NULL, 0); + NULL, NULL, DIO_SYNC_WRITES); out: gfs2_glock_dq_m(1, &gh); gfs2_holder_uninit(&gh); -- 1.7.1
Jeff Moyer
2012-Feb-10 16:09 UTC
[Ocfs2-devel] [PATCH 0/4] Generic O_SYNC AIO DIO handling
Jan Kara <jack at suse.cz> writes:> Hi Jeff, > > these patches implement generic way of handling O_SYNC AIO DIO. They work > for all filesystems except for ext4 and xfs. Thus together with your patches, > all filesystems should handle O_SYNC AIO DIO correctly. I've tested ext3, > btrfs, and xfs (to check that I didn't break anything when the generic code > is unused) and things seem to work fine. Will you add these patches to your > series please? Thanks.Thanks, Jan! I'll add them in and give them some testing. I should be ready to repost the series early next week. Cheers, Jeff
Steven Whitehouse
2012-Feb-13 09:42 UTC
[Ocfs2-devel] [Cluster-devel] [PATCH 3/4] gfs2: Use generic handlers of O_SYNC AIO DIO
Hi, Acked-by: Steven Whitehouse <swhiteho at redhat.com> That looks ok to me, Steve. On Fri, 2012-02-10 at 17:04 +0100, Jan Kara wrote:> Use generic handlers to queue fsync() when AIO DIO is completed for O_SYNC > file. > > Signed-off-by: Jan Kara <jack at suse.cz> > --- > fs/gfs2/aops.c | 2 +- > 1 files changed, 1 insertions(+), 1 deletions(-) > > diff --git a/fs/gfs2/aops.c b/fs/gfs2/aops.c > index 501e5cb..9c381ff 100644 > --- a/fs/gfs2/aops.c > +++ b/fs/gfs2/aops.c > @@ -1034,7 +1034,7 @@ static ssize_t gfs2_direct_IO(int rw, struct kiocb *iocb, > > rv = __blockdev_direct_IO(rw, iocb, inode, inode->i_sb->s_bdev, iov, > offset, nr_segs, gfs2_get_block_direct, > - NULL, NULL, 0); > + NULL, NULL, DIO_SYNC_WRITES); > out: > gfs2_glock_dq_m(1, &gh); > gfs2_holder_uninit(&gh);