[AIO] add cancellation support to aio_down() Now that kiocbs serialise the cancel and retry paths, add support for cancelling a aio_down() operation that is still blocked by undoing the aio_down() steps that were already completed. Signed-off-by: Benjamin LaHaise diff -purN --exclude=description 55_iocb_intr/fs/pipe.c 60_pipe_aio/fs/pipe.c --- 55_iocb_intr/fs/pipe.c 2005-06-20 13:33:32.000000000 -0400 +++ 60_pipe_aio/fs/pipe.c 2005-06-29 18:09:49.000000000 -0400 @@ -46,6 +46,56 @@ void pipe_wait(struct inode * inode) down(PIPE_SEM(*inode)); } +static int pipe_aio_waiter(wait_queue_t *wait, unsigned mode, int sync, + void *key) +{ + struct kiocb *iocb = io_wait_to_kiocb(wait); + + printk("pipe_aio_waiter(%p)\n", iocb); + list_del_init(&wait->task_list); + iocb->ki_cancel = NULL; /* We're removed from the wait queue, so our + * cancellation code no longer applies. + */ + kick_iocb(iocb); + return 1; +} + +static int pipe_aio_cancel(struct kiocb *kiocb, struct io_event *event) +{ + struct inode *inode = kiocb->ki_filp->f_dentry->d_inode; + wait_queue_head_t *wq = PIPE_WAIT(*inode); + int ret = 0; + + spin_lock_irq(&wq->lock); + printk("pipe_aio_cancel(%p)\n", kiocb); + if (kiocb->ki_cancel == pipe_aio_cancel) { + printk("pipe_aio_cancel: success\n"); + kiocb->ki_cancel = NULL; + list_del_init(&kiocb->ki_wait.task_list); + if (event) { + event->res = -EINTR; + event->res2 = 0; + } + } else { + printk("pipe_aio_cancel: eagain\n"); + ret = -EAGAIN; + } + spin_unlock_irq(&wq->lock); + return ret; +} + +static long pipe_aio_wait(struct kiocb *kiocb, struct inode *inode) +{ + kiocb->ki_wait.func = pipe_aio_waiter; + kiocb->ki_cancel = pipe_aio_cancel; + add_wait_queue(PIPE_WAIT(*inode), &kiocb->ki_wait); + aio_up(kiocb, PIPE_SEM(*inode)); + kiocbSetIntr(kiocb); + printk("pipe_aio_wait(%p, %p) cancel = %p\n", kiocb, inode, + kiocb->ki_cancel); + return -EIOCBRETRY; +} + static inline int pipe_iov_copy_from_user(void *to, struct iovec *iov, unsigned long len) { @@ -114,10 +164,18 @@ static struct pipe_buf_operations anon_p .release = anon_pipe_buf_release, }; +static void dummy_printk(const char *fmt, ...) +{ + return; +} + static ssize_t -pipe_readv(struct file *filp, const struct iovec *_iov, - unsigned long nr_segs, loff_t *ppos) +pipe_aio_read(struct kiocb *kiocb, char __user *buf, size_t len, loff_t pos) { + struct iovec _iov[] = {{ .iov_base = (void __user *)buf, .iov_len = len }}; + unsigned long nr_segs = 1; + struct file *filp = kiocb->ki_filp; + struct inode *inode = filp->f_dentry->d_inode; struct pipe_inode_info *info; int do_wakeup; @@ -125,14 +183,31 @@ pipe_readv(struct file *filp, const stru struct iovec *iov = (struct iovec *)_iov; size_t total_len; +printk("pipe_aio_read(%p %p) cancel = %p\n", kiocb, buf, kiocb->ki_cancel); + /* In retries we need to remove ourself from the wait queue at this + * point. Checking ki_cancel is a convenient way of checking for + * this case, as we clear the cancel operation when the iocb is + * removed from the wait queue. + */ + if (kiocb->ki_cancel == pipe_aio_cancel) { + if (!is_sync_kiocb(kiocb)) + printk("pipe_aio_read(%p) -- removing cancel\n", kiocb); + pipe_aio_cancel(kiocb, NULL); + } + total_len = iov_length(iov, nr_segs); /* Null read succeeds. */ if (unlikely(total_len == 0)) return 0; do_wakeup = 0; - ret = 0; - down(PIPE_SEM(*inode)); + ret = aio_down(kiocb, PIPE_SEM(*inode)); + if (ret) { + if (!is_sync_kiocb(kiocb)) + printk("pipe_aio_read(%p) -- aio_down early ret\n", kiocb); + return ret; + } + info = inode->i_pipe; for (;;) { int bufs = info->nrbufs; @@ -186,7 +261,7 @@ pipe_readv(struct file *filp, const stru break; } } - if (signal_pending(current)) { + if (is_sync_kiocb(kiocb) && signal_pending(current)) { if (!ret) ret = -ERESTARTSYS; break; } @@ -194,9 +269,9 @@ pipe_readv(struct file *filp, const stru wake_up_interruptible_sync(PIPE_WAIT(*inode)); kill_fasync(PIPE_FASYNC_WRITERS(*inode), SIGIO, POLL_OUT); } - pipe_wait(inode); + return pipe_aio_wait(kiocb, inode); } - up(PIPE_SEM(*inode)); + aio_up(kiocb, PIPE_SEM(*inode)); /* Signal writers asynchronously that there is more room. */ if (do_wakeup) { wake_up_interruptible(PIPE_WAIT(*inode)); @@ -204,20 +279,17 @@ pipe_readv(struct file *filp, const stru } if (ret > 0) file_accessed(filp); +dummy_printk("pipe_aio_read(%p %p) done = %ld\n", kiocb, buf, (long)ret); return ret; } static ssize_t -pipe_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos) +pipe_aio_write(struct kiocb *kiocb, const char __user *buf, size_t len, loff_t pos) { - struct iovec iov = { .iov_base = buf, .iov_len = count }; - return pipe_readv(filp, &iov, 1, ppos); -} + struct iovec _iov[] = {{ .iov_base = (void __user *)buf, .iov_len = len }}; + unsigned long nr_segs = 1; -static ssize_t -pipe_writev(struct file *filp, const struct iovec *_iov, - unsigned long nr_segs, loff_t *ppos) -{ + struct file *filp = kiocb->ki_filp; struct inode *inode = filp->f_dentry->d_inode; struct pipe_inode_info *info; ssize_t ret; @@ -226,18 +298,39 @@ pipe_writev(struct file *filp, const str size_t total_len; ssize_t chars; +printk("pipe_aio_write(%p)\n", kiocb); total_len = iov_length(iov, nr_segs); /* Null write succeeds. */ if (unlikely(total_len == 0)) return 0; + /* In retries we need to remove ourself from the wait queue at this + * point. Checking ki_cancel is a convenient way of checking for + * this case, as we clear the cancel operation when the iocb is + * removed from the wait queue. + */ + if (kiocb->ki_cancel == pipe_aio_cancel) + pipe_aio_cancel(kiocb, NULL); + do_wakeup = 0; - ret = 0; - down(PIPE_SEM(*inode)); + ret = aio_down(kiocb, PIPE_SEM(*inode)); + if (ret) + return ret; + + /* Undo the WRITERS++ done below where we are queued. We use + * kiocb->private to flag if we were waiting, as the higher layers + * initialize it to NULL at the beginning of a request's life. + */ + if (kiocb->private != NULL) { + PIPE_WAITING_WRITERS(*inode)--; + kiocb->private = NULL; + } + info = inode->i_pipe; if (!PIPE_READERS(*inode)) { - send_sig(SIGPIPE, current, 0); + if (is_sync_kiocb(kiocb)) + send_sig(SIGPIPE, current, 0); ret = -EPIPE; goto out; } @@ -267,8 +360,10 @@ pipe_writev(struct file *filp, const str for (;;) { int bufs; + if (!PIPE_READERS(*inode)) { - send_sig(SIGPIPE, current, 0); + if (is_sync_kiocb(kiocb)) + send_sig(SIGPIPE, current, 0); if (!ret) ret = -EPIPE; break; } @@ -323,7 +418,7 @@ pipe_writev(struct file *filp, const str if (!ret) ret = -EAGAIN; break; } - if (signal_pending(current)) { + if (is_sync_kiocb(kiocb) && signal_pending(current)) { if (!ret) ret = -ERESTARTSYS; break; } @@ -333,20 +428,22 @@ pipe_writev(struct file *filp, const str do_wakeup = 0; } PIPE_WAITING_WRITERS(*inode)++; - pipe_wait(inode); - PIPE_WAITING_WRITERS(*inode)--; + kiocb->private = kiocb; /* Flag for retry. */ + return pipe_aio_wait(kiocb, inode); } out: - up(PIPE_SEM(*inode)); + aio_up(kiocb, PIPE_SEM(*inode)); if (do_wakeup) { wake_up_interruptible(PIPE_WAIT(*inode)); kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN); } if (ret > 0) inode_update_time(inode, 1); /* mtime and ctime */ +dummy_printk("pipe_aio_write(%p %p) done = %ld\n", kiocb, buf, (long)ret); return ret; } +#if 0 static ssize_t pipe_write(struct file *filp, const char __user *buf, size_t count, loff_t *ppos) @@ -354,6 +451,7 @@ pipe_write(struct file *filp, const char struct iovec iov = { .iov_base = (void __user *)buf, .iov_len = count }; return pipe_writev(filp, &iov, 1, ppos); } +#endif static ssize_t bad_pipe_r(struct file *filp, char __user *buf, size_t count, loff_t *ppos) @@ -362,11 +460,23 @@ bad_pipe_r(struct file *filp, char __use } static ssize_t +bad_pipe_aio_r(struct kiocb *iocb, char __user *buf, size_t count, loff_t pos) +{ + return -EBADF; +} + +static ssize_t bad_pipe_w(struct file *filp, const char __user *buf, size_t count, loff_t *ppos) { return -EBADF; } +static ssize_t +bad_pipe_aio_w(struct kiocb *iocb, const char __user *buf, size_t count, loff_t pos) +{ + return -EBADF; +} + static int pipe_ioctl(struct inode *pino, struct file *filp, unsigned int cmd, unsigned long arg) @@ -565,8 +675,8 @@ pipe_rdwr_open(struct inode *inode, stru */ struct file_operations read_fifo_fops = { .llseek = no_llseek, - .read = pipe_read, - .readv = pipe_readv, + .read = do_sync_read, + .aio_read = pipe_aio_read, .write = bad_pipe_w, .poll = fifo_poll, .ioctl = pipe_ioctl, @@ -578,8 +688,9 @@ struct file_operations read_fifo_fops = struct file_operations write_fifo_fops = { .llseek = no_llseek, .read = bad_pipe_r, - .write = pipe_write, - .writev = pipe_writev, + .write = do_sync_write, + .aio_read = bad_pipe_aio_r, + .aio_write = pipe_aio_write, .poll = fifo_poll, .ioctl = pipe_ioctl, .open = pipe_write_open, @@ -589,10 +700,10 @@ struct file_operations write_fifo_fops = struct file_operations rdwr_fifo_fops = { .llseek = no_llseek, - .read = pipe_read, - .readv = pipe_readv, - .write = pipe_write, - .writev = pipe_writev, + .read = do_sync_read, + .write = do_sync_write, + .aio_read = pipe_aio_read, + .aio_write = pipe_aio_write, .poll = fifo_poll, .ioctl = pipe_ioctl, .open = pipe_rdwr_open, @@ -602,9 +713,10 @@ struct file_operations rdwr_fifo_fops = struct file_operations read_pipe_fops = { .llseek = no_llseek, - .read = pipe_read, - .readv = pipe_readv, + .read = do_sync_read, .write = bad_pipe_w, + .aio_read = pipe_aio_read, + .aio_write = bad_pipe_aio_w, .poll = pipe_poll, .ioctl = pipe_ioctl, .open = pipe_read_open, @@ -615,8 +727,9 @@ struct file_operations read_pipe_fops = struct file_operations write_pipe_fops = { .llseek = no_llseek, .read = bad_pipe_r, - .write = pipe_write, - .writev = pipe_writev, + .write = do_sync_write, + .aio_read = bad_pipe_aio_r, + .aio_write = pipe_aio_write, .poll = pipe_poll, .ioctl = pipe_ioctl, .open = pipe_write_open, @@ -626,10 +739,10 @@ struct file_operations write_pipe_fops = struct file_operations rdwr_pipe_fops = { .llseek = no_llseek, - .read = pipe_read, - .readv = pipe_readv, - .write = pipe_write, - .writev = pipe_writev, + .read = do_sync_read, + .write = do_sync_write, + .aio_read = pipe_aio_read, + .aio_write = pipe_aio_write, .poll = pipe_poll, .ioctl = pipe_ioctl, .open = pipe_rdwr_open, diff -purN --exclude=description 55_iocb_intr/fs/read_write.c 60_pipe_aio/fs/read_write.c --- 55_iocb_intr/fs/read_write.c 2005-06-23 17:31:01.000000000 -0400 +++ 60_pipe_aio/fs/read_write.c 2005-06-29 13:16:16.000000000 -0400 @@ -203,16 +203,32 @@ Einval: return -EINVAL; } -static void wait_on_retry_sync_kiocb(struct kiocb *iocb) +static long wait_on_retry_sync_kiocb(struct kiocb *iocb) { + long ret = 0; set_current_state(kiocbIsIntr(iocb) ? TASK_INTERRUPTIBLE : TASK_UNINTERRUPTIBLE); if (!kiocbIsKicked(iocb)) schedule(); else kiocbClearKicked(iocb); + + /* If we were interrupted by a signal, issue a cancel to allow the + * operation to clean up. + */ + if (kiocbIsIntr(iocb) && signal_pending(current) && iocb->ki_cancel) { + struct io_event dummy_event; + printk("iocb(%p) was interrupted and has a cancel\n", iocb); + if (!iocb->ki_cancel(iocb, &dummy_event)) { + ret = dummy_event.res; + printk("cancel success(%ld)\n", (long)ret); + } + printk("complete(%p)\n", iocb); + } else if (kiocbIsIntr(iocb) && signal_pending(current)) + printk("iocb(%p) interrupted, no cancel\n", iocb); kiocbClearIntr(iocb); __set_current_state(TASK_RUNNING); + return ret; } ssize_t do_sync_read(struct file *filp, char __user *buf, size_t len, loff_t *ppos) @@ -220,15 +236,21 @@ ssize_t do_sync_read(struct file *filp, struct kiocb kiocb; ssize_t ret; + memset(&kiocb, 0, sizeof kiocb); init_sync_kiocb(&kiocb, filp); kiocb.ki_pos = *ppos; while (-EIOCBRETRY == - (ret = filp->f_op->aio_read(&kiocb, buf, len, kiocb.ki_pos))) - wait_on_retry_sync_kiocb(&kiocb); + (ret = filp->f_op->aio_read(&kiocb, buf, len, kiocb.ki_pos))) { + ret = wait_on_retry_sync_kiocb(&kiocb); + if (ret) + break; + } if (-EIOCBQUEUED == ret) ret = wait_on_sync_kiocb(&kiocb); + BUG_ON(!list_empty(&kiocb.ki_wait.task_list)); *ppos = kiocb.ki_pos; + memset(&kiocb, 0x24, sizeof kiocb); return ret; } @@ -271,15 +293,21 @@ ssize_t do_sync_write(struct file *filp, struct kiocb kiocb; ssize_t ret; + memset(&kiocb, 0, sizeof kiocb); init_sync_kiocb(&kiocb, filp); kiocb.ki_pos = *ppos; while (-EIOCBRETRY == - (ret = filp->f_op->aio_write(&kiocb, buf, len, kiocb.ki_pos))) - wait_on_retry_sync_kiocb(&kiocb); + (ret = filp->f_op->aio_write(&kiocb, buf, len, kiocb.ki_pos))) { + ret = wait_on_retry_sync_kiocb(&kiocb); + if (ret) + break; + } if (-EIOCBQUEUED == ret) ret = wait_on_sync_kiocb(&kiocb); + BUG_ON(!list_empty(&kiocb.ki_wait.task_list)); *ppos = kiocb.ki_pos; + memset(&kiocb, 0x25, sizeof kiocb); return ret; } diff -purN --exclude=description 55_iocb_intr/include/linux/aio.h 60_pipe_aio/include/linux/aio.h --- 55_iocb_intr/include/linux/aio.h 2005-06-23 17:32:12.000000000 -0400 +++ 60_pipe_aio/include/linux/aio.h 2005-06-28 18:01:48.000000000 -0400 @@ -95,6 +95,7 @@ struct kiocb { (x)->ki_obj.tsk = tsk; \ (x)->ki_user_data = 0; \ init_wait((&(x)->ki_wait)); \ + (x)->private = NULL; \ } while (0) #define AIO_RING_MAGIC 0xa10a10a1