diff -purN v2.6.12-rc4/fs/aio.c aio-min_nr-rc4/fs/aio.c --- v2.6.12-rc4/fs/aio.c 2005-05-09 15:46:57.000000000 -0400 +++ aio-min_nr-rc4/fs/aio.c 2005-05-17 16:46:46.000000000 -0400 @@ -221,6 +221,7 @@ static struct kioctx *ioctx_alloc(unsign atomic_set(&ctx->users, 1); spin_lock_init(&ctx->ctx_lock); spin_lock_init(&ctx->ring_info.ring_lock); + INIT_LIST_HEAD(&ctx->ring_info.waiters); init_waitqueue_head(&ctx->wait); INIT_LIST_HEAD(&ctx->active_reqs); @@ -915,7 +916,7 @@ int fastcall aio_complete(struct kiocb * struct aio_ring *ring; struct io_event *event; unsigned long flags; - unsigned long tail; + unsigned long tail, head; int ret; /* Special case handling for sync iocbs: events go directly @@ -985,6 +986,7 @@ int fastcall aio_complete(struct kiocb * info->tail = tail; ring->tail = tail; + head = ring->head; put_aio_ring_event(event, KM_IRQ0); kunmap_atomic(ring, KM_IRQ1); @@ -993,15 +995,26 @@ int fastcall aio_complete(struct kiocb * pr_debug("%ld retries: %d of %d\n", iocb->ki_retried, iocb->ki_nbytes - iocb->ki_left, iocb->ki_nbytes); + + if (unlikely(ctx->nr_wait)) { + unsigned long nr_events; + if (head <= tail) + nr_events = tail - head; + else + nr_events = tail + info->nr - head; + if (nr_events >= ctx->nr_wait) + wake_up_process(ctx->waiter_process); + } + put_rq: /* everything turned out well, dispose of the aiocb. */ ret = __aio_put_req(ctx, iocb); - spin_unlock_irqrestore(&ctx->ctx_lock, flags); - if (waitqueue_active(&ctx->wait)) wake_up(&ctx->wait); + spin_unlock_irqrestore(&ctx->ctx_lock, flags); + if (ret) put_ioctx(ctx); @@ -1029,8 +1042,6 @@ static int aio_read_evt(struct kioctx *i if (ring->head == ring->tail) goto out; - spin_lock(&info->ring_lock); - head = ring->head % info->nr; if (head != ring->tail) { struct io_event *evp = aio_ring_event(info, head, KM_USER1); @@ -1041,7 +1052,6 @@ static int aio_read_evt(struct kioctx *i ret = 1; put_aio_ring_event(evp, KM_USER1); } - spin_unlock(&info->ring_lock); out: kunmap_atomic(ring, KM_USER0); @@ -1050,6 +1060,22 @@ out: return ret; } +static inline long ring_events_avail(struct kioctx *ctx) +{ + struct aio_ring *ring; + long head, tail; + + ring = kmap_atomic(ctx->ring_info.ring_pages[0], KM_USER0); + head = ring->head; + tail = ring->tail; + kunmap_atomic(ring, KM_USER0); + + barrier(); + if (head <= tail) + return tail - head; + return ctx->ring_info.nr + tail - head; +} + struct aio_timeout { struct timer_list timer; int timed_out; @@ -1088,20 +1114,126 @@ static inline void clear_timeout(struct del_singleshot_timer_sync(&to->timer); } +static inline int set_timeout_user(long start_jiffies, struct aio_timeout *to, + const struct timespec __user *timeout) +{ + struct timespec ts; + int ret; + + ret = copy_from_user(&ts, timeout, sizeof(ts)); + if (ret) + return ret; + + set_timeout(start_jiffies, to, &ts); + return 0; +} + +/* wait_aio_ring + * Waits to become head of the aio ring waiters. Must be called + * with ring_lock held. Returns with ring_lock held. + */ +static int wait_aio_ring(struct kioctx *ctx, + long start_jiffies, + struct aio_ring_waiter *waiter, + long min_nr, + struct timespec __user *timeout) +{ + struct aio_ring_info *info = &ctx->ring_info; + struct aio_timeout to; + int ret = 0; + + waiter->min_nr = min_nr; + waiter->process = current; + waiter->am_head = 0; + list_add_tail(&waiter->list, &info->waiters); + spin_unlock(&info->ring_lock); + + init_timeout(&to); + if (timeout) + ret = set_timeout_user(start_jiffies, &to, timeout); + + while (!ret) { + set_current_state(TASK_INTERRUPTIBLE); + + schedule(); + + set_current_state(TASK_RUNNING); + + if (waiter->am_head) + break; + + if (signal_pending(current)) { + ret = -EINTR; + break; + } + if (to.timed_out) { + ret = -ETIMEDOUT; + break; + } + } + + if (timeout) + clear_timeout(&to); + + spin_lock(&info->ring_lock); + list_del(&waiter->list); + + return ret; +} + +static void drop_waiter(struct kioctx *ctx, struct aio_ring_waiter *waiter) +{ + struct aio_ring_info *info = &ctx->ring_info; + + spin_lock(&info->ring_lock); + list_del(&waiter->list); + + spin_lock_irq(&ctx->ctx_lock); /* We muck with ctx->nr_wait and + * waiter_process */ + if (!list_empty(&info->waiters)) { + struct aio_ring_waiter *next_waiter; + next_waiter = (struct aio_ring_waiter *)info->waiters.next; + next_waiter->am_head = 1; + ctx->nr_wait = next_waiter->min_nr; + ctx->waiter_process = next_waiter->process; + } else { + ctx->nr_wait = 0; + ctx->waiter_process = NULL; + } + + spin_unlock_irq(&ctx->ctx_lock); + spin_unlock(&info->ring_lock); +} + static int read_events(struct kioctx *ctx, long min_nr, long nr, struct io_event __user *event, struct timespec __user *timeout) { + struct aio_ring_info *info = &ctx->ring_info; long start_jiffies = jiffies; struct task_struct *tsk = current; - DECLARE_WAITQUEUE(wait, tsk); + struct aio_ring_waiter waiter; int ret; int i = 0; struct io_event ent; struct aio_timeout to; int retry = 0; + spin_lock(&info->ring_lock); + + /* If someone else is already on the waiters list, we must delay */ + if (!list_empty(&ctx->ring_info.waiters)) { + ret = wait_aio_ring(ctx, start_jiffies, &waiter, min_nr, timeout); + if (unlikely(ret)) { + spin_unlock(&info->ring_lock); + return ret; + } + } + + list_add(&waiter.list, &ctx->ring_info.waiters); + spin_unlock(&info->ring_lock); + /* needed to zero any padding within an entry (there shouldn't be * any, but C is fun! */ @@ -1129,10 +1261,10 @@ retry: i ++; } - if (min_nr <= i) - return i; - if (ret) - return ret; + if (min_nr <= i || ret) { + drop_waiter(ctx, &waiter); + return i ? i : ret; + } /* End fast path */ @@ -1145,17 +1277,22 @@ retry: init_timeout(&to); if (timeout) { - struct timespec ts; - ret = -EFAULT; - if (unlikely(copy_from_user(&ts, timeout, sizeof(ts)))) + ret = set_timeout_user(start_jiffies, &to, timeout); + if (unlikely(ret)) goto out; - - set_timeout(start_jiffies, &to, &ts); } + /* Make sure we're receiving the next wakeup. */ + ctx->waiter_process = current; + while (likely(i < nr)) { - add_wait_queue_exclusive(&ctx->wait, &wait); do { + /* Setting ctx->nr_wait allows aio_complete to only + * issue a wakeup when enough events to satisfy our + * read are available. + */ + ctx->nr_wait = min_nr - i; + set_task_state(tsk, TASK_INTERRUPTIBLE); ret = aio_read_evt(ctx, &ent); if (ret) @@ -1165,7 +1302,10 @@ retry: ret = 0; if (to.timed_out) /* Only check after read evt */ break; - schedule(); + + if (ring_events_avail(ctx) < min_nr - i) + schedule(); + if (signal_pending(tsk)) { ret = -EINTR; break; @@ -1174,7 +1314,6 @@ retry: } while (1) ; set_task_state(tsk, TASK_RUNNING); - remove_wait_queue(&ctx->wait, &wait); if (unlikely(ret <= 0)) break; @@ -1192,7 +1331,9 @@ retry: if (timeout) clear_timeout(&to); + out: + drop_waiter(ctx, &waiter); return i ? i : ret; } diff -purN v2.6.12-rc4/include/linux/aio.h aio-min_nr-rc4/include/linux/aio.h --- v2.6.12-rc4/include/linux/aio.h 2004-12-24 16:35:50.000000000 -0500 +++ aio-min_nr-rc4/include/linux/aio.h 2005-05-17 15:23:44.000000000 -0400 @@ -11,6 +11,7 @@ #define AIO_KIOGRP_NR_ATOMIC 8 struct kioctx; +struct task_struct; /* Notes on cancelling a kiocb: * If a kiocb is cancelled, aio_complete may return 0 to indicate @@ -113,6 +114,13 @@ struct aio_ring { #define aio_ring_avail(info, ring) (((ring)->head + (info)->nr - 1 - (ring)->tail) % (info)->nr) +struct aio_ring_waiter { + struct list_head list; + long min_nr; + struct task_struct *process; + int am_head; +}; + #define AIO_RING_PAGES 8 struct aio_ring_info { unsigned long mmap_base; @@ -121,6 +129,7 @@ struct aio_ring_info { struct page **ring_pages; spinlock_t ring_lock; long nr_pages; + struct list_head waiters; /* list of aio_ring_waiter */ unsigned nr, tail; @@ -145,6 +154,8 @@ struct kioctx { struct list_head run_list; /* used for kicked reqs */ unsigned max_reqs; + long nr_wait; + struct task_struct *waiter_process; struct aio_ring_info ring_info;