From: Yu Kuai <yukuai3(a)huawei.com>
In certain environments, specific CPUs handle a large number of tasks
and become bottlenecks, affecting overall system performance. This
commit introduces a new feature that enables asynchronous I/O dispatch
to designated CPUs, thereby relieving the pressure on the busy CPUs.
Signed-off-by: Yu Kuai <yukuai3(a)huawei.com>
Signed-off-by: Li Nan <linan122(a)huawei.com>
---
 block/blk.h               |   7 ++
 include/linux/blk_types.h |   1 +
 include/linux/blkdev.h    |   8 ++
 block/blk-core.c          | 229 +++++++++++++++++++++++++++++++++++++-
 block/blk-mq-debugfs.c    |   1 +
 block/blk-sysfs.c         |  58 ++++++++++
 block/Kconfig             |  12 ++
 7 files changed, 315 insertions(+), 1 deletion(-)
diff --git a/block/blk.h b/block/blk.h
index 4bbcc971d4f7..5e7c00356ddc 100644
--- a/block/blk.h
+++ b/block/blk.h
@@ -450,4 +450,11 @@ int bio_add_hw_page(struct request_queue *q, struct bio *bio,
 		struct page *page, unsigned int len, unsigned int offset,
 		unsigned int max_sectors, bool *same_page);
 
+#ifdef CONFIG_BLK_BIO_DISPATCH_ASYNC
+void blk_free_queue_dispatch_async(struct request_queue *q);
+#else
+static inline void blk_free_queue_dispatch_async(struct request_queue *q)
+{
+}
+#endif
 #endif /* BLK_INTERNAL_H */
diff --git a/include/linux/blk_types.h b/include/linux/blk_types.h
index 1853ec569b72..865e1198e837 100644
--- a/include/linux/blk_types.h
+++ b/include/linux/blk_types.h
@@ -301,6 +301,7 @@ enum {
 				 * of this bio. */
 	BIO_CGROUP_ACCT,	/* has been accounted to a cgroup */
 	BIO_TRACKED,		/* set if bio goes through the rq_qos path */
+	BIO_ASYNC,
 	BIO_FLAG_LAST
 };
 
diff --git a/include/linux/blkdev.h b/include/linux/blkdev.h
index 50b4fd0a0687..453bae093b65 100644
--- a/include/linux/blkdev.h
+++ b/include/linux/blkdev.h
@@ -601,6 +601,12 @@ struct request_queue {
 #define BLK_MAX_WRITE_HINTS	5
 	u64			write_hints[BLK_MAX_WRITE_HINTS];
 
+
+#ifdef CONFIG_BLK_BIO_DISPATCH_ASYNC
+	/* used when QUEUE_FLAG_DISPATCH_ASYNC is set */
+	struct cpumask          dispatch_async_cpus;
+	int __percpu            *last_dispatch_cpu;
+#endif
 	KABI_REPLACE(unsigned long dtag_wait_time,
 		     struct blk_mq_tags *shared_sbitmap_tags)
 	KABI_RESERVE(1)
@@ -643,6 +649,8 @@ struct request_queue {
 #define QUEUE_FLAG_NOWAIT       29	/* device supports NOWAIT */
 /*at least one blk-mq hctx can't get driver tag */
 #define QUEUE_FLAG_HCTX_WAIT	30
+/* support to dispatch bio asynchronously */
+#define QUEUE_FLAG_DISPATCH_ASYNC 31
 
 #define QUEUE_FLAG_MQ_DEFAULT	((1 << QUEUE_FLAG_IO_STAT) |		\
 				 (1 << QUEUE_FLAG_SAME_COMP) |		\
diff --git a/block/blk-core.c b/block/blk-core.c
index f91f8e8be482..03375d28d664 100644
--- a/block/blk-core.c
+++ b/block/blk-core.c
@@ -87,6 +87,223 @@ struct kmem_cache *blk_requestq_cachep;
  */
 static struct workqueue_struct *kblockd_workqueue;
 
+#ifdef CONFIG_BLK_BIO_DISPATCH_ASYNC
+
+#define BIO_DISPATCH_MAX_LOOP 16
+
+struct async_bio {
+	struct bio_list list;
+	spinlock_t lock;
+} ____cacheline_aligned_in_smp;
+
+struct bio_dispatch_async_ctl {
+	/*
+	 * Vector size is nr_cpu_ids, list stores bio dispatched from other cpu,
+	 * such bio will be dispatched asynchronously to the cpu this structure
+	 * is serviced.
+	 */
+	struct async_bio	*bios;
+	/* kthread to handle bio dispatched from other cpu. */
+	struct task_struct	*thread;
+	wait_queue_head_t       wait;
+};
+
+static struct bio_dispatch_async_ctl __percpu *bio_dispatch_async_ctl;
+
+static int blk_alloc_queue_dispatch_async(struct request_queue *q)
+{
+	int cpu;
+
+	q->last_dispatch_cpu = alloc_percpu(int);
+	if (!q->last_dispatch_cpu)
+		return -ENOMEM;
+
+	cpumask_setall(&q->dispatch_async_cpus);
+	for_each_possible_cpu(cpu)
+		*per_cpu_ptr(q->last_dispatch_cpu, cpu) = cpu;
+
+	return 0;
+}
+
+void blk_free_queue_dispatch_async(struct request_queue *q)
+{
+	free_percpu(q->last_dispatch_cpu);
+	q->last_dispatch_cpu = NULL;
+}
+
+static int get_dispatch_cpu(struct request_queue *q, int cpu)
+{
+	int *last_dispatch_cpu =
+		per_cpu_ptr(q->last_dispatch_cpu, cpu);
+
+	cpu = cpumask_next(*last_dispatch_cpu, &q->dispatch_async_cpus);
+	if (cpu >= nr_cpu_ids)
+		cpu = cpumask_first(&q->dispatch_async_cpus);
+
+	*last_dispatch_cpu = cpu;
+
+	return cpu;
+}
+
+static bool __submit_bio_noacct_async(struct bio *bio)
+{
+	int cpu = smp_processor_id();
+	int dispatch_cpu = get_dispatch_cpu(bio->bi_disk->queue, cpu);
+	struct bio_dispatch_async_ctl *ctl;
+
+	if (dispatch_cpu >= nr_cpu_ids || dispatch_cpu == cpu)
+		return false;
+
+	ctl = per_cpu_ptr(bio_dispatch_async_ctl, dispatch_cpu);
+	spin_lock_irq(&ctl->bios[cpu].lock);
+	bio_list_add(&ctl->bios[cpu].list, bio);
+	spin_unlock_irq(&ctl->bios[cpu].lock);
+
+	if (wq_has_sleeper(&ctl->wait))
+		wake_up(&ctl->wait);
+
+	return true;
+}
+
+static bool submit_bio_noacct_async(struct bio *bio)
+{
+	struct request_queue *q;
+
+	if (bio_flagged(bio, BIO_ASYNC))
+		return false;
+
+	bio_set_flag(bio, BIO_ASYNC);
+	/*
+	 * Don't dispatch bio asynchronously in following cases:
+	 *
+	 * - QUEUE_FLAG_DISPATCH_ASYNC is not set;
+	 * - current cpu is the target cpu;
+	 * - bio is flagged no wait;
+	 * - io polling is enabled;
+	 */
+	q = bio->bi_disk->queue;
+	if (!test_bit(QUEUE_FLAG_DISPATCH_ASYNC, &q->queue_flags) ||
+	    test_bit(QUEUE_FLAG_POLL, &q->queue_flags) ||
+	    cpumask_test_cpu(smp_processor_id(), &q->dispatch_async_cpus) ||
+	    bio->bi_opf & REQ_NOWAIT)
+		return false;
+
+	return __submit_bio_noacct_async(bio);
+}
+
+static bool collect_bio(struct bio_dispatch_async_ctl *ctl,
+			struct bio_list *list)
+{
+	bool has_bio = false;
+	int cpu;
+
+	for_each_possible_cpu(cpu) {
+		struct async_bio *abio = &ctl->bios[cpu];
+
+		if (bio_list_empty(&abio->list))
+			continue;
+
+		has_bio = true;
+
+		spin_lock_irq(&abio->lock);
+		bio_list_merge(list, &abio->list);
+		bio_list_init(&abio->list);
+		spin_unlock_irq(&abio->lock);
+	}
+
+	return has_bio;
+}
+
+static int bio_dispatch_work(void *data)
+{
+	int loop_count = 0;
+	struct bio_list bio_list_on_stack;
+	struct blk_plug plug;
+	struct bio_dispatch_async_ctl *ctl;
+
+	bio_list_init(&bio_list_on_stack);
+	ctl = this_cpu_ptr(bio_dispatch_async_ctl);
+
+	for (;; loop_count++) {
+		struct bio *bio;
+		bool has_bio = collect_bio(ctl, &bio_list_on_stack);
+
+		if (!has_bio) {
+			DEFINE_WAIT(wait);
+
+			for (;;) {
+				prepare_to_wait(&ctl->wait, &wait,
+						TASK_INTERRUPTIBLE);
+				has_bio = collect_bio(ctl, &bio_list_on_stack);
+				if (has_bio)
+					break;
+				schedule();
+				loop_count = 0;
+			}
+			finish_wait(&ctl->wait, &wait);
+		}
+
+		blk_start_plug(&plug);
+		while ((bio = bio_list_pop(&bio_list_on_stack)))
+			submit_bio_noacct(bio);
+		blk_finish_plug(&plug);
+
+		/* prevent soft lockup. */
+		if (loop_count >= BIO_DISPATCH_MAX_LOOP) {
+			loop_count = 0;
+			cond_resched();
+		}
+	}
+
+	return 0;
+}
+
+static void init_blk_queue_async_dispatch(void)
+{
+	int cpu;
+
+	bio_dispatch_async_ctl = alloc_percpu(struct bio_dispatch_async_ctl);
+	if (!bio_dispatch_async_ctl)
+		panic("Failed to alloc bio_dispatch_async_ctl\n");
+
+	for_each_possible_cpu(cpu) {
+		int i;
+		struct bio_dispatch_async_ctl *ctl =
+			per_cpu_ptr(bio_dispatch_async_ctl, cpu);
+
+		init_waitqueue_head(&ctl->wait);
+		ctl->bios = kmalloc_array(nr_cpu_ids, sizeof(struct async_bio),
+					  GFP_KERNEL | __GFP_NOFAIL);
+		for (i = 0; i < nr_cpu_ids; ++i) {
+			bio_list_init(&ctl->bios[i].list);
+			spin_lock_init(&ctl->bios[i].lock);
+		}
+
+		ctl->thread =
+			kthread_create_on_cpu(bio_dispatch_work, NULL, cpu,
+					      "bio_dispatch_work_%u");
+		if (IS_ERR_OR_NULL(ctl->thread))
+			panic("Failed to create bio dispatch thread\n");
+
+		wake_up_process(ctl->thread);
+	}
+}
+#else
+static int blk_alloc_queue_dispatch_async(struct request_queue *q)
+{
+	return 0;
+}
+
+static bool submit_bio_noacct_async(struct bio *bio)
+{
+	return false;
+}
+
+static void init_blk_queue_async_dispatch(void)
+{
+}
+#endif
+
 /**
  * blk_queue_flag_set - atomically set a queue flag
  * @flag: flag to be set
@@ -539,9 +756,12 @@ struct request_queue *blk_alloc_queue(int node_id)
 
 	q->last_merge = NULL;
 
+	if (blk_alloc_queue_dispatch_async(q))
+		goto fail_q;
+
 	q->id = ida_simple_get(&blk_queue_ida, 0, 0, GFP_KERNEL);
 	if (q->id < 0)
-		goto fail_q;
+		goto fail_dispatch_async;
 
 	ret = bioset_init(&q->bio_split, BIO_POOL_SIZE, 0, BIOSET_NEED_BVECS);
 	if (ret)
@@ -606,6 +826,8 @@ struct request_queue *blk_alloc_queue(int node_id)
 	bioset_exit(&q->bio_split);
 fail_id:
 	ida_simple_remove(&blk_queue_ida, q->id);
+fail_dispatch_async:
+	blk_free_queue_dispatch_async(q);
 fail_q:
 	kmem_cache_free(blk_requestq_cachep, q);
 	return NULL;
@@ -1055,6 +1277,9 @@ static blk_qc_t __submit_bio_noacct_mq(struct bio *bio)
  */
 blk_qc_t submit_bio_noacct(struct bio *bio)
 {
+	if (submit_bio_noacct_async(bio))
+		return BLK_QC_T_NONE;
+
 	if (!submit_bio_checks(bio))
 		return BLK_QC_T_NONE;
 
@@ -1905,5 +2130,7 @@ int __init blk_dev_init(void)
 
 	blk_debugfs_root = debugfs_create_dir("block", NULL);
 
+	init_blk_queue_async_dispatch();
+
 	return 0;
 }
diff --git a/block/blk-mq-debugfs.c b/block/blk-mq-debugfs.c
index a879f94782e4..b5b17c6ee650 100644
--- a/block/blk-mq-debugfs.c
+++ b/block/blk-mq-debugfs.c
@@ -131,6 +131,7 @@ static const char *const blk_queue_flag_name[] = {
 	QUEUE_FLAG_NAME(RQ_ALLOC_TIME),
 	QUEUE_FLAG_NAME(HCTX_ACTIVE),
 	QUEUE_FLAG_NAME(NOWAIT),
+	QUEUE_FLAG_NAME(DISPATCH_ASYNC),
 };
 #undef QUEUE_FLAG_NAME
 
diff --git a/block/blk-sysfs.c b/block/blk-sysfs.c
index c95be9626a09..e363222858b4 100644
--- a/block/blk-sysfs.c
+++ b/block/blk-sysfs.c
@@ -288,6 +288,9 @@ QUEUE_SYSFS_BIT_FNS(nonrot, NONROT, 1);
 QUEUE_SYSFS_BIT_FNS(random, ADD_RANDOM, 0);
 QUEUE_SYSFS_BIT_FNS(iostats, IO_STAT, 0);
 QUEUE_SYSFS_BIT_FNS(stable_writes, STABLE_WRITES, 0);
+#ifdef CONFIG_BLK_BIO_DISPATCH_ASYNC
+QUEUE_SYSFS_BIT_FNS(dispatch_async, DISPATCH_ASYNC, 0);
+#endif
 #undef QUEUE_SYSFS_BIT_FNS
 
 static ssize_t queue_zoned_show(struct request_queue *q, char *page)
@@ -619,6 +622,56 @@ QUEUE_RW_ENTRY(queue_iostats, "iostats");
 QUEUE_RW_ENTRY(queue_random, "add_random");
 QUEUE_RW_ENTRY(queue_stable_writes, "stable_writes");
 
+#ifdef CONFIG_BLK_BIO_DISPATCH_ASYNC
+
+static ssize_t queue_dispatch_async_cpus_show(struct request_queue *q,
+					      char *page)
+{
+
+	return sprintf(page, "%*pb\n", nr_cpu_ids,
+		       cpumask_bits(&q->dispatch_async_cpus));
+}
+
+static ssize_t queue_dispatch_async_cpus_store(struct request_queue *q,
+					       const char *page, size_t count)
+{
+	cpumask_var_t cpumask;
+	ssize_t ret;
+
+	if (!alloc_cpumask_var(&cpumask, GFP_KERNEL))
+		return -ENOMEM;
+
+	ret = bitmap_parse(page, count, cpumask_bits(cpumask),
+			   nr_cpumask_bits);
+	if (ret < 0)
+		goto out;
+
+	if (cpumask_empty(cpumask) ||
+	    !cpumask_subset(cpumask, cpu_online_mask))
+		return -EINVAL;
+
+	blk_mq_freeze_queue(q);
+	blk_mq_quiesce_queue(q);
+
+	cpumask_copy(&q->dispatch_async_cpus, cpumask);
+
+	blk_mq_unquiesce_queue(q);
+	blk_mq_unfreeze_queue(q);
+	ret = count;
+out:
+	free_cpumask_var(cpumask);
+	return ret;
+}
+
+static struct queue_sysfs_entry queue_dispatch_async_cpus_entry = {
+	.attr = {.name = "dispatch_async_cpus", .mode = 0644 },
+	.show = queue_dispatch_async_cpus_show,
+	.store = queue_dispatch_async_cpus_store,
+};
+
+QUEUE_RW_ENTRY(queue_dispatch_async, "dispatch_async");
+#endif
+
 static struct attribute *queue_attrs[] = {
 	&queue_requests_entry.attr,
 	&queue_ra_entry.attr,
@@ -659,6 +712,10 @@ static struct attribute *queue_attrs[] = {
 	&queue_wb_lat_entry.attr,
 	&queue_poll_delay_entry.attr,
 	&queue_io_timeout_entry.attr,
+#ifdef CONFIG_BLK_BIO_DISPATCH_ASYNC
+	&queue_dispatch_async_cpus_entry.attr,
+	&queue_dispatch_async_entry.attr,
+#endif
 #ifdef CONFIG_BLK_DEV_THROTTLING_LOW
 	&blk_throtl_sample_time_entry.attr,
 #endif
@@ -795,6 +852,7 @@ static void blk_release_queue(struct kobject *kobj)
 		blk_stat_remove_callback(q, q->poll_cb);
 	blk_stat_free_callback(q->poll_cb);
 
+	blk_free_queue_dispatch_async(q);
 	blk_free_queue_stats(q->stats);
 
 	blk_exit_queue(q);
diff --git a/block/Kconfig b/block/Kconfig
index e5c965f1ea25..bda77b006352 100644
--- a/block/Kconfig
+++ b/block/Kconfig
@@ -209,6 +209,18 @@ config BLK_DEV_DUMPINFO
 	  Dump info when open an write opened block device exclusively
 	  or open an exclusive opened device for write
 
+config BLK_BIO_DISPATCH_ASYNC
+	bool "Dispatch bios asynchronously on specific cpus"
+	default n
+	help
+	If there are multiple nodes, memory access across nodes is rather bad
+	compare to local node. And if some drivers are using internal spin
+	locks, io performance will be bad if bios are issued concurrently from
+	different nodes. This feature will dispatch bio asynchronously to the
+	specific CPUs to avoid across nodes memory access in driver, noted this
+	feature will require special care in the driver to work. If unsure,
+	say N here.
+
 menu "Partition Types"
 
 source "block/partitions/Kconfig"
-- 
2.39.2