diff -purN v2.6.9/fs/aio.c aio-2.6.9/fs/aio.c --- v2.6.9/fs/aio.c 2005-06-01 14:00:56.000000000 -0400 +++ aio-2.6.9/fs/aio.c 2005-06-01 15:02:38.000000000 -0400 @@ -225,6 +225,7 @@ static struct kioctx *ioctx_alloc(unsign atomic_set(&ctx->users, 1); spin_lock_init(&ctx->ctx_lock); spin_lock_init(&ctx->ring_info.ring_lock); + INIT_LIST_HEAD(&ctx->ring_info.waiters); init_waitqueue_head(&ctx->wait); INIT_LIST_HEAD(&ctx->active_reqs); @@ -908,7 +909,7 @@ void fastcall kick_iocb(struct kiocb *io * single context. */ if (is_sync_kiocb(iocb)) { kiocbSetKicked(iocb); - wake_up_process(iocb->ki_obj.tsk); + wake_up_process(iocb->ki_obj.tsk); return; } @@ -932,7 +933,7 @@ int fastcall aio_complete(struct kiocb * struct aio_ring *ring; struct io_event *event; unsigned long flags; - unsigned long tail; + unsigned long tail, head; int ret; /* Special case handling for sync iocbs: events go directly @@ -1001,6 +1002,7 @@ int fastcall aio_complete(struct kiocb * info->tail = tail; ring->tail = tail; + head = ring->head; put_aio_ring_event(event, KM_IRQ0); kunmap_atomic(ring, KM_IRQ1); @@ -1011,15 +1013,26 @@ int fastcall aio_complete(struct kiocb * iocb->ki_retried, iocb->ki_nbytes - iocb->ki_left, iocb->ki_nbytes, iocb->ki_kicked, iocb->ki_queued, aio_run, aio_wakeups); + + if (unlikely(ctx->nr_wait)) { + unsigned long nr_events; + if (head <= tail) + nr_events = tail - head; + else + nr_events = tail + info->nr - head; + if (nr_events >= ctx->nr_wait) + wake_up_process(ctx->waiter_process); + } + put_rq: /* everything turned out well, dispose of the aiocb. */ ret = __aio_put_req(ctx, iocb); - spin_unlock_irqrestore(&ctx->ctx_lock, flags); - if (waitqueue_active(&ctx->wait)) wake_up(&ctx->wait); + spin_unlock_irqrestore(&ctx->ctx_lock, flags); + if (ret) put_ioctx(ctx); @@ -1047,8 +1060,6 @@ static int aio_read_evt(struct kioctx *i if (ring->head == ring->tail) goto out; - spin_lock(&info->ring_lock); - head = ring->head % info->nr; if (head != ring->tail) { struct io_event *evp = aio_ring_event(info, head, KM_USER1); @@ -1059,7 +1070,6 @@ static int aio_read_evt(struct kioctx *i ret = 1; put_aio_ring_event(evp, KM_USER1); } - spin_unlock(&info->ring_lock); out: kunmap_atomic(ring, KM_USER0); @@ -1068,6 +1078,22 @@ out: return ret; } +static inline long ring_events_avail(struct kioctx *ctx) +{ + struct aio_ring *ring; + long head, tail; + + ring = kmap_atomic(ctx->ring_info.ring_pages[0], KM_USER0); + head = ring->head; + tail = ring->tail; + kunmap_atomic(ring, KM_USER0); + + barrier(); + if (head <= tail) + return tail - head; + return ctx->ring_info.nr + tail - head; +} + struct aio_timeout { struct timer_list timer; int timed_out; @@ -1106,14 +1132,108 @@ static inline void clear_timeout(struct del_singleshot_timer_sync(&to->timer); } +static inline int set_timeout_user(long start_jiffies, struct aio_timeout *to, + const struct timespec __user *timeout) +{ + struct timespec ts; + int ret; + + ret = copy_from_user(&ts, timeout, sizeof(ts)); + if (ret) + return ret; + + set_timeout(start_jiffies, to, &ts); + return 0; +} + +/* wait_aio_ring + * Waits to become head of the aio ring waiters. Must be called + * with ring_lock held. Returns with ring_lock held. + */ +static int wait_aio_ring(struct kioctx *ctx, + long start_jiffies, + struct aio_ring_waiter *waiter, + long min_nr, + struct timespec __user *timeout) +{ + struct aio_ring_info *info = &ctx->ring_info; + struct aio_timeout to; + int ret = 0; + + waiter->min_nr = min_nr; + waiter->process = current; + list_add_tail(&waiter->list, &info->waiters); + spin_unlock(&info->ring_lock); + + init_timeout(&to); + if (timeout) + ret = set_timeout_user(start_jiffies, &to, timeout); + + while (!ret) { + set_current_state(TASK_INTERRUPTIBLE); + + schedule(); + + set_current_state(TASK_RUNNING); + + /* Are we head of the list? */ + if (info->waiters.next == &waiter->list) + break; + + if (signal_pending(current)) { + ret = -ERESTARTSYS; + break; + } + if (to.timed_out) { + ret = -ETIMEDOUT; + break; + } + } + + if (timeout) + clear_timeout(&to); + + spin_lock(&info->ring_lock); + list_del(&waiter->list); + + return ret; +} + +static void drop_waiter(struct kioctx *ctx, struct aio_ring_waiter *waiter) +{ + struct aio_ring_info *info = &ctx->ring_info; + + spin_lock(&info->ring_lock); + list_del(&waiter->list); + + spin_lock_irq(&ctx->ctx_lock); /* We muck with ctx->nr_wait and + * waiter_process */ + if (!list_empty(&info->waiters)) { + struct aio_ring_waiter *next_waiter; + next_waiter = list_entry(info->waiters.next, struct aio_ring_waiter, list); + ctx->nr_wait = next_waiter->min_nr; + ctx->waiter_process = next_waiter->process; + + if (ring_events_avail(ctx) >= ctx->nr_wait) + wake_up_process(ctx->waiter_process); + } else { + ctx->nr_wait = 0; + ctx->waiter_process = NULL; + } + + spin_unlock_irq(&ctx->ctx_lock); + spin_unlock(&info->ring_lock); +} + static int read_events(struct kioctx *ctx, long min_nr, long nr, struct io_event __user *event, struct timespec __user *timeout) { + struct aio_ring_info *info = &ctx->ring_info; long start_jiffies = jiffies; struct task_struct *tsk = current; - DECLARE_WAITQUEUE(wait, tsk); + struct aio_ring_waiter waiter; int ret; int i = 0; struct io_event ent; @@ -1121,6 +1241,21 @@ static int read_events(struct kioctx *ct int event_loop = 0; /* testing only */ int retry = 0; + spin_lock(&info->ring_lock); + + /* If someone else is already on the waiters list, we must delay */ + if (!list_empty(&ctx->ring_info.waiters)) { + ret = wait_aio_ring(ctx, start_jiffies, &waiter, min_nr, timeout +); + if (unlikely(ret)) { + spin_unlock(&info->ring_lock); + return ret; + } + } + + list_add(&waiter.list, &ctx->ring_info.waiters); + spin_unlock(&info->ring_lock); + /* needed to zero any padding within an entry (there shouldn't be * any, but C is fun! */ @@ -1148,10 +1283,10 @@ retry: i ++; } - if (min_nr <= i) - return i; - if (ret) - return ret; + if (min_nr <= i || ret) { + drop_waiter(ctx, &waiter); + return i ? i : ret; + } /* End fast path */ @@ -1164,17 +1299,22 @@ retry: init_timeout(&to); if (timeout) { - struct timespec ts; - ret = -EFAULT; - if (unlikely(copy_from_user(&ts, timeout, sizeof(ts)))) + ret = set_timeout_user(start_jiffies, &to, timeout); + if (unlikely(ret)) goto out; - - set_timeout(start_jiffies, &to, &ts); } + /* Make sure we're receiving the next wakeup. */ + ctx->waiter_process = current; + while (likely(i < nr)) { - add_wait_queue_exclusive(&ctx->wait, &wait); do { + /* Setting ctx->nr_wait allows aio_complete to only + * issue a wakeup when enough events to satisfy our + * read are available. + */ + ctx->nr_wait = min_nr - i; + set_task_state(tsk, TASK_INTERRUPTIBLE); ret = aio_read_evt(ctx, &ent); if (ret) @@ -1184,17 +1324,19 @@ retry: ret = 0; if (to.timed_out) /* Only check after read evt */ break; - schedule(); + + if (ring_events_avail(ctx) < min_nr - i) + schedule(); + event_loop++; if (signal_pending(tsk)) { - ret = -EINTR; + ret = -ERESTARTSYS; break; } /*ret = aio_read_evt(ctx, &ent);*/ } while (1) ; set_task_state(tsk, TASK_RUNNING); - remove_wait_queue(&ctx->wait, &wait); if (unlikely(ret <= 0)) break; @@ -1216,6 +1358,7 @@ out: pr_debug("event loop executed %d times\n", event_loop); pr_debug("aio_run %ld\n", aio_run); pr_debug("aio_wakeups %ld\n", aio_wakeups); + drop_waiter(ctx, &waiter); return i ? i : ret; } diff -purN v2.6.9/include/linux/aio.h aio-2.6.9/include/linux/aio.h --- v2.6.9/include/linux/aio.h 2004-12-24 16:35:50.000000000 -0500 +++ aio-2.6.9/include/linux/aio.h 2005-06-01 14:16:58.000000000 -0400 @@ -11,6 +11,7 @@ #define AIO_KIOGRP_NR_ATOMIC 8 struct kioctx; +struct task_struct; /* Notes on cancelling a kiocb: * If a kiocb is cancelled, aio_complete may return 0 to indicate @@ -113,6 +114,12 @@ struct aio_ring { #define aio_ring_avail(info, ring) (((ring)->head + (info)->nr - 1 - (ring)->tail) % (info)->nr) +struct aio_ring_waiter { + struct list_head list; + long min_nr; + struct task_struct *process; +}; + #define AIO_RING_PAGES 8 struct aio_ring_info { unsigned long mmap_base; @@ -121,6 +128,7 @@ struct aio_ring_info { struct page **ring_pages; spinlock_t ring_lock; long nr_pages; + struct list_head waiters; /* list of aio_ring_waiter */ unsigned nr, tail; @@ -145,6 +153,8 @@ struct kioctx { struct list_head run_list; /* used for kicked reqs */ unsigned max_reqs; + long nr_wait; + struct task_struct *waiter_process; struct aio_ring_info ring_info; diff -purN v2.6.9/lib/rwsem.c aio-2.6.9/lib/rwsem.c --- v2.6.9/lib/rwsem.c 2004-12-24 16:35:50.000000000 -0500 +++ aio-2.6.9/lib/rwsem.c 2005-06-01 16:16:55.000000000 -0400 @@ -219,15 +219,16 @@ rwsem_down_write_failed(struct rw_semaph */ struct rw_semaphore fastcall *rwsem_wake(struct rw_semaphore *sem) { + unsigned long flags; rwsemtrace(sem, "Entering rwsem_wake"); - spin_lock(&sem->wait_lock); + spin_lock_irqsave(&sem->wait_lock, flags); /* do nothing if list empty */ if (!list_empty(&sem->wait_list)) sem = __rwsem_do_wake(sem, 0); - spin_unlock(&sem->wait_lock); + spin_unlock_irqrestore(&sem->wait_lock, flags); rwsemtrace(sem, "Leaving rwsem_wake"); @@ -241,15 +242,16 @@ struct rw_semaphore fastcall *rwsem_wake */ struct rw_semaphore fastcall *rwsem_downgrade_wake(struct rw_semaphore *sem) { + unsigned long flags; rwsemtrace(sem, "Entering rwsem_downgrade_wake"); - spin_lock(&sem->wait_lock); + spin_lock_irqsave(&sem->wait_lock, flags); /* do nothing if list empty */ if (!list_empty(&sem->wait_list)) sem = __rwsem_do_wake(sem, 1); - spin_unlock(&sem->wait_lock); + spin_unlock_irqsave(&sem->wait_lock, flags); rwsemtrace(sem, "Leaving rwsem_downgrade_wake"); return sem; diff -purN v2.6.9/lib/rwsem-spinlock.c aio-2.6.9/lib/rwsem-spinlock.c --- v2.6.9/lib/rwsem-spinlock.c 2004-12-24 16:35:50.000000000 -0500 +++ aio-2.6.9/lib/rwsem-spinlock.c 2005-06-01 16:20:57.000000000 -0400 @@ -137,15 +137,16 @@ void fastcall __sched __down_read(struct { struct rwsem_waiter waiter; struct task_struct *tsk; + unsigned long flags; rwsemtrace(sem, "Entering __down_read"); - spin_lock(&sem->wait_lock); + spin_lock_irqsave(&sem->wait_lock, flags); if (sem->activity >= 0 && list_empty(&sem->wait_list)) { /* granted */ sem->activity++; - spin_unlock(&sem->wait_lock); + spin_unlock_irqrestore(&sem->wait_lock, flags); goto out; } @@ -160,7 +161,7 @@ void fastcall __sched __down_read(struct list_add_tail(&waiter.list, &sem->wait_list); /* we don't need to touch the semaphore struct anymore */ - spin_unlock(&sem->wait_lock); + spin_unlock_irqrestore(&sem->wait_lock, flags); /* wait to be given the lock */ for (;;) { @@ -181,10 +182,11 @@ void fastcall __sched __down_read(struct */ int fastcall __down_read_trylock(struct rw_semaphore *sem) { + unsigned long flags; int ret = 0; rwsemtrace(sem, "Entering __down_read_trylock"); - spin_lock(&sem->wait_lock); + spin_lock_irqsave(&sem->wait_lock, flags); if (sem->activity >= 0 && list_empty(&sem->wait_list)) { /* granted */ @@ -192,7 +194,7 @@ int fastcall __down_read_trylock(struct ret = 1; } - spin_unlock(&sem->wait_lock); + spin_unlock_irqrestore(&sem->wait_lock, flags); rwsemtrace(sem, "Leaving __down_read_trylock"); return ret; @@ -250,10 +252,11 @@ void fastcall __sched __down_write(struc */ int fastcall __down_write_trylock(struct rw_semaphore *sem) { + unsigned long flags; int ret = 0; rwsemtrace(sem, "Entering __down_write_trylock"); - spin_lock(&sem->wait_lock); + spin_lock_irqsave(&sem->wait_lock, flags); if (sem->activity == 0 && list_empty(&sem->wait_list)) { /* granted */ @@ -261,7 +264,7 @@ int fastcall __down_write_trylock(struct ret = 1; } - spin_unlock(&sem->wait_lock); + spin_unlock_irqrestore(&sem->wait_lock, flags); rwsemtrace(sem, "Leaving __down_write_trylock"); return ret; @@ -272,14 +275,15 @@ int fastcall __down_write_trylock(struct */ void fastcall __up_read(struct rw_semaphore *sem) { + unsigned long flags; rwsemtrace(sem, "Entering __up_read"); - spin_lock(&sem->wait_lock); + spin_lock_irqsave(&sem->wait_lock, flags); if (--sem->activity == 0 && !list_empty(&sem->wait_list)) sem = __rwsem_wake_one_writer(sem); - spin_unlock(&sem->wait_lock); + spin_unlock_irqrestore(&sem->wait_lock, flags); rwsemtrace(sem, "Leaving __up_read"); } @@ -289,15 +293,16 @@ void fastcall __up_read(struct rw_semaph */ void fastcall __up_write(struct rw_semaphore *sem) { + unsigned long flags; rwsemtrace(sem, "Entering __up_write"); - spin_lock(&sem->wait_lock); + spin_lock_irqsave(&sem->wait_lock, flags); sem->activity = 0; if (!list_empty(&sem->wait_list)) sem = __rwsem_do_wake(sem, 1); - spin_unlock(&sem->wait_lock); + spin_unlock_irqrestore(&sem->wait_lock, flags); rwsemtrace(sem, "Leaving __up_write"); } @@ -308,15 +313,16 @@ void fastcall __up_write(struct rw_semap */ void fastcall __downgrade_write(struct rw_semaphore *sem) { + unsigned long flags; rwsemtrace(sem, "Entering __downgrade_write"); - spin_lock(&sem->wait_lock); + spin_lock_irqsave(&sem->wait_lock, flags); sem->activity = 1; if (!list_empty(&sem->wait_list)) sem = __rwsem_do_wake(sem, 0); - spin_unlock(&sem->wait_lock); + spin_unlock_irqrestore(&sem->wait_lock, flags); rwsemtrace(sem, "Leaving __downgrade_write"); }