diff -purN 00_v2.6.13-rc6/arch/i386/Kconfig 90_pipe_aio/arch/i386/Kconfig --- 00_v2.6.13-rc6/arch/i386/Kconfig 2005-08-08 15:45:47.000000000 -0400 +++ 90_pipe_aio/arch/i386/Kconfig 2005-08-08 17:15:28.000000000 -0400 @@ -14,6 +14,10 @@ config X86 486, 586, Pentiums, and various instruction-set-compatible chips by AMD, Cyrix, and others. +config SEMAPHORE_SLEEPERS + bool + default y + config MMU bool default y diff -purN 00_v2.6.13-rc6/arch/i386/kernel/semaphore.c 90_pipe_aio/arch/i386/kernel/semaphore.c --- 00_v2.6.13-rc6/arch/i386/kernel/semaphore.c 2005-06-20 13:33:10.000000000 -0400 +++ 90_pipe_aio/arch/i386/kernel/semaphore.c 2005-08-08 17:15:30.000000000 -0400 @@ -13,171 +13,10 @@ * rw semaphores implemented November 1999 by Benjamin LaHaise */ #include -#include -#include -#include +#include #include /* - * Semaphores are implemented using a two-way counter: - * The "count" variable is decremented for each process - * that tries to acquire the semaphore, while the "sleeping" - * variable is a count of such acquires. - * - * Notably, the inline "up()" and "down()" functions can - * efficiently test if they need to do any extra work (up - * needs to do something only if count was negative before - * the increment operation. - * - * "sleeping" and the contention routine ordering is protected - * by the spinlock in the semaphore's waitqueue head. - * - * Note that these functions are only called when there is - * contention on the lock, and as such all this is the - * "non-critical" part of the whole semaphore business. The - * critical part is the inline stuff in - * where we want to avoid any extra jumps and calls. - */ - -/* - * Logic: - * - only on a boundary condition do we need to care. When we go - * from a negative count to a non-negative, we wake people up. - * - when we go from a non-negative count to a negative do we - * (a) synchronize with the "sleeper" count and (b) make sure - * that we're on the wakeup list before we synchronize so that - * we cannot lose wakeup events. - */ - -static fastcall void __attribute_used__ __up(struct semaphore *sem) -{ - wake_up(&sem->wait); -} - -static fastcall void __attribute_used__ __sched __down(struct semaphore * sem) -{ - struct task_struct *tsk = current; - DECLARE_WAITQUEUE(wait, tsk); - unsigned long flags; - - tsk->state = TASK_UNINTERRUPTIBLE; - spin_lock_irqsave(&sem->wait.lock, flags); - add_wait_queue_exclusive_locked(&sem->wait, &wait); - - sem->sleepers++; - for (;;) { - int sleepers = sem->sleepers; - - /* - * Add "everybody else" into it. They aren't - * playing, because we own the spinlock in - * the wait_queue_head. - */ - if (!atomic_add_negative(sleepers - 1, &sem->count)) { - sem->sleepers = 0; - break; - } - sem->sleepers = 1; /* us - see -1 above */ - spin_unlock_irqrestore(&sem->wait.lock, flags); - - schedule(); - - spin_lock_irqsave(&sem->wait.lock, flags); - tsk->state = TASK_UNINTERRUPTIBLE; - } - remove_wait_queue_locked(&sem->wait, &wait); - wake_up_locked(&sem->wait); - spin_unlock_irqrestore(&sem->wait.lock, flags); - tsk->state = TASK_RUNNING; -} - -static fastcall int __attribute_used__ __sched __down_interruptible(struct semaphore * sem) -{ - int retval = 0; - struct task_struct *tsk = current; - DECLARE_WAITQUEUE(wait, tsk); - unsigned long flags; - - tsk->state = TASK_INTERRUPTIBLE; - spin_lock_irqsave(&sem->wait.lock, flags); - add_wait_queue_exclusive_locked(&sem->wait, &wait); - - sem->sleepers++; - for (;;) { - int sleepers = sem->sleepers; - - /* - * With signals pending, this turns into - * the trylock failure case - we won't be - * sleeping, and we* can't get the lock as - * it has contention. Just correct the count - * and exit. - */ - if (signal_pending(current)) { - retval = -EINTR; - sem->sleepers = 0; - atomic_add(sleepers, &sem->count); - break; - } - - /* - * Add "everybody else" into it. They aren't - * playing, because we own the spinlock in - * wait_queue_head. The "-1" is because we're - * still hoping to get the semaphore. - */ - if (!atomic_add_negative(sleepers - 1, &sem->count)) { - sem->sleepers = 0; - break; - } - sem->sleepers = 1; /* us - see -1 above */ - spin_unlock_irqrestore(&sem->wait.lock, flags); - - schedule(); - - spin_lock_irqsave(&sem->wait.lock, flags); - tsk->state = TASK_INTERRUPTIBLE; - } - remove_wait_queue_locked(&sem->wait, &wait); - wake_up_locked(&sem->wait); - spin_unlock_irqrestore(&sem->wait.lock, flags); - - tsk->state = TASK_RUNNING; - return retval; -} - -/* - * Trylock failed - make sure we correct for - * having decremented the count. - * - * We could have done the trylock with a - * single "cmpxchg" without failure cases, - * but then it wouldn't work on a 386. - */ -static fastcall int __attribute_used__ __down_trylock(struct semaphore * sem) -{ - int sleepers; - unsigned long flags; - - spin_lock_irqsave(&sem->wait.lock, flags); - sleepers = sem->sleepers + 1; - sem->sleepers = 0; - - /* - * Add "everybody else" and us into it. They aren't - * playing, because we own the spinlock in the - * wait_queue_head. - */ - if (!atomic_add_negative(sleepers, &sem->count)) { - wake_up_locked(&sem->wait); - } - - spin_unlock_irqrestore(&sem->wait.lock, flags); - return 1; -} - - -/* * The semaphore operations have a special calling sequence that * allow us to do a simpler in-line version of them. These routines * need to convert that sequence back into the C sequence when @@ -211,6 +50,28 @@ asm( asm( ".section .sched.text\n" ".align 4\n" +".globl __aio_down_failed\n" +"__aio_down_failed:\n\t" +#if defined(CONFIG_FRAME_POINTER) + "pushl %ebp\n\t" + "movl %esp,%ebp\n\t" +#endif + "pushl %edx\n\t" + "pushl %ecx\n\t" + "call __aio_down\n\t" + "popl %ecx\n\t" + "popl %edx\n\t" +#if defined(CONFIG_FRAME_POINTER) + "movl %ebp,%esp\n\t" + "popl %ebp\n\t" +#endif + "ret" +); +EXPORT_SYMBOL(__aio_down_failed); + +asm( +".section .sched.text\n" +".align 4\n" ".globl __down_failed_interruptible\n" "__down_failed_interruptible:\n\t" #if defined(CONFIG_FRAME_POINTER) diff -purN 00_v2.6.13-rc6/arch/um/Kconfig_i386 90_pipe_aio/arch/um/Kconfig_i386 --- 00_v2.6.13-rc6/arch/um/Kconfig_i386 2005-08-04 15:54:42.000000000 -0400 +++ 90_pipe_aio/arch/um/Kconfig_i386 2005-08-08 17:15:28.000000000 -0400 @@ -6,6 +6,10 @@ config 64BIT bool default n +config SEMAPHORE_SLEEPERS + bool + default y + config TOP_ADDR hex default 0xc0000000 if !HOST_2G_2G diff -purN 00_v2.6.13-rc6/arch/um/Kconfig_x86_64 90_pipe_aio/arch/um/Kconfig_x86_64 --- 00_v2.6.13-rc6/arch/um/Kconfig_x86_64 2005-08-04 15:54:42.000000000 -0400 +++ 90_pipe_aio/arch/um/Kconfig_x86_64 2005-08-08 17:15:28.000000000 -0400 @@ -6,6 +6,10 @@ config 64BIT bool default y +config SEMAPHORE_SLEEPERS + bool + default y + config TOP_ADDR hex default 0x80000000 diff -purN 00_v2.6.13-rc6/arch/x86_64/Kconfig 90_pipe_aio/arch/x86_64/Kconfig --- 00_v2.6.13-rc6/arch/x86_64/Kconfig 2005-08-04 15:54:44.000000000 -0400 +++ 90_pipe_aio/arch/x86_64/Kconfig 2005-08-08 17:15:28.000000000 -0400 @@ -24,6 +24,10 @@ config X86 bool default y +config SEMAPHORE_SLEEPERS + bool + default y + config MMU bool default y diff -purN 00_v2.6.13-rc6/arch/x86_64/kernel/Makefile 90_pipe_aio/arch/x86_64/kernel/Makefile --- 00_v2.6.13-rc6/arch/x86_64/kernel/Makefile 2005-08-04 15:54:44.000000000 -0400 +++ 90_pipe_aio/arch/x86_64/kernel/Makefile 2005-08-08 17:15:28.000000000 -0400 @@ -4,7 +4,7 @@ extra-y := head.o head64.o init_task.o vmlinux.lds EXTRA_AFLAGS := -traditional -obj-y := process.o semaphore.o signal.o entry.o traps.o irq.o \ +obj-y := process.o signal.o entry.o traps.o irq.o \ ptrace.o time.o ioport.o ldt.o setup.o i8259.o sys_x86_64.o \ x8664_ksyms.o i387.o syscall.o vsyscall.o \ setup64.o bootflag.o e820.o reboot.o quirks.o diff -purN 00_v2.6.13-rc6/arch/x86_64/kernel/semaphore.c 90_pipe_aio/arch/x86_64/kernel/semaphore.c --- 00_v2.6.13-rc6/arch/x86_64/kernel/semaphore.c 2005-06-20 13:33:15.000000000 -0400 +++ 90_pipe_aio/arch/x86_64/kernel/semaphore.c 1969-12-31 19:00:00.000000000 -0500 @@ -1,180 +0,0 @@ -/* - * x86_64 semaphore implementation. - * - * (C) Copyright 1999 Linus Torvalds - * - * Portions Copyright 1999 Red Hat, Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version - * 2 of the License, or (at your option) any later version. - * - * rw semaphores implemented November 1999 by Benjamin LaHaise - */ -#include -#include -#include -#include - -#include - -/* - * Semaphores are implemented using a two-way counter: - * The "count" variable is decremented for each process - * that tries to acquire the semaphore, while the "sleeping" - * variable is a count of such acquires. - * - * Notably, the inline "up()" and "down()" functions can - * efficiently test if they need to do any extra work (up - * needs to do something only if count was negative before - * the increment operation. - * - * "sleeping" and the contention routine ordering is protected - * by the spinlock in the semaphore's waitqueue head. - * - * Note that these functions are only called when there is - * contention on the lock, and as such all this is the - * "non-critical" part of the whole semaphore business. The - * critical part is the inline stuff in - * where we want to avoid any extra jumps and calls. - */ - -/* - * Logic: - * - only on a boundary condition do we need to care. When we go - * from a negative count to a non-negative, we wake people up. - * - when we go from a non-negative count to a negative do we - * (a) synchronize with the "sleeper" count and (b) make sure - * that we're on the wakeup list before we synchronize so that - * we cannot lose wakeup events. - */ - -void __up(struct semaphore *sem) -{ - wake_up(&sem->wait); -} - -void __sched __down(struct semaphore * sem) -{ - struct task_struct *tsk = current; - DECLARE_WAITQUEUE(wait, tsk); - unsigned long flags; - - tsk->state = TASK_UNINTERRUPTIBLE; - spin_lock_irqsave(&sem->wait.lock, flags); - add_wait_queue_exclusive_locked(&sem->wait, &wait); - - sem->sleepers++; - for (;;) { - int sleepers = sem->sleepers; - - /* - * Add "everybody else" into it. They aren't - * playing, because we own the spinlock in - * the wait_queue_head. - */ - if (!atomic_add_negative(sleepers - 1, &sem->count)) { - sem->sleepers = 0; - break; - } - sem->sleepers = 1; /* us - see -1 above */ - spin_unlock_irqrestore(&sem->wait.lock, flags); - - schedule(); - - spin_lock_irqsave(&sem->wait.lock, flags); - tsk->state = TASK_UNINTERRUPTIBLE; - } - remove_wait_queue_locked(&sem->wait, &wait); - wake_up_locked(&sem->wait); - spin_unlock_irqrestore(&sem->wait.lock, flags); - tsk->state = TASK_RUNNING; -} - -int __sched __down_interruptible(struct semaphore * sem) -{ - int retval = 0; - struct task_struct *tsk = current; - DECLARE_WAITQUEUE(wait, tsk); - unsigned long flags; - - tsk->state = TASK_INTERRUPTIBLE; - spin_lock_irqsave(&sem->wait.lock, flags); - add_wait_queue_exclusive_locked(&sem->wait, &wait); - - sem->sleepers++; - for (;;) { - int sleepers = sem->sleepers; - - /* - * With signals pending, this turns into - * the trylock failure case - we won't be - * sleeping, and we* can't get the lock as - * it has contention. Just correct the count - * and exit. - */ - if (signal_pending(current)) { - retval = -EINTR; - sem->sleepers = 0; - atomic_add(sleepers, &sem->count); - break; - } - - /* - * Add "everybody else" into it. They aren't - * playing, because we own the spinlock in - * wait_queue_head. The "-1" is because we're - * still hoping to get the semaphore. - */ - if (!atomic_add_negative(sleepers - 1, &sem->count)) { - sem->sleepers = 0; - break; - } - sem->sleepers = 1; /* us - see -1 above */ - spin_unlock_irqrestore(&sem->wait.lock, flags); - - schedule(); - - spin_lock_irqsave(&sem->wait.lock, flags); - tsk->state = TASK_INTERRUPTIBLE; - } - remove_wait_queue_locked(&sem->wait, &wait); - wake_up_locked(&sem->wait); - spin_unlock_irqrestore(&sem->wait.lock, flags); - - tsk->state = TASK_RUNNING; - return retval; -} - -/* - * Trylock failed - make sure we correct for - * having decremented the count. - * - * We could have done the trylock with a - * single "cmpxchg" without failure cases, - * but then it wouldn't work on a 386. - */ -int __down_trylock(struct semaphore * sem) -{ - int sleepers; - unsigned long flags; - - spin_lock_irqsave(&sem->wait.lock, flags); - sleepers = sem->sleepers + 1; - sem->sleepers = 0; - - /* - * Add "everybody else" and us into it. They aren't - * playing, because we own the spinlock in the - * wait_queue_head. - */ - if (!atomic_add_negative(sleepers, &sem->count)) { - wake_up_locked(&sem->wait); - } - - spin_unlock_irqrestore(&sem->wait.lock, flags); - return 1; -} - - diff -purN 00_v2.6.13-rc6/arch/x86_64/lib/thunk.S 90_pipe_aio/arch/x86_64/lib/thunk.S --- 00_v2.6.13-rc6/arch/x86_64/lib/thunk.S 2004-12-24 16:34:44.000000000 -0500 +++ 90_pipe_aio/arch/x86_64/lib/thunk.S 2005-08-08 17:15:30.000000000 -0400 @@ -47,6 +47,7 @@ thunk __down_failed,__down thunk_retrax __down_failed_interruptible,__down_interruptible thunk_retrax __down_failed_trylock,__down_trylock + thunk_retrax __aio_down_failed,__aio_down thunk __up_wakeup,__up /* SAVE_ARGS below is used only for the .cfi directives it contains. */ diff -purN 00_v2.6.13-rc6/description 90_pipe_aio/description --- 00_v2.6.13-rc6/description 1969-12-31 19:00:00.000000000 -0500 +++ 90_pipe_aio/description 2005-08-16 16:41:52.000000000 -0400 @@ -0,0 +1,9 @@ +[AIO] add aio support to pipes using aio_down and retry + +This patch implements aio pipes, and has the standard pipe code use the +sync kiocb retry mechanism to complete synchronous read/write calls. The +async pipe implementation depends on the earlier aio_down patch. Pipes +are fully featured in their support for cancellation and signal handling, +but do not current send a SIGPIPE for aio requests, but they do get -EPIPE. + +Signed-off-by: Benjamin LaHaise diff -purN 00_v2.6.13-rc6/drivers/usb/gadget/inode.c 90_pipe_aio/drivers/usb/gadget/inode.c --- 00_v2.6.13-rc6/drivers/usb/gadget/inode.c 2005-08-04 15:55:46.000000000 -0400 +++ 90_pipe_aio/drivers/usb/gadget/inode.c 2005-08-08 17:15:34.000000000 -0400 @@ -540,7 +540,6 @@ static int ep_aio_cancel(struct kiocb *i local_irq_disable(); epdata = priv->epdata; // spin_lock(&epdata->dev->lock); - kiocbSetCancelled(iocb); if (likely(epdata && epdata->ep && priv->req)) value = usb_ep_dequeue (epdata->ep, priv->req); else @@ -548,7 +547,6 @@ static int ep_aio_cancel(struct kiocb *i // spin_unlock(&epdata->dev->lock); local_irq_enable(); - aio_put_req(iocb); return value; } @@ -586,10 +584,7 @@ static void ep_aio_complete(struct usb_e kfree(priv); iocb->private = NULL; /* aio_complete() reports bytes-transferred _and_ faults */ - if (unlikely(kiocbIsCancelled(iocb))) - aio_put_req(iocb); - else - aio_complete(iocb, + aio_complete(iocb, req->actual ? req->actual : req->status, req->status); } else { diff -purN 00_v2.6.13-rc6/fs/aio.c 90_pipe_aio/fs/aio.c --- 00_v2.6.13-rc6/fs/aio.c 2005-08-04 15:55:50.000000000 -0400 +++ 90_pipe_aio/fs/aio.c 2005-08-17 12:40:36.000000000 -0400 @@ -29,6 +29,9 @@ #include #include #include +#include +#include +#include #include #include @@ -59,6 +62,7 @@ static LIST_HEAD(fput_head); static void aio_kick_handler(void *); static void aio_queue_work(struct kioctx *); +static long aio_cancel_kiocb(struct kiocb *iocb, struct io_event __user *event); /* aio_setup * Creates the slab caches used by the aio routines, panic on @@ -135,7 +139,7 @@ static int aio_setup_ring(struct kioctx 0); if (IS_ERR((void *)info->mmap_base)) { up_write(&ctx->mm->mmap_sem); - printk("mmap err: %ld\n", -info->mmap_base); + dprintk("mmap err: %ld\n", -info->mmap_base); info->mmap_size = 0; aio_free_ring(ctx); return -EAGAIN; @@ -268,22 +272,18 @@ out_freectx: */ static void aio_cancel_all(struct kioctx *ctx) { - int (*cancel)(struct kiocb *, struct io_event *); - struct io_event res; spin_lock_irq(&ctx->ctx_lock); ctx->dead = 1; while (!list_empty(&ctx->active_reqs)) { struct list_head *pos = ctx->active_reqs.next; struct kiocb *iocb = list_kiocb(pos); list_del_init(&iocb->ki_list); - cancel = iocb->ki_cancel; kiocbSetCancelled(iocb); - if (cancel) { - iocb->ki_users++; - spin_unlock_irq(&ctx->ctx_lock); - cancel(iocb, &res); - spin_lock_irq(&ctx->ctx_lock); - } + iocb->ki_users++; + spin_unlock_irq(&ctx->ctx_lock); + aio_cancel_kiocb(iocb, NULL); + aio_put_req(iocb); + spin_lock_irq(&ctx->ctx_lock); } spin_unlock_irq(&ctx->ctx_lock); } @@ -404,7 +404,9 @@ static struct kiocb fastcall *__aio_get_ req->ki_cancel = NULL; req->ki_retry = NULL; req->ki_dtor = NULL; + req->ki_signo = 0; req->private = NULL; + req->ki_iovec = NULL; INIT_LIST_HEAD(&req->ki_run_list); /* Check if the completion queue has enough free space to @@ -448,6 +450,7 @@ static inline void really_put_req(struct { if (req->ki_dtor) req->ki_dtor(req); + kfree(req->ki_iovec); kmem_cache_free(kiocb_cachep, req); ctx->reqs_active--; @@ -492,6 +495,7 @@ static int __aio_put_req(struct kioctx * BUG(); if (likely(req->ki_users)) return 0; + BUG_ON(!list_empty(&req->ki_wait.wait.task_list)); list_del(&req->ki_list); /* remove from active_reqs */ req->ki_cancel = NULL; req->ki_retry = NULL; @@ -546,6 +550,24 @@ struct kioctx *lookup_ioctx(unsigned lon return ioctx; } +static int lock_kiocb_action(void *param, wait_queue_t *wait) +{ + schedule(); + return 0; +} + +static inline void lock_kiocb(struct kiocb *iocb) +{ + wait_on_bit_lock(&iocb->ki_flags, KIF_LOCKED, lock_kiocb_action, + TASK_UNINTERRUPTIBLE); +} + +static inline void unlock_kiocb(struct kiocb *iocb) +{ + kiocbClearLocked(iocb); + wake_up_bit(&iocb->ki_flags, KIF_LOCKED); +} + /* * use_mm * Makes the calling kernel thread take on the specified @@ -649,7 +671,7 @@ static ssize_t aio_run_iocb(struct kiocb ssize_t ret; if (iocb->ki_retried++ > 1024*1024) { - printk("Maximal retry count. Bytes done %Zd\n", + printk(KERN_DEBUG "Maximal retry count. Bytes done %Zd\n", iocb->ki_nbytes - iocb->ki_left); return -EAGAIN; } @@ -660,7 +682,7 @@ static ssize_t aio_run_iocb(struct kiocb } if (!(retry = iocb->ki_retry)) { - printk("aio_run_iocb: iocb->ki_retry = NULL\n"); + printk(KERN_DEBUG "aio_run_iocb: iocb->ki_retry = NULL\n"); return 0; } @@ -712,14 +734,14 @@ static ssize_t aio_run_iocb(struct kiocb * cause the iocb to be kicked for continuation (through * the aio_wake_function callback). */ - BUG_ON(current->io_wait != NULL); - current->io_wait = &iocb->ki_wait; + BUG_ON(!is_sync_wait(current->io_wait)); + current->io_wait = &iocb->ki_wait.wait; ret = retry(iocb); - current->io_wait = NULL; + current->io_wait = ¤t->__wait.wait; if (-EIOCBRETRY != ret) { if (-EIOCBQUEUED != ret) { - BUG_ON(!list_empty(&iocb->ki_wait.task_list)); + BUG_ON(!list_empty(&iocb->ki_wait.wait.task_list)); aio_complete(iocb, ret, 0); /* must not access the iocb after this */ } @@ -728,7 +750,7 @@ static ssize_t aio_run_iocb(struct kiocb * Issue an additional retry to avoid waiting forever if * no waits were queued (e.g. in case of a short read). */ - if (list_empty(&iocb->ki_wait.task_list)) + if (list_empty(&iocb->ki_wait.wait.task_list)) kiocbSetKicked(iocb); } out: @@ -782,7 +804,9 @@ static int __aio_run_iocbs(struct kioctx * Hold an extra reference while retrying i/o. */ iocb->ki_users++; /* grab extra reference */ + lock_kiocb(iocb); aio_run_iocb(iocb); + unlock_kiocb(iocb); if (__aio_put_req(ctx, iocb)) /* drop extra ref */ put_ioctx(ctx); } @@ -879,7 +903,7 @@ static void queue_kicked_iocb(struct kio unsigned long flags; int run = 0; - WARN_ON((!list_empty(&iocb->ki_wait.task_list))); + WARN_ON((!list_empty(&iocb->ki_wait.wait.task_list))); spin_lock_irqsave(&ctx->ctx_lock, flags); run = __queue_kicked_iocb(iocb); @@ -912,6 +936,97 @@ void fastcall kick_iocb(struct kiocb *io } EXPORT_SYMBOL(kick_iocb); +static void __aio_send_signal(struct kiocb *iocb) +{ + struct siginfo info; + struct task_struct *p; + unsigned long flags; + int ret = -1; + + memset(&info, 0, sizeof(struct siginfo)); + + info.si_signo = iocb->ki_signo; + info.si_errno = 0; + info.si_code = SI_ASYNCIO; + info.si_pid = 0; + info.si_uid = 0; + info.si_value = iocb->ki_sigev_value; + + read_lock(&tasklist_lock); + p = find_task_by_pid(iocb->ki_pid); + if (!p || !p->sighand) + goto out_unlock; + + /* Do we have permission to signal this task? */ + if ((iocb->ki_euid ^ p->suid) && (iocb->ki_euid ^ p->uid) + && (iocb->ki_uid ^ p->suid) && (iocb->ki_uid ^ p->uid)) + goto out_unlock; /* No. */ + + spin_lock_irqsave(&p->sighand->siglock, flags); + + switch(iocb->ki_notify) { + case IO_NOTIFY_SIGNAL: + ret = __group_send_sig_info(iocb->ki_signo, &info, p); + break; + case IO_NOTIFY_THREAD_ID: + //ret = specific_send_sig_info(iocb->ki_signo, &info, p); + ret = __group_send_sig_info(iocb->ki_signo, &info, p); + break; + } + + spin_unlock_irqrestore(&p->sighand->siglock, flags); + + if (ret) + printk(KERN_DEBUG "__aio_send_signal: failed to send signal %d to %d\n", + iocb->ki_signo, iocb->ki_pid); + +out_unlock: + read_unlock(&tasklist_lock); +} + +static void __aio_write_evt(struct kioctx *ctx, struct io_event *event) +{ + struct aio_ring_info *info; + struct aio_ring *ring; + struct io_event *ring_event; + unsigned long tail; + + info = &ctx->ring_info; + + /* add a completion event to the ring buffer. + * must be done holding ctx->ctx_lock to prevent + * other code from messing with the tail + * pointer since we might be called from irq + * context. + */ + + ring = kmap_atomic(info->ring_pages[0], KM_IRQ1); + + tail = info->tail; + ring_event = aio_ring_event(info, tail, KM_IRQ0); + if (++tail >= info->nr) + tail = 0; + + *ring_event = *event; + + dprintk("aio_write_evt: %p[%lu]: %Lx %Lx %Lx %Lx\n", + ctx, tail, event->obj, event->data, event->res, event->res2); + + /* after flagging the request as done, we + * must never even look at it again + */ + + smp_wmb(); /* make event visible before updating tail */ + + info->tail = tail; + ring->tail = tail; + + put_aio_ring_event(ring_event, KM_IRQ0); + kunmap_atomic(ring, KM_IRQ1); + + pr_debug("added to ring at [%lu]\n", tail); +} + /* aio_complete * Called when the io request on the given iocb is complete. * Returns true if this is the last user of the request. The @@ -920,11 +1035,8 @@ EXPORT_SYMBOL(kick_iocb); int fastcall aio_complete(struct kiocb *iocb, long res, long res2) { struct kioctx *ctx = iocb->ki_ctx; - struct aio_ring_info *info; - struct aio_ring *ring; - struct io_event *event; + struct io_event event; unsigned long flags; - unsigned long tail; int ret; /* Special case handling for sync iocbs: events go directly @@ -951,14 +1063,13 @@ int fastcall aio_complete(struct kiocb * return ret; } - info = &ctx->ring_info; + /* insert event in the event ring */ + + event.obj = (u64)(unsigned long)iocb->ki_obj.user; + event.data = iocb->ki_user_data; + event.res = res; + event.res2 = res2; - /* add a completion event to the ring buffer. - * must be done holding ctx->ctx_lock to prevent - * other code from messing with the tail - * pointer since we might be called from irq - * context. - */ spin_lock_irqsave(&ctx->ctx_lock, flags); if (iocb->ki_run_list.prev && !list_empty(&iocb->ki_run_list)) @@ -971,34 +1082,10 @@ int fastcall aio_complete(struct kiocb * if (kiocbIsCancelled(iocb)) goto put_rq; - ring = kmap_atomic(info->ring_pages[0], KM_IRQ1); + __aio_write_evt(ctx, &event); - tail = info->tail; - event = aio_ring_event(info, tail, KM_IRQ0); - if (++tail >= info->nr) - tail = 0; - - event->obj = (u64)(unsigned long)iocb->ki_obj.user; - event->data = iocb->ki_user_data; - event->res = res; - event->res2 = res2; - - dprintk("aio_complete: %p[%lu]: %p: %p %Lx %lx %lx\n", - ctx, tail, iocb, iocb->ki_obj.user, iocb->ki_user_data, - res, res2); - - /* after flagging the request as done, we - * must never even look at it again - */ - smp_wmb(); /* make event visible before updating tail */ - - info->tail = tail; - ring->tail = tail; - - put_aio_ring_event(event, KM_IRQ0); - kunmap_atomic(ring, KM_IRQ1); - - pr_debug("added to ring %p at [%lu]\n", iocb, tail); + if (iocb->ki_signo) + __aio_send_signal(iocb); pr_debug("%ld retries: %d of %d\n", iocb->ki_retried, iocb->ki_nbytes - iocb->ki_left, iocb->ki_nbytes); @@ -1295,11 +1382,70 @@ asmlinkage long sys_io_destroy(aio_conte return -EINVAL; } +static void aio_advance_iovec(struct kiocb *iocb, ssize_t ret) +{ + struct iovec *iov = &iocb->ki_iovec[iocb->ki_cur_seg]; + + BUG_ON(ret <= 0); + + while (iocb->ki_cur_seg < iocb->ki_nr_segs && ret > 0) { + ssize_t this = min(iov->iov_len, (size_t)ret); + iov->iov_base += this; + iov->iov_len -= this; + iocb->ki_left -= this; + ret -= this; + if (iov->iov_len == 0) { + iocb->ki_cur_seg++; + iov++; + } + } + + /* the caller should not have done more io than what fit in + * the remaining iovecs */ + BUG_ON(ret > 0 && iocb->ki_left == 0); +} + +static ssize_t aio_rw_vect_retry(struct kiocb *iocb) +{ + struct file *file = iocb->ki_filp; + struct address_space *mapping = file->f_mapping; + struct inode *inode = mapping->host; + ssize_t ret = 0; + + if (iocb->ki_opcode == IOCB_CMD_PREADV) + ret = file->f_op->aio_readv(iocb, + &iocb->ki_iovec[iocb->ki_cur_seg], + iocb->ki_nr_segs - iocb->ki_cur_seg, + iocb->ki_pos); + else + ret = file->f_op->aio_writev(iocb, + &iocb->ki_iovec[iocb->ki_cur_seg], + iocb->ki_nr_segs - iocb->ki_cur_seg, + iocb->ki_pos); + + if (ret > 0) { + aio_advance_iovec(iocb, ret); + /* turn partial completion into retries. full completion + * gets ret = 0 below. only retry partial reads if they + * were to a regular file. */ + if (iocb->ki_opcode == IOCB_CMD_PWRITEV || + (!S_ISFIFO(inode->i_mode) && !S_ISSOCK(inode->i_mode))) + ret = -EIOCBRETRY; + } + + /* This means we must have transferred all that we could */ + /* No need to retry anymore */ + if ((ret == 0) || (iocb->ki_left == 0)) + ret = iocb->ki_nbytes - iocb->ki_left; + + return ret; +} + /* * Default retry method for aio_read (also used for first time submit) * Responsible for updating iocb state as retries progress */ -static ssize_t aio_pread(struct kiocb *iocb) +ssize_t aio_pread(struct kiocb *iocb) { struct file *file = iocb->ki_filp; struct address_space *mapping = file->f_mapping; @@ -1338,7 +1484,7 @@ static ssize_t aio_pread(struct kiocb *i * Default retry method for aio_write (also used for first time submit) * Responsible for updating iocb state as retries progress */ -static ssize_t aio_pwrite(struct kiocb *iocb) +ssize_t aio_pwrite(struct kiocb *iocb) { struct file *file = iocb->ki_filp; ssize_t ret = 0; @@ -1361,6 +1507,128 @@ static ssize_t aio_pwrite(struct kiocb * return ret; } +static int aio_thread_cancel(struct kiocb *iocb, struct io_event *res) +{ + force_sig_specific(SIGKILL, iocb->private); + return -EAGAIN; +} + +static int aio_thread_rw_v_fn(void *arg) +{ + struct kiocb *iocb = arg; + long ret; + + current->exit_signal = -1; + set_fs(USER_DS); + + /* setup cancellation */ + iocb->private = current; + iocb->ki_cancel = aio_thread_cancel; + unlock_kiocb(iocb); + + ret = -EINTR; + if (kiocbIsCancelled(iocb)) + goto out; + /* We don't count against the aio usage count. */ + if (atomic_dec_and_test(&iocb->ki_ctx->mm->default_kioctx.users)) + goto out; + + switch (iocb->ki_opcode) { + case IOCB_CMD_PREAD: + ret = iocb->ki_filp->f_op->read(iocb->ki_filp, iocb->ki_buf, + iocb->ki_left, &iocb->ki_pos); + break; + case IOCB_CMD_PREADV: + ret = vfs_readv(iocb->ki_filp, + &iocb->ki_iovec[iocb->ki_cur_seg], + iocb->ki_nr_segs - iocb->ki_cur_seg, + &iocb->ki_pos); + break; + case IOCB_CMD_PWRITE: + ret = iocb->ki_filp->f_op->write(iocb->ki_filp, iocb->ki_buf, + iocb->ki_left, &iocb->ki_pos); + break; + case IOCB_CMD_PWRITEV: + ret = vfs_writev(iocb->ki_filp, + &iocb->ki_iovec[iocb->ki_cur_seg], + iocb->ki_nr_segs - iocb->ki_cur_seg, + &iocb->ki_pos); + break; + case IOCB_CMD_SENDMSG: + ret = vfs_sendmsg(iocb->ki_filp, + (void __user *)iocb->ki_buf, iocb->ki_pos); + break; + case IOCB_CMD_RECVMSG: + ret = vfs_recvmsg(iocb->ki_filp, + (void __user *)iocb->ki_buf, iocb->ki_pos); + break; + default: + ret = -EINVAL; + break; + } +out: + current->aio_mm = NULL; /* prevent mmput from dec'ing aio_mm */ + aio_complete(iocb, ret, 0); + + return 0; +} + +static int aio_thread_fsync_fn(void *arg) +{ + struct kiocb *iocb = arg; + long ret; + + ret = iocb->ki_filp->f_op->fsync(iocb->ki_filp, + iocb->ki_filp->f_dentry, 0); + aio_complete(iocb, ret, 0); + + return 0; +} + +static int aio_thread_fdsync_fn(void *arg) +{ + struct kiocb *iocb = arg; + long ret; + + ret = iocb->ki_filp->f_op->fsync(iocb->ki_filp, + iocb->ki_filp->f_dentry, 1); + aio_complete(iocb, ret, 0); + + return 0; +} + +static ssize_t aio_kernel_thread(struct kiocb *iocb, int (*fn)(void *)) +{ + long ret; + + /* Setting up cancellation depends on actions of the thread, so + * we need to prevent the submit path from unlocking the iocb + * until our cancel method is in place. + */ + kiocbSetDontUnlock(iocb); + + ret = kernel_thread(fn, iocb, CLONE_VM|CLONE_FS|CLONE_FILES); + if (ret < 0) + return ret; + + return -EIOCBQUEUED; +} + +static ssize_t aio_thread_rw_v(struct kiocb *iocb) +{ + return aio_kernel_thread(iocb, aio_thread_rw_v_fn); +} + +static ssize_t aio_thread_fsync(struct kiocb *iocb) +{ + return aio_kernel_thread(iocb, aio_thread_fsync_fn); +} + +static ssize_t aio_thread_fdsync(struct kiocb *iocb) +{ + return aio_kernel_thread(iocb, aio_thread_fdsync_fn); +} + static ssize_t aio_fdsync(struct kiocb *iocb) { struct file *file = iocb->ki_filp; @@ -1381,6 +1649,27 @@ static ssize_t aio_fsync(struct kiocb *i return ret; } +static ssize_t aio_setup_vectored_rw(struct kiocb *kiocb, int type) +{ + ssize_t ret; + + ret = rw_copy_check_uvector(type, (struct iovec __user *)kiocb->ki_buf, + kiocb->ki_nbytes, 0, NULL, + &kiocb->ki_iovec); + if (ret < 0) + goto out; + + kiocb->ki_nr_segs = kiocb->ki_nbytes; + kiocb->ki_cur_seg = 0; + /* ki_nbytes/left now reflect bytes instead of segs */ + kiocb->ki_nbytes = ret; + kiocb->ki_left = ret; + + ret = 0; +out: + return ret; +} + /* * aio_setup_iocb: * Performs the initial checks and aio retry method @@ -1403,6 +1692,8 @@ static ssize_t aio_setup_iocb(struct kio ret = -EINVAL; if (file->f_op->aio_read) kiocb->ki_retry = aio_pread; + else if (file->f_op->read) + kiocb->ki_retry = aio_thread_rw_v; break; case IOCB_CMD_PWRITE: ret = -EBADF; @@ -1415,16 +1706,55 @@ static ssize_t aio_setup_iocb(struct kio ret = -EINVAL; if (file->f_op->aio_write) kiocb->ki_retry = aio_pwrite; + else if (file->f_op->write) + kiocb->ki_retry = aio_thread_rw_v; + break; + case IOCB_CMD_PREADV: + ret = -EBADF; + if (unlikely(!(file->f_mode & FMODE_READ))) + break; + ret = aio_setup_vectored_rw(kiocb, READ); + if (ret) + break; + ret = EINVAL; + if (file->f_op->aio_readv) + kiocb->ki_retry = aio_rw_vect_retry; + else if (file->f_op->readv || file->f_op->read) + kiocb->ki_retry = aio_thread_rw_v; + break; + case IOCB_CMD_PWRITEV: + ret = -EBADF; + if (unlikely(!(file->f_mode & FMODE_WRITE))) + break; + ret = aio_setup_vectored_rw(kiocb, WRITE); + if (ret) + break; + ret = EINVAL; + if (file->f_op->aio_writev) + kiocb->ki_retry = aio_rw_vect_retry; + else if (file->f_op->writev || file->f_op->write) + kiocb->ki_retry = aio_thread_rw_v; break; case IOCB_CMD_FDSYNC: ret = -EINVAL; if (file->f_op->aio_fsync) kiocb->ki_retry = aio_fdsync; + else if (file->f_op->fsync) + kiocb->ki_retry = aio_thread_fdsync; break; case IOCB_CMD_FSYNC: ret = -EINVAL; if (file->f_op->aio_fsync) kiocb->ki_retry = aio_fsync; + else if (file->f_op->fsync) + kiocb->ki_retry = aio_thread_fsync; + break; + case IOCB_CMD_SENDMSG: + case IOCB_CMD_RECVMSG: + ret = -EINVAL; + if (kiocb->ki_nbytes != sizeof(struct msghdr)) + break; + kiocb->ki_retry = aio_thread_rw_v; break; default: dprintk("EINVAL: io_submit: no operation provided\n"); @@ -1455,14 +1785,93 @@ static ssize_t aio_setup_iocb(struct kio * because this callback isn't used for wait queues which * are nested inside ioctx lock (i.e. ctx->wait) */ -static int aio_wake_function(wait_queue_t *wait, unsigned mode, - int sync, void *key) +int aio_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key) { - struct kiocb *iocb = container_of(wait, struct kiocb, ki_wait); + struct wait_bit_queue *wait_bit + = container_of(wait, struct wait_bit_queue, wait); + struct kiocb *iocb = container_of(wait_bit, struct kiocb, ki_wait); + + /* Assumes that a non-NULL key implies wait bit filtering */ + if (key && !test_wait_bit_key(wait, key)) + return 0; list_del_init(&wait->task_list); kick_iocb(iocb); - return 1; + /* + * Avoid exclusive wakeups with retries since an exclusive wakeup + * may involve implicit expectations of waking up the next waiter + * and there is no guarantee that the retry will take a path that + * would do so. For example if a page has become up-to-date, then + * a retried read may end up straightaway performing a copyout + * and not go through a lock_page - unlock_page that would have + * passed the baton to the next waiter. + */ + return 0; +} + +static long iocb_setup_sigevent(struct kiocb *req, + struct sigevent __user *user_event) +{ + int notify = 0; + pid_t aio_pid; + int aio_signo = 0; + + req->ki_uid = current->uid; + req->ki_euid = current->euid; + + if (!access_ok(VERIFY_READ, user_event, sizeof(struct sigevent))) + return -EFAULT; + /* + * We avoid copying the whole sigevent bunch and only get the + * needed fields. + */ + if (unlikely(__get_user(aio_pid, &user_event->sigev_notify_thread_id))) + return -EFAULT; + + if (unlikely(__get_user(aio_signo, &user_event->sigev_signo))) + return -EFAULT; + + if (unlikely(__copy_from_user(&req->ki_sigev_value, + &user_event->sigev_value, + sizeof(sigval_t)))) + return -EFAULT; + + if (!aio_signo) + return 0; /* no signal number, we do nothing */ + + if (aio_pid == 0) { + /* notify itself */ + aio_pid = current->pid; + notify = IO_NOTIFY_SIGNAL; + } else { + pid_t group_id; + task_t *ptask; + /* notify given thread */ + + /* caller thread and target thread must be in same + * thread group + */ + read_lock(&tasklist_lock); + + ptask = find_task_by_pid(aio_pid); + if (ptask) + group_id = ptask->tgid; + read_unlock(&tasklist_lock); + + if (unlikely (ptask == NULL)) + return -ESRCH; + + if (group_id != current->tgid) + return -EINVAL; + + notify = IO_NOTIFY_THREAD_ID; + } + + req->ki_pid = aio_pid; + req->ki_signo = aio_signo; + req->ki_notify = notify; + + return 0; } int fastcall io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, @@ -1473,8 +1882,7 @@ int fastcall io_submit_one(struct kioctx ssize_t ret; /* enforce forwards compatibility on users */ - if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2 || - iocb->aio_reserved3)) { + if (unlikely(iocb->aio_reserved1)) { pr_debug("EINVAL: io_submit: reserve field set\n"); return -EINVAL; } @@ -1483,6 +1891,7 @@ int fastcall io_submit_one(struct kioctx if (unlikely( (iocb->aio_buf != (unsigned long)iocb->aio_buf) || (iocb->aio_nbytes != (size_t)iocb->aio_nbytes) || + (iocb->aio_sigeventp != (unsigned long)iocb->aio_sigeventp) || ((ssize_t)iocb->aio_nbytes < 0) )) { pr_debug("EINVAL: io_submit: overflow check\n"); @@ -1513,20 +1922,29 @@ int fastcall io_submit_one(struct kioctx req->ki_buf = (char __user *)(unsigned long)iocb->aio_buf; req->ki_left = req->ki_nbytes = iocb->aio_nbytes; req->ki_opcode = iocb->aio_lio_opcode; - init_waitqueue_func_entry(&req->ki_wait, aio_wake_function); - INIT_LIST_HEAD(&req->ki_wait.task_list); + init_waitqueue_func_entry(&req->ki_wait.wait, aio_wake_function); + INIT_LIST_HEAD(&req->ki_wait.wait.task_list); + req->ki_run_list.next = req->ki_run_list.prev = NULL; req->ki_retried = 0; - ret = aio_setup_iocb(req); + /* handle setting up the sigevent for POSIX AIO signals */ + if (iocb->aio_sigeventp) { + ret = iocb_setup_sigevent(req, + (struct sigevent __user *)(unsigned long) + iocb->aio_sigeventp); + if (ret) + goto out_put_req; + } + ret = aio_setup_iocb(req); if (ret) goto out_put_req; spin_lock_irq(&ctx->ctx_lock); - if (likely(list_empty(&ctx->run_list))) { - aio_run_iocb(req); - } else { - list_add_tail(&req->ki_run_list, &ctx->run_list); + aio_run_iocb(req); + if (!kiocbIsDontUnlock(req)) + unlock_kiocb(req); + if (!list_empty(&ctx->run_list)) { /* drain the run list */ while (__aio_run_iocbs(ctx)) ; @@ -1575,6 +1993,9 @@ asmlinkage long sys_io_submit(aio_contex /* * AKPM: should this return a partial result if some of the IOs were * successfully submitted? + * bcrl: yes. By returning either the partial result or an error, + * applications can resubmit the requests that were not completed + * to discover the error code returned by the submit path. */ for (i=0; iki_cancel); + memset(&tmp, 0, sizeof(tmp)); + tmp.obj = (u64)(unsigned long)kiocb->ki_obj.user; + tmp.data = kiocb->ki_user_data; + + lock_kiocb(kiocb); + cancel = kiocb->ki_cancel; + if (cancel) { + ret = cancel(kiocb, &tmp); + if (!ret) { + /* Cancellation succeeded -- copy the result + * into the user's buffer. + */ + if (result && copy_to_user(result, &tmp, sizeof(tmp))) + ret = -EFAULT; + } + } + unlock_kiocb(kiocb); + /* If the cancellation was successful, we must discard the + * reference held for completion of the iocb. + */ + if (!ret) + aio_put_req(kiocb); + return ret; +} + /* sys_io_cancel: * Attempts to cancel an iocb previously passed to io_submit. If * the operation is successfully cancelled, the resulting event is @@ -1655,19 +2108,8 @@ asmlinkage long sys_io_cancel(aio_contex spin_unlock_irq(&ctx->ctx_lock); if (NULL != cancel) { - struct io_event tmp; - pr_debug("calling cancel\n"); - memset(&tmp, 0, sizeof(tmp)); - tmp.obj = (u64)(unsigned long)kiocb->ki_obj.user; - tmp.data = kiocb->ki_user_data; - ret = cancel(kiocb, &tmp); - if (!ret) { - /* Cancellation succeeded -- copy the result - * into the user's buffer. - */ - if (copy_to_user(result, &tmp, sizeof(tmp))) - ret = -EFAULT; - } + ret = aio_cancel_kiocb(kiocb, result); + aio_put_req(kiocb); } else printk(KERN_DEBUG "iocb has no cancel operation\n"); diff -purN 00_v2.6.13-rc6/fs/bad_inode.c 90_pipe_aio/fs/bad_inode.c --- 00_v2.6.13-rc6/fs/bad_inode.c 2005-06-20 13:33:31.000000000 -0400 +++ 90_pipe_aio/fs/bad_inode.c 2005-08-09 19:13:07.000000000 -0400 @@ -26,9 +26,11 @@ static struct file_operations bad_file_o { .llseek = EIO_ERROR, .aio_read = EIO_ERROR, + .aio_readv = EIO_ERROR, .read = EIO_ERROR, .write = EIO_ERROR, .aio_write = EIO_ERROR, + .aio_writev = EIO_ERROR, .readdir = EIO_ERROR, .poll = EIO_ERROR, .ioctl = EIO_ERROR, diff -purN 00_v2.6.13-rc6/fs/block_dev.c 90_pipe_aio/fs/block_dev.c --- 00_v2.6.13-rc6/fs/block_dev.c 2005-08-04 15:55:50.000000000 -0400 +++ 90_pipe_aio/fs/block_dev.c 2005-08-09 19:15:45.000000000 -0400 @@ -777,6 +777,13 @@ static ssize_t blkdev_file_aio_write(str return generic_file_aio_write_nolock(iocb, &local_iov, 1, &iocb->ki_pos); } +static ssize_t blkdev_file_aio_writev(struct kiocb *iocb, + const struct iovec *iov, + unsigned long nr_segs, loff_t pos) +{ + return generic_file_aio_write_nolock(iocb, iov, nr_segs, &iocb->ki_pos); +} + static long block_ioctl(struct file *file, unsigned cmd, unsigned long arg) { return blkdev_ioctl(file->f_mapping->host, file, cmd, arg); @@ -799,7 +806,9 @@ struct file_operations def_blk_fops = { .read = generic_file_read, .write = blkdev_file_write, .aio_read = generic_file_aio_read, + .aio_readv = generic_file_aio_readv, .aio_write = blkdev_file_aio_write, + .aio_writev = blkdev_file_aio_writev, .mmap = generic_file_mmap, .fsync = block_fsync, .unlocked_ioctl = block_ioctl, diff -purN 00_v2.6.13-rc6/fs/buffer.c 90_pipe_aio/fs/buffer.c --- 00_v2.6.13-rc6/fs/buffer.c 2005-08-04 15:55:50.000000000 -0400 +++ 90_pipe_aio/fs/buffer.c 2005-08-08 17:15:51.000000000 -0400 @@ -53,7 +53,7 @@ init_buffer(struct buffer_head *bh, bh_e bh->b_private = private; } -static int sync_buffer(void *word) +static int sync_buffer(void *word, wait_queue_t *wait) { struct block_device *bd; struct buffer_head *bh diff -purN 00_v2.6.13-rc6/fs/ext2/file.c 90_pipe_aio/fs/ext2/file.c --- 00_v2.6.13-rc6/fs/ext2/file.c 2005-08-04 15:55:50.000000000 -0400 +++ 90_pipe_aio/fs/ext2/file.c 2005-08-09 19:13:07.000000000 -0400 @@ -44,7 +44,9 @@ struct file_operations ext2_file_operati .read = generic_file_read, .write = generic_file_write, .aio_read = generic_file_aio_read, + .aio_readv = generic_file_aio_readv, .aio_write = generic_file_aio_write, + .aio_writev = generic_file_aio_writev, .ioctl = ext2_ioctl, .mmap = generic_file_mmap, .open = generic_file_open, diff -purN 00_v2.6.13-rc6/fs/ext3/file.c 90_pipe_aio/fs/ext3/file.c --- 00_v2.6.13-rc6/fs/ext3/file.c 2005-08-04 15:55:51.000000000 -0400 +++ 90_pipe_aio/fs/ext3/file.c 2005-08-09 19:13:07.000000000 -0400 @@ -23,6 +23,7 @@ #include #include #include +#include #include "xattr.h" #include "acl.h" @@ -48,14 +49,15 @@ static int ext3_release_file (struct ino } static ssize_t -ext3_file_write(struct kiocb *iocb, const char __user *buf, size_t count, loff_t pos) +ext3_file_aio_writev(struct kiocb *iocb, const struct iovec *iov, + unsigned long nr_segs, loff_t pos) { struct file *file = iocb->ki_filp; struct inode *inode = file->f_dentry->d_inode; ssize_t ret; int err; - ret = generic_file_aio_write(iocb, buf, count, pos); + ret = generic_file_aio_writev(iocb, iov, nr_segs, pos); /* * Skip flushing if there was an error, or if nothing was written. @@ -105,12 +107,22 @@ force_commit: return ret; } +static ssize_t +ext3_file_write(struct kiocb *iocb, const char __user *buf, size_t count, loff_t pos) +{ + struct iovec local_iov = { .iov_base = (void __user *)buf, + .iov_len = count }; + return ext3_file_aio_writev(iocb, &local_iov, 1, pos); +} + struct file_operations ext3_file_operations = { .llseek = generic_file_llseek, .read = do_sync_read, .write = do_sync_write, .aio_read = generic_file_aio_read, + .aio_readv = generic_file_aio_readv, .aio_write = ext3_file_write, + .aio_writev = ext3_file_aio_writev, .readv = generic_file_readv, .writev = generic_file_writev, .ioctl = ext3_ioctl, diff -purN 00_v2.6.13-rc6/fs/inode.c 90_pipe_aio/fs/inode.c --- 00_v2.6.13-rc6/fs/inode.c 2005-08-04 15:55:51.000000000 -0400 +++ 90_pipe_aio/fs/inode.c 2005-08-08 17:15:51.000000000 -0400 @@ -1279,7 +1279,7 @@ void remove_dquot_ref(struct super_block #endif -int inode_wait(void *word) +int inode_wait(void *word, wait_queue_t *wait) { schedule(); return 0; diff -purN 00_v2.6.13-rc6/fs/jfs/file.c 90_pipe_aio/fs/jfs/file.c --- 00_v2.6.13-rc6/fs/jfs/file.c 2005-08-04 15:55:52.000000000 -0400 +++ 90_pipe_aio/fs/jfs/file.c 2005-08-09 19:13:07.000000000 -0400 @@ -106,7 +106,9 @@ struct file_operations jfs_file_operatio .write = generic_file_write, .read = generic_file_read, .aio_read = generic_file_aio_read, + .aio_readv = generic_file_aio_readv, .aio_write = generic_file_aio_write, + .aio_writev = generic_file_aio_writev, .mmap = generic_file_mmap, .readv = generic_file_readv, .writev = generic_file_writev, diff -purN 00_v2.6.13-rc6/fs/ntfs/file.c 90_pipe_aio/fs/ntfs/file.c --- 00_v2.6.13-rc6/fs/ntfs/file.c 2005-08-04 15:55:56.000000000 -0400 +++ 90_pipe_aio/fs/ntfs/file.c 2005-08-09 19:13:07.000000000 -0400 @@ -111,10 +111,12 @@ struct file_operations ntfs_file_ops = { .llseek = generic_file_llseek, /* Seek inside file. */ .read = generic_file_read, /* Read from file. */ .aio_read = generic_file_aio_read, /* Async read from file. */ + .aio_readv = generic_file_aio_readv, /* Async readv from file. */ .readv = generic_file_readv, /* Read from file. */ #ifdef NTFS_RW .write = generic_file_write, /* Write to file. */ .aio_write = generic_file_aio_write, /* Async write to file. */ + .aio_writev = generic_file_aio_writev,/* Async writev to file. */ .writev = generic_file_writev, /* Write to file. */ /*.release = ,*/ /* Last file is closed. See fs/ext2/file.c:: diff -purN 00_v2.6.13-rc6/fs/pipe.c 90_pipe_aio/fs/pipe.c --- 00_v2.6.13-rc6/fs/pipe.c 2005-06-20 13:33:32.000000000 -0400 +++ 90_pipe_aio/fs/pipe.c 2005-08-16 14:05:23.000000000 -0400 @@ -46,6 +46,67 @@ void pipe_wait(struct inode * inode) down(PIPE_SEM(*inode)); } +static int pipe_aio_waiter(wait_queue_t *wait, unsigned mode, int sync, + void *key) +{ + struct kiocb *iocb = io_wait_to_kiocb(wait); + + list_del_init(&wait->task_list); + iocb->ki_cancel = NULL; /* We're removed from the wait queue, so our + * cancellation code no longer applies. + */ + kick_iocb(iocb); + return 1; +} + +static int pipe_aio_cancel(struct kiocb *kiocb, struct io_event *event); +static int pipe_aio_clear_cancel(struct kiocb *kiocb, struct io_event *event) +{ + struct inode *inode = kiocb->ki_filp->f_dentry->d_inode; + wait_queue_head_t *wq = PIPE_WAIT(*inode); + int ret = 0; + + spin_lock_irq(&wq->lock); + if (kiocb->ki_cancel == pipe_aio_cancel) { + kiocb->ki_cancel = NULL; + list_del_init(&kiocb->ki_wait.wait.task_list); + if (event) { + long transferred = kiocb->ki_nbytes - kiocb->ki_left; + event->res = (transferred > 0) ? transferred : -EINTR; + event->res2 = 0; + } + } else + ret = -EAGAIN; + spin_unlock_irq(&wq->lock); + return ret; +} + +static int pipe_aio_cancel(struct kiocb *kiocb, struct io_event *event) +{ + int ret = pipe_aio_clear_cancel(kiocb, event); + /* Undo the WRITERS++ done in pipe_aio_write if cancel was okay. + */ + if (!ret && kiocb->ki_user_data) { + struct inode *inode = kiocb->ki_filp->f_dentry->d_inode; + kiocb->ki_user_data = 0; + down(PIPE_SEM(*inode)); + PIPE_WAITING_WRITERS(*inode)--; + up(PIPE_SEM(*inode)); + } + + return ret; +} + +static long pipe_aio_wait(struct kiocb *kiocb, struct inode *inode) +{ + kiocb->ki_wait.wait.func = pipe_aio_waiter; + kiocb->ki_cancel = pipe_aio_cancel; + add_wait_queue(PIPE_WAIT(*inode), &kiocb->ki_wait.wait); + aio_up(kiocb, PIPE_SEM(*inode)); + kiocbSetIntr(kiocb); + return -EIOCBRETRY; +} + static inline int pipe_iov_copy_from_user(void *to, struct iovec *iov, unsigned long len) { @@ -115,9 +176,12 @@ static struct pipe_buf_operations anon_p }; static ssize_t -pipe_readv(struct file *filp, const struct iovec *_iov, - unsigned long nr_segs, loff_t *ppos) +pipe_aio_read(struct kiocb *kiocb, char __user *buf, size_t len, loff_t pos) { + struct iovec _iov[2] = {{ .iov_base = (void __user *)buf, .iov_len = len }}; + unsigned long nr_segs = 1; + struct file *filp = kiocb->ki_filp; + struct inode *inode = filp->f_dentry->d_inode; struct pipe_inode_info *info; int do_wakeup; @@ -125,14 +189,25 @@ pipe_readv(struct file *filp, const stru struct iovec *iov = (struct iovec *)_iov; size_t total_len; + /* In retries we need to remove ourself from the wait queue at this + * point. Checking ki_cancel is a convenient way of checking for + * this case, as we clear the cancel operation when the iocb is + * removed from the wait queue. + */ + if (kiocb->ki_cancel == pipe_aio_cancel) + pipe_aio_clear_cancel(kiocb, NULL); + total_len = iov_length(iov, nr_segs); /* Null read succeeds. */ if (unlikely(total_len == 0)) return 0; do_wakeup = 0; - ret = 0; - down(PIPE_SEM(*inode)); + ret = aio_down(kiocb, PIPE_SEM(*inode)); + if (ret) + return ret; + + ret = kiocb->ki_nbytes - kiocb->ki_left; info = inode->i_pipe; for (;;) { int bufs = info->nrbufs; @@ -155,6 +230,8 @@ pipe_readv(struct file *filp, const stru break; } ret += chars; + kiocb->ki_left -= chars; + kiocb->ki_buf += chars; buf->offset += chars; buf->len -= chars; if (!buf->len) { @@ -186,7 +263,7 @@ pipe_readv(struct file *filp, const stru break; } } - if (signal_pending(current)) { + if (is_sync_kiocb(kiocb) && signal_pending(current)) { if (!ret) ret = -ERESTARTSYS; break; } @@ -194,9 +271,9 @@ pipe_readv(struct file *filp, const stru wake_up_interruptible_sync(PIPE_WAIT(*inode)); kill_fasync(PIPE_FASYNC_WRITERS(*inode), SIGIO, POLL_OUT); } - pipe_wait(inode); + return pipe_aio_wait(kiocb, inode); } - up(PIPE_SEM(*inode)); + aio_up(kiocb, PIPE_SEM(*inode)); /* Signal writers asynchronously that there is more room. */ if (do_wakeup) { wake_up_interruptible(PIPE_WAIT(*inode)); @@ -208,16 +285,12 @@ pipe_readv(struct file *filp, const stru } static ssize_t -pipe_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos) +pipe_aio_write(struct kiocb *kiocb, const char __user *buf, size_t len, loff_t pos) { - struct iovec iov = { .iov_base = buf, .iov_len = count }; - return pipe_readv(filp, &iov, 1, ppos); -} + struct iovec _iov[2] = {{ .iov_base = (void __user *)buf, .iov_len = len }}; + unsigned long nr_segs = 1; -static ssize_t -pipe_writev(struct file *filp, const struct iovec *_iov, - unsigned long nr_segs, loff_t *ppos) -{ + struct file *filp = kiocb->ki_filp; struct inode *inode = filp->f_dentry->d_inode; struct pipe_inode_info *info; ssize_t ret; @@ -231,13 +304,33 @@ pipe_writev(struct file *filp, const str if (unlikely(total_len == 0)) return 0; + /* In retries we need to remove ourself from the wait queue at this + * point. Checking ki_cancel is a convenient way of checking for + * this case, as we clear the cancel operation when the iocb is + * removed from the wait queue. + */ + if (kiocb->ki_cancel == pipe_aio_cancel) + pipe_aio_clear_cancel(kiocb, NULL); + do_wakeup = 0; - ret = 0; - down(PIPE_SEM(*inode)); + ret = aio_down(kiocb, PIPE_SEM(*inode)); + if (ret) + return ret; + + /* Undo the WRITERS++ done below where we are queued. We use + * kiocb->private to flag if we were waiting, as the higher layers + * initialize it to NULL at the beginning of a request's life. + */ + if (kiocb->ki_user_data) { + PIPE_WAITING_WRITERS(*inode)--; + kiocb->ki_user_data = 0; + } + info = inode->i_pipe; if (!PIPE_READERS(*inode)) { - send_sig(SIGPIPE, current, 0); + if (is_sync_kiocb(kiocb)) + send_sig(SIGPIPE, current, 0); ret = -EPIPE; goto out; } @@ -257,6 +350,8 @@ pipe_writev(struct file *filp, const str do_wakeup = 1; if (error) goto out; + iov->iov_base += chars; + iov->iov_len -= chars; buf->len += chars; total_len -= chars; ret = chars; @@ -267,8 +362,10 @@ pipe_writev(struct file *filp, const str for (;;) { int bufs; + if (!PIPE_READERS(*inode)) { - send_sig(SIGPIPE, current, 0); + if (is_sync_kiocb(kiocb)) + send_sig(SIGPIPE, current, 0); if (!ret) ret = -EPIPE; break; } @@ -304,6 +401,8 @@ pipe_writev(struct file *filp, const str break; } ret += chars; + kiocb->ki_left -= chars; + kiocb->ki_buf += chars; /* Insert it into the buffer array */ buf->page = page; @@ -323,7 +422,7 @@ pipe_writev(struct file *filp, const str if (!ret) ret = -EAGAIN; break; } - if (signal_pending(current)) { + if (is_sync_kiocb(kiocb) && signal_pending(current)) { if (!ret) ret = -ERESTARTSYS; break; } @@ -333,11 +432,11 @@ pipe_writev(struct file *filp, const str do_wakeup = 0; } PIPE_WAITING_WRITERS(*inode)++; - pipe_wait(inode); - PIPE_WAITING_WRITERS(*inode)--; + kiocb->ki_user_data = 1; /* Flag for retry. */ + return pipe_aio_wait(kiocb, inode); } out: - up(PIPE_SEM(*inode)); + aio_up(kiocb, PIPE_SEM(*inode)); if (do_wakeup) { wake_up_interruptible(PIPE_WAIT(*inode)); kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN); @@ -347,6 +446,7 @@ out: return ret; } +#if 0 static ssize_t pipe_write(struct file *filp, const char __user *buf, size_t count, loff_t *ppos) @@ -354,6 +454,7 @@ pipe_write(struct file *filp, const char struct iovec iov = { .iov_base = (void __user *)buf, .iov_len = count }; return pipe_writev(filp, &iov, 1, ppos); } +#endif static ssize_t bad_pipe_r(struct file *filp, char __user *buf, size_t count, loff_t *ppos) @@ -362,11 +463,23 @@ bad_pipe_r(struct file *filp, char __use } static ssize_t +bad_pipe_aio_r(struct kiocb *iocb, char __user *buf, size_t count, loff_t pos) +{ + return -EBADF; +} + +static ssize_t bad_pipe_w(struct file *filp, const char __user *buf, size_t count, loff_t *ppos) { return -EBADF; } +static ssize_t +bad_pipe_aio_w(struct kiocb *iocb, const char __user *buf, size_t count, loff_t pos) +{ + return -EBADF; +} + static int pipe_ioctl(struct inode *pino, struct file *filp, unsigned int cmd, unsigned long arg) @@ -565,8 +678,8 @@ pipe_rdwr_open(struct inode *inode, stru */ struct file_operations read_fifo_fops = { .llseek = no_llseek, - .read = pipe_read, - .readv = pipe_readv, + .read = do_sync_read, + .aio_read = pipe_aio_read, .write = bad_pipe_w, .poll = fifo_poll, .ioctl = pipe_ioctl, @@ -578,8 +691,9 @@ struct file_operations read_fifo_fops = struct file_operations write_fifo_fops = { .llseek = no_llseek, .read = bad_pipe_r, - .write = pipe_write, - .writev = pipe_writev, + .write = do_sync_write, + .aio_read = bad_pipe_aio_r, + .aio_write = pipe_aio_write, .poll = fifo_poll, .ioctl = pipe_ioctl, .open = pipe_write_open, @@ -589,10 +703,10 @@ struct file_operations write_fifo_fops = struct file_operations rdwr_fifo_fops = { .llseek = no_llseek, - .read = pipe_read, - .readv = pipe_readv, - .write = pipe_write, - .writev = pipe_writev, + .read = do_sync_read, + .write = do_sync_write, + .aio_read = pipe_aio_read, + .aio_write = pipe_aio_write, .poll = fifo_poll, .ioctl = pipe_ioctl, .open = pipe_rdwr_open, @@ -602,9 +716,10 @@ struct file_operations rdwr_fifo_fops = struct file_operations read_pipe_fops = { .llseek = no_llseek, - .read = pipe_read, - .readv = pipe_readv, + .read = do_sync_read, .write = bad_pipe_w, + .aio_read = pipe_aio_read, + .aio_write = bad_pipe_aio_w, .poll = pipe_poll, .ioctl = pipe_ioctl, .open = pipe_read_open, @@ -615,8 +730,9 @@ struct file_operations read_pipe_fops = struct file_operations write_pipe_fops = { .llseek = no_llseek, .read = bad_pipe_r, - .write = pipe_write, - .writev = pipe_writev, + .write = do_sync_write, + .aio_read = bad_pipe_aio_r, + .aio_write = pipe_aio_write, .poll = pipe_poll, .ioctl = pipe_ioctl, .open = pipe_write_open, @@ -626,10 +742,10 @@ struct file_operations write_pipe_fops = struct file_operations rdwr_pipe_fops = { .llseek = no_llseek, - .read = pipe_read, - .readv = pipe_readv, - .write = pipe_write, - .writev = pipe_writev, + .read = do_sync_read, + .write = do_sync_write, + .aio_read = pipe_aio_read, + .aio_write = pipe_aio_write, .poll = pipe_poll, .ioctl = pipe_ioctl, .open = pipe_rdwr_open, diff -purN 00_v2.6.13-rc6/fs/read_write.c 90_pipe_aio/fs/read_write.c --- 00_v2.6.13-rc6/fs/read_write.c 2005-08-04 15:55:57.000000000 -0400 +++ 90_pipe_aio/fs/read_write.c 2005-08-09 19:41:35.000000000 -0400 @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -203,14 +204,36 @@ Einval: return -EINVAL; } -static void wait_on_retry_sync_kiocb(struct kiocb *iocb) +static long wait_on_retry_sync_kiocb(struct kiocb *iocb) { - set_current_state(TASK_UNINTERRUPTIBLE); + int (*cancel)(struct kiocb *, struct io_event *); + long ret = 0; + set_current_state(kiocbIsIntr(iocb) ? TASK_INTERRUPTIBLE + : TASK_UNINTERRUPTIBLE); if (!kiocbIsKicked(iocb)) schedule(); else kiocbClearKicked(iocb); + + /* If we were interrupted by a signal, issue a cancel to allow the + * operation to clean up. + */ + if (kiocbIsIntr(iocb) && signal_pending(current) && + (cancel = iocb->ki_cancel)) { + struct io_event dummy_event; + dummy_event.res = 0; + if (!cancel(iocb, &dummy_event)) { + ret = dummy_event.res; + if (!ret) + printk(KERN_DEBUG "wait_on_retry_sync_kiocb: ki_cancel method %p is buggy\n", cancel); + goto out; + } + } + kiocbClearIntr(iocb); + ret = iocb->ki_retry(iocb); +out: __set_current_state(TASK_RUNNING); + return ret; } ssize_t do_sync_read(struct file *filp, char __user *buf, size_t len, loff_t *ppos) @@ -220,12 +243,15 @@ ssize_t do_sync_read(struct file *filp, init_sync_kiocb(&kiocb, filp); kiocb.ki_pos = *ppos; - while (-EIOCBRETRY == - (ret = filp->f_op->aio_read(&kiocb, buf, len, kiocb.ki_pos))) - wait_on_retry_sync_kiocb(&kiocb); + kiocb.ki_retry = aio_pread; + kiocb.ki_buf = buf; + kiocb.ki_nbytes = len; + kiocb.ki_left = len; + ret = filp->f_op->aio_read(&kiocb, buf, len, *ppos); + while (ret == -EIOCBRETRY) + ret = wait_on_retry_sync_kiocb(&kiocb); - if (-EIOCBQUEUED == ret) - ret = wait_on_sync_kiocb(&kiocb); + BUG_ON(!list_empty(&kiocb.ki_wait.wait.task_list)); *ppos = kiocb.ki_pos; return ret; } @@ -271,12 +297,15 @@ ssize_t do_sync_write(struct file *filp, init_sync_kiocb(&kiocb, filp); kiocb.ki_pos = *ppos; - while (-EIOCBRETRY == - (ret = filp->f_op->aio_write(&kiocb, buf, len, kiocb.ki_pos))) - wait_on_retry_sync_kiocb(&kiocb); + kiocb.ki_retry = aio_pwrite; + kiocb.ki_buf = (void *)buf; + kiocb.ki_nbytes = len; + kiocb.ki_left = len; + ret = filp->f_op->aio_write(&kiocb, buf, len, kiocb.ki_pos); + while (ret == -EIOCBRETRY) + ret = wait_on_retry_sync_kiocb(&kiocb); - if (-EIOCBQUEUED == ret) - ret = wait_on_sync_kiocb(&kiocb); + BUG_ON(!list_empty(&kiocb.ki_wait.wait.task_list)); *ppos = kiocb.ki_pos; return ret; } @@ -427,6 +456,77 @@ EXPORT_SYMBOL(iov_shorten); /* A write operation does a read from user space and vice versa */ #define vrfy_dir(type) ((type) == READ ? VERIFY_WRITE : VERIFY_READ) +ssize_t rw_copy_check_uvector(int type, const struct iovec __user * uvector, + unsigned long nr_segs, unsigned long fast_segs, + struct iovec *fast_pointer, + struct iovec **ret_pointer) + { + unsigned long seg; + ssize_t ret; + struct iovec *iov = fast_pointer; + + /* + * SuS says "The readv() function *may* fail if the iovcnt argument + * was less than or equal to 0, or greater than {IOV_MAX}. Linux has + * traditionally returned zero for zero segments, so... + */ + if (nr_segs == 0) { + ret = 0; + goto out; + } + + /* + * First get the "struct iovec" from user memory and + * verify all the pointers + */ + if ((nr_segs > UIO_MAXIOV) || (nr_segs <= 0)) { + ret = -EINVAL; + goto out; + } + if (nr_segs > fast_segs) { + iov = kmalloc(nr_segs*sizeof(struct iovec), GFP_KERNEL); + if (iov == NULL) { + ret = -ENOMEM; + goto out; + } + } + if (copy_from_user(iov, uvector, nr_segs*sizeof(*uvector))) { + ret = -EFAULT; + goto out; + } + + /* + * According to the Single Unix Specification we should return EINVAL + * if an element length is < 0 when cast to ssize_t or if the + * total length would overflow the ssize_t return value of the + * system call. + */ + ret = 0; + for (seg = 0; seg < nr_segs; seg++) { + void __user *buf = iov[seg].iov_base; + ssize_t len = (ssize_t)iov[seg].iov_len; + + /* see if we we're about to use an invalid len or if + * it's about to overflow ssize_t */ + if (len < 0 || (ret + len < ret)) { + ret = -EINVAL; + goto out; + } + if (unlikely(!access_ok(vrfy_dir(type), buf, len))) { + ret = -EFAULT; + goto out; + } + + ret += len; + } +out: + *ret_pointer = iov; + return ret; +} + +/* A write operation does a read from user space and vice versa */ +#define vrfy_dir(type) ((type) == READ ? VERIFY_WRITE : VERIFY_READ) + static ssize_t do_readv_writev(int type, struct file *file, const struct iovec __user * uvector, unsigned long nr_segs, loff_t *pos) @@ -434,62 +534,23 @@ static ssize_t do_readv_writev(int type, typedef ssize_t (*io_fn_t)(struct file *, char __user *, size_t, loff_t *); typedef ssize_t (*iov_fn_t)(struct file *, const struct iovec *, unsigned long, loff_t *); - size_t tot_len; + ssize_t tot_len; struct iovec iovstack[UIO_FASTIOV]; struct iovec *iov=iovstack, *vector; ssize_t ret; - int seg; io_fn_t fn; iov_fn_t fnv; - /* - * SuS says "The readv() function *may* fail if the iovcnt argument - * was less than or equal to 0, or greater than {IOV_MAX}. Linux has - * traditionally returned zero for zero segments, so... - */ - ret = 0; - if (nr_segs == 0) - goto out; - - /* - * First get the "struct iovec" from user memory and - * verify all the pointers - */ - ret = -EINVAL; - if ((nr_segs > UIO_MAXIOV) || (nr_segs <= 0)) + if (!file->f_op) { + ret = -EINVAL; goto out; - if (!file->f_op) - goto out; - if (nr_segs > UIO_FASTIOV) { - ret = -ENOMEM; - iov = kmalloc(nr_segs*sizeof(struct iovec), GFP_KERNEL); - if (!iov) - goto out; } - ret = -EFAULT; - if (copy_from_user(iov, uvector, nr_segs*sizeof(*uvector))) - goto out; - /* - * Single unix specification: - * We should -EINVAL if an element length is not >= 0 and fitting an - * ssize_t. The total length is fitting an ssize_t - * - * Be careful here because iov_len is a size_t not an ssize_t - */ - tot_len = 0; - ret = -EINVAL; - for (seg = 0; seg < nr_segs; seg++) { - void __user *buf = iov[seg].iov_base; - ssize_t len = (ssize_t)iov[seg].iov_len; - - if (len < 0) /* size_t not fitting an ssize_t .. */ - goto out; - if (unlikely(!access_ok(vrfy_dir(type), buf, len))) - goto Efault; - tot_len += len; - if ((ssize_t)tot_len < 0) /* maths overflow on the ssize_t */ - goto out; + tot_len = rw_copy_check_uvector(type, uvector, nr_segs, + ARRAY_SIZE(iovstack), iovstack, &iov); + if (tot_len < 0) { + ret = tot_len; + goto out; } if (tot_len == 0) { ret = 0; @@ -546,9 +607,6 @@ out: fsnotify_modify(file->f_dentry); } return ret; -Efault: - ret = -EFAULT; - goto out; } ssize_t vfs_readv(struct file *file, const struct iovec __user *vec, diff -purN 00_v2.6.13-rc6/fs/reiserfs/file.c 90_pipe_aio/fs/reiserfs/file.c --- 00_v2.6.13-rc6/fs/reiserfs/file.c 2005-08-04 15:55:57.000000000 -0400 +++ 90_pipe_aio/fs/reiserfs/file.c 2005-08-09 19:29:57.000000000 -0400 @@ -1540,12 +1540,6 @@ static ssize_t reiserfs_file_write(struc return res; } -static ssize_t reiserfs_aio_write(struct kiocb *iocb, const char __user * buf, - size_t count, loff_t pos) -{ - return generic_file_aio_write(iocb, buf, count, pos); -} - struct file_operations reiserfs_file_operations = { .read = generic_file_read, .write = reiserfs_file_write, @@ -1555,7 +1549,9 @@ struct file_operations reiserfs_file_ope .fsync = reiserfs_sync_file, .sendfile = generic_file_sendfile, .aio_read = generic_file_aio_read, - .aio_write = reiserfs_aio_write, + .aio_readv = generic_file_aio_readv, + .aio_write = generic_file_aio_write, + .aio_writev = generic_file_aio_writev, }; struct inode_operations reiserfs_file_inode_operations = { diff -purN 00_v2.6.13-rc6/include/asm-i386/semaphore.h 90_pipe_aio/include/asm-i386/semaphore.h --- 00_v2.6.13-rc6/include/asm-i386/semaphore.h 2005-06-20 13:33:36.000000000 -0400 +++ 90_pipe_aio/include/asm-i386/semaphore.h 2005-08-08 17:15:30.000000000 -0400 @@ -41,10 +41,12 @@ #include #include +struct kiocb; struct semaphore { atomic_t count; int sleepers; wait_queue_head_t wait; + struct kiocb *aio_owner; }; @@ -52,7 +54,8 @@ struct semaphore { { \ .count = ATOMIC_INIT(n), \ .sleepers = 0, \ - .wait = __WAIT_QUEUE_HEAD_INITIALIZER((name).wait) \ + .wait = __WAIT_QUEUE_HEAD_INITIALIZER((name).wait), \ + .aio_owner = NULL \ } #define __MUTEX_INITIALIZER(name) \ @@ -75,6 +78,7 @@ static inline void sema_init (struct sem atomic_set(&sem->count, val); sem->sleepers = 0; init_waitqueue_head(&sem->wait); + sem->aio_owner = NULL; } static inline void init_MUTEX (struct semaphore *sem) @@ -87,6 +91,7 @@ static inline void init_MUTEX_LOCKED (st sema_init(sem, 0); } +fastcall void __aio_down_failed(void /* special register calling convention */); fastcall void __down_failed(void /* special register calling convention */); fastcall int __down_failed_interruptible(void /* params in registers */); fastcall int __down_failed_trylock(void /* params in registers */); @@ -142,6 +147,32 @@ static inline int down_interruptible(str } /* + * Non-blockingly attempt to down() a semaphore for use with aio. + * Returns zero if we acquired it + */ +static inline int aio_down(struct kiocb *iocb, struct semaphore * sem) +{ + int result; + + __asm__ __volatile__( + "# atomic aio down operation\n\t" + LOCK "decl %1\n\t" /* --sem->count */ + "js 2f\n\t" + "movl %3,%2\n" + "xorl %0,%0\n" + "1:\n" + LOCK_SECTION_START("") + "2:\tlea %1,%%edx\n\t" + "call __aio_down_failed\n\t" + "jmp 1b\n" + LOCK_SECTION_END + :"=a" (result), "+m" (sem->count), "=m" (sem->aio_owner) + :"0" (iocb) + :"memory","cc","dx"); + return result; +} + +/* * Non-blockingly attempt to down() a semaphore. * Returns zero if we acquired it */ @@ -190,5 +221,14 @@ static inline void up(struct semaphore * :"memory","ax"); } +static inline void aio_up(struct kiocb *iocb, struct semaphore *sem) +{ +#ifdef CONFIG_DEBUG_KERNEL + BUG_ON(sem->aio_owner != iocb); +#endif + sem->aio_owner = NULL; + up(sem); +} + #endif #endif diff -purN 00_v2.6.13-rc6/include/asm-x86_64/semaphore.h 90_pipe_aio/include/asm-x86_64/semaphore.h --- 00_v2.6.13-rc6/include/asm-x86_64/semaphore.h 2004-12-24 16:33:48.000000000 -0500 +++ 90_pipe_aio/include/asm-x86_64/semaphore.h 2005-08-08 17:15:30.000000000 -0400 @@ -43,17 +43,20 @@ #include #include +struct kiocb; struct semaphore { atomic_t count; int sleepers; wait_queue_head_t wait; + struct kiocb *aio_owner; }; #define __SEMAPHORE_INITIALIZER(name, n) \ { \ .count = ATOMIC_INIT(n), \ .sleepers = 0, \ - .wait = __WAIT_QUEUE_HEAD_INITIALIZER((name).wait) \ + .wait = __WAIT_QUEUE_HEAD_INITIALIZER((name).wait), \ + .aio_owner = NULL \ } #define __MUTEX_INITIALIZER(name) \ @@ -76,6 +79,7 @@ static inline void sema_init (struct sem atomic_set(&sem->count, val); sem->sleepers = 0; init_waitqueue_head(&sem->wait); + sem->aio_owner = NULL; } static inline void init_MUTEX (struct semaphore *sem) @@ -88,11 +92,13 @@ static inline void init_MUTEX_LOCKED (st sema_init(sem, 0); } +asmlinkage long __aio_down_failed(void /* special register calling convention */); asmlinkage void __down_failed(void /* special register calling convention */); asmlinkage int __down_failed_interruptible(void /* params in registers */); asmlinkage int __down_failed_trylock(void /* params in registers */); asmlinkage void __up_wakeup(void /* special register calling convention */); +asmlinkage long __aio_down(struct kiocb *iocb, struct semaphore * sem); asmlinkage void __down(struct semaphore * sem); asmlinkage int __down_interruptible(struct semaphore * sem); asmlinkage int __down_trylock(struct semaphore * sem); @@ -148,6 +154,32 @@ static inline int down_interruptible(str } /* + * Non-blockingly attempt to down() a semaphore for use with aio. + * Returns zero if we acquired it, -EIOCBRETRY if the operation was + * queued and the iocb will receive a kick_iocb() on completion. + */ +static inline long aio_down(struct kiocb *iocb, struct semaphore * sem) +{ + long result; + + __asm__ __volatile__( + "# atomic aio_down operation\n\t" + LOCK "decl %1\n\t" /* --sem->count */ + "js 2f\n\t" + "movq %3,%2\n" /* sem->aio_owner = iocb */ + "xorq %0,%0\n\t" + "1:\n" + LOCK_SECTION_START("") + "2:\tcall __aio_down_failed\n\t" + "jmp 1b\n" + LOCK_SECTION_END + :"=a" (result), "+m" (sem->count), "=m" (sem->aio_owner) + : "D" (iocb), "S" (sem) + :"memory"); + return result; +} + +/* * Non-blockingly attempt to down() a semaphore. * Returns zero if we acquired it */ @@ -192,5 +224,15 @@ static inline void up(struct semaphore * :"D" (sem) :"memory"); } + +static inline void aio_up(struct kiocb *iocb, struct semaphore *sem) +{ +#ifdef CONFIG_DEBUG_KERNEL + BUG_ON(sem->aio_owner != iocb); +#endif + sem->aio_owner = NULL; + up(sem); +} + #endif /* __KERNEL__ */ #endif diff -purN 00_v2.6.13-rc6/include/linux/aio.h 90_pipe_aio/include/linux/aio.h --- 00_v2.6.13-rc6/include/linux/aio.h 2004-12-24 16:35:50.000000000 -0500 +++ 90_pipe_aio/include/linux/aio.h 2005-08-15 15:39:08.000000000 -0400 @@ -6,6 +6,12 @@ #include #include +#include + +enum { + IO_NOTIFY_SIGNAL = 0, /* send signal to a processe */ + IO_NOTIFY_THREAD_ID = 1, /* send signal to a specific thread */ +}; #define AIO_MAXSEGS 4 #define AIO_KIOGRP_NR_ATOMIC 8 @@ -27,21 +33,33 @@ struct kioctx; #define KIF_LOCKED 0 #define KIF_KICKED 1 #define KIF_CANCELLED 2 +#define KIF_INTR 3 /* use TASK_INTERRUPTIBLE waits */ +#define KIF_SYNCED 4 +#define KIF_DONT_UNLOCK 5 /* don't unlock the iocb after submit */ #define kiocbTryLock(iocb) test_and_set_bit(KIF_LOCKED, &(iocb)->ki_flags) #define kiocbTryKick(iocb) test_and_set_bit(KIF_KICKED, &(iocb)->ki_flags) +#define kiocbTrySync(iocb) test_and_set_bit(KIF_SYNCED, &(iocb)->ki_flags) #define kiocbSetLocked(iocb) set_bit(KIF_LOCKED, &(iocb)->ki_flags) #define kiocbSetKicked(iocb) set_bit(KIF_KICKED, &(iocb)->ki_flags) #define kiocbSetCancelled(iocb) set_bit(KIF_CANCELLED, &(iocb)->ki_flags) +#define kiocbSetIntr(iocb) set_bit(KIF_INTR, &(iocb)->ki_flags) +#define kiocbSetSynced(iocb) set_bit(KIF_SYNCED, &(iocb)->ki_flags) +#define kiocbSetDontUnlock(iocb) set_bit(KIF_DONT_UNLOCK, &(iocb)->ki_flags) #define kiocbClearLocked(iocb) clear_bit(KIF_LOCKED, &(iocb)->ki_flags) #define kiocbClearKicked(iocb) clear_bit(KIF_KICKED, &(iocb)->ki_flags) #define kiocbClearCancelled(iocb) clear_bit(KIF_CANCELLED, &(iocb)->ki_flags) +#define kiocbClearIntr(iocb) clear_bit(KIF_INTR, &(iocb)->ki_flags) +#define kiocbClearSynced(iocb) clear_bit(KIF_SYNCED, &(iocb)->ki_flags) #define kiocbIsLocked(iocb) test_bit(KIF_LOCKED, &(iocb)->ki_flags) #define kiocbIsKicked(iocb) test_bit(KIF_KICKED, &(iocb)->ki_flags) #define kiocbIsCancelled(iocb) test_bit(KIF_CANCELLED, &(iocb)->ki_flags) +#define kiocbIsIntr(iocb) test_bit(KIF_INTR, &(iocb)->ki_flags) +#define kiocbIsSynced(iocb) test_bit(KIF_SYNCED, &(iocb)->ki_flags) +#define kiocbIsDontUnlock(iocb) test_bit(KIF_DONT_UNLOCK, &(iocb)->ki_flags) struct kiocb { struct list_head ki_run_list; @@ -69,12 +87,22 @@ struct kiocb { size_t ki_nbytes; /* copy of iocb->aio_nbytes */ char __user *ki_buf; /* remaining iocb->aio_buf */ size_t ki_left; /* remaining bytes */ - wait_queue_t ki_wait; + struct wait_bit_queue ki_wait; long ki_retried; /* just for testing */ long ki_kicked; /* just for testing */ long ki_queued; /* just for testing */ + struct iovec *ki_iovec; + unsigned long ki_nr_segs; + unsigned long ki_cur_seg; void *private; + + /* to notify a process on I/O event only valid if ki_signo != 0 */ + pid_t ki_pid; + __u16 ki_signo; + __u16 ki_notify; + uid_t ki_uid, ki_euid; + sigval_t ki_sigev_value; }; #define is_sync_kiocb(iocb) ((iocb)->ki_key == KIOCB_SYNC_KEY) @@ -90,7 +118,7 @@ struct kiocb { (x)->ki_dtor = NULL; \ (x)->ki_obj.tsk = tsk; \ (x)->ki_user_data = 0; \ - init_wait((&(x)->ki_wait)); \ + init_wait_bit_task((&(x)->ki_wait), current);\ } while (0) #define AIO_RING_MAGIC 0xa10a10a1 @@ -154,6 +182,7 @@ struct kioctx { /* prototypes */ extern unsigned aio_max_size; +extern int aio_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key); extern ssize_t FASTCALL(wait_on_sync_kiocb(struct kiocb *iocb)); extern int FASTCALL(aio_put_req(struct kiocb *iocb)); extern void FASTCALL(kick_iocb(struct kiocb *iocb)); @@ -164,6 +193,8 @@ extern void FASTCALL(exit_aio(struct mm_ extern struct kioctx *lookup_ioctx(unsigned long ctx_id); extern int FASTCALL(io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, struct iocb *iocb)); +extern ssize_t aio_pread(struct kiocb *iocb); +extern ssize_t aio_pwrite(struct kiocb *iocb); /* semi private, but used by the 32bit emulations: */ struct kioctx *lookup_ioctx(unsigned long ctx_id); @@ -184,7 +215,8 @@ do { \ } \ } while (0) -#define io_wait_to_kiocb(wait) container_of(wait, struct kiocb, ki_wait) +#define io_wait_to_kiocb(io_wait) container_of(container_of(io_wait, \ + struct wait_bit_queue, wait), struct kiocb, ki_wait) #define is_retried_kiocb(iocb) ((iocb)->ki_retried > 1) #include diff -purN 00_v2.6.13-rc6/include/linux/aio_abi.h 90_pipe_aio/include/linux/aio_abi.h --- 00_v2.6.13-rc6/include/linux/aio_abi.h 2005-06-20 13:33:38.000000000 -0400 +++ 90_pipe_aio/include/linux/aio_abi.h 2005-08-10 10:47:36.000000000 -0400 @@ -41,6 +41,16 @@ enum { * IOCB_CMD_POLL = 5, */ IOCB_CMD_NOOP = 6, + IOCB_CMD_PREADV = 7, + IOCB_CMD_PWRITEV = 8, + + /* For these network operations: + * aio_buf points to struct msghdr *, + * aio_nbytes is sizeof(struct msghdr) + * aio_offset is the flags value. + */ + IOCB_CMD_SENDMSG = 9, + IOCB_CMD_RECVMSG = 10, }; /* read() from /dev/aio returns these structures. */ @@ -80,8 +90,9 @@ struct iocb { __u64 aio_nbytes; __s64 aio_offset; + __u64 aio_sigeventp; /* pointer to struct sigevent */ + /* extra parameters */ - __u64 aio_reserved2; /* TODO: use this for a (struct sigevent *) */ __u64 aio_reserved3; }; /* 64 bytes */ diff -purN 00_v2.6.13-rc6/include/linux/fs.h 90_pipe_aio/include/linux/fs.h --- 00_v2.6.13-rc6/include/linux/fs.h 2005-08-04 15:56:06.000000000 -0400 +++ 90_pipe_aio/include/linux/fs.h 2005-08-09 19:38:19.000000000 -0400 @@ -956,8 +956,10 @@ struct file_operations { loff_t (*llseek) (struct file *, loff_t, int); ssize_t (*read) (struct file *, char __user *, size_t, loff_t *); ssize_t (*aio_read) (struct kiocb *, char __user *, size_t, loff_t); + ssize_t (*aio_readv) (struct kiocb *, const struct iovec *, unsigned long, loff_t); ssize_t (*write) (struct file *, const char __user *, size_t, loff_t *); ssize_t (*aio_write) (struct kiocb *, const char __user *, size_t, loff_t); + ssize_t (*aio_writev) (struct kiocb *, const struct iovec *, unsigned long, loff_t); int (*readdir) (struct file *, void *, filldir_t); unsigned int (*poll) (struct file *, struct poll_table_struct *); int (*ioctl) (struct inode *, struct file *, unsigned int, unsigned long); @@ -1007,6 +1009,11 @@ struct inode_operations { struct seq_file; +ssize_t rw_copy_check_uvector(int type, const struct iovec __user * uvector, + unsigned long nr_segs, unsigned long fast_segs, + struct iovec *fast_pointer, + struct iovec **ret_pointer); + extern ssize_t vfs_read(struct file *, char __user *, size_t, loff_t *); extern ssize_t vfs_write(struct file *, const char __user *, size_t, loff_t *); extern ssize_t vfs_readv(struct file *, const struct iovec __user *, @@ -1498,8 +1505,10 @@ extern ssize_t generic_file_read(struct int generic_write_checks(struct file *file, loff_t *pos, size_t *count, int isblk); extern ssize_t generic_file_write(struct file *, const char __user *, size_t, loff_t *); extern ssize_t generic_file_aio_read(struct kiocb *, char __user *, size_t, loff_t); +extern ssize_t generic_file_aio_readv(struct kiocb *, const struct iovec *, unsigned long, loff_t); extern ssize_t __generic_file_aio_read(struct kiocb *, const struct iovec *, unsigned long, loff_t *); extern ssize_t generic_file_aio_write(struct kiocb *, const char __user *, size_t, loff_t); +extern ssize_t generic_file_aio_writev(struct kiocb *, const struct iovec *, unsigned long, loff_t); extern ssize_t generic_file_aio_write_nolock(struct kiocb *, const struct iovec *, unsigned long, loff_t *); extern ssize_t generic_file_direct_write(struct kiocb *, const struct iovec *, diff -purN 00_v2.6.13-rc6/include/linux/net.h 90_pipe_aio/include/linux/net.h --- 00_v2.6.13-rc6/include/linux/net.h 2005-06-20 13:33:40.000000000 -0400 +++ 90_pipe_aio/include/linux/net.h 2005-08-16 16:22:51.000000000 -0400 @@ -186,6 +186,10 @@ extern int sock_sendmsg(struct s size_t len); extern int sock_recvmsg(struct socket *sock, struct msghdr *msg, size_t size, int flags); +extern long vfs_sendmsg(struct file *file, struct msghdr __user *msg, + unsigned flags); +extern long vfs_recvmsg(struct file *file, struct msghdr __user *msg, + unsigned flags); extern int sock_map_fd(struct socket *sock); extern struct socket *sockfd_lookup(int fd, int *err); #define sockfd_put(sock) fput(sock->file) diff -purN 00_v2.6.13-rc6/include/linux/pagemap.h 90_pipe_aio/include/linux/pagemap.h --- 00_v2.6.13-rc6/include/linux/pagemap.h 2005-08-04 15:56:06.000000000 -0400 +++ 90_pipe_aio/include/linux/pagemap.h 2005-08-08 17:16:00.000000000 -0400 @@ -159,21 +159,25 @@ static inline pgoff_t linear_page_index( return pgoff >> (PAGE_CACHE_SHIFT - PAGE_SHIFT); } -extern void FASTCALL(__lock_page(struct page *page)); +extern int FASTCALL(lock_page_slow(struct page *page, wait_queue_t *wait)); extern void FASTCALL(unlock_page(struct page *page)); -static inline void lock_page(struct page *page) +static inline int __lock_page(struct page *page, wait_queue_t *wait) { might_sleep(); if (TestSetPageLocked(page)) - __lock_page(page); + return lock_page_slow(page, wait); + return 0; } + +#define lock_page(page) __lock_page(page, ¤t->__wait.wait) /* * This is exported only for wait_on_page_locked/wait_on_page_writeback. * Never use this directly! */ -extern void FASTCALL(wait_on_page_bit(struct page *page, int bit_nr)); +extern int FASTCALL(wait_on_page_bit(struct page *page, int bit_nr, + wait_queue_t *wait)); /* * Wait for a page to be unlocked. @@ -182,21 +186,30 @@ extern void FASTCALL(wait_on_page_bit(st * ie with increased "page->count" so that the page won't * go away during the wait.. */ -static inline void wait_on_page_locked(struct page *page) +static inline int __wait_on_page_locked(struct page *page, wait_queue_t *wait) { if (PageLocked(page)) - wait_on_page_bit(page, PG_locked); + return wait_on_page_bit(page, PG_locked, wait); + return 0; } +#define wait_on_page_locked(page) \ + __wait_on_page_locked(page, ¤t->__wait.wait) + /* * Wait for a page to complete writeback */ -static inline void wait_on_page_writeback(struct page *page) +static inline int __wait_on_page_writeback(struct page *page, + wait_queue_t *wait) { if (PageWriteback(page)) - wait_on_page_bit(page, PG_writeback); + return wait_on_page_bit(page, PG_writeback, wait); + return 0; } +#define wait_on_page_writeback(page) \ + __wait_on_page_writeback(page, ¤t->__wait.wait) + extern void end_page_writeback(struct page *page); /* diff -purN 00_v2.6.13-rc6/include/linux/sched.h 90_pipe_aio/include/linux/sched.h --- 00_v2.6.13-rc6/include/linux/sched.h 2005-08-04 15:56:07.000000000 -0400 +++ 90_pipe_aio/include/linux/sched.h 2005-08-16 14:03:31.000000000 -0400 @@ -170,6 +170,7 @@ extern void show_stack(struct task_struc void io_schedule(void); long io_schedule_timeout(long timeout); +int io_wait_schedule(wait_queue_t *wait); extern void cpu_init (void); extern void trap_init(void); @@ -636,6 +637,7 @@ struct task_struct { struct list_head ptrace_list; struct mm_struct *mm, *active_mm; + struct mm_struct *aio_mm; /* task state */ struct linux_binfmt *binfmt; @@ -746,11 +748,14 @@ struct task_struct { unsigned long ptrace_message; siginfo_t *last_siginfo; /* For ptrace use. */ + +/* Space for default IO wait bit entry used for synchronous IO waits */ + struct wait_bit_queue __wait; /* - * current io wait handle: wait queue entry to use for io waits - * If this thread is processing aio, this points at the waitqueue - * inside the currently handled kiocb. It may be NULL (i.e. default - * to a stack based synchronous wait) if its doing sync IO. + * Current IO wait handle: wait queue entry to use for IO waits + * If this thread is processing AIO, this points at the waitqueue + * inside the currently handled kiocb. Otherwise, points to the + * default IO wait field (i.e &__wait.wait above). */ wait_queue_t *io_wait; /* i/o counters(bytes read/written, #syscalls */ diff -purN 00_v2.6.13-rc6/include/linux/wait.h 90_pipe_aio/include/linux/wait.h --- 00_v2.6.13-rc6/include/linux/wait.h 2005-08-04 15:56:07.000000000 -0400 +++ 90_pipe_aio/include/linux/wait.h 2005-08-08 17:15:55.000000000 -0400 @@ -103,6 +103,17 @@ static inline int waitqueue_active(wait_ return !list_empty(&q->task_list); } +static inline int test_wait_bit_key(wait_queue_t *wait, + struct wait_bit_key *key) +{ + struct wait_bit_queue *wait_bit + = container_of(wait, struct wait_bit_queue, wait); + + return (wait_bit->key.flags == key->flags && + wait_bit->key.bit_nr == key->bit_nr && + !test_bit(key->bit_nr, key->flags)); +} + /* * Used to distinguish between sync and async io wait context: * sync i/o typically specifies a NULL wait queue entry or a wait @@ -140,11 +151,15 @@ void FASTCALL(__wake_up(wait_queue_head_ extern void FASTCALL(__wake_up_locked(wait_queue_head_t *q, unsigned int mode)); extern void FASTCALL(__wake_up_sync(wait_queue_head_t *q, unsigned int mode, int nr)); void FASTCALL(__wake_up_bit(wait_queue_head_t *, void *, int)); -int FASTCALL(__wait_on_bit(wait_queue_head_t *, struct wait_bit_queue *, int (*)(void *), unsigned)); -int FASTCALL(__wait_on_bit_lock(wait_queue_head_t *, struct wait_bit_queue *, int (*)(void *), unsigned)); +int FASTCALL(__wait_on_bit(wait_queue_head_t *, struct wait_bit_queue *, + int (*)(void *, wait_queue_t *), unsigned)); +int FASTCALL(__wait_on_bit_lock(wait_queue_head_t *, struct wait_bit_queue *, + int (*)(void *, wait_queue_t *), unsigned)); void FASTCALL(wake_up_bit(void *, int)); -int FASTCALL(out_of_line_wait_on_bit(void *, int, int (*)(void *), unsigned)); -int FASTCALL(out_of_line_wait_on_bit_lock(void *, int, int (*)(void *), unsigned)); +int FASTCALL(out_of_line_wait_on_bit(void *, int, int (*)(void *, + wait_queue_t *), unsigned)); +int FASTCALL(out_of_line_wait_on_bit_lock(void *, int, int (*)(void *, + wait_queue_t *), unsigned)); wait_queue_head_t *FASTCALL(bit_waitqueue(void *, int)); #define wake_up(x) __wake_up(x, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE, 1, NULL) @@ -407,6 +422,19 @@ int wake_bit_function(wait_queue_t *wait INIT_LIST_HEAD(&(wait)->task_list); \ } while (0) +#define init_wait_bit_key(waitbit, word, bit) \ + do { \ + (waitbit)->key.flags = word; \ + (waitbit)->key.bit_nr = bit; \ + } while (0) + +#define init_wait_bit_task(waitbit, tsk) \ + do { \ + (waitbit)->wait.private = tsk; \ + (waitbit)->wait.func = wake_bit_function; \ + INIT_LIST_HEAD(&(waitbit)->wait.task_list); \ + } while (0) + /** * wait_on_bit - wait for a bit to be cleared * @word: the word being waited on, a kernel virtual address @@ -422,7 +450,8 @@ int wake_bit_function(wait_queue_t *wait * but has no intention of setting it. */ static inline int wait_on_bit(void *word, int bit, - int (*action)(void *), unsigned mode) + int (*action)(void *, wait_queue_t *), + unsigned mode) { if (!test_bit(bit, word)) return 0; @@ -446,7 +475,8 @@ static inline int wait_on_bit(void *word * clear with the intention of setting it, and when done, clearing it. */ static inline int wait_on_bit_lock(void *word, int bit, - int (*action)(void *), unsigned mode) + int (*action)(void *, wait_queue_t *), + unsigned mode) { if (!test_and_set_bit(bit, word)) return 0; diff -purN 00_v2.6.13-rc6/include/linux/writeback.h 90_pipe_aio/include/linux/writeback.h --- 00_v2.6.13-rc6/include/linux/writeback.h 2005-08-04 15:56:07.000000000 -0400 +++ 90_pipe_aio/include/linux/writeback.h 2005-08-08 17:15:51.000000000 -0400 @@ -70,7 +70,7 @@ struct writeback_control { */ void writeback_inodes(struct writeback_control *wbc); void wake_up_inode(struct inode *inode); -int inode_wait(void *); +int inode_wait(void *, wait_queue_t *); void sync_inodes_sb(struct super_block *, int wait); void sync_inodes(int wait); diff -purN 00_v2.6.13-rc6/kernel/exit.c 90_pipe_aio/kernel/exit.c --- 00_v2.6.13-rc6/kernel/exit.c 2005-08-08 15:45:49.000000000 -0400 +++ 90_pipe_aio/kernel/exit.c 2005-08-16 14:03:31.000000000 -0400 @@ -485,6 +485,8 @@ static void exit_mm(struct task_struct * { struct mm_struct *mm = tsk->mm; + if (tsk->aio_mm && atomic_dec_and_test(&mm->default_kioctx.users)) + exit_aio(mm); mm_release(tsk, mm); if (!mm) return; diff -purN 00_v2.6.13-rc6/kernel/fork.c 90_pipe_aio/kernel/fork.c --- 00_v2.6.13-rc6/kernel/fork.c 2005-08-04 15:56:08.000000000 -0400 +++ 90_pipe_aio/kernel/fork.c 2005-08-16 14:03:31.000000000 -0400 @@ -370,7 +370,6 @@ void fastcall __mmdrop(struct mm_struct void mmput(struct mm_struct *mm) { if (atomic_dec_and_test(&mm->mm_users)) { - exit_aio(mm); exit_mmap(mm); if (!list_empty(&mm->mmlist)) { spin_lock(&mmlist_lock); @@ -457,6 +456,7 @@ static int copy_mm(unsigned long clone_f tsk->mm = NULL; tsk->active_mm = NULL; + tsk->aio_mm = NULL; /* * Are we cloning a kernel thread? @@ -469,6 +469,7 @@ static int copy_mm(unsigned long clone_f if (clone_flags & CLONE_VM) { atomic_inc(&oldmm->mm_users); + atomic_inc(&oldmm->default_kioctx.users); mm = oldmm; /* * There are cases where the PTL is held to ensure no @@ -502,6 +503,7 @@ static int copy_mm(unsigned long clone_f good_mm: tsk->mm = mm; + tsk->aio_mm = mm; tsk->active_mm = mm; return 0; @@ -943,7 +945,8 @@ static task_t *copy_process(unsigned lon do_posix_clock_monotonic_gettime(&p->start_time); p->security = NULL; p->io_context = NULL; - p->io_wait = NULL; + init_wait_bit_task(&p->__wait, p); + p->io_wait = &p->__wait.wait; p->audit_context = NULL; #ifdef CONFIG_NUMA p->mempolicy = mpol_copy(p->mempolicy); diff -purN 00_v2.6.13-rc6/kernel/sched.c 90_pipe_aio/kernel/sched.c --- 00_v2.6.13-rc6/kernel/sched.c 2005-08-04 15:56:08.000000000 -0400 +++ 90_pipe_aio/kernel/sched.c 2005-08-08 17:16:00.000000000 -0400 @@ -3993,6 +3993,20 @@ long __sched io_schedule_timeout(long ti return ret; } +/* + * Sleep only if the wait context passed is not async, + * otherwise return so that a retry can be issued later. + */ +int __sched io_wait_schedule(wait_queue_t *wait) +{ + if (!is_sync_wait(wait)) + return -EIOCBRETRY; + io_schedule(); + return 0; +} + +EXPORT_SYMBOL(io_wait_schedule); + /** * sys_sched_get_priority_max - return maximum RT priority. * @policy: scheduling class. diff -purN 00_v2.6.13-rc6/kernel/wait.c 90_pipe_aio/kernel/wait.c --- 00_v2.6.13-rc6/kernel/wait.c 2004-12-24 16:35:27.000000000 -0500 +++ 90_pipe_aio/kernel/wait.c 2005-08-08 17:15:58.000000000 -0400 @@ -132,16 +132,10 @@ EXPORT_SYMBOL(autoremove_wake_function); int wake_bit_function(wait_queue_t *wait, unsigned mode, int sync, void *arg) { - struct wait_bit_key *key = arg; - struct wait_bit_queue *wait_bit - = container_of(wait, struct wait_bit_queue, wait); - - if (wait_bit->key.flags != key->flags || - wait_bit->key.bit_nr != key->bit_nr || - test_bit(key->bit_nr, key->flags)) + /* Assumes that a non-NULL key implies wait bit filtering */ + if (arg && !test_wait_bit_key(wait, arg)) return 0; - else - return autoremove_wake_function(wait, mode, sync, key); + return autoremove_wake_function(wait, mode, sync, arg); } EXPORT_SYMBOL(wake_bit_function); @@ -152,22 +146,28 @@ EXPORT_SYMBOL(wake_bit_function); */ int __sched fastcall __wait_on_bit(wait_queue_head_t *wq, struct wait_bit_queue *q, - int (*action)(void *), unsigned mode) + int (*action)(void *, wait_queue_t *), unsigned mode) { int ret = 0; do { prepare_to_wait(wq, &q->wait, mode); if (test_bit(q->key.bit_nr, q->key.flags)) - ret = (*action)(q->key.flags); + ret = (*action)(q->key.flags, &q->wait); } while (test_bit(q->key.bit_nr, q->key.flags) && !ret); - finish_wait(wq, &q->wait); + /* + * AIO retries require the wait queue entry to remain queued + * for async notification + */ + if (ret != -EIOCBRETRY) + finish_wait(wq, &q->wait); return ret; } EXPORT_SYMBOL(__wait_on_bit); int __sched fastcall out_of_line_wait_on_bit(void *word, int bit, - int (*action)(void *), unsigned mode) + int (*action)(void *, wait_queue_t *), + unsigned mode) { wait_queue_head_t *wq = bit_waitqueue(word, bit); DEFINE_WAIT_BIT(wait, word, bit); @@ -178,24 +178,30 @@ EXPORT_SYMBOL(out_of_line_wait_on_bit); int __sched fastcall __wait_on_bit_lock(wait_queue_head_t *wq, struct wait_bit_queue *q, - int (*action)(void *), unsigned mode) + int (*action)(void *, wait_queue_t *), unsigned mode) { int ret = 0; do { prepare_to_wait_exclusive(wq, &q->wait, mode); if (test_bit(q->key.bit_nr, q->key.flags)) { - if ((ret = (*action)(q->key.flags))) + if ((ret = (*action)(q->key.flags, &q->wait))) break; } } while (test_and_set_bit(q->key.bit_nr, q->key.flags)); - finish_wait(wq, &q->wait); + /* + * AIO retries require the wait queue entry to remain queued + * for async notification + */ + if (ret != -EIOCBRETRY) + finish_wait(wq, &q->wait); return ret; } EXPORT_SYMBOL(__wait_on_bit_lock); int __sched fastcall out_of_line_wait_on_bit_lock(void *word, int bit, - int (*action)(void *), unsigned mode) + int (*action)(void *, wait_queue_t *wait), + unsigned mode) { wait_queue_head_t *wq = bit_waitqueue(word, bit); DEFINE_WAIT_BIT(wait, word, bit); diff -purN 00_v2.6.13-rc6/lib/Makefile 90_pipe_aio/lib/Makefile --- 00_v2.6.13-rc6/lib/Makefile 2005-08-04 15:56:09.000000000 -0400 +++ 90_pipe_aio/lib/Makefile 2005-08-08 17:15:28.000000000 -0400 @@ -18,6 +18,7 @@ endif lib-$(CONFIG_RWSEM_GENERIC_SPINLOCK) += rwsem-spinlock.o lib-$(CONFIG_RWSEM_XCHGADD_ALGORITHM) += rwsem.o +lib-$(CONFIG_SEMAPHORE_SLEEPERS) += semaphore-sleepers.o lib-$(CONFIG_GENERIC_FIND_NEXT_BIT) += find_next_bit.o obj-$(CONFIG_LOCK_KERNEL) += kernel_lock.o obj-$(CONFIG_DEBUG_PREEMPT) += smp_processor_id.o diff -purN 00_v2.6.13-rc6/lib/semaphore-sleepers.c 90_pipe_aio/lib/semaphore-sleepers.c --- 00_v2.6.13-rc6/lib/semaphore-sleepers.c 1969-12-31 19:00:00.000000000 -0500 +++ 90_pipe_aio/lib/semaphore-sleepers.c 2005-08-08 17:15:58.000000000 -0400 @@ -0,0 +1,253 @@ +/* + * i386 and x86-64 semaphore implementation. + * + * (C) Copyright 1999 Linus Torvalds + * + * Portions Copyright 1999 Red Hat, Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + * + * rw semaphores implemented November 1999 by Benjamin LaHaise + */ +#include +#include +#include +#include +#include +#include + +/* + * Semaphores are implemented using a two-way counter: + * The "count" variable is decremented for each process + * that tries to acquire the semaphore, while the "sleeping" + * variable is a count of such acquires. + * + * Notably, the inline "up()" and "down()" functions can + * efficiently test if they need to do any extra work (up + * needs to do something only if count was negative before + * the increment operation. + * + * "sleeping" and the contention routine ordering is protected + * by the spinlock in the semaphore's waitqueue head. + * + * Note that these functions are only called when there is + * contention on the lock, and as such all this is the + * "non-critical" part of the whole semaphore business. The + * critical part is the inline stuff in + * where we want to avoid any extra jumps and calls. + */ + +/* + * Logic: + * - only on a boundary condition do we need to care. When we go + * from a negative count to a non-negative, we wake people up. + * - when we go from a non-negative count to a negative do we + * (a) synchronize with the "sleeper" count and (b) make sure + * that we're on the wakeup list before we synchronize so that + * we cannot lose wakeup events. + */ + +fastcall void __up(struct semaphore *sem) +{ + wake_up(&sem->wait); +} + +fastcall void __sched __down(struct semaphore * sem) +{ + struct task_struct *tsk = current; + DECLARE_WAITQUEUE(wait, tsk); + unsigned long flags; + + tsk->state = TASK_UNINTERRUPTIBLE; + spin_lock_irqsave(&sem->wait.lock, flags); + add_wait_queue_exclusive_locked(&sem->wait, &wait); + + sem->sleepers++; + for (;;) { + int sleepers = sem->sleepers; + + /* + * Add "everybody else" into it. They aren't + * playing, because we own the spinlock in + * the wait_queue_head. + */ + if (!atomic_add_negative(sleepers - 1, &sem->count)) { + sem->sleepers = 0; + break; + } + sem->sleepers = 1; /* us - see -1 above */ + spin_unlock_irqrestore(&sem->wait.lock, flags); + + schedule(); + + spin_lock_irqsave(&sem->wait.lock, flags); + tsk->state = TASK_UNINTERRUPTIBLE; + } + remove_wait_queue_locked(&sem->wait, &wait); + wake_up_locked(&sem->wait); + spin_unlock_irqrestore(&sem->wait.lock, flags); + tsk->state = TASK_RUNNING; +} + +static int aio_down_wait(wait_queue_t *wait, unsigned mode, int sync, void *key) +{ + struct kiocb *iocb = io_wait_to_kiocb(wait); + struct semaphore *sem = wait->private; + int sleepers = sem->sleepers; + + /* + * Add "everybody else" into it. They aren't + * playing, because we own the spinlock in + * the wait_queue_head. + */ + if (!atomic_add_negative(sleepers - 1, &sem->count)) { + iocb->ki_wait.wait.func = aio_wake_function; + iocb->ki_cancel = NULL; + sem->sleepers = 0; + sem->aio_owner = iocb; + list_del_init(&wait->task_list); + wake_up_locked(&sem->wait); + kick_iocb(iocb); + return 1; + } + sem->sleepers = 1; /* us - see -1 above */ + + return 1; +} + +static void fixup_down_trylock_locked(struct semaphore *sem); +static int cancel_aio_down(struct kiocb *iocb, struct io_event *event) +{ + /* At this point, the kiocb is locked and even if we have kicked + * it, the pointer to the semaphore is still valid. + */ + struct semaphore *sem = iocb->ki_wait.wait.private; + unsigned long flags; + int ret = 0; + + spin_lock_irqsave(&sem->wait.lock, flags); + if (!list_empty(&iocb->ki_wait.wait.task_list)) { + /* Ensure aio_down_wait() can no longer be called. */ + list_del_init(&iocb->ki_wait.wait.task_list); + fixup_down_trylock_locked(sem); + event->res = is_sync_kiocb(iocb) ? -ERESTARTSYS : -EINTR; + } else + ret = -EAGAIN; /* we lost the race with aio_down_wait(). */ + spin_unlock_irqrestore(&sem->wait.lock, flags); + + return ret; +} + +fastcall long __sched __aio_down(struct kiocb *iocb, struct semaphore * sem) +{ + unsigned long flags; + + if (sem->aio_owner == iocb) { + atomic_inc(&sem->count); /* undo dec in aio_down() */ + return 0; + } + + iocb->ki_wait.wait.private = sem; + iocb->ki_wait.wait.func = aio_down_wait; + spin_lock_irqsave(&sem->wait.lock, flags); + add_wait_queue_exclusive_locked(&sem->wait, &iocb->ki_wait.wait); + + sem->sleepers++; + + iocb->ki_cancel = cancel_aio_down; + + aio_down_wait(&iocb->ki_wait.wait, 0, 0, NULL); + spin_unlock_irqrestore(&sem->wait.lock, flags); + return -EIOCBRETRY; +} + +fastcall int __sched __down_interruptible(struct semaphore * sem) +{ + int retval = 0; + struct task_struct *tsk = current; + DECLARE_WAITQUEUE(wait, tsk); + unsigned long flags; + + tsk->state = TASK_INTERRUPTIBLE; + spin_lock_irqsave(&sem->wait.lock, flags); + add_wait_queue_exclusive_locked(&sem->wait, &wait); + + sem->sleepers++; + for (;;) { + int sleepers = sem->sleepers; + + /* + * With signals pending, this turns into + * the trylock failure case - we won't be + * sleeping, and we* can't get the lock as + * it has contention. Just correct the count + * and exit. + */ + if (signal_pending(current)) { + retval = -EINTR; + sem->sleepers = 0; + atomic_add(sleepers, &sem->count); + break; + } + + /* + * Add "everybody else" into it. They aren't + * playing, because we own the spinlock in + * wait_queue_head. The "-1" is because we're + * still hoping to get the semaphore. + */ + if (!atomic_add_negative(sleepers - 1, &sem->count)) { + sem->sleepers = 0; + break; + } + sem->sleepers = 1; /* us - see -1 above */ + spin_unlock_irqrestore(&sem->wait.lock, flags); + + schedule(); + + spin_lock_irqsave(&sem->wait.lock, flags); + tsk->state = TASK_INTERRUPTIBLE; + } + remove_wait_queue_locked(&sem->wait, &wait); + wake_up_locked(&sem->wait); + spin_unlock_irqrestore(&sem->wait.lock, flags); + + tsk->state = TASK_RUNNING; + return retval; +} + +/* + * Trylock failed - make sure we correct for + * having decremented the count. + * + * We could have done the trylock with a + * single "cmpxchg" without failure cases, + * but then it wouldn't work on a 386. + */ +static void fixup_down_trylock_locked(struct semaphore *sem) +{ + int sleepers; + sleepers = sem->sleepers + 1; + sem->sleepers = 0; + + /* + * Add "everybody else" and us into it. They aren't + * playing, because we own the spinlock in the + * wait_queue_head. + */ + if (!atomic_add_negative(sleepers, &sem->count)) + wake_up_locked(&sem->wait); +} + +fastcall int __down_trylock(struct semaphore * sem) +{ + unsigned long flags; + + spin_lock_irqsave(&sem->wait.lock, flags); + fixup_down_trylock_locked(sem); + spin_unlock_irqrestore(&sem->wait.lock, flags); + return 1; +} diff -purN 00_v2.6.13-rc6/mm/filemap.c 90_pipe_aio/mm/filemap.c --- 00_v2.6.13-rc6/mm/filemap.c 2005-08-04 15:56:09.000000000 -0400 +++ 90_pipe_aio/mm/filemap.c 2005-08-09 19:34:31.000000000 -0400 @@ -126,7 +126,7 @@ void remove_from_page_cache(struct page write_unlock_irq(&mapping->tree_lock); } -static int sync_page(void *word) +static int sync_page(void *word, wait_queue_t *wait) { struct address_space *mapping; struct page *page; @@ -158,8 +158,7 @@ static int sync_page(void *word) mapping = page_mapping(page); if (mapping && mapping->a_ops && mapping->a_ops->sync_page) mapping->a_ops->sync_page(page); - io_schedule(); - return 0; + return io_wait_schedule(wait); } /** @@ -223,10 +222,11 @@ EXPORT_SYMBOL(filemap_flush); /* * Wait for writeback to complete against pages indexed by start->end - * inclusive + * inclusive. In AIO context, this may queue an async notification + * and retry callback and return, instead of blocking the caller. */ -static int wait_on_page_writeback_range(struct address_space *mapping, - pgoff_t start, pgoff_t end) +static int __wait_on_page_writeback_range(struct address_space *mapping, + pgoff_t start, pgoff_t end, wait_queue_t *wait) { struct pagevec pvec; int nr_pages; @@ -238,20 +238,20 @@ static int wait_on_page_writeback_range( pagevec_init(&pvec, 0); index = start; - while ((index <= end) && + while (!ret && (index <= end) && (nr_pages = pagevec_lookup_tag(&pvec, mapping, &index, PAGECACHE_TAG_WRITEBACK, min(end - index, (pgoff_t)PAGEVEC_SIZE-1) + 1)) != 0) { unsigned i; - for (i = 0; i < nr_pages; i++) { + for (i = 0; !ret && (i < nr_pages); i++) { struct page *page = pvec.pages[i]; /* until radix tree lookup accepts end_index */ if (page->index > end) continue; - wait_on_page_writeback(page); + ret = __wait_on_page_writeback(page, wait); if (PageError(page)) ret = -EIO; } @@ -268,6 +268,14 @@ static int wait_on_page_writeback_range( return ret; } +static inline int wait_on_page_writeback_range(struct address_space *mapping, + pgoff_t start, pgoff_t end) +{ + return __wait_on_page_writeback_range(mapping, start, end, + ¤t->__wait.wait); +} + + /* * Write and wait upon all the pages in the passed range. This is a "data * integrity" operation. It waits upon in-flight writeout before starting and @@ -281,18 +289,27 @@ int sync_page_range(struct inode *inode, { pgoff_t start = pos >> PAGE_CACHE_SHIFT; pgoff_t end = (pos + count - 1) >> PAGE_CACHE_SHIFT; - int ret; + int ret = 0; if (!mapping_cap_writeback_dirty(mapping) || !count) return 0; + if (in_aio()) { + /* Already issued writeouts for this iocb ? */ + if (kiocbTrySync(io_wait_to_kiocb(current->io_wait))) + goto do_wait; /* just need to check if done */ + } ret = filemap_fdatawrite_range(mapping, pos, pos + count - 1); - if (ret == 0) { + + if (ret >= 0) { down(&inode->i_sem); ret = generic_osync_inode(inode, mapping, OSYNC_METADATA); up(&inode->i_sem); } - if (ret == 0) - ret = wait_on_page_writeback_range(mapping, start, end); +do_wait: + if (ret >= 0) { + ret = __wait_on_page_writeback_range(mapping, start, end, + current->io_wait); + } return ret; } EXPORT_SYMBOL(sync_page_range); @@ -307,15 +324,23 @@ int sync_page_range_nolock(struct inode { pgoff_t start = pos >> PAGE_CACHE_SHIFT; pgoff_t end = (pos + count - 1) >> PAGE_CACHE_SHIFT; - int ret; + int ret = 0; if (!mapping_cap_writeback_dirty(mapping) || !count) return 0; + if (in_aio()) { + /* Already issued writeouts for this iocb ? */ + if (kiocbTrySync(io_wait_to_kiocb(current->io_wait))) + goto do_wait; /* just need to check if done */ + } ret = filemap_fdatawrite_range(mapping, pos, pos + count - 1); - if (ret == 0) + if (ret >= 0) ret = generic_osync_inode(inode, mapping, OSYNC_METADATA); - if (ret == 0) - ret = wait_on_page_writeback_range(mapping, start, end); +do_wait: + if (ret >= 0) { + ret = __wait_on_page_writeback_range(mapping, start, end, + current->io_wait); + } return ret; } EXPORT_SYMBOL(sync_page_range_nolock); @@ -428,13 +453,17 @@ static inline void wake_up_page(struct p __wake_up_bit(page_waitqueue(page), &page->flags, bit); } -void fastcall wait_on_page_bit(struct page *page, int bit_nr) +int fastcall wait_on_page_bit(struct page *page, int bit_nr, + wait_queue_t *wait) { - DEFINE_WAIT_BIT(wait, &page->flags, bit_nr); - - if (test_bit(bit_nr, &page->flags)) - __wait_on_bit(page_waitqueue(page), &wait, sync_page, + if (test_bit(bit_nr, &page->flags)) { + struct wait_bit_queue *wait_bit + = container_of(wait, struct wait_bit_queue, wait); + init_wait_bit_key(wait_bit, &page->flags, bit_nr); + return __wait_on_bit(page_waitqueue(page), wait_bit, sync_page, TASK_UNINTERRUPTIBLE); + } + return 0; } EXPORT_SYMBOL(wait_on_page_bit); @@ -478,21 +507,23 @@ void end_page_writeback(struct page *pag EXPORT_SYMBOL(end_page_writeback); /* - * Get a lock on the page, assuming we need to sleep to get it. + * Get a lock on the page, assuming we need to wait to get it. * * Ugly: running sync_page() in state TASK_UNINTERRUPTIBLE is scary. If some * random driver's requestfn sets TASK_RUNNING, we could busywait. However * chances are that on the second loop, the block layer's plug list is empty, * so sync_page() will then return in state TASK_UNINTERRUPTIBLE. */ -void fastcall __lock_page(struct page *page) +int fastcall lock_page_slow(struct page *page, wait_queue_t *wait) { - DEFINE_WAIT_BIT(wait, &page->flags, PG_locked); + struct wait_bit_queue *wait_bit + = container_of(wait, struct wait_bit_queue, wait); - __wait_on_bit_lock(page_waitqueue(page), &wait, sync_page, + init_wait_bit_key(wait_bit, &page->flags, PG_locked); + return __wait_on_bit_lock(page_waitqueue(page), wait_bit, sync_page, TASK_UNINTERRUPTIBLE); } -EXPORT_SYMBOL(__lock_page); +EXPORT_SYMBOL(lock_page_slow); /* * a rather lightweight function, finding and getting a reference to a @@ -740,6 +771,11 @@ void do_generic_mapping_read(struct addr if (!isize) goto out; + if (in_aio()) { + /* Avoid repeat readahead */ + if (is_retried_kiocb(io_wait_to_kiocb(current->io_wait))) + next_index = last_index; + } end_index = (isize - 1) >> PAGE_CACHE_SHIFT; for (;;) { struct page *page; @@ -809,7 +845,11 @@ page_ok: page_not_up_to_date: /* Get exclusive access to the page ... */ - lock_page(page); + + if ((error = __lock_page(page, current->io_wait))) { + pr_debug("queued lock page \n"); + goto readpage_error; + } /* Did it get unhashed before we got the lock? */ if (!page->mapping) { @@ -832,7 +872,8 @@ readpage: goto readpage_error; if (!PageUptodate(page)) { - lock_page(page); + if ((error = __lock_page(page, current->io_wait))) + goto readpage_error; if (!PageUptodate(page)) { if (page->mapping == NULL) { /* @@ -877,7 +918,11 @@ readpage: goto page_ok; readpage_error: - /* UHHUH! A synchronous read error occurred. Report it */ + /* We don't have uptodate data in the page yet */ + /* Could be due to an error or because we need to + * retry when we get an async i/o notification. + * Report the reason. + */ desc->error = error; page_cache_release(page); goto out; @@ -1051,6 +1096,15 @@ generic_file_aio_read(struct kiocb *iocb EXPORT_SYMBOL(generic_file_aio_read); ssize_t +generic_file_aio_readv(struct kiocb *iocb, const struct iovec *iov, + unsigned long nr_segs, loff_t pos) +{ + BUG_ON(iocb->ki_pos != pos); + return __generic_file_aio_read(iocb, iov, nr_segs, &iocb->ki_pos); +} +EXPORT_SYMBOL(generic_file_aio_readv); + +ssize_t generic_file_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos) { struct iovec local_iov = { .iov_base = buf, .iov_len = count }; @@ -1983,7 +2037,7 @@ generic_file_buffered_write(struct kiocb */ if (likely(status >= 0)) { if (unlikely((file->f_flags & O_SYNC) || IS_SYNC(inode))) { - if (!a_ops->writepage || !is_sync_kiocb(iocb)) + if (!a_ops->writepage) status = generic_osync_inode(inode, mapping, OSYNC_METADATA|OSYNC_DATA); } @@ -2090,14 +2144,23 @@ generic_file_aio_write_nolock(struct kio ssize_t ret; loff_t pos = *ppos; + if (!is_sync_kiocb(iocb) && kiocbIsSynced(iocb)) { + /* nothing to transfer, may just need to sync data */ + ret = iov->iov_len; /* vector AIO not supported yet */ + goto osync; + } + ret = __generic_file_aio_write_nolock(iocb, iov, nr_segs, ppos); +osync: if (ret > 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) { int err; err = sync_page_range_nolock(inode, mapping, pos, ret); - if (err < 0) - ret = err; + if (err < 0) { + ret = err; + *ppos = pos; + } } return ret; } @@ -2131,32 +2194,47 @@ generic_file_write_nolock(struct file *f } EXPORT_SYMBOL(generic_file_write_nolock); -ssize_t generic_file_aio_write(struct kiocb *iocb, const char __user *buf, - size_t count, loff_t pos) +ssize_t generic_file_aio_writev(struct kiocb *iocb, const struct iovec *iov, + unsigned long nr_segs, loff_t pos) { struct file *file = iocb->ki_filp; struct address_space *mapping = file->f_mapping; struct inode *inode = mapping->host; ssize_t ret; - struct iovec local_iov = { .iov_base = (void __user *)buf, - .iov_len = count }; - BUG_ON(iocb->ki_pos != pos); + if (!is_sync_kiocb(iocb) && kiocbIsSynced(iocb)) { + /* nothing to transfer, may just need to sync data */ + ret = iocb->ki_left; + goto osync; + } - down(&inode->i_sem); - ret = __generic_file_aio_write_nolock(iocb, &local_iov, 1, - &iocb->ki_pos); - up(&inode->i_sem); + ret = aio_down(iocb, &inode->i_sem); + if (ret) + return ret; + ret = generic_file_aio_write_nolock(iocb, iov, nr_segs, &iocb->ki_pos); + aio_up(iocb, &inode->i_sem); +osync: if (ret > 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) { ssize_t err; err = sync_page_range(inode, mapping, pos, ret); - if (err < 0) + if (err < 0) { ret = err; + iocb->ki_pos = pos; + } } return ret; } +EXPORT_SYMBOL(generic_file_aio_writev); + +ssize_t generic_file_aio_write(struct kiocb *iocb, const char __user *buf, + size_t count, loff_t pos) +{ + struct iovec local_iov = { .iov_base = (void __user *)buf, + .iov_len = count }; + return generic_file_aio_writev(iocb, &local_iov, 1, pos); +} EXPORT_SYMBOL(generic_file_aio_write); ssize_t generic_file_write(struct file *file, const char __user *buf, diff -purN 00_v2.6.13-rc6/net/socket.c 90_pipe_aio/net/socket.c --- 00_v2.6.13-rc6/net/socket.c 2005-08-04 15:56:11.000000000 -0400 +++ 90_pipe_aio/net/socket.c 2005-08-17 14:26:24.000000000 -0400 @@ -82,6 +82,7 @@ #include #include #include +#include #ifdef CONFIG_NET_RADIO #include /* Note : will define WIRELESS_EXT */ @@ -108,6 +109,10 @@ static unsigned int sock_poll(struct fil static long sock_ioctl(struct file *file, unsigned int cmd, unsigned long arg); static int sock_fasync(int fd, struct file *filp, int on); +static ssize_t sock_read(struct file *file, char __user *buf, + size_t len, loff_t *ppos); +static ssize_t sock_write(struct file *file, const char __user *buf, + size_t len, loff_t *ppos); static ssize_t sock_readv(struct file *file, const struct iovec *vector, unsigned long count, loff_t *ppos); static ssize_t sock_writev(struct file *file, const struct iovec *vector, @@ -124,8 +129,12 @@ static ssize_t sock_sendpage(struct file static struct file_operations socket_file_ops = { .owner = THIS_MODULE, .llseek = no_llseek, +#if 0 .aio_read = sock_aio_read, .aio_write = sock_aio_write, +#endif + .read = sock_read, + .write = sock_write, .poll = sock_poll, .unlocked_ioctl = sock_ioctl, .mmap = sock_mmap, @@ -404,6 +413,7 @@ int sock_map_fd(struct socket *sock) file->f_mode = FMODE_READ | FMODE_WRITE; file->f_flags = O_RDWR; file->f_pos = 0; + file->private_data = sock; fd_install(fd, file); } @@ -411,6 +421,13 @@ out: return fd; } +static inline struct socket *file_to_socket(struct file *file) +{ + if (file->f_op == &socket_file_ops) + return file->private_data; + return ERR_PTR(-ENOTSOCK); +} + /** * sockfd_lookup - Go from a file number to its socket slot * @fd: file handle @@ -436,6 +453,10 @@ struct socket *sockfd_lookup(int fd, int return NULL; } + sock = file_to_socket(file); + if (!IS_ERR(sock)) + return sock; + inode = file->f_dentry->d_inode; if (!S_ISSOCK(inode->i_mode)) { *err = -ENOTSOCK; @@ -638,7 +659,6 @@ static void sock_aio_dtor(struct kiocb * * Read data from a socket. ubuf is a user mode pointer. We make sure the user * area ubuf...ubuf+size-1 is writable before asking the protocol. */ - static ssize_t sock_aio_read(struct kiocb *iocb, char __user *ubuf, size_t size, loff_t pos) { @@ -772,7 +792,13 @@ static ssize_t sock_readv(struct file *f return sock_readv_writev(VERIFY_WRITE, file->f_dentry->d_inode, file, vector, count, tot_len); } - + +static ssize_t sock_read(struct file *file, char __user *buf, size_t len, loff_t *ppos) +{ + struct iovec local_iov = { .iov_base = buf, .iov_len = len }; + return sock_readv(file, &local_iov, 1, ppos); +} + static ssize_t sock_writev(struct file *file, const struct iovec *vector, unsigned long count, loff_t *ppos) { @@ -784,6 +810,13 @@ static ssize_t sock_writev(struct file * file, vector, count, tot_len); } +static ssize_t sock_write(struct file *file, const char __user *buf, size_t len, loff_t *ppos) +{ + struct iovec local_iov = { .iov_base = (void __user *)buf, + .iov_len = len }; + return sock_writev(file, &local_iov, 1, ppos); +} + /* * Atomic setting of ioctl hooks to avoid race @@ -1690,15 +1723,31 @@ asmlinkage long sys_shutdown(int fd, int asmlinkage long sys_sendmsg(int fd, struct msghdr __user *msg, unsigned flags) { - struct compat_msghdr __user *msg_compat = (struct compat_msghdr __user *)msg; struct socket *sock; + int err; + + sock = sockfd_lookup(fd, &err); + if (!sock) + goto out; + err = vfs_sendmsg(sock->file, msg, flags); + sockfd_put(sock); +out: + return err; +} + +long vfs_sendmsg(struct file *file, struct msghdr __user *msg, unsigned flags) +{ + struct compat_msghdr __user *msg_compat = (struct compat_msghdr __user *)msg; + struct socket *sock = file_to_socket(file); char address[MAX_SOCK_ADDR]; struct iovec iovstack[UIO_FASTIOV], *iov = iovstack; unsigned char ctl[sizeof(struct cmsghdr) + 20]; /* 20 is size of ipv6_pktinfo */ unsigned char *ctl_buf = ctl; struct msghdr msg_sys; int err, ctl_len, iov_size, total_len; - + + if (IS_ERR(sock)) + return PTR_ERR(sock); err = -EFAULT; if (MSG_CMSG_COMPAT & flags) { if (get_compat_msghdr(&msg_sys, msg_compat)) @@ -1706,14 +1755,10 @@ asmlinkage long sys_sendmsg(int fd, stru } else if (copy_from_user(&msg_sys, msg, sizeof(struct msghdr))) return -EFAULT; - sock = sockfd_lookup(fd, &err); - if (!sock) - goto out; - /* do not move before msg_sys is valid */ err = -EMSGSIZE; if (msg_sys.msg_iovlen > UIO_MAXIOV) - goto out_put; + goto out; /* Check whether to allocate the iovec area*/ err = -ENOMEM; @@ -1721,7 +1766,7 @@ asmlinkage long sys_sendmsg(int fd, stru if (msg_sys.msg_iovlen > UIO_FASTIOV) { iov = sock_kmalloc(sock->sk, iov_size, GFP_KERNEL); if (!iov) - goto out_put; + goto out; } /* This will also move the address data into kernel space */ @@ -1772,8 +1817,6 @@ out_freectl: out_freeiov: if (iov != iovstack) sock_kfree_s(sock->sk, iov, iov_size); -out_put: - sockfd_put(sock); out: return err; } @@ -1784,8 +1827,22 @@ out: asmlinkage long sys_recvmsg(int fd, struct msghdr __user *msg, unsigned int flags) { - struct compat_msghdr __user *msg_compat = (struct compat_msghdr __user *)msg; struct socket *sock; + int err; + + sock = sockfd_lookup(fd, &err); + if (!sock) + goto out; + err = vfs_recvmsg(sock->file, msg, flags); + sockfd_put(sock); +out: + return err; +} + +long vfs_recvmsg(struct file *file, struct msghdr __user *msg, unsigned int flags) +{ + struct compat_msghdr __user *msg_compat = (struct compat_msghdr __user *)msg; + struct socket *sock = file_to_socket(file); struct iovec iovstack[UIO_FASTIOV]; struct iovec *iov=iovstack; struct msghdr msg_sys; @@ -1798,7 +1855,9 @@ asmlinkage long sys_recvmsg(int fd, stru /* user mode address pointers */ struct sockaddr __user *uaddr; int __user *uaddr_len; - + + if (IS_ERR(sock)) + return PTR_ERR(sock); if (MSG_CMSG_COMPAT & flags) { if (get_compat_msghdr(&msg_sys, msg_compat)) return -EFAULT; @@ -1806,13 +1865,9 @@ asmlinkage long sys_recvmsg(int fd, stru if (copy_from_user(&msg_sys,msg,sizeof(struct msghdr))) return -EFAULT; - sock = sockfd_lookup(fd, &err); - if (!sock) - goto out; - err = -EMSGSIZE; if (msg_sys.msg_iovlen > UIO_MAXIOV) - goto out_put; + goto out; /* Check whether to allocate the iovec area*/ err = -ENOMEM; @@ -1820,7 +1875,7 @@ asmlinkage long sys_recvmsg(int fd, stru if (msg_sys.msg_iovlen > UIO_FASTIOV) { iov = sock_kmalloc(sock->sk, iov_size, GFP_KERNEL); if (!iov) - goto out_put; + goto out; } /* @@ -1871,8 +1926,6 @@ asmlinkage long sys_recvmsg(int fd, stru out_freeiov: if (iov != iovstack) sock_kfree_s(sock->sk, iov, iov_size); -out_put: - sockfd_put(sock); out: return err; }