diff --git a/fs/aio.c b/fs/aio.c index 24ba228..c41355d 100644 --- a/fs/aio.c +++ b/fs/aio.c @@ -38,6 +38,7 @@ #include #include #include +#include #include #include @@ -73,6 +74,8 @@ struct kioctx { unsigned long user_id; struct hlist_node list; + struct mm_struct *mm; + struct __percpu kioctx_cpu *cpu; unsigned req_batch; @@ -101,6 +104,11 @@ struct kioctx { } ____cacheline_aligned_in_smp; struct { + spinlock_t worker_lock; + struct list_head worker_list; + } ____cacheline_aligned_in_smp; + + struct { struct mutex ring_lock; wait_queue_head_t wait; @@ -135,6 +143,8 @@ unsigned long aio_max_nr = 0x10000; /* system wide maximum number of aio request static struct kmem_cache *kiocb_cachep; static struct kmem_cache *kioctx_cachep; +static int make_helper_thread(struct kioctx *ctx); + /* aio_setup * Creates the slab caches used by the aio routines, panic on * failure as this is done early during the boot sequence. @@ -294,9 +304,25 @@ static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb, static void free_ioctx_rcu(struct rcu_head *head) { struct kioctx *ctx = container_of(head, struct kioctx, rcu_head); + struct task_struct *task; + int nr = 0; free_percpu(ctx->cpu); + do { + spin_lock(&ctx->worker_lock); + if (!list_empty(&ctx->worker_list)) { + task = list_entry(ctx->worker_list.next, + struct task_struct, aio_list); + list_del(&task->aio_list); + nr++; + } else + task = NULL; + spin_unlock(&ctx->worker_lock); + if (task) + wake_up_process(task); + } while (task) ; kmem_cache_free(kioctx_cachep, ctx); + printk("free_ioctx_rcu: nr of worker threads: %d\n", nr); } /* @@ -355,6 +381,10 @@ static void free_ioctx(struct kioctx *ctx) pr_debug("freeing %p\n", ctx); + if (ctx->mm) + mmdrop(ctx->mm); + ctx->mm = NULL; + /* * Here the call_rcu() is between the wait_event() for reqs_active to * hit 0, and freeing the ioctx. @@ -402,6 +432,8 @@ static struct kioctx *ioctx_alloc(unsigned nr_events) rcu_read_unlock(); spin_lock_init(&ctx->ctx_lock); + spin_lock_init(&ctx->worker_lock); + INIT_LIST_HEAD(&ctx->worker_list); mutex_init(&ctx->ring_lock); init_waitqueue_head(&ctx->wait); @@ -428,6 +460,9 @@ static struct kioctx *ioctx_alloc(unsigned nr_events) aio_nr += ctx->max_reqs; spin_unlock(&aio_nr_lock); + ctx->mm = current->mm; + atomic_inc(¤t->mm->mm_count); + /* now link into global list. */ spin_lock(&mm->ioctx_lock); hlist_add_head_rcu(&ctx->list, &mm->ioctx_list); @@ -624,6 +659,7 @@ static void kiocb_free(struct kiocb *req) void aio_put_req(struct kiocb *req) { + BUG_ON(atomic_read(&req->ki_users) <= 0); if (atomic_dec_and_test(&req->ki_users)) kiocb_free(req); } @@ -676,6 +712,7 @@ static inline unsigned kioctx_ring_put(struct kioctx *ctx, struct kiocb *req, static inline unsigned kioctx_ring_lock(struct kioctx *ctx) { + struct aio_ring *ring; unsigned tail; /* @@ -685,6 +722,13 @@ static inline unsigned kioctx_ring_lock(struct kioctx *ctx) while ((tail = xchg(&ctx->tail, UINT_MAX)) == UINT_MAX) cpu_relax(); + ring = kmap_atomic(ctx->ring_pages[0]); + if (ring->head == ring->tail) { + ring->head = ring->tail = 0; + tail = 0; + } + kunmap_atomic(ring); + return tail; } @@ -1023,6 +1067,9 @@ SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp) put_ioctx(ioctx); } + if (!ret) + make_helper_thread(ioctx); + out: return ret; } @@ -1153,7 +1200,7 @@ static ssize_t aio_setup_single_vector(int rw, struct kiocb *kiocb) * Performs the initial checks and aio retry method * setup for the kiocb at the time of io submission. */ -static ssize_t aio_run_iocb(struct kiocb *req, bool compat) +static ssize_t aio_run_iocb(struct kiocb *req) { struct file *file = req->ki_filp; ssize_t ret; @@ -1179,12 +1226,14 @@ rw_common: if (unlikely(!(file->f_mode & mode))) return -EBADF; - if (!rw_op) - return -EINVAL; + //if (!current->aio_data && !rw_op) + // return -EINVAL; + printk("current->aio_data = %p, rw_op = %p\n", + current->aio_data, rw_op); ret = (req->ki_opcode == IOCB_CMD_PREADV || req->ki_opcode == IOCB_CMD_PWRITEV) - ? aio_setup_vectored_rw(rw, req, compat) + ? aio_setup_vectored_rw(rw, req, req->ki_compat) : aio_setup_single_vector(rw, req); if (ret) return ret; @@ -1196,23 +1245,36 @@ rw_common: req->ki_nbytes = ret; req->ki_left = ret; + if (current->aio_data) + goto aio_submit_task; + if (!rw_op) + return -EINVAL; ret = aio_rw_vect_retry(req, rw, rw_op); break; case IOCB_CMD_FDSYNC: - if (!file->f_op->aio_fsync) - return -EINVAL; - - ret = file->f_op->aio_fsync(req, 1); - break; - case IOCB_CMD_FSYNC: - if (!file->f_op->aio_fsync) - return -EINVAL; - - ret = file->f_op->aio_fsync(req, 0); + { + struct task_struct *task; + +aio_submit_task: + task = current->aio_data; + BUG_ON(task->aio_data != NULL); + if (task) { + //if (!file->f_op->fsync) + // return -EINVAL; + //printk("doing aio submit of fsync to %p\n", task); + current->aio_data = NULL; + task->aio_data = req; + wake_up_process(task); + ret = -EIOCBQUEUED; + } else { + if (!file->f_op->aio_fsync) + return -EINVAL; + ret = file->f_op->aio_fsync(req, req->ki_opcode == IOCB_CMD_FDSYNC); + } break; - + } default: pr_debug("EINVAL: no operation provided\n"); return -EINVAL; @@ -1232,6 +1294,155 @@ rw_common: return 0; } +static int aio_thread_cancel_fn(struct kiocb *iocb, struct io_event *event) +{ + struct task_struct *task = iocb->private; + + barrier(); + aio_put_req(iocb); + if (task == NULL) + return -EAGAIN; + force_sig(SIGSEGV, task); + return -EAGAIN; /* the cancelled iocb will generate a completion */ +} + +static int aio_thread_fn(void *data) +{ + kiocb_cancel_fn *cancel; + struct kiocb *iocb; + struct kioctx *ctx; + ssize_t ret; + +again: + iocb = current->aio_data; + //printk("%p: aio_thread_fn(%p) aio_data = %p pid = %d\n", + // current, ctx, iocb, current->pid); + current->aio_data = NULL; + + if (!iocb) { + printk("%p/%d: no iocb -- quitting!\n", current, current->pid); + return 0; + } + + ctx = iocb->ki_ctx; + use_mm(ctx->mm); + set_fs(USER_DS); + + iocb->private = current; + kiocb_set_cancel_fn(iocb, aio_thread_cancel_fn); + ret = -EINVAL; + + switch (iocb->ki_opcode) { + case IOCB_CMD_PREAD: + if (!iocb->ki_filp->f_op->read) + break; + ret = iocb->ki_filp->f_op->read(iocb->ki_filp, iocb->ki_buf, + iocb->ki_nbytes, &iocb->ki_pos); + break; + + case IOCB_CMD_PWRITE: + if (!iocb->ki_filp->f_op->write) + break; + ret = iocb->ki_filp->f_op->write(iocb->ki_filp, + iocb->ki_buf, + iocb->ki_nbytes, + &iocb->ki_pos); + break; + + case IOCB_CMD_FSYNC: + case IOCB_CMD_FDSYNC: + ret = iocb->ki_filp->f_op->fsync(iocb->ki_filp, 0, LLONG_MAX, + iocb->ki_opcode == IOCB_CMD_FDSYNC); + default: + break; + } + + cancel = cmpxchg(&iocb->ki_cancel, aio_thread_cancel_fn, NULL); + if (cancel == KIOCB_CANCELLED) { + printk("%p/%d: Cancel in progress iocb = %p\n", + current, current->pid, iocb); + set_current_state(TASK_INTERRUPTIBLE); + while (!signal_pending(current)) { + schedule(); + if (signal_pending(current)) + break; + set_current_state(TASK_INTERRUPTIBLE); + } + } else + BUG_ON(cancel != aio_thread_cancel_fn); + + if (signal_pending(current)) { + flush_signals(current); + printk("%p/%d: signals flushed for iocb = %p pending = %d\n", + current, current->pid, iocb, signal_pending(current)); + } + + set_current_state(TASK_INTERRUPTIBLE); + + spin_lock(&ctx->worker_lock); + list_add(¤t->aio_list, &ctx->worker_list); + spin_unlock(&ctx->worker_lock); + + printk("%p: aio_thread_fn(%p): ret = %ld\n", current, ctx, (long)ret); + if (ret != -EIOCBQUEUED) { + /* + * There's no easy way to restart the syscall since other AIO's + * may be already running. Just fail this IO with EINTR. + */ + if (unlikely(ret == -ERESTARTSYS || ret == -ERESTARTNOINTR || + ret == -ERESTARTNOHAND || ret == -ERESTART_RESTARTBLOCK)) + ret = -EINTR; + aio_complete(iocb, ret, 0); + } + + set_fs(KERNEL_DS); + unuse_mm(current->mm); + + if (current->aio_data) { + set_current_state(TASK_RUNNING); + goto again; + } + + schedule(); + if (current->aio_data) + goto again; + return 0; +} + +static int make_helper_thread(struct kioctx *ctx) +{ + struct task_struct *task; + char name[32]; + + if (current->aio_data) + return 0; + + spin_lock(&ctx->worker_lock); + if (!list_empty(&ctx->worker_list)) { + struct task_struct *task; + task = list_entry(ctx->worker_list.next, struct task_struct, + aio_list); + list_del(&task->aio_list); + spin_unlock(&ctx->worker_lock); + current->aio_data = task; + return 0; + } + spin_unlock(&ctx->worker_lock); + + snprintf(name, sizeof(name), "aio-helper-%d", current->pid); + //printk("creating %s\n", name); + + //percpu_ref_get(&ctx->users); + //task = kthread_create_on_cpu(aio_thread_fn, ctx, + // smp_processor_id(), name); + task = kthread_create(aio_thread_fn, NULL, name); + if (IS_ERR(task)) + return PTR_ERR(task); + + current->aio_data = task; + return 0; +} + static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, struct iocb *iocb, bool compat) { @@ -1285,6 +1496,10 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, goto out_put_req; } + ret = -ENOMEM; + if (make_helper_thread(ctx)) + goto out_put_req; + req->ki_obj.user = user_iocb; req->ki_user_data = iocb->aio_data; req->ki_pos = iocb->aio_offset; @@ -1292,8 +1507,11 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, req->ki_buf = (char __user *)(unsigned long)iocb->aio_buf; req->ki_left = req->ki_nbytes = iocb->aio_nbytes; req->ki_opcode = iocb->aio_lio_opcode; + req->ki_compat = compat; - ret = aio_run_iocb(req, compat); + current->in_aio_submit = 1; + ret = aio_run_iocb(req); + current->in_aio_submit = 0; if (ret) goto out_put_req; @@ -1480,3 +1698,5 @@ SYSCALL_DEFINE5(io_getevents, aio_context_t, ctx_id, asmlinkage_protect(5, ret, ctx_id, min_nr, nr, events, timeout); return ret; } + +/* foo */ diff --git a/fs/exec.c b/fs/exec.c index 83dde5b..68d3531 100644 --- a/fs/exec.c +++ b/fs/exec.c @@ -826,6 +826,13 @@ static int exec_mmap(struct mm_struct *mm) return -EINTR; } } + if (tsk->aio_data) { + struct task_struct *p = tsk->aio_data; + tsk->aio_data = NULL; + printk("exec_mmap: waking up aio helper %p\n", p); + wake_up_process(p); + } + task_lock(tsk); active_mm = tsk->active_mm; tsk->mm = mm; diff --git a/fs/ext3/ext3.h b/fs/ext3/ext3.h index e85ff15..a9bf6be 100644 --- a/fs/ext3/ext3.h +++ b/fs/ext3/ext3.h @@ -1020,6 +1020,7 @@ extern void ext3_htree_free_dir_info(struct dir_private_info *p); /* fsync.c */ extern int ext3_sync_file(struct file *, loff_t, loff_t, int); +extern int ext3_aio_sync_file(struct kiocb *, int); /* hash.c */ extern int ext3fs_dirhash(const char *name, int len, struct diff --git a/fs/ext3/file.c b/fs/ext3/file.c index 25cb413..aaf1668 100644 --- a/fs/ext3/file.c +++ b/fs/ext3/file.c @@ -62,6 +62,7 @@ const struct file_operations ext3_file_operations = { .open = dquot_file_open, .release = ext3_release_file, .fsync = ext3_sync_file, + .aio_fsync = ext3_aio_sync_file, .splice_read = generic_file_splice_read, .splice_write = generic_file_splice_write, }; diff --git a/fs/ext3/fsync.c b/fs/ext3/fsync.c index b31dbd4..c0828f3 100644 --- a/fs/ext3/fsync.c +++ b/fs/ext3/fsync.c @@ -24,6 +24,7 @@ #include #include +#include #include "ext3.h" /* @@ -103,3 +104,8 @@ out: trace_ext3_sync_file_exit(inode, ret); return ret; } + +int ext3_aio_sync_file(struct kiocb *iocb, int datasync) +{ + return ext3_sync_file(iocb->ki_filp, 0, LLONG_MAX, datasync); +} diff --git a/include/linux/aio.h b/include/linux/aio.h index a7e4c59..c2ac93f 100644 --- a/include/linux/aio.h +++ b/include/linux/aio.h @@ -54,6 +54,7 @@ struct kiocb { void *private; /* State that we remember to be able to restart/retry */ unsigned short ki_opcode; + unsigned short ki_compat; size_t ki_nbytes; /* copy of iocb->aio_nbytes */ char __user *ki_buf; /* remaining iocb->aio_buf */ size_t ki_left; /* remaining bytes */ diff --git a/include/linux/sched.h b/include/linux/sched.h index 13f2cf4..372c26c 100644 --- a/include/linux/sched.h +++ b/include/linux/sched.h @@ -1323,6 +1323,7 @@ struct task_struct { /* Revert to default priority/policy when forking */ unsigned sched_reset_on_fork:1; unsigned sched_contributes_to_load:1; + unsigned in_aio_submit:1; pid_t pid; pid_t tgid; @@ -1603,6 +1604,8 @@ struct task_struct { #ifdef CONFIG_UPROBES struct uprobe_task *utask; #endif + void *aio_data; + struct list_head aio_list; }; /* Future-safe accessor for struct task_struct's cpus_allowed. */ diff --git a/kernel/exit.c b/kernel/exit.c index b4df219..1e911c4 100644 --- a/kernel/exit.c +++ b/kernel/exit.c @@ -783,6 +783,14 @@ void do_exit(long code) tsk->exit_code = code; taskstats_exit(tsk, group_dead); + if (tsk->aio_data) { + printk("%p/%d: free_task: waking up aio thread %p at exit\n", + tsk, tsk->pid, tsk->aio_data); + wake_up_process(tsk->aio_data); + tsk->aio_data = NULL; + } + + exit_mm(tsk); if (group_dead) diff --git a/kernel/fork.c b/kernel/fork.c index 2741178..f51652f 100644 --- a/kernel/fork.c +++ b/kernel/fork.c @@ -207,6 +207,12 @@ static void account_kernel_stack(struct thread_info *ti, int account) void free_task(struct task_struct *tsk) { + if (current->aio_data) { + printk("%p/%d: free_task: waking up aio thread %p at exit\n", + tsk, tsk->pid, current->aio_data); + wake_up_process(current->aio_data); + current->aio_data = NULL; + } account_kernel_stack(tsk->stack, -1); arch_release_thread_info(tsk->stack); free_thread_info(tsk->stack); @@ -332,6 +338,7 @@ static struct task_struct *dup_task_struct(struct task_struct *orig) #endif tsk->splice_pipe = NULL; tsk->task_frag.page = NULL; + tsk->aio_data = NULL; account_kernel_stack(ti, 1); diff --git a/kernel/sched/core.c b/kernel/sched/core.c index 86490f2..d0447f6 100644 --- a/kernel/sched/core.c +++ b/kernel/sched/core.c @@ -2895,6 +2895,8 @@ static void __sched __schedule(void) struct rq *rq; int cpu; + WARN_ON(current->in_aio_submit); + need_resched: preempt_disable(); cpu = smp_processor_id();