From: Jens Axboe axboe@kernel.dk
mainline inclusion from mainline-5.6-rc1 commit e94f141bd248ebdadcb7351f1e70b31cee5add53 category: feature bugzilla: https://bugzilla.openeuler.org/show_bug.cgi?id=27 CVE: NA ---------------------------
For busy IORING_OP_POLL_ADD workloads, we can have enough contention on the completion lock that we fail the inline completion path quite often as we fail the trylock on that lock. Add a list for deferred completions that we can use in that case. This helps reduce the number of async offloads we have to do, as if we get multiple completions in a row, we'll piggy back on to the poll_llist instead of having to queue our own offload.
Signed-off-by: Jens Axboe axboe@kernel.dk Signed-off-by: yangerkun yangerkun@huawei.com Reviewed-by: zhangyi (F) yi.zhang@huawei.com Signed-off-by: Cheng Jian cj.chengjian@huawei.com --- fs/io_uring.c | 108 ++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 88 insertions(+), 20 deletions(-)
diff --git a/fs/io_uring.c b/fs/io_uring.c index 44a0166f7d85..c96694d7b0fb 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -286,7 +286,8 @@ struct io_ring_ctx {
struct { spinlock_t completion_lock; - bool poll_multi_file; + struct llist_head poll_llist; + /* * ->poll_list is protected by the ctx->uring_lock for * io_uring instances that don't use IORING_SETUP_SQPOLL. @@ -296,6 +297,7 @@ struct io_ring_ctx { struct list_head poll_list; struct hlist_head *cancel_hash; unsigned cancel_hash_bits; + bool poll_multi_file;
spinlock_t inflight_lock; struct list_head inflight_list; @@ -453,7 +455,14 @@ struct io_kiocb { };
struct io_async_ctx *io; - struct file *ring_file; + union { + /* + * ring_file is only used in the submission path, and + * llist_node is only used for poll deferred completions + */ + struct file *ring_file; + struct llist_node llist_node; + }; int ring_fd; bool has_user; bool in_async; @@ -724,6 +733,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) mutex_init(&ctx->uring_lock); init_waitqueue_head(&ctx->wait); spin_lock_init(&ctx->completion_lock); + init_llist_head(&ctx->poll_llist); INIT_LIST_HEAD(&ctx->poll_list); INIT_LIST_HEAD(&ctx->defer_list); INIT_LIST_HEAD(&ctx->timeout_list); @@ -1319,6 +1329,20 @@ static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx) return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head; }
+static inline bool io_req_multi_free(struct io_kiocb *req) +{ + /* + * If we're not using fixed files, we have to pair the completion part + * with the file put. Use regular completions for those, only batch + * free for fixed file and non-linked commands. + */ + if (((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) == REQ_F_FIXED_FILE) + && !io_is_fallback_req(req) && !req->io) + return true; + + return false; +} + /* * Find and free completed poll iocbs */ @@ -1338,14 +1362,7 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events, (*nr_events)++;
if (refcount_dec_and_test(&req->refs)) { - /* If we're not using fixed files, we have to pair the - * completion part with the file put. Use regular - * completions for those, only batch free for fixed - * file and non-linked commands. - */ - if (((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) == - REQ_F_FIXED_FILE) && !io_is_fallback_req(req) && - !req->io) { + if (io_req_multi_free(req)) { reqs[to_free++] = req; if (to_free == ARRAY_SIZE(reqs)) io_free_req_many(ctx, reqs, &to_free); @@ -3078,6 +3095,44 @@ static void io_poll_complete_work(struct io_wq_work **workptr) io_wq_assign_next(workptr, nxt); }
+static void __io_poll_flush(struct io_ring_ctx *ctx, struct llist_node *nodes) +{ + void *reqs[IO_IOPOLL_BATCH]; + struct io_kiocb *req, *tmp; + int to_free = 0; + + spin_lock_irq(&ctx->completion_lock); + llist_for_each_entry_safe(req, tmp, nodes, llist_node) { + hash_del(&req->hash_node); + io_poll_complete(req, req->result, 0); + + if (refcount_dec_and_test(&req->refs)) { + if (io_req_multi_free(req)) { + reqs[to_free++] = req; + if (to_free == ARRAY_SIZE(reqs)) + io_free_req_many(ctx, reqs, &to_free); + } else { + req->flags |= REQ_F_COMP_LOCKED; + io_free_req(req); + } + } + } + spin_unlock_irq(&ctx->completion_lock); + + io_cqring_ev_posted(ctx); + io_free_req_many(ctx, reqs, &to_free); +} + +static void io_poll_flush(struct io_wq_work **workptr) +{ + struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work); + struct llist_node *nodes; + + nodes = llist_del_all(&req->ctx->poll_llist); + if (nodes) + __io_poll_flush(req->ctx, nodes); +} + static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, void *key) { @@ -3085,7 +3140,6 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, struct io_kiocb *req = container_of(poll, struct io_kiocb, poll); struct io_ring_ctx *ctx = req->ctx; __poll_t mask = key_to_poll(key); - unsigned long flags;
/* for instances that support it check for an event match first: */ if (mask && !(mask & poll->events)) @@ -3099,17 +3153,31 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, * If we have a link timeout we're going to need the completion_lock * for finalizing the request, mark us as having grabbed that already. */ - if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) { - hash_del(&req->hash_node); - io_poll_complete(req, mask, 0); - req->flags |= REQ_F_COMP_LOCKED; - io_put_req(req); - spin_unlock_irqrestore(&ctx->completion_lock, flags); + if (mask) { + unsigned long flags;
- io_cqring_ev_posted(ctx); - } else { - io_queue_async_work(req); + if (llist_empty(&ctx->poll_llist) && + spin_trylock_irqsave(&ctx->completion_lock, flags)) { + hash_del(&req->hash_node); + io_poll_complete(req, mask, 0); + req->flags |= REQ_F_COMP_LOCKED; + io_put_req(req); + spin_unlock_irqrestore(&ctx->completion_lock, flags); + + io_cqring_ev_posted(ctx); + req = NULL; + } else { + req->result = mask; + req->llist_node.next = NULL; + /* if the list wasn't empty, we're done */ + if (!llist_add(&req->llist_node, &ctx->poll_llist)) + req = NULL; + else + req->work.func = io_poll_flush; + } } + if (req) + io_queue_async_work(req);
return 1; }