diff -purN 00__v2.6.13-rc3-git8/arch/i386/Kconfig 78_fs_aio_write/arch/i386/Kconfig --- 00__v2.6.13-rc3-git8/arch/i386/Kconfig 2005-07-13 11:25:25.000000000 -0400 +++ 78_fs_aio_write/arch/i386/Kconfig 2005-07-27 16:19:35.000000000 -0400 @@ -14,6 +14,10 @@ config X86 486, 586, Pentiums, and various instruction-set-compatible chips by AMD, Cyrix, and others. +config SEMAPHORE_SLEEPERS + bool + default y + config MMU bool default y diff -purN 00__v2.6.13-rc3-git8/arch/i386/kernel/semaphore.c 78_fs_aio_write/arch/i386/kernel/semaphore.c --- 00__v2.6.13-rc3-git8/arch/i386/kernel/semaphore.c 2005-06-20 13:33:10.000000000 -0400 +++ 78_fs_aio_write/arch/i386/kernel/semaphore.c 2005-07-27 16:20:15.000000000 -0400 @@ -13,171 +13,10 @@ * rw semaphores implemented November 1999 by Benjamin LaHaise */ #include -#include -#include -#include +#include #include /* - * Semaphores are implemented using a two-way counter: - * The "count" variable is decremented for each process - * that tries to acquire the semaphore, while the "sleeping" - * variable is a count of such acquires. - * - * Notably, the inline "up()" and "down()" functions can - * efficiently test if they need to do any extra work (up - * needs to do something only if count was negative before - * the increment operation. - * - * "sleeping" and the contention routine ordering is protected - * by the spinlock in the semaphore's waitqueue head. - * - * Note that these functions are only called when there is - * contention on the lock, and as such all this is the - * "non-critical" part of the whole semaphore business. The - * critical part is the inline stuff in - * where we want to avoid any extra jumps and calls. - */ - -/* - * Logic: - * - only on a boundary condition do we need to care. When we go - * from a negative count to a non-negative, we wake people up. - * - when we go from a non-negative count to a negative do we - * (a) synchronize with the "sleeper" count and (b) make sure - * that we're on the wakeup list before we synchronize so that - * we cannot lose wakeup events. - */ - -static fastcall void __attribute_used__ __up(struct semaphore *sem) -{ - wake_up(&sem->wait); -} - -static fastcall void __attribute_used__ __sched __down(struct semaphore * sem) -{ - struct task_struct *tsk = current; - DECLARE_WAITQUEUE(wait, tsk); - unsigned long flags; - - tsk->state = TASK_UNINTERRUPTIBLE; - spin_lock_irqsave(&sem->wait.lock, flags); - add_wait_queue_exclusive_locked(&sem->wait, &wait); - - sem->sleepers++; - for (;;) { - int sleepers = sem->sleepers; - - /* - * Add "everybody else" into it. They aren't - * playing, because we own the spinlock in - * the wait_queue_head. - */ - if (!atomic_add_negative(sleepers - 1, &sem->count)) { - sem->sleepers = 0; - break; - } - sem->sleepers = 1; /* us - see -1 above */ - spin_unlock_irqrestore(&sem->wait.lock, flags); - - schedule(); - - spin_lock_irqsave(&sem->wait.lock, flags); - tsk->state = TASK_UNINTERRUPTIBLE; - } - remove_wait_queue_locked(&sem->wait, &wait); - wake_up_locked(&sem->wait); - spin_unlock_irqrestore(&sem->wait.lock, flags); - tsk->state = TASK_RUNNING; -} - -static fastcall int __attribute_used__ __sched __down_interruptible(struct semaphore * sem) -{ - int retval = 0; - struct task_struct *tsk = current; - DECLARE_WAITQUEUE(wait, tsk); - unsigned long flags; - - tsk->state = TASK_INTERRUPTIBLE; - spin_lock_irqsave(&sem->wait.lock, flags); - add_wait_queue_exclusive_locked(&sem->wait, &wait); - - sem->sleepers++; - for (;;) { - int sleepers = sem->sleepers; - - /* - * With signals pending, this turns into - * the trylock failure case - we won't be - * sleeping, and we* can't get the lock as - * it has contention. Just correct the count - * and exit. - */ - if (signal_pending(current)) { - retval = -EINTR; - sem->sleepers = 0; - atomic_add(sleepers, &sem->count); - break; - } - - /* - * Add "everybody else" into it. They aren't - * playing, because we own the spinlock in - * wait_queue_head. The "-1" is because we're - * still hoping to get the semaphore. - */ - if (!atomic_add_negative(sleepers - 1, &sem->count)) { - sem->sleepers = 0; - break; - } - sem->sleepers = 1; /* us - see -1 above */ - spin_unlock_irqrestore(&sem->wait.lock, flags); - - schedule(); - - spin_lock_irqsave(&sem->wait.lock, flags); - tsk->state = TASK_INTERRUPTIBLE; - } - remove_wait_queue_locked(&sem->wait, &wait); - wake_up_locked(&sem->wait); - spin_unlock_irqrestore(&sem->wait.lock, flags); - - tsk->state = TASK_RUNNING; - return retval; -} - -/* - * Trylock failed - make sure we correct for - * having decremented the count. - * - * We could have done the trylock with a - * single "cmpxchg" without failure cases, - * but then it wouldn't work on a 386. - */ -static fastcall int __attribute_used__ __down_trylock(struct semaphore * sem) -{ - int sleepers; - unsigned long flags; - - spin_lock_irqsave(&sem->wait.lock, flags); - sleepers = sem->sleepers + 1; - sem->sleepers = 0; - - /* - * Add "everybody else" and us into it. They aren't - * playing, because we own the spinlock in the - * wait_queue_head. - */ - if (!atomic_add_negative(sleepers, &sem->count)) { - wake_up_locked(&sem->wait); - } - - spin_unlock_irqrestore(&sem->wait.lock, flags); - return 1; -} - - -/* * The semaphore operations have a special calling sequence that * allow us to do a simpler in-line version of them. These routines * need to convert that sequence back into the C sequence when @@ -211,6 +50,28 @@ asm( asm( ".section .sched.text\n" ".align 4\n" +".globl __aio_down_failed\n" +"__aio_down_failed:\n\t" +#if defined(CONFIG_FRAME_POINTER) + "pushl %ebp\n\t" + "movl %esp,%ebp\n\t" +#endif + "pushl %edx\n\t" + "pushl %ecx\n\t" + "call __aio_down\n\t" + "popl %ecx\n\t" + "popl %edx\n\t" +#if defined(CONFIG_FRAME_POINTER) + "movl %ebp,%esp\n\t" + "popl %ebp\n\t" +#endif + "ret" +); +EXPORT_SYMBOL(__aio_down_failed); + +asm( +".section .sched.text\n" +".align 4\n" ".globl __down_failed_interruptible\n" "__down_failed_interruptible:\n\t" #if defined(CONFIG_FRAME_POINTER) diff -purN 00__v2.6.13-rc3-git8/arch/um/Kconfig_i386 78_fs_aio_write/arch/um/Kconfig_i386 --- 00__v2.6.13-rc3-git8/arch/um/Kconfig_i386 2005-07-13 11:25:30.000000000 -0400 +++ 78_fs_aio_write/arch/um/Kconfig_i386 2005-07-27 16:19:35.000000000 -0400 @@ -6,6 +6,10 @@ config 64BIT bool default n +config SEMAPHORE_SLEEPERS + bool + default y + config TOP_ADDR hex default 0xc0000000 if !HOST_2G_2G diff -purN 00__v2.6.13-rc3-git8/arch/um/Kconfig_x86_64 78_fs_aio_write/arch/um/Kconfig_x86_64 --- 00__v2.6.13-rc3-git8/arch/um/Kconfig_x86_64 2005-07-13 11:25:30.000000000 -0400 +++ 78_fs_aio_write/arch/um/Kconfig_x86_64 2005-07-27 16:19:35.000000000 -0400 @@ -6,6 +6,10 @@ config 64BIT bool default y +config SEMAPHORE_SLEEPERS + bool + default y + config TOP_ADDR hex default 0x80000000 diff -purN 00__v2.6.13-rc3-git8/arch/x86_64/Kconfig 78_fs_aio_write/arch/x86_64/Kconfig --- 00__v2.6.13-rc3-git8/arch/x86_64/Kconfig 2005-07-13 11:25:30.000000000 -0400 +++ 78_fs_aio_write/arch/x86_64/Kconfig 2005-07-27 16:19:35.000000000 -0400 @@ -24,6 +24,10 @@ config X86 bool default y +config SEMAPHORE_SLEEPERS + bool + default y + config MMU bool default y diff -purN 00__v2.6.13-rc3-git8/arch/x86_64/kernel/Makefile 78_fs_aio_write/arch/x86_64/kernel/Makefile --- 00__v2.6.13-rc3-git8/arch/x86_64/kernel/Makefile 2005-07-13 11:25:30.000000000 -0400 +++ 78_fs_aio_write/arch/x86_64/kernel/Makefile 2005-07-27 16:19:35.000000000 -0400 @@ -4,7 +4,7 @@ extra-y := head.o head64.o init_task.o vmlinux.lds EXTRA_AFLAGS := -traditional -obj-y := process.o semaphore.o signal.o entry.o traps.o irq.o \ +obj-y := process.o signal.o entry.o traps.o irq.o \ ptrace.o time.o ioport.o ldt.o setup.o i8259.o sys_x86_64.o \ x8664_ksyms.o i387.o syscall.o vsyscall.o \ setup64.o bootflag.o e820.o reboot.o quirks.o diff -purN 00__v2.6.13-rc3-git8/arch/x86_64/kernel/semaphore.c 78_fs_aio_write/arch/x86_64/kernel/semaphore.c --- 00__v2.6.13-rc3-git8/arch/x86_64/kernel/semaphore.c 2005-06-20 13:33:15.000000000 -0400 +++ 78_fs_aio_write/arch/x86_64/kernel/semaphore.c 1969-12-31 19:00:00.000000000 -0500 @@ -1,180 +0,0 @@ -/* - * x86_64 semaphore implementation. - * - * (C) Copyright 1999 Linus Torvalds - * - * Portions Copyright 1999 Red Hat, Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version - * 2 of the License, or (at your option) any later version. - * - * rw semaphores implemented November 1999 by Benjamin LaHaise - */ -#include -#include -#include -#include - -#include - -/* - * Semaphores are implemented using a two-way counter: - * The "count" variable is decremented for each process - * that tries to acquire the semaphore, while the "sleeping" - * variable is a count of such acquires. - * - * Notably, the inline "up()" and "down()" functions can - * efficiently test if they need to do any extra work (up - * needs to do something only if count was negative before - * the increment operation. - * - * "sleeping" and the contention routine ordering is protected - * by the spinlock in the semaphore's waitqueue head. - * - * Note that these functions are only called when there is - * contention on the lock, and as such all this is the - * "non-critical" part of the whole semaphore business. The - * critical part is the inline stuff in - * where we want to avoid any extra jumps and calls. - */ - -/* - * Logic: - * - only on a boundary condition do we need to care. When we go - * from a negative count to a non-negative, we wake people up. - * - when we go from a non-negative count to a negative do we - * (a) synchronize with the "sleeper" count and (b) make sure - * that we're on the wakeup list before we synchronize so that - * we cannot lose wakeup events. - */ - -void __up(struct semaphore *sem) -{ - wake_up(&sem->wait); -} - -void __sched __down(struct semaphore * sem) -{ - struct task_struct *tsk = current; - DECLARE_WAITQUEUE(wait, tsk); - unsigned long flags; - - tsk->state = TASK_UNINTERRUPTIBLE; - spin_lock_irqsave(&sem->wait.lock, flags); - add_wait_queue_exclusive_locked(&sem->wait, &wait); - - sem->sleepers++; - for (;;) { - int sleepers = sem->sleepers; - - /* - * Add "everybody else" into it. They aren't - * playing, because we own the spinlock in - * the wait_queue_head. - */ - if (!atomic_add_negative(sleepers - 1, &sem->count)) { - sem->sleepers = 0; - break; - } - sem->sleepers = 1; /* us - see -1 above */ - spin_unlock_irqrestore(&sem->wait.lock, flags); - - schedule(); - - spin_lock_irqsave(&sem->wait.lock, flags); - tsk->state = TASK_UNINTERRUPTIBLE; - } - remove_wait_queue_locked(&sem->wait, &wait); - wake_up_locked(&sem->wait); - spin_unlock_irqrestore(&sem->wait.lock, flags); - tsk->state = TASK_RUNNING; -} - -int __sched __down_interruptible(struct semaphore * sem) -{ - int retval = 0; - struct task_struct *tsk = current; - DECLARE_WAITQUEUE(wait, tsk); - unsigned long flags; - - tsk->state = TASK_INTERRUPTIBLE; - spin_lock_irqsave(&sem->wait.lock, flags); - add_wait_queue_exclusive_locked(&sem->wait, &wait); - - sem->sleepers++; - for (;;) { - int sleepers = sem->sleepers; - - /* - * With signals pending, this turns into - * the trylock failure case - we won't be - * sleeping, and we* can't get the lock as - * it has contention. Just correct the count - * and exit. - */ - if (signal_pending(current)) { - retval = -EINTR; - sem->sleepers = 0; - atomic_add(sleepers, &sem->count); - break; - } - - /* - * Add "everybody else" into it. They aren't - * playing, because we own the spinlock in - * wait_queue_head. The "-1" is because we're - * still hoping to get the semaphore. - */ - if (!atomic_add_negative(sleepers - 1, &sem->count)) { - sem->sleepers = 0; - break; - } - sem->sleepers = 1; /* us - see -1 above */ - spin_unlock_irqrestore(&sem->wait.lock, flags); - - schedule(); - - spin_lock_irqsave(&sem->wait.lock, flags); - tsk->state = TASK_INTERRUPTIBLE; - } - remove_wait_queue_locked(&sem->wait, &wait); - wake_up_locked(&sem->wait); - spin_unlock_irqrestore(&sem->wait.lock, flags); - - tsk->state = TASK_RUNNING; - return retval; -} - -/* - * Trylock failed - make sure we correct for - * having decremented the count. - * - * We could have done the trylock with a - * single "cmpxchg" without failure cases, - * but then it wouldn't work on a 386. - */ -int __down_trylock(struct semaphore * sem) -{ - int sleepers; - unsigned long flags; - - spin_lock_irqsave(&sem->wait.lock, flags); - sleepers = sem->sleepers + 1; - sem->sleepers = 0; - - /* - * Add "everybody else" and us into it. They aren't - * playing, because we own the spinlock in the - * wait_queue_head. - */ - if (!atomic_add_negative(sleepers, &sem->count)) { - wake_up_locked(&sem->wait); - } - - spin_unlock_irqrestore(&sem->wait.lock, flags); - return 1; -} - - diff -purN 00__v2.6.13-rc3-git8/arch/x86_64/lib/thunk.S 78_fs_aio_write/arch/x86_64/lib/thunk.S --- 00__v2.6.13-rc3-git8/arch/x86_64/lib/thunk.S 2004-12-24 16:34:44.000000000 -0500 +++ 78_fs_aio_write/arch/x86_64/lib/thunk.S 2005-07-27 16:20:15.000000000 -0400 @@ -47,6 +47,7 @@ thunk __down_failed,__down thunk_retrax __down_failed_interruptible,__down_interruptible thunk_retrax __down_failed_trylock,__down_trylock + thunk_retrax __aio_down_failed,__aio_down thunk __up_wakeup,__up /* SAVE_ARGS below is used only for the .cfi directives it contains. */ diff -purN 00__v2.6.13-rc3-git8/description 78_fs_aio_write/description --- 00__v2.6.13-rc3-git8/description 1969-12-31 19:00:00.000000000 -0500 +++ 78_fs_aio_write/description 2005-07-31 20:37:08.000000000 -0400 @@ -0,0 +1,15 @@ +On Fri, Jun 24, 2005 at 04:19:28PM +0530, Suparna Bhattacharya wrote: +> On Mon, Jun 20, 2005 at 05:31:54PM +0530, Suparna Bhattacharya wrote: +> > (2) Buffered filesystem AIO read/write (me/Ben) + +Filesystem AIO write + +AIO support for O_SYNC buffered writes, built over O_SYNC-speedup. +It uses the tagged radix tree lookups to writeout just the pages +pertaining to this request, and retries instead of blocking +for writeback to complete on the same range. All the writeout is +issued at the time of io submission, and there is a check to make +sure that retries skip over straight to the wait_on_page_writeback_range. + +Signed-off-by: Suparna Bhattacharya +Signed-off-by: Benjamin LaHaise diff -purN 00__v2.6.13-rc3-git8/drivers/usb/gadget/inode.c 78_fs_aio_write/drivers/usb/gadget/inode.c --- 00__v2.6.13-rc3-git8/drivers/usb/gadget/inode.c 2005-07-13 11:25:54.000000000 -0400 +++ 78_fs_aio_write/drivers/usb/gadget/inode.c 2005-07-27 16:22:54.000000000 -0400 @@ -540,7 +540,6 @@ static int ep_aio_cancel(struct kiocb *i local_irq_disable(); epdata = priv->epdata; // spin_lock(&epdata->dev->lock); - kiocbSetCancelled(iocb); if (likely(epdata && epdata->ep && priv->req)) value = usb_ep_dequeue (epdata->ep, priv->req); else @@ -548,7 +547,6 @@ static int ep_aio_cancel(struct kiocb *i // spin_unlock(&epdata->dev->lock); local_irq_enable(); - aio_put_req(iocb); return value; } @@ -586,10 +584,7 @@ static void ep_aio_complete(struct usb_e kfree(priv); iocb->private = NULL; /* aio_complete() reports bytes-transferred _and_ faults */ - if (unlikely(kiocbIsCancelled(iocb))) - aio_put_req(iocb); - else - aio_complete(iocb, + aio_complete(iocb, req->actual ? req->actual : req->status, req->status); } else { diff -purN 00__v2.6.13-rc3-git8/fs/aio.c 78_fs_aio_write/fs/aio.c --- 00__v2.6.13-rc3-git8/fs/aio.c 2005-07-13 11:26:00.000000000 -0400 +++ 78_fs_aio_write/fs/aio.c 2005-07-31 17:04:32.000000000 -0400 @@ -546,6 +546,24 @@ struct kioctx *lookup_ioctx(unsigned lon return ioctx; } +static int lock_kiocb_action(void *param, wait_queue_t *wait) +{ + schedule(); + return 0; +} + +static inline void lock_kiocb(struct kiocb *iocb) +{ + wait_on_bit_lock(&iocb->ki_flags, KIF_LOCKED, lock_kiocb_action, + TASK_UNINTERRUPTIBLE); +} + +static inline void unlock_kiocb(struct kiocb *iocb) +{ + kiocbClearLocked(iocb); + wake_up_bit(&iocb->ki_flags, KIF_LOCKED); +} + /* * use_mm * Makes the calling kernel thread take on the specified @@ -712,14 +730,14 @@ static ssize_t aio_run_iocb(struct kiocb * cause the iocb to be kicked for continuation (through * the aio_wake_function callback). */ - BUG_ON(current->io_wait != NULL); - current->io_wait = &iocb->ki_wait; + BUG_ON(!is_sync_wait(current->io_wait)); + current->io_wait = &iocb->ki_wait.wait; ret = retry(iocb); current->io_wait = NULL; if (-EIOCBRETRY != ret) { if (-EIOCBQUEUED != ret) { - BUG_ON(!list_empty(&iocb->ki_wait.task_list)); + BUG_ON(!list_empty(&iocb->ki_wait.wait.task_list)); aio_complete(iocb, ret, 0); /* must not access the iocb after this */ } @@ -728,7 +746,7 @@ static ssize_t aio_run_iocb(struct kiocb * Issue an additional retry to avoid waiting forever if * no waits were queued (e.g. in case of a short read). */ - if (list_empty(&iocb->ki_wait.task_list)) + if (list_empty(&iocb->ki_wait.wait.task_list)) kiocbSetKicked(iocb); } out: @@ -782,7 +800,9 @@ static int __aio_run_iocbs(struct kioctx * Hold an extra reference while retrying i/o. */ iocb->ki_users++; /* grab extra reference */ + lock_kiocb(iocb); aio_run_iocb(iocb); + unlock_kiocb(iocb); if (__aio_put_req(ctx, iocb)) /* drop extra ref */ put_ioctx(ctx); } @@ -879,7 +899,7 @@ static void queue_kicked_iocb(struct kio unsigned long flags; int run = 0; - WARN_ON((!list_empty(&iocb->ki_wait.task_list))); + WARN_ON((!list_empty(&iocb->ki_wait.wait.task_list))); spin_lock_irqsave(&ctx->ctx_lock, flags); run = __queue_kicked_iocb(iocb); @@ -1299,7 +1319,7 @@ asmlinkage long sys_io_destroy(aio_conte * Default retry method for aio_read (also used for first time submit) * Responsible for updating iocb state as retries progress */ -static ssize_t aio_pread(struct kiocb *iocb) +ssize_t aio_pread(struct kiocb *iocb) { struct file *file = iocb->ki_filp; struct address_space *mapping = file->f_mapping; @@ -1338,7 +1358,7 @@ static ssize_t aio_pread(struct kiocb *i * Default retry method for aio_write (also used for first time submit) * Responsible for updating iocb state as retries progress */ -static ssize_t aio_pwrite(struct kiocb *iocb) +ssize_t aio_pwrite(struct kiocb *iocb) { struct file *file = iocb->ki_filp; ssize_t ret = 0; @@ -1458,11 +1478,26 @@ static ssize_t aio_setup_iocb(struct kio static int aio_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key) { - struct kiocb *iocb = container_of(wait, struct kiocb, ki_wait); + struct wait_bit_queue *wait_bit + = container_of(wait, struct wait_bit_queue, wait); + struct kiocb *iocb = container_of(wait_bit, struct kiocb, ki_wait); + + /* Assumes that a non-NULL key implies wait bit filtering */ + if (key && !test_wait_bit_key(wait, key)) + return 0; list_del_init(&wait->task_list); kick_iocb(iocb); - return 1; + /* + * Avoid exclusive wakeups with retries since an exclusive wakeup + * may involve implicit expectations of waking up the next waiter + * and there is no guarantee that the retry will take a path that + * would do so. For example if a page has become up-to-date, then + * a retried read may end up straightaway performing a copyout + * and not go through a lock_page - unlock_page that would have + * passed the baton to the next waiter. + */ + return 0; } int fastcall io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, @@ -1513,8 +1548,9 @@ int fastcall io_submit_one(struct kioctx req->ki_buf = (char __user *)(unsigned long)iocb->aio_buf; req->ki_left = req->ki_nbytes = iocb->aio_nbytes; req->ki_opcode = iocb->aio_lio_opcode; - init_waitqueue_func_entry(&req->ki_wait, aio_wake_function); - INIT_LIST_HEAD(&req->ki_wait.task_list); + init_waitqueue_func_entry(&req->ki_wait.wait, aio_wake_function); + INIT_LIST_HEAD(&req->ki_wait.wait.task_list); + req->ki_run_list.next = req->ki_run_list.prev = NULL; req->ki_retried = 0; ret = aio_setup_iocb(req); @@ -1532,6 +1568,7 @@ int fastcall io_submit_one(struct kioctx ; } spin_unlock_irq(&ctx->ctx_lock); + unlock_kiocb(req); aio_put_req(req); /* drop extra ref to req */ return 0; @@ -1657,6 +1694,7 @@ asmlinkage long sys_io_cancel(aio_contex if (NULL != cancel) { struct io_event tmp; pr_debug("calling cancel\n"); + lock_kiocb(kiocb); memset(&tmp, 0, sizeof(tmp)); tmp.obj = (u64)(unsigned long)kiocb->ki_obj.user; tmp.data = kiocb->ki_user_data; @@ -1668,6 +1706,13 @@ asmlinkage long sys_io_cancel(aio_contex if (copy_to_user(result, &tmp, sizeof(tmp))) ret = -EFAULT; } + unlock_kiocb(kiocb); + /* If the cancellation was successful, we must discard the + * reference held for completion of the iocb. + */ + if (!ret) + aio_put_req(kiocb); + aio_put_req(kiocb); } else printk(KERN_DEBUG "iocb has no cancel operation\n"); diff -purN 00__v2.6.13-rc3-git8/fs/buffer.c 78_fs_aio_write/fs/buffer.c --- 00__v2.6.13-rc3-git8/fs/buffer.c 2005-07-13 11:26:00.000000000 -0400 +++ 78_fs_aio_write/fs/buffer.c 2005-07-31 16:04:36.000000000 -0400 @@ -53,7 +53,7 @@ init_buffer(struct buffer_head *bh, bh_e bh->b_private = private; } -static int sync_buffer(void *word) +static int sync_buffer(void *word, wait_queue_t *wait) { struct block_device *bd; struct buffer_head *bh diff -purN 00__v2.6.13-rc3-git8/fs/inode.c 78_fs_aio_write/fs/inode.c --- 00__v2.6.13-rc3-git8/fs/inode.c 2005-07-27 16:13:44.000000000 -0400 +++ 78_fs_aio_write/fs/inode.c 2005-07-31 16:04:36.000000000 -0400 @@ -1279,7 +1279,7 @@ void remove_dquot_ref(struct super_block #endif -int inode_wait(void *word) +int inode_wait(void *word, wait_queue_t *wait) { schedule(); return 0; diff -purN 00__v2.6.13-rc3-git8/fs/pipe.c 78_fs_aio_write/fs/pipe.c --- 00__v2.6.13-rc3-git8/fs/pipe.c 2005-06-20 13:33:32.000000000 -0400 +++ 78_fs_aio_write/fs/pipe.c 2005-07-31 16:59:06.000000000 -0400 @@ -46,6 +46,49 @@ void pipe_wait(struct inode * inode) down(PIPE_SEM(*inode)); } +static int pipe_aio_waiter(wait_queue_t *wait, unsigned mode, int sync, + void *key) +{ + struct kiocb *iocb = io_wait_to_kiocb(wait); + + list_del_init(&wait->task_list); + iocb->ki_cancel = NULL; /* We're removed from the wait queue, so our + * cancellation code no longer applies. + */ + kick_iocb(iocb); + return 1; +} + +static int pipe_aio_cancel(struct kiocb *kiocb, struct io_event *event) +{ + struct inode *inode = kiocb->ki_filp->f_dentry->d_inode; + wait_queue_head_t *wq = PIPE_WAIT(*inode); + int ret = 0; + + spin_lock_irq(&wq->lock); + if (kiocb->ki_cancel == pipe_aio_cancel) { + kiocb->ki_cancel = NULL; + list_del_init(&kiocb->ki_wait.wait.task_list); + if (event) { + event->res = -EINTR; + event->res2 = 0; + } + } else + ret = -EAGAIN; + spin_unlock_irq(&wq->lock); + return ret; +} + +static long pipe_aio_wait(struct kiocb *kiocb, struct inode *inode) +{ + kiocb->ki_wait.wait.func = pipe_aio_waiter; + kiocb->ki_cancel = pipe_aio_cancel; + add_wait_queue(PIPE_WAIT(*inode), &kiocb->ki_wait.wait); + aio_up(kiocb, PIPE_SEM(*inode)); + kiocbSetIntr(kiocb); + return -EIOCBRETRY; +} + static inline int pipe_iov_copy_from_user(void *to, struct iovec *iov, unsigned long len) { @@ -115,9 +158,12 @@ static struct pipe_buf_operations anon_p }; static ssize_t -pipe_readv(struct file *filp, const struct iovec *_iov, - unsigned long nr_segs, loff_t *ppos) +pipe_aio_read(struct kiocb *kiocb, char __user *buf, size_t len, loff_t pos) { + struct iovec _iov[2] = {{ .iov_base = (void __user *)buf, .iov_len = len }}; + unsigned long nr_segs = 1; + struct file *filp = kiocb->ki_filp; + struct inode *inode = filp->f_dentry->d_inode; struct pipe_inode_info *info; int do_wakeup; @@ -125,14 +171,25 @@ pipe_readv(struct file *filp, const stru struct iovec *iov = (struct iovec *)_iov; size_t total_len; + /* In retries we need to remove ourself from the wait queue at this + * point. Checking ki_cancel is a convenient way of checking for + * this case, as we clear the cancel operation when the iocb is + * removed from the wait queue. + */ + if (kiocb->ki_cancel == pipe_aio_cancel) + pipe_aio_cancel(kiocb, NULL); + total_len = iov_length(iov, nr_segs); /* Null read succeeds. */ if (unlikely(total_len == 0)) return 0; do_wakeup = 0; - ret = 0; - down(PIPE_SEM(*inode)); + ret = aio_down(kiocb, PIPE_SEM(*inode)); + if (ret) + return ret; + + ret = kiocb->ki_nbytes - kiocb->ki_left; info = inode->i_pipe; for (;;) { int bufs = info->nrbufs; @@ -155,6 +212,8 @@ pipe_readv(struct file *filp, const stru break; } ret += chars; + kiocb->ki_left -= chars; + kiocb->ki_buf += chars; buf->offset += chars; buf->len -= chars; if (!buf->len) { @@ -186,7 +245,7 @@ pipe_readv(struct file *filp, const stru break; } } - if (signal_pending(current)) { + if (is_sync_kiocb(kiocb) && signal_pending(current)) { if (!ret) ret = -ERESTARTSYS; break; } @@ -194,9 +253,9 @@ pipe_readv(struct file *filp, const stru wake_up_interruptible_sync(PIPE_WAIT(*inode)); kill_fasync(PIPE_FASYNC_WRITERS(*inode), SIGIO, POLL_OUT); } - pipe_wait(inode); + return pipe_aio_wait(kiocb, inode); } - up(PIPE_SEM(*inode)); + aio_up(kiocb, PIPE_SEM(*inode)); /* Signal writers asynchronously that there is more room. */ if (do_wakeup) { wake_up_interruptible(PIPE_WAIT(*inode)); @@ -208,16 +267,12 @@ pipe_readv(struct file *filp, const stru } static ssize_t -pipe_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos) +pipe_aio_write(struct kiocb *kiocb, const char __user *buf, size_t len, loff_t pos) { - struct iovec iov = { .iov_base = buf, .iov_len = count }; - return pipe_readv(filp, &iov, 1, ppos); -} + struct iovec _iov[2] = {{ .iov_base = (void __user *)buf, .iov_len = len }}; + unsigned long nr_segs = 1; -static ssize_t -pipe_writev(struct file *filp, const struct iovec *_iov, - unsigned long nr_segs, loff_t *ppos) -{ + struct file *filp = kiocb->ki_filp; struct inode *inode = filp->f_dentry->d_inode; struct pipe_inode_info *info; ssize_t ret; @@ -231,13 +286,33 @@ pipe_writev(struct file *filp, const str if (unlikely(total_len == 0)) return 0; + /* In retries we need to remove ourself from the wait queue at this + * point. Checking ki_cancel is a convenient way of checking for + * this case, as we clear the cancel operation when the iocb is + * removed from the wait queue. + */ + if (kiocb->ki_cancel == pipe_aio_cancel) + pipe_aio_cancel(kiocb, NULL); + do_wakeup = 0; - ret = 0; - down(PIPE_SEM(*inode)); + ret = aio_down(kiocb, PIPE_SEM(*inode)); + if (ret) + return ret; + + /* Undo the WRITERS++ done below where we are queued. We use + * kiocb->private to flag if we were waiting, as the higher layers + * initialize it to NULL at the beginning of a request's life. + */ + if (kiocb->ki_user_data) { + PIPE_WAITING_WRITERS(*inode)--; + kiocb->ki_user_data = 0; + } + info = inode->i_pipe; if (!PIPE_READERS(*inode)) { - send_sig(SIGPIPE, current, 0); + if (is_sync_kiocb(kiocb)) + send_sig(SIGPIPE, current, 0); ret = -EPIPE; goto out; } @@ -257,6 +332,8 @@ pipe_writev(struct file *filp, const str do_wakeup = 1; if (error) goto out; + iov->iov_base += chars; + iov->iov_len -= chars; buf->len += chars; total_len -= chars; ret = chars; @@ -267,8 +344,10 @@ pipe_writev(struct file *filp, const str for (;;) { int bufs; + if (!PIPE_READERS(*inode)) { - send_sig(SIGPIPE, current, 0); + if (is_sync_kiocb(kiocb)) + send_sig(SIGPIPE, current, 0); if (!ret) ret = -EPIPE; break; } @@ -304,6 +383,8 @@ pipe_writev(struct file *filp, const str break; } ret += chars; + kiocb->ki_left -= chars; + kiocb->ki_buf += chars; /* Insert it into the buffer array */ buf->page = page; @@ -323,7 +404,7 @@ pipe_writev(struct file *filp, const str if (!ret) ret = -EAGAIN; break; } - if (signal_pending(current)) { + if (is_sync_kiocb(kiocb) && signal_pending(current)) { if (!ret) ret = -ERESTARTSYS; break; } @@ -333,11 +414,11 @@ pipe_writev(struct file *filp, const str do_wakeup = 0; } PIPE_WAITING_WRITERS(*inode)++; - pipe_wait(inode); - PIPE_WAITING_WRITERS(*inode)--; + kiocb->ki_user_data = 1; /* Flag for retry. */ + return pipe_aio_wait(kiocb, inode); } out: - up(PIPE_SEM(*inode)); + aio_up(kiocb, PIPE_SEM(*inode)); if (do_wakeup) { wake_up_interruptible(PIPE_WAIT(*inode)); kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN); @@ -347,6 +428,7 @@ out: return ret; } +#if 0 static ssize_t pipe_write(struct file *filp, const char __user *buf, size_t count, loff_t *ppos) @@ -354,6 +436,7 @@ pipe_write(struct file *filp, const char struct iovec iov = { .iov_base = (void __user *)buf, .iov_len = count }; return pipe_writev(filp, &iov, 1, ppos); } +#endif static ssize_t bad_pipe_r(struct file *filp, char __user *buf, size_t count, loff_t *ppos) @@ -362,11 +445,23 @@ bad_pipe_r(struct file *filp, char __use } static ssize_t +bad_pipe_aio_r(struct kiocb *iocb, char __user *buf, size_t count, loff_t pos) +{ + return -EBADF; +} + +static ssize_t bad_pipe_w(struct file *filp, const char __user *buf, size_t count, loff_t *ppos) { return -EBADF; } +static ssize_t +bad_pipe_aio_w(struct kiocb *iocb, const char __user *buf, size_t count, loff_t pos) +{ + return -EBADF; +} + static int pipe_ioctl(struct inode *pino, struct file *filp, unsigned int cmd, unsigned long arg) @@ -565,8 +660,8 @@ pipe_rdwr_open(struct inode *inode, stru */ struct file_operations read_fifo_fops = { .llseek = no_llseek, - .read = pipe_read, - .readv = pipe_readv, + .read = do_sync_read, + .aio_read = pipe_aio_read, .write = bad_pipe_w, .poll = fifo_poll, .ioctl = pipe_ioctl, @@ -578,8 +673,9 @@ struct file_operations read_fifo_fops = struct file_operations write_fifo_fops = { .llseek = no_llseek, .read = bad_pipe_r, - .write = pipe_write, - .writev = pipe_writev, + .write = do_sync_write, + .aio_read = bad_pipe_aio_r, + .aio_write = pipe_aio_write, .poll = fifo_poll, .ioctl = pipe_ioctl, .open = pipe_write_open, @@ -589,10 +685,10 @@ struct file_operations write_fifo_fops = struct file_operations rdwr_fifo_fops = { .llseek = no_llseek, - .read = pipe_read, - .readv = pipe_readv, - .write = pipe_write, - .writev = pipe_writev, + .read = do_sync_read, + .write = do_sync_write, + .aio_read = pipe_aio_read, + .aio_write = pipe_aio_write, .poll = fifo_poll, .ioctl = pipe_ioctl, .open = pipe_rdwr_open, @@ -602,9 +698,10 @@ struct file_operations rdwr_fifo_fops = struct file_operations read_pipe_fops = { .llseek = no_llseek, - .read = pipe_read, - .readv = pipe_readv, + .read = do_sync_read, .write = bad_pipe_w, + .aio_read = pipe_aio_read, + .aio_write = bad_pipe_aio_w, .poll = pipe_poll, .ioctl = pipe_ioctl, .open = pipe_read_open, @@ -615,8 +712,9 @@ struct file_operations read_pipe_fops = struct file_operations write_pipe_fops = { .llseek = no_llseek, .read = bad_pipe_r, - .write = pipe_write, - .writev = pipe_writev, + .write = do_sync_write, + .aio_read = bad_pipe_aio_r, + .aio_write = pipe_aio_write, .poll = pipe_poll, .ioctl = pipe_ioctl, .open = pipe_write_open, @@ -626,10 +724,10 @@ struct file_operations write_pipe_fops = struct file_operations rdwr_pipe_fops = { .llseek = no_llseek, - .read = pipe_read, - .readv = pipe_readv, - .write = pipe_write, - .writev = pipe_writev, + .read = do_sync_read, + .write = do_sync_write, + .aio_read = pipe_aio_read, + .aio_write = pipe_aio_write, .poll = pipe_poll, .ioctl = pipe_ioctl, .open = pipe_rdwr_open, diff -purN 00__v2.6.13-rc3-git8/fs/read_write.c 78_fs_aio_write/fs/read_write.c --- 00__v2.6.13-rc3-git8/fs/read_write.c 2005-07-13 11:26:03.000000000 -0400 +++ 78_fs_aio_write/fs/read_write.c 2005-07-31 16:54:40.000000000 -0400 @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -203,14 +204,36 @@ Einval: return -EINVAL; } -static void wait_on_retry_sync_kiocb(struct kiocb *iocb) +static long wait_on_retry_sync_kiocb(struct kiocb *iocb) { - set_current_state(TASK_UNINTERRUPTIBLE); + int (*cancel)(struct kiocb *, struct io_event *); + long ret = 0; + set_current_state(kiocbIsIntr(iocb) ? TASK_INTERRUPTIBLE + : TASK_UNINTERRUPTIBLE); if (!kiocbIsKicked(iocb)) schedule(); else kiocbClearKicked(iocb); + + /* If we were interrupted by a signal, issue a cancel to allow the + * operation to clean up. + */ + if (kiocbIsIntr(iocb) && signal_pending(current) && + (cancel = iocb->ki_cancel)) { + struct io_event dummy_event; + dummy_event.res = 0; + if (!cancel(iocb, &dummy_event)) { + ret = dummy_event.res; + if (!ret) + printk(KERN_DEBUG "wait_on_retry_sync_kiocb: ki_cancel method %p is buggy\n", cancel); + goto out; + } + } + kiocbClearIntr(iocb); + ret = iocb->ki_retry(iocb); +out: __set_current_state(TASK_RUNNING); + return ret; } ssize_t do_sync_read(struct file *filp, char __user *buf, size_t len, loff_t *ppos) @@ -220,12 +243,15 @@ ssize_t do_sync_read(struct file *filp, init_sync_kiocb(&kiocb, filp); kiocb.ki_pos = *ppos; - while (-EIOCBRETRY == - (ret = filp->f_op->aio_read(&kiocb, buf, len, kiocb.ki_pos))) - wait_on_retry_sync_kiocb(&kiocb); + kiocb.ki_retry = aio_pread; + kiocb.ki_buf = buf; + kiocb.ki_nbytes = len; + kiocb.ki_left = len; + ret = filp->f_op->aio_read(&kiocb, buf, len, *ppos); + while (ret == -EIOCBRETRY) + ret = wait_on_retry_sync_kiocb(&kiocb); - if (-EIOCBQUEUED == ret) - ret = wait_on_sync_kiocb(&kiocb); + BUG_ON(!list_empty(&kiocb.ki_wait.wait.task_list)); *ppos = kiocb.ki_pos; return ret; } @@ -271,12 +297,15 @@ ssize_t do_sync_write(struct file *filp, init_sync_kiocb(&kiocb, filp); kiocb.ki_pos = *ppos; - while (-EIOCBRETRY == - (ret = filp->f_op->aio_write(&kiocb, buf, len, kiocb.ki_pos))) - wait_on_retry_sync_kiocb(&kiocb); + kiocb.ki_retry = aio_pwrite; + kiocb.ki_buf = (void *)buf; + kiocb.ki_nbytes = len; + kiocb.ki_left = len; + ret = filp->f_op->aio_write(&kiocb, buf, len, kiocb.ki_pos); + while (ret == -EIOCBRETRY) + ret = wait_on_retry_sync_kiocb(&kiocb); - if (-EIOCBQUEUED == ret) - ret = wait_on_sync_kiocb(&kiocb); + BUG_ON(!list_empty(&kiocb.ki_wait.wait.task_list)); *ppos = kiocb.ki_pos; return ret; } diff -purN 00__v2.6.13-rc3-git8/include/asm-i386/semaphore.h 78_fs_aio_write/include/asm-i386/semaphore.h --- 00__v2.6.13-rc3-git8/include/asm-i386/semaphore.h 2005-06-20 13:33:36.000000000 -0400 +++ 78_fs_aio_write/include/asm-i386/semaphore.h 2005-07-27 16:20:15.000000000 -0400 @@ -41,10 +41,12 @@ #include #include +struct kiocb; struct semaphore { atomic_t count; int sleepers; wait_queue_head_t wait; + struct kiocb *aio_owner; }; @@ -52,7 +54,8 @@ struct semaphore { { \ .count = ATOMIC_INIT(n), \ .sleepers = 0, \ - .wait = __WAIT_QUEUE_HEAD_INITIALIZER((name).wait) \ + .wait = __WAIT_QUEUE_HEAD_INITIALIZER((name).wait), \ + .aio_owner = NULL \ } #define __MUTEX_INITIALIZER(name) \ @@ -75,6 +78,7 @@ static inline void sema_init (struct sem atomic_set(&sem->count, val); sem->sleepers = 0; init_waitqueue_head(&sem->wait); + sem->aio_owner = NULL; } static inline void init_MUTEX (struct semaphore *sem) @@ -87,6 +91,7 @@ static inline void init_MUTEX_LOCKED (st sema_init(sem, 0); } +fastcall void __aio_down_failed(void /* special register calling convention */); fastcall void __down_failed(void /* special register calling convention */); fastcall int __down_failed_interruptible(void /* params in registers */); fastcall int __down_failed_trylock(void /* params in registers */); @@ -142,6 +147,32 @@ static inline int down_interruptible(str } /* + * Non-blockingly attempt to down() a semaphore for use with aio. + * Returns zero if we acquired it + */ +static inline int aio_down(struct kiocb *iocb, struct semaphore * sem) +{ + int result; + + __asm__ __volatile__( + "# atomic aio down operation\n\t" + LOCK "decl %1\n\t" /* --sem->count */ + "js 2f\n\t" + "movl %3,%2\n" + "xorl %0,%0\n" + "1:\n" + LOCK_SECTION_START("") + "2:\tlea %1,%%edx\n\t" + "call __aio_down_failed\n\t" + "jmp 1b\n" + LOCK_SECTION_END + :"=a" (result), "+m" (sem->count), "=m" (sem->aio_owner) + :"0" (iocb) + :"memory","cc","dx"); + return result; +} + +/* * Non-blockingly attempt to down() a semaphore. * Returns zero if we acquired it */ @@ -190,5 +221,14 @@ static inline void up(struct semaphore * :"memory","ax"); } +static inline void aio_up(struct kiocb *iocb, struct semaphore *sem) +{ +#ifdef CONFIG_DEBUG_KERNEL + BUG_ON(sem->aio_owner != iocb); +#endif + sem->aio_owner = NULL; + up(sem); +} + #endif #endif diff -purN 00__v2.6.13-rc3-git8/include/asm-x86_64/semaphore.h 78_fs_aio_write/include/asm-x86_64/semaphore.h --- 00__v2.6.13-rc3-git8/include/asm-x86_64/semaphore.h 2004-12-24 16:33:48.000000000 -0500 +++ 78_fs_aio_write/include/asm-x86_64/semaphore.h 2005-07-27 16:20:15.000000000 -0400 @@ -43,17 +43,20 @@ #include #include +struct kiocb; struct semaphore { atomic_t count; int sleepers; wait_queue_head_t wait; + struct kiocb *aio_owner; }; #define __SEMAPHORE_INITIALIZER(name, n) \ { \ .count = ATOMIC_INIT(n), \ .sleepers = 0, \ - .wait = __WAIT_QUEUE_HEAD_INITIALIZER((name).wait) \ + .wait = __WAIT_QUEUE_HEAD_INITIALIZER((name).wait), \ + .aio_owner = NULL \ } #define __MUTEX_INITIALIZER(name) \ @@ -76,6 +79,7 @@ static inline void sema_init (struct sem atomic_set(&sem->count, val); sem->sleepers = 0; init_waitqueue_head(&sem->wait); + sem->aio_owner = NULL; } static inline void init_MUTEX (struct semaphore *sem) @@ -88,11 +92,13 @@ static inline void init_MUTEX_LOCKED (st sema_init(sem, 0); } +asmlinkage long __aio_down_failed(void /* special register calling convention */); asmlinkage void __down_failed(void /* special register calling convention */); asmlinkage int __down_failed_interruptible(void /* params in registers */); asmlinkage int __down_failed_trylock(void /* params in registers */); asmlinkage void __up_wakeup(void /* special register calling convention */); +asmlinkage long __aio_down(struct kiocb *iocb, struct semaphore * sem); asmlinkage void __down(struct semaphore * sem); asmlinkage int __down_interruptible(struct semaphore * sem); asmlinkage int __down_trylock(struct semaphore * sem); @@ -148,6 +154,32 @@ static inline int down_interruptible(str } /* + * Non-blockingly attempt to down() a semaphore for use with aio. + * Returns zero if we acquired it, -EIOCBRETRY if the operation was + * queued and the iocb will receive a kick_iocb() on completion. + */ +static inline long aio_down(struct kiocb *iocb, struct semaphore * sem) +{ + long result; + + __asm__ __volatile__( + "# atomic aio_down operation\n\t" + LOCK "decl %1\n\t" /* --sem->count */ + "js 2f\n\t" + "movq %3,%2\n" /* sem->aio_owner = iocb */ + "xorq %0,%0\n\t" + "1:\n" + LOCK_SECTION_START("") + "2:\tcall __aio_down_failed\n\t" + "jmp 1b\n" + LOCK_SECTION_END + :"=a" (result), "+m" (sem->count), "=m" (sem->aio_owner) + : "D" (iocb), "S" (sem) + :"memory"); + return result; +} + +/* * Non-blockingly attempt to down() a semaphore. * Returns zero if we acquired it */ @@ -192,5 +224,15 @@ static inline void up(struct semaphore * :"D" (sem) :"memory"); } + +static inline void aio_up(struct kiocb *iocb, struct semaphore *sem) +{ +#ifdef CONFIG_DEBUG_KERNEL + BUG_ON(sem->aio_owner != iocb); +#endif + sem->aio_owner = NULL; + up(sem); +} + #endif /* __KERNEL__ */ #endif diff -purN 00__v2.6.13-rc3-git8/include/linux/aio.h 78_fs_aio_write/include/linux/aio.h --- 00__v2.6.13-rc3-git8/include/linux/aio.h 2004-12-24 16:35:50.000000000 -0500 +++ 78_fs_aio_write/include/linux/aio.h 2005-07-31 16:29:13.000000000 -0400 @@ -27,21 +27,30 @@ struct kioctx; #define KIF_LOCKED 0 #define KIF_KICKED 1 #define KIF_CANCELLED 2 +#define KIF_INTR 3 /* use TASK_INTERRUPTIBLE waits */ +#define KIF_SYNCED 4 #define kiocbTryLock(iocb) test_and_set_bit(KIF_LOCKED, &(iocb)->ki_flags) #define kiocbTryKick(iocb) test_and_set_bit(KIF_KICKED, &(iocb)->ki_flags) +#define kiocbTrySync(iocb) test_and_set_bit(KIF_SYNCED, &(iocb)->ki_flags) #define kiocbSetLocked(iocb) set_bit(KIF_LOCKED, &(iocb)->ki_flags) #define kiocbSetKicked(iocb) set_bit(KIF_KICKED, &(iocb)->ki_flags) #define kiocbSetCancelled(iocb) set_bit(KIF_CANCELLED, &(iocb)->ki_flags) +#define kiocbSetIntr(iocb) set_bit(KIF_INTR, &(iocb)->ki_flags) +#define kiocbSetSynced(iocb) set_bit(KIF_SYNCED, &(iocb)->ki_flags) #define kiocbClearLocked(iocb) clear_bit(KIF_LOCKED, &(iocb)->ki_flags) #define kiocbClearKicked(iocb) clear_bit(KIF_KICKED, &(iocb)->ki_flags) #define kiocbClearCancelled(iocb) clear_bit(KIF_CANCELLED, &(iocb)->ki_flags) +#define kiocbClearIntr(iocb) clear_bit(KIF_INTR, &(iocb)->ki_flags) +#define kiocbClearSynced(iocb) clear_bit(KIF_SYNCED, &(iocb)->ki_flags) #define kiocbIsLocked(iocb) test_bit(KIF_LOCKED, &(iocb)->ki_flags) #define kiocbIsKicked(iocb) test_bit(KIF_KICKED, &(iocb)->ki_flags) #define kiocbIsCancelled(iocb) test_bit(KIF_CANCELLED, &(iocb)->ki_flags) +#define kiocbIsIntr(iocb) test_bit(KIF_INTR, &(iocb)->ki_flags) +#define kiocbIsSynced(iocb) test_bit(KIF_SYNCED, &(iocb)->ki_flags) struct kiocb { struct list_head ki_run_list; @@ -69,7 +78,7 @@ struct kiocb { size_t ki_nbytes; /* copy of iocb->aio_nbytes */ char __user *ki_buf; /* remaining iocb->aio_buf */ size_t ki_left; /* remaining bytes */ - wait_queue_t ki_wait; + struct wait_bit_queue ki_wait; long ki_retried; /* just for testing */ long ki_kicked; /* just for testing */ long ki_queued; /* just for testing */ @@ -90,7 +99,7 @@ struct kiocb { (x)->ki_dtor = NULL; \ (x)->ki_obj.tsk = tsk; \ (x)->ki_user_data = 0; \ - init_wait((&(x)->ki_wait)); \ + init_wait_bit_task((&(x)->ki_wait), current);\ } while (0) #define AIO_RING_MAGIC 0xa10a10a1 @@ -164,6 +173,8 @@ extern void FASTCALL(exit_aio(struct mm_ extern struct kioctx *lookup_ioctx(unsigned long ctx_id); extern int FASTCALL(io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, struct iocb *iocb)); +extern ssize_t aio_pread(struct kiocb *iocb); +extern ssize_t aio_pwrite(struct kiocb *iocb); /* semi private, but used by the 32bit emulations: */ struct kioctx *lookup_ioctx(unsigned long ctx_id); @@ -184,7 +195,8 @@ do { \ } \ } while (0) -#define io_wait_to_kiocb(wait) container_of(wait, struct kiocb, ki_wait) +#define io_wait_to_kiocb(io_wait) container_of(container_of(io_wait, \ + struct wait_bit_queue, wait), struct kiocb, ki_wait) #define is_retried_kiocb(iocb) ((iocb)->ki_retried > 1) #include diff -purN 00__v2.6.13-rc3-git8/include/linux/pagemap.h 78_fs_aio_write/include/linux/pagemap.h --- 00__v2.6.13-rc3-git8/include/linux/pagemap.h 2005-07-13 11:26:11.000000000 -0400 +++ 78_fs_aio_write/include/linux/pagemap.h 2005-07-31 16:16:28.000000000 -0400 @@ -159,21 +159,25 @@ static inline pgoff_t linear_page_index( return pgoff >> (PAGE_CACHE_SHIFT - PAGE_SHIFT); } -extern void FASTCALL(__lock_page(struct page *page)); +extern int FASTCALL(lock_page_slow(struct page *page, wait_queue_t *wait)); extern void FASTCALL(unlock_page(struct page *page)); -static inline void lock_page(struct page *page) +static inline int __lock_page(struct page *page, wait_queue_t *wait) { might_sleep(); if (TestSetPageLocked(page)) - __lock_page(page); + return lock_page_slow(page, wait); + return 0; } + +#define lock_page(page) __lock_page(page, ¤t->__wait.wait) /* * This is exported only for wait_on_page_locked/wait_on_page_writeback. * Never use this directly! */ -extern void FASTCALL(wait_on_page_bit(struct page *page, int bit_nr)); +extern int FASTCALL(wait_on_page_bit(struct page *page, int bit_nr, + wait_queue_t *wait)); /* * Wait for a page to be unlocked. @@ -182,21 +186,30 @@ extern void FASTCALL(wait_on_page_bit(st * ie with increased "page->count" so that the page won't * go away during the wait.. */ -static inline void wait_on_page_locked(struct page *page) +static inline int __wait_on_page_locked(struct page *page, wait_queue_t *wait) { if (PageLocked(page)) - wait_on_page_bit(page, PG_locked); + return wait_on_page_bit(page, PG_locked, wait); + return 0; } +#define wait_on_page_locked(page) \ + __wait_on_page_locked(page, ¤t->__wait.wait) + /* * Wait for a page to complete writeback */ -static inline void wait_on_page_writeback(struct page *page) +static inline int __wait_on_page_writeback(struct page *page, + wait_queue_t *wait) { if (PageWriteback(page)) - wait_on_page_bit(page, PG_writeback); + return wait_on_page_bit(page, PG_writeback, wait); + return 0; } +#define wait_on_page_writeback(page) \ + __wait_on_page_writeback(page, ¤t->__wait.wait) + extern void end_page_writeback(struct page *page); /* diff -purN 00__v2.6.13-rc3-git8/include/linux/sched.h 78_fs_aio_write/include/linux/sched.h --- 00__v2.6.13-rc3-git8/include/linux/sched.h 2005-07-13 11:26:12.000000000 -0400 +++ 78_fs_aio_write/include/linux/sched.h 2005-07-31 16:16:28.000000000 -0400 @@ -170,6 +170,7 @@ extern void show_stack(struct task_struc void io_schedule(void); long io_schedule_timeout(long timeout); +int io_wait_schedule(wait_queue_t *wait); extern void cpu_init (void); extern void trap_init(void); @@ -746,11 +747,14 @@ struct task_struct { unsigned long ptrace_message; siginfo_t *last_siginfo; /* For ptrace use. */ + +/* Space for default IO wait bit entry used for synchronous IO waits */ + struct wait_bit_queue __wait; /* - * current io wait handle: wait queue entry to use for io waits - * If this thread is processing aio, this points at the waitqueue - * inside the currently handled kiocb. It may be NULL (i.e. default - * to a stack based synchronous wait) if its doing sync IO. + * Current IO wait handle: wait queue entry to use for IO waits + * If this thread is processing AIO, this points at the waitqueue + * inside the currently handled kiocb. Otherwise, points to the + * default IO wait field (i.e &__wait.wait above). */ wait_queue_t *io_wait; /* i/o counters(bytes read/written, #syscalls */ diff -purN 00__v2.6.13-rc3-git8/include/linux/wait.h 78_fs_aio_write/include/linux/wait.h --- 00__v2.6.13-rc3-git8/include/linux/wait.h 2005-07-13 11:26:13.000000000 -0400 +++ 78_fs_aio_write/include/linux/wait.h 2005-07-31 16:49:07.000000000 -0400 @@ -103,6 +103,17 @@ static inline int waitqueue_active(wait_ return !list_empty(&q->task_list); } +static inline int test_wait_bit_key(wait_queue_t *wait, + struct wait_bit_key *key) +{ + struct wait_bit_queue *wait_bit + = container_of(wait, struct wait_bit_queue, wait); + + return (wait_bit->key.flags == key->flags && + wait_bit->key.bit_nr == key->bit_nr && + !test_bit(key->bit_nr, key->flags)); +} + /* * Used to distinguish between sync and async io wait context: * sync i/o typically specifies a NULL wait queue entry or a wait @@ -140,11 +151,15 @@ void FASTCALL(__wake_up(wait_queue_head_ extern void FASTCALL(__wake_up_locked(wait_queue_head_t *q, unsigned int mode)); extern void FASTCALL(__wake_up_sync(wait_queue_head_t *q, unsigned int mode, int nr)); void FASTCALL(__wake_up_bit(wait_queue_head_t *, void *, int)); -int FASTCALL(__wait_on_bit(wait_queue_head_t *, struct wait_bit_queue *, int (*)(void *), unsigned)); -int FASTCALL(__wait_on_bit_lock(wait_queue_head_t *, struct wait_bit_queue *, int (*)(void *), unsigned)); +int FASTCALL(__wait_on_bit(wait_queue_head_t *, struct wait_bit_queue *, + int (*)(void *, wait_queue_t *), unsigned)); +int FASTCALL(__wait_on_bit_lock(wait_queue_head_t *, struct wait_bit_queue *, + int (*)(void *, wait_queue_t *), unsigned)); void FASTCALL(wake_up_bit(void *, int)); -int FASTCALL(out_of_line_wait_on_bit(void *, int, int (*)(void *), unsigned)); -int FASTCALL(out_of_line_wait_on_bit_lock(void *, int, int (*)(void *), unsigned)); +int FASTCALL(out_of_line_wait_on_bit(void *, int, int (*)(void *, + wait_queue_t *), unsigned)); +int FASTCALL(out_of_line_wait_on_bit_lock(void *, int, int (*)(void *, + wait_queue_t *), unsigned)); wait_queue_head_t *FASTCALL(bit_waitqueue(void *, int)); #define wake_up(x) __wake_up(x, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE, 1, NULL) @@ -407,6 +422,19 @@ int wake_bit_function(wait_queue_t *wait INIT_LIST_HEAD(&(wait)->task_list); \ } while (0) +#define init_wait_bit_key(waitbit, word, bit) \ + do { \ + (waitbit)->key.flags = word; \ + (waitbit)->key.bit_nr = bit; \ + } while (0) + +#define init_wait_bit_task(waitbit, tsk) \ + do { \ + (waitbit)->wait.private = tsk; \ + (waitbit)->wait.func = wake_bit_function; \ + INIT_LIST_HEAD(&(waitbit)->wait.task_list); \ + } while (0) + /** * wait_on_bit - wait for a bit to be cleared * @word: the word being waited on, a kernel virtual address @@ -422,7 +450,8 @@ int wake_bit_function(wait_queue_t *wait * but has no intention of setting it. */ static inline int wait_on_bit(void *word, int bit, - int (*action)(void *), unsigned mode) + int (*action)(void *, wait_queue_t *), + unsigned mode) { if (!test_bit(bit, word)) return 0; @@ -446,7 +475,8 @@ static inline int wait_on_bit(void *word * clear with the intention of setting it, and when done, clearing it. */ static inline int wait_on_bit_lock(void *word, int bit, - int (*action)(void *), unsigned mode) + int (*action)(void *, wait_queue_t *), + unsigned mode) { if (!test_and_set_bit(bit, word)) return 0; diff -purN 00__v2.6.13-rc3-git8/include/linux/writeback.h 78_fs_aio_write/include/linux/writeback.h --- 00__v2.6.13-rc3-git8/include/linux/writeback.h 2005-07-13 11:26:13.000000000 -0400 +++ 78_fs_aio_write/include/linux/writeback.h 2005-07-31 16:04:36.000000000 -0400 @@ -70,7 +70,7 @@ struct writeback_control { */ void writeback_inodes(struct writeback_control *wbc); void wake_up_inode(struct inode *inode); -int inode_wait(void *); +int inode_wait(void *, wait_queue_t *); void sync_inodes_sb(struct super_block *, int wait); void sync_inodes(int wait); diff -purN 00__v2.6.13-rc3-git8/kernel/fork.c 78_fs_aio_write/kernel/fork.c --- 00__v2.6.13-rc3-git8/kernel/fork.c 2005-07-13 11:26:14.000000000 -0400 +++ 78_fs_aio_write/kernel/fork.c 2005-07-31 16:12:32.000000000 -0400 @@ -943,7 +943,8 @@ static task_t *copy_process(unsigned lon do_posix_clock_monotonic_gettime(&p->start_time); p->security = NULL; p->io_context = NULL; - p->io_wait = NULL; + init_wait_bit_task(&p->__wait, p); + p->io_wait = &p->__wait.wait; p->audit_context = NULL; #ifdef CONFIG_NUMA p->mempolicy = mpol_copy(p->mempolicy); diff -purN 00__v2.6.13-rc3-git8/kernel/sched.c 78_fs_aio_write/kernel/sched.c --- 00__v2.6.13-rc3-git8/kernel/sched.c 2005-07-27 16:13:47.000000000 -0400 +++ 78_fs_aio_write/kernel/sched.c 2005-07-31 16:16:28.000000000 -0400 @@ -3993,6 +3993,20 @@ long __sched io_schedule_timeout(long ti return ret; } +/* + * Sleep only if the wait context passed is not async, + * otherwise return so that a retry can be issued later. + */ +int __sched io_wait_schedule(wait_queue_t *wait) +{ + if (!is_sync_wait(wait)) + return -EIOCBRETRY; + io_schedule(); + return 0; +} + +EXPORT_SYMBOL(io_wait_schedule); + /** * sys_sched_get_priority_max - return maximum RT priority. * @policy: scheduling class. diff -purN 00__v2.6.13-rc3-git8/kernel/wait.c 78_fs_aio_write/kernel/wait.c --- 00__v2.6.13-rc3-git8/kernel/wait.c 2004-12-24 16:35:27.000000000 -0500 +++ 78_fs_aio_write/kernel/wait.c 2005-07-31 16:15:40.000000000 -0400 @@ -132,16 +132,10 @@ EXPORT_SYMBOL(autoremove_wake_function); int wake_bit_function(wait_queue_t *wait, unsigned mode, int sync, void *arg) { - struct wait_bit_key *key = arg; - struct wait_bit_queue *wait_bit - = container_of(wait, struct wait_bit_queue, wait); - - if (wait_bit->key.flags != key->flags || - wait_bit->key.bit_nr != key->bit_nr || - test_bit(key->bit_nr, key->flags)) + /* Assumes that a non-NULL key implies wait bit filtering */ + if (arg && !test_wait_bit_key(wait, arg)) return 0; - else - return autoremove_wake_function(wait, mode, sync, key); + return autoremove_wake_function(wait, mode, sync, arg); } EXPORT_SYMBOL(wake_bit_function); @@ -152,22 +146,28 @@ EXPORT_SYMBOL(wake_bit_function); */ int __sched fastcall __wait_on_bit(wait_queue_head_t *wq, struct wait_bit_queue *q, - int (*action)(void *), unsigned mode) + int (*action)(void *, wait_queue_t *), unsigned mode) { int ret = 0; do { prepare_to_wait(wq, &q->wait, mode); if (test_bit(q->key.bit_nr, q->key.flags)) - ret = (*action)(q->key.flags); + ret = (*action)(q->key.flags, &q->wait); } while (test_bit(q->key.bit_nr, q->key.flags) && !ret); - finish_wait(wq, &q->wait); + /* + * AIO retries require the wait queue entry to remain queued + * for async notification + */ + if (ret != -EIOCBRETRY) + finish_wait(wq, &q->wait); return ret; } EXPORT_SYMBOL(__wait_on_bit); int __sched fastcall out_of_line_wait_on_bit(void *word, int bit, - int (*action)(void *), unsigned mode) + int (*action)(void *, wait_queue_t *), + unsigned mode) { wait_queue_head_t *wq = bit_waitqueue(word, bit); DEFINE_WAIT_BIT(wait, word, bit); @@ -178,24 +178,30 @@ EXPORT_SYMBOL(out_of_line_wait_on_bit); int __sched fastcall __wait_on_bit_lock(wait_queue_head_t *wq, struct wait_bit_queue *q, - int (*action)(void *), unsigned mode) + int (*action)(void *, wait_queue_t *), unsigned mode) { int ret = 0; do { prepare_to_wait_exclusive(wq, &q->wait, mode); if (test_bit(q->key.bit_nr, q->key.flags)) { - if ((ret = (*action)(q->key.flags))) + if ((ret = (*action)(q->key.flags, &q->wait))) break; } } while (test_and_set_bit(q->key.bit_nr, q->key.flags)); - finish_wait(wq, &q->wait); + /* + * AIO retries require the wait queue entry to remain queued + * for async notification + */ + if (ret != -EIOCBRETRY) + finish_wait(wq, &q->wait); return ret; } EXPORT_SYMBOL(__wait_on_bit_lock); int __sched fastcall out_of_line_wait_on_bit_lock(void *word, int bit, - int (*action)(void *), unsigned mode) + int (*action)(void *, wait_queue_t *wait), + unsigned mode) { wait_queue_head_t *wq = bit_waitqueue(word, bit); DEFINE_WAIT_BIT(wait, word, bit); diff -purN 00__v2.6.13-rc3-git8/lib/Makefile 78_fs_aio_write/lib/Makefile --- 00__v2.6.13-rc3-git8/lib/Makefile 2005-07-13 11:26:14.000000000 -0400 +++ 78_fs_aio_write/lib/Makefile 2005-07-27 16:19:35.000000000 -0400 @@ -18,6 +18,7 @@ endif lib-$(CONFIG_RWSEM_GENERIC_SPINLOCK) += rwsem-spinlock.o lib-$(CONFIG_RWSEM_XCHGADD_ALGORITHM) += rwsem.o +lib-$(CONFIG_SEMAPHORE_SLEEPERS) += semaphore-sleepers.o lib-$(CONFIG_GENERIC_FIND_NEXT_BIT) += find_next_bit.o obj-$(CONFIG_LOCK_KERNEL) += kernel_lock.o obj-$(CONFIG_DEBUG_PREEMPT) += smp_processor_id.o diff -purN 00__v2.6.13-rc3-git8/lib/semaphore-sleepers.c 78_fs_aio_write/lib/semaphore-sleepers.c --- 00__v2.6.13-rc3-git8/lib/semaphore-sleepers.c 1969-12-31 19:00:00.000000000 -0500 +++ 78_fs_aio_write/lib/semaphore-sleepers.c 2005-07-31 17:07:29.000000000 -0400 @@ -0,0 +1,251 @@ +/* + * i386 and x86-64 semaphore implementation. + * + * (C) Copyright 1999 Linus Torvalds + * + * Portions Copyright 1999 Red Hat, Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + * + * rw semaphores implemented November 1999 by Benjamin LaHaise + */ +#include +#include +#include +#include +#include + +/* + * Semaphores are implemented using a two-way counter: + * The "count" variable is decremented for each process + * that tries to acquire the semaphore, while the "sleeping" + * variable is a count of such acquires. + * + * Notably, the inline "up()" and "down()" functions can + * efficiently test if they need to do any extra work (up + * needs to do something only if count was negative before + * the increment operation. + * + * "sleeping" and the contention routine ordering is protected + * by the spinlock in the semaphore's waitqueue head. + * + * Note that these functions are only called when there is + * contention on the lock, and as such all this is the + * "non-critical" part of the whole semaphore business. The + * critical part is the inline stuff in + * where we want to avoid any extra jumps and calls. + */ + +/* + * Logic: + * - only on a boundary condition do we need to care. When we go + * from a negative count to a non-negative, we wake people up. + * - when we go from a non-negative count to a negative do we + * (a) synchronize with the "sleeper" count and (b) make sure + * that we're on the wakeup list before we synchronize so that + * we cannot lose wakeup events. + */ + +fastcall void __up(struct semaphore *sem) +{ + wake_up(&sem->wait); +} + +fastcall void __sched __down(struct semaphore * sem) +{ + struct task_struct *tsk = current; + DECLARE_WAITQUEUE(wait, tsk); + unsigned long flags; + + tsk->state = TASK_UNINTERRUPTIBLE; + spin_lock_irqsave(&sem->wait.lock, flags); + add_wait_queue_exclusive_locked(&sem->wait, &wait); + + sem->sleepers++; + for (;;) { + int sleepers = sem->sleepers; + + /* + * Add "everybody else" into it. They aren't + * playing, because we own the spinlock in + * the wait_queue_head. + */ + if (!atomic_add_negative(sleepers - 1, &sem->count)) { + sem->sleepers = 0; + break; + } + sem->sleepers = 1; /* us - see -1 above */ + spin_unlock_irqrestore(&sem->wait.lock, flags); + + schedule(); + + spin_lock_irqsave(&sem->wait.lock, flags); + tsk->state = TASK_UNINTERRUPTIBLE; + } + remove_wait_queue_locked(&sem->wait, &wait); + wake_up_locked(&sem->wait); + spin_unlock_irqrestore(&sem->wait.lock, flags); + tsk->state = TASK_RUNNING; +} + +static int aio_down_wait(wait_queue_t *wait, unsigned mode, int sync, void *key) +{ + struct kiocb *iocb = io_wait_to_kiocb(wait); + struct semaphore *sem = wait->private; + int sleepers = sem->sleepers; + + /* + * Add "everybody else" into it. They aren't + * playing, because we own the spinlock in + * the wait_queue_head. + */ + if (!atomic_add_negative(sleepers - 1, &sem->count)) { + iocb->ki_cancel = NULL; + sem->sleepers = 0; + sem->aio_owner = iocb; + list_del_init(&wait->task_list); + wake_up_locked(&sem->wait); + kick_iocb(iocb); + return 1; + } + sem->sleepers = 1; /* us - see -1 above */ + + return 1; +} + +static void fixup_down_trylock_locked(struct semaphore *sem); +static int cancel_aio_down(struct kiocb *iocb, struct io_event *event) +{ + /* At this point, the kiocb is locked and even if we have kicked + * it, the pointer to the semaphore is still valid. + */ + struct semaphore *sem = iocb->ki_wait.wait.private; + unsigned long flags; + int ret = 0; + + spin_lock_irqsave(&sem->wait.lock, flags); + if (!list_empty(&iocb->ki_wait.wait.task_list)) { + /* Ensure aio_down_wait() can no longer be called. */ + list_del_init(&iocb->ki_wait.wait.task_list); + fixup_down_trylock_locked(sem); + event->res = is_sync_kiocb(iocb) ? -ERESTARTSYS : -EINTR; + } else + ret = -EAGAIN; /* we lost the race with aio_down_wait(). */ + spin_unlock_irqrestore(&sem->wait.lock, flags); + + return ret; +} + +fastcall long __sched __aio_down(struct kiocb *iocb, struct semaphore * sem) +{ + unsigned long flags; + + if (sem->aio_owner == iocb) { + atomic_inc(&sem->count); /* undo dec in aio_down() */ + return 0; + } + + iocb->ki_wait.wait.private = sem; + iocb->ki_wait.wait.func = aio_down_wait; + spin_lock_irqsave(&sem->wait.lock, flags); + add_wait_queue_exclusive_locked(&sem->wait, &iocb->ki_wait.wait); + + sem->sleepers++; + + iocb->ki_cancel = cancel_aio_down; + + aio_down_wait(&iocb->ki_wait.wait, 0, 0, NULL); + spin_unlock_irqrestore(&sem->wait.lock, flags); + return -EIOCBRETRY; +} + +fastcall int __sched __down_interruptible(struct semaphore * sem) +{ + int retval = 0; + struct task_struct *tsk = current; + DECLARE_WAITQUEUE(wait, tsk); + unsigned long flags; + + tsk->state = TASK_INTERRUPTIBLE; + spin_lock_irqsave(&sem->wait.lock, flags); + add_wait_queue_exclusive_locked(&sem->wait, &wait); + + sem->sleepers++; + for (;;) { + int sleepers = sem->sleepers; + + /* + * With signals pending, this turns into + * the trylock failure case - we won't be + * sleeping, and we* can't get the lock as + * it has contention. Just correct the count + * and exit. + */ + if (signal_pending(current)) { + retval = -EINTR; + sem->sleepers = 0; + atomic_add(sleepers, &sem->count); + break; + } + + /* + * Add "everybody else" into it. They aren't + * playing, because we own the spinlock in + * wait_queue_head. The "-1" is because we're + * still hoping to get the semaphore. + */ + if (!atomic_add_negative(sleepers - 1, &sem->count)) { + sem->sleepers = 0; + break; + } + sem->sleepers = 1; /* us - see -1 above */ + spin_unlock_irqrestore(&sem->wait.lock, flags); + + schedule(); + + spin_lock_irqsave(&sem->wait.lock, flags); + tsk->state = TASK_INTERRUPTIBLE; + } + remove_wait_queue_locked(&sem->wait, &wait); + wake_up_locked(&sem->wait); + spin_unlock_irqrestore(&sem->wait.lock, flags); + + tsk->state = TASK_RUNNING; + return retval; +} + +/* + * Trylock failed - make sure we correct for + * having decremented the count. + * + * We could have done the trylock with a + * single "cmpxchg" without failure cases, + * but then it wouldn't work on a 386. + */ +static void fixup_down_trylock_locked(struct semaphore *sem) +{ + int sleepers; + sleepers = sem->sleepers + 1; + sem->sleepers = 0; + + /* + * Add "everybody else" and us into it. They aren't + * playing, because we own the spinlock in the + * wait_queue_head. + */ + if (!atomic_add_negative(sleepers, &sem->count)) + wake_up_locked(&sem->wait); +} + +fastcall int __down_trylock(struct semaphore * sem) +{ + unsigned long flags; + + spin_lock_irqsave(&sem->wait.lock, flags); + fixup_down_trylock_locked(sem); + spin_unlock_irqrestore(&sem->wait.lock, flags); + return 1; +} diff -purN 00__v2.6.13-rc3-git8/mm/filemap.c 78_fs_aio_write/mm/filemap.c --- 00__v2.6.13-rc3-git8/mm/filemap.c 2005-07-13 11:26:14.000000000 -0400 +++ 78_fs_aio_write/mm/filemap.c 2005-07-31 16:34:07.000000000 -0400 @@ -126,7 +126,7 @@ void remove_from_page_cache(struct page write_unlock_irq(&mapping->tree_lock); } -static int sync_page(void *word) +static int sync_page(void *word, wait_queue_t *wait) { struct address_space *mapping; struct page *page; @@ -158,8 +158,7 @@ static int sync_page(void *word) mapping = page_mapping(page); if (mapping && mapping->a_ops && mapping->a_ops->sync_page) mapping->a_ops->sync_page(page); - io_schedule(); - return 0; + return io_wait_schedule(wait); } /** @@ -223,10 +222,11 @@ EXPORT_SYMBOL(filemap_flush); /* * Wait for writeback to complete against pages indexed by start->end - * inclusive + * inclusive. In AIO context, this may queue an async notification + * and retry callback and return, instead of blocking the caller. */ -static int wait_on_page_writeback_range(struct address_space *mapping, - pgoff_t start, pgoff_t end) +static int __wait_on_page_writeback_range(struct address_space *mapping, + pgoff_t start, pgoff_t end, wait_queue_t *wait) { struct pagevec pvec; int nr_pages; @@ -238,20 +238,20 @@ static int wait_on_page_writeback_range( pagevec_init(&pvec, 0); index = start; - while ((index <= end) && + while (!ret && (index <= end) && (nr_pages = pagevec_lookup_tag(&pvec, mapping, &index, PAGECACHE_TAG_WRITEBACK, min(end - index, (pgoff_t)PAGEVEC_SIZE-1) + 1)) != 0) { unsigned i; - for (i = 0; i < nr_pages; i++) { + for (i = 0; !ret && (i < nr_pages); i++) { struct page *page = pvec.pages[i]; /* until radix tree lookup accepts end_index */ if (page->index > end) continue; - wait_on_page_writeback(page); + ret = __wait_on_page_writeback(page, wait); if (PageError(page)) ret = -EIO; } @@ -268,6 +268,14 @@ static int wait_on_page_writeback_range( return ret; } +static inline int wait_on_page_writeback_range(struct address_space *mapping, + pgoff_t start, pgoff_t end) +{ + return __wait_on_page_writeback_range(mapping, start, end, + ¤t->__wait.wait); +} + + /* * Write and wait upon all the pages in the passed range. This is a "data * integrity" operation. It waits upon in-flight writeout before starting and @@ -281,18 +289,27 @@ int sync_page_range(struct inode *inode, { pgoff_t start = pos >> PAGE_CACHE_SHIFT; pgoff_t end = (pos + count - 1) >> PAGE_CACHE_SHIFT; - int ret; + int ret = 0; if (!mapping_cap_writeback_dirty(mapping) || !count) return 0; + if (in_aio()) { + /* Already issued writeouts for this iocb ? */ + if (kiocbTrySync(io_wait_to_kiocb(current->io_wait))) + goto do_wait; /* just need to check if done */ + } ret = filemap_fdatawrite_range(mapping, pos, pos + count - 1); - if (ret == 0) { + + if (ret >= 0) { down(&inode->i_sem); ret = generic_osync_inode(inode, mapping, OSYNC_METADATA); up(&inode->i_sem); } - if (ret == 0) - ret = wait_on_page_writeback_range(mapping, start, end); +do_wait: + if (ret >= 0) { + ret = __wait_on_page_writeback_range(mapping, start, end, + current->io_wait); + } return ret; } EXPORT_SYMBOL(sync_page_range); @@ -307,15 +324,23 @@ int sync_page_range_nolock(struct inode { pgoff_t start = pos >> PAGE_CACHE_SHIFT; pgoff_t end = (pos + count - 1) >> PAGE_CACHE_SHIFT; - int ret; + int ret = 0; if (!mapping_cap_writeback_dirty(mapping) || !count) return 0; + if (in_aio()) { + /* Already issued writeouts for this iocb ? */ + if (kiocbTrySync(io_wait_to_kiocb(current->io_wait))) + goto do_wait; /* just need to check if done */ + } ret = filemap_fdatawrite_range(mapping, pos, pos + count - 1); - if (ret == 0) + if (ret >= 0) ret = generic_osync_inode(inode, mapping, OSYNC_METADATA); - if (ret == 0) - ret = wait_on_page_writeback_range(mapping, start, end); +do_wait: + if (ret >= 0) { + ret = __wait_on_page_writeback_range(mapping, start, end, + current->io_wait); + } return ret; } EXPORT_SYMBOL(sync_page_range_nolock); @@ -428,13 +453,17 @@ static inline void wake_up_page(struct p __wake_up_bit(page_waitqueue(page), &page->flags, bit); } -void fastcall wait_on_page_bit(struct page *page, int bit_nr) +int fastcall wait_on_page_bit(struct page *page, int bit_nr, + wait_queue_t *wait) { - DEFINE_WAIT_BIT(wait, &page->flags, bit_nr); - - if (test_bit(bit_nr, &page->flags)) - __wait_on_bit(page_waitqueue(page), &wait, sync_page, + if (test_bit(bit_nr, &page->flags)) { + struct wait_bit_queue *wait_bit + = container_of(wait, struct wait_bit_queue, wait); + init_wait_bit_key(wait_bit, &page->flags, bit_nr); + return __wait_on_bit(page_waitqueue(page), wait_bit, sync_page, TASK_UNINTERRUPTIBLE); + } + return 0; } EXPORT_SYMBOL(wait_on_page_bit); @@ -478,21 +507,23 @@ void end_page_writeback(struct page *pag EXPORT_SYMBOL(end_page_writeback); /* - * Get a lock on the page, assuming we need to sleep to get it. + * Get a lock on the page, assuming we need to wait to get it. * * Ugly: running sync_page() in state TASK_UNINTERRUPTIBLE is scary. If some * random driver's requestfn sets TASK_RUNNING, we could busywait. However * chances are that on the second loop, the block layer's plug list is empty, * so sync_page() will then return in state TASK_UNINTERRUPTIBLE. */ -void fastcall __lock_page(struct page *page) +int fastcall lock_page_slow(struct page *page, wait_queue_t *wait) { - DEFINE_WAIT_BIT(wait, &page->flags, PG_locked); + struct wait_bit_queue *wait_bit + = container_of(wait, struct wait_bit_queue, wait); - __wait_on_bit_lock(page_waitqueue(page), &wait, sync_page, + init_wait_bit_key(wait_bit, &page->flags, PG_locked); + return __wait_on_bit_lock(page_waitqueue(page), wait_bit, sync_page, TASK_UNINTERRUPTIBLE); } -EXPORT_SYMBOL(__lock_page); +EXPORT_SYMBOL(lock_page_slow); /* * a rather lightweight function, finding and getting a reference to a @@ -740,6 +771,11 @@ void do_generic_mapping_read(struct addr if (!isize) goto out; + if (in_aio()) { + /* Avoid repeat readahead */ + if (is_retried_kiocb(io_wait_to_kiocb(current->io_wait))) + next_index = last_index; + } end_index = (isize - 1) >> PAGE_CACHE_SHIFT; for (;;) { struct page *page; @@ -809,7 +845,11 @@ page_ok: page_not_up_to_date: /* Get exclusive access to the page ... */ - lock_page(page); + + if ((error = __lock_page(page, current->io_wait))) { + pr_debug("queued lock page \n"); + goto readpage_error; + } /* Did it get unhashed before we got the lock? */ if (!page->mapping) { @@ -832,7 +872,8 @@ readpage: goto readpage_error; if (!PageUptodate(page)) { - lock_page(page); + if ((error = __lock_page(page, current->io_wait))) + goto readpage_error; if (!PageUptodate(page)) { if (page->mapping == NULL) { /* @@ -877,7 +918,11 @@ readpage: goto page_ok; readpage_error: - /* UHHUH! A synchronous read error occurred. Report it */ + /* We don't have uptodate data in the page yet */ + /* Could be due to an error or because we need to + * retry when we get an async i/o notification. + * Report the reason. + */ desc->error = error; page_cache_release(page); goto out; @@ -1983,7 +2028,7 @@ generic_file_buffered_write(struct kiocb */ if (likely(status >= 0)) { if (unlikely((file->f_flags & O_SYNC) || IS_SYNC(inode))) { - if (!a_ops->writepage || !is_sync_kiocb(iocb)) + if (!a_ops->writepage) status = generic_osync_inode(inode, mapping, OSYNC_METADATA|OSYNC_DATA); } @@ -2090,14 +2135,23 @@ generic_file_aio_write_nolock(struct kio ssize_t ret; loff_t pos = *ppos; + if (!is_sync_kiocb(iocb) && kiocbIsSynced(iocb)) { + /* nothing to transfer, may just need to sync data */ + ret = iov->iov_len; /* vector AIO not supported yet */ + goto osync; + } + ret = __generic_file_aio_write_nolock(iocb, iov, nr_segs, ppos); +osync: if (ret > 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) { int err; err = sync_page_range_nolock(inode, mapping, pos, ret); - if (err < 0) - ret = err; + if (err < 0) { + ret = err; + *ppos = pos; + } } return ret; } @@ -2141,19 +2195,28 @@ ssize_t generic_file_aio_write(struct ki struct iovec local_iov = { .iov_base = (void __user *)buf, .iov_len = count }; - BUG_ON(iocb->ki_pos != pos); + if (!is_sync_kiocb(iocb) && kiocbIsSynced(iocb)) { + /* nothing to transfer, may just need to sync data */ + ret = count; + goto osync; + } - down(&inode->i_sem); + ret = aio_down(iocb, &inode->i_sem); + if (ret) + return ret; ret = __generic_file_aio_write_nolock(iocb, &local_iov, 1, &iocb->ki_pos); - up(&inode->i_sem); + aio_up(iocb, &inode->i_sem); +osync: if (ret > 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) { ssize_t err; err = sync_page_range(inode, mapping, pos, ret); - if (err < 0) + if (err < 0) { ret = err; + iocb->ki_pos = pos; + } } return ret; }