From: Xinyu Zheng <zhengxinyu6@huawei.com> hulk inclusion category: feature bugzilla: https://gitee.com/openeuler/release-management/issues/ID5CMS -------------------------------- This kernel module is targeted for redis scenario. Each redis server process's 0~99 fd number can be speedup by prefetch read. In this kernel module, we hijacked 5 syscalls to implement this feature, which are epoll_create/epoll_ctl/epoll_wait/read/close. And there are two new procfs interface, /proc/xcall_feature/cpu_list and /proc/xcall_feature/prefetch. You can set the prefetch read thread running on which CPUs through CPU_LIST procfs, and read the prefetch hit ratio by PREFETCH procfs. Signed-off-by: Xinyu Zheng <zhengxinyu6@huawei.com> Signed-off-by: Jinjie Ruan <ruanjinjie@huawei.com> --- drivers/staging/xcall/Makefile | 2 +- drivers/staging/xcall/prefetch.c | 573 +++++++++++++++++++++++++++++++ 2 files changed, 574 insertions(+), 1 deletion(-) create mode 100644 drivers/staging/xcall/prefetch.c diff --git a/drivers/staging/xcall/Makefile b/drivers/staging/xcall/Makefile index 668ac4f3b471..d8c6137e2945 100644 --- a/drivers/staging/xcall/Makefile +++ b/drivers/staging/xcall/Makefile @@ -1 +1 @@ -obj-$(CONFIG_DYNAMIC_XCALL_TESTCASE) += dynamic_xcall_test.o +obj-$(CONFIG_DYNAMIC_XCALL_TESTCASE) += dynamic_xcall_test.o prefetch.o diff --git a/drivers/staging/xcall/prefetch.c b/drivers/staging/xcall/prefetch.c new file mode 100644 index 000000000000..eed746e41f2a --- /dev/null +++ b/drivers/staging/xcall/prefetch.c @@ -0,0 +1,573 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * xcall epollwait prefetch module + * + * The data struct and functions marked as MANDATORY have to + * be includes in all of kernel xcall modules. + * + * Copyright (C) 2025 Huawei Limited. + */ + +#define pr_fmt(fmt) "xcall_prefetch: " fmt + +#include <linux/module.h> +#include <linux/proc_fs.h> +#include <linux/xcall.h> +#include <net/sock.h> + +#include <asm/xcall.h> + +#define MAX_FD 100 + +#define XCALL_CACHE_PAGE_ORDER 2 +#define XCALL_CACHE_BUF_SIZE ((1 << XCALL_CACHE_PAGE_ORDER) * PAGE_SIZE) + +#define current_prefetch_items() \ + ((struct prefetch_item *) \ + ((((struct xcall_area *)(current->mm->xcall))->sys_call_data)[__NR_epoll_pwait])) + +static DEFINE_PER_CPU_ALIGNED(unsigned long, xcall_cache_hit); +static DEFINE_PER_CPU_ALIGNED(unsigned long, xcall_cache_miss); + +static struct workqueue_struct *rc_work; +static struct cpumask xcall_mask; +struct proc_dir_entry *xcall_proc_dir, *prefetch_dir, *xcall_mask_dir; + +enum cache_state { + XCALL_CACHE_NONE = 0, + XCALL_CACHE_PREFETCH, + XCALL_CACHE_READY, + XCALL_CACHE_CANCEL +}; + +struct prefetch_item { + struct file *file; + struct work_struct work; + int cpu; + cpumask_t related_cpus; + struct page *cache_pages; + char *cache; + ssize_t len; + /* cache state in epoll_wait */ + atomic_t state; + loff_t pos; +}; + +static ssize_t xcall_mask_proc_write(struct file *file, const char __user *buf, + size_t count, loff_t *ppos) +{ + struct cpumask tmp; + int err; + + err = cpumask_parselist_user(buf, count, &tmp); + if (err) + return err; + + if (cpumask_empty(&tmp)) + return -EINVAL; + + if (!cpumask_intersects(&tmp, cpu_online_mask)) { + pr_warn("cpu %*pbl is not online.\n", cpumask_pr_args(&tmp)); + return -EINVAL; + } + + cpumask_copy(&xcall_mask, &tmp); + return count; +} + +static int xcall_mask_proc_show(struct seq_file *m, void *v) +{ + seq_printf(m, "%*pbl\n", cpumask_pr_args(&xcall_mask)); + return 0; +} + +static int xcall_mask_proc_open(struct inode *inode, struct file *file) +{ + return single_open(file, xcall_mask_proc_show, pde_data(inode)); +} + +static const struct proc_ops xcall_mask_fops = { + .proc_open = xcall_mask_proc_open, + .proc_read = seq_read, + .proc_lseek = seq_lseek, + .proc_release = single_release, + .proc_write = xcall_mask_proc_write, +}; + +static ssize_t xcall_prefetch_write(struct file *file, const char __user *buf, + size_t count, loff_t *pos) +{ + int cpu; + + for_each_cpu(cpu, cpu_online_mask) { + *per_cpu_ptr(&xcall_cache_hit, cpu) = 0; + *per_cpu_ptr(&xcall_cache_miss, cpu) = 0; + } + + return count; +} + +static int xcall_prefetch_show(struct seq_file *m, void *v) +{ + unsigned long hit = 0, miss = 0; + unsigned int cpu; + u64 percent; + + for_each_cpu(cpu, cpu_online_mask) { + hit = *per_cpu_ptr(&xcall_cache_hit, cpu); + miss = *per_cpu_ptr(&xcall_cache_miss, cpu); + + if (hit == 0 && miss == 0) + continue; + + percent = DIV_ROUND_CLOSEST(hit * 100ULL, hit + miss); + seq_printf(m, "cpu%u epoll cache_{hit,miss}: %lu,%lu, hit ratio: %llu%%\n", + cpu, hit, miss, percent); + } + return 0; +} + +static int xcall_prefetch_open(struct inode *inode, struct file *file) +{ + return single_open(file, xcall_prefetch_show, NULL); +} + +static const struct proc_ops xcall_prefetch_fops = { + .proc_open = xcall_prefetch_open, + .proc_read = seq_read, + .proc_write = xcall_prefetch_write, + .proc_lseek = seq_lseek, + .proc_release = single_release +}; + +static inline bool transition_state(struct prefetch_item *pfi, + enum cache_state old, enum cache_state new) +{ + return atomic_cmpxchg(&pfi->state, old, new) == old; +} + +static void prefetch_work_fn(struct work_struct *work) +{ + struct prefetch_item *pfi = container_of(work, struct prefetch_item, work); + + if (!transition_state(pfi, XCALL_CACHE_NONE, XCALL_CACHE_PREFETCH)) + return; + + if (!pfi->cache) + return; + + pfi->pos = 0; + pfi->len = kernel_read(pfi->file, pfi->cache, + XCALL_CACHE_BUF_SIZE, &pfi->file->f_pos); + transition_state(pfi, XCALL_CACHE_PREFETCH, XCALL_CACHE_READY); +} + +static int prefetch_cpus[MAX_NUMNODES] = { [0 ... MAX_NUMNODES - 1] = -1 }; + +static void set_prefetch_numa_cpu(struct prefetch_item *pfi) +{ + int cur_cpu = smp_processor_id(); + int cur_nid = numa_node_id(); + int old_cpu, new_cpu; + struct cpumask tmp; + + cpumask_copy(&tmp, &xcall_mask); + cpumask_and(&pfi->related_cpus, cpu_cpu_mask(cur_cpu), cpu_online_mask); + if (cpumask_intersects(&tmp, &pfi->related_cpus)) + cpumask_and(&pfi->related_cpus, &pfi->related_cpus, &tmp); + + do { + old_cpu = prefetch_cpus[cur_nid]; + new_cpu = cpumask_next(old_cpu, &pfi->related_cpus); + if (new_cpu > cpumask_last(&pfi->related_cpus)) + new_cpu = cpumask_first(&pfi->related_cpus); + } while (cmpxchg(&prefetch_cpus[cur_nid], old_cpu, new_cpu) != old_cpu); + + pfi->cpu = new_cpu; +} + +static int get_async_prefetch_cpu(struct prefetch_item *pfi) +{ + int cpu; + + if (pfi->cpu != smp_processor_id()) + return pfi->cpu; + + cpu = cpumask_next(pfi->cpu, &pfi->related_cpus); + if (cpu > cpumask_last(&pfi->related_cpus)) + cpu = cpumask_first(&pfi->related_cpus); + pfi->cpu = cpu; + return pfi->cpu; +} + +static void xcall_mm_release(struct mmu_notifier *mn, struct mm_struct *mm) +{ + struct xcall_area *area = mm_xcall_area(mm); + void *area_private_data = NULL; + + area_private_data = xchg(&area->sys_call_data[__NR_epoll_pwait], NULL); + kfree(area_private_data); +} + +static struct mmu_notifier_ops xcall_mmu_notifier_ops = { + .release = xcall_mm_release, +}; + +static struct mmu_notifier xcall_mmu_notifier = { + .ops = &xcall_mmu_notifier_ops, +}; + +static void xcall_cancel_work(unsigned int fd) +{ + struct prefetch_item *pfi = current_prefetch_items() + fd; + + if (fd < MAX_FD && pfi->file) + cancel_work_sync(&pfi->work); +} + +#define MAX_READY_WAIT_TIME msecs_to_jiffies(2) +static int xcall_read(struct prefetch_item *pfi, char __user *buf, size_t count) +{ + unsigned long end = jiffies + MAX_READY_WAIT_TIME; + ssize_t copy_len = 0; + + /* + * Everytime it does the memcpy on prefetch buffer, it has to keep + * the state of pfi is "CANCEL" to avoid the race on the prefetch + * buffer from both the prefetch thread calling kernel_read() and + * other threads calling copy_to_user(), also avoid race on the + * prefetch file from both the prefetch thread calling kernel_read() + * and other threads calling vfs_read(). + */ + while (!transition_state(pfi, XCALL_CACHE_READY, XCALL_CACHE_CANCEL)) { + /* + * Once the prefetch thread read return error code or prefetch + * has not start, no need to waste CPU on waiting right here, + * it should do a slow vfs_read() to ensure no new arrival data. + */ + if (transition_state(pfi, XCALL_CACHE_NONE, XCALL_CACHE_CANCEL)) + goto slow_read; + + if (time_after(jiffies, end)) { + pr_warn("xcall read wait prefetch state %d more than 2ms\n", + atomic_read(&pfi->state)); + cond_resched(); + end = jiffies + MAX_READY_WAIT_TIME; + } + } + + copy_len = pfi->len; + if (unlikely(copy_len < 0)) + goto slow_read; + + if (copy_len == 0) { + this_cpu_inc(xcall_cache_hit); + transition_state(pfi, XCALL_CACHE_CANCEL, XCALL_CACHE_NONE); + return 0; + } + + copy_len = (copy_len >= count) ? count : copy_len; + copy_len -= copy_to_user(buf, (void *)(pfi->cache + pfi->pos), copy_len); + pfi->len -= copy_len; + pfi->pos += copy_len; + if (pfi->len == 0) + transition_state(pfi, XCALL_CACHE_CANCEL, XCALL_CACHE_NONE); + else + transition_state(pfi, XCALL_CACHE_CANCEL, XCALL_CACHE_READY); + + this_cpu_inc(xcall_cache_hit); + return copy_len; + +slow_read: + this_cpu_inc(xcall_cache_miss); + pfi->len = 0; + pfi->pos = 0; + cancel_work(&pfi->work); + + return -EAGAIN; +} + +static inline int xcall_read_begin(unsigned int fd, char __user *buf, size_t count) +{ + struct prefetch_item *pfi = NULL; + + if (fd >= MAX_FD || !current_prefetch_items()) + return -EAGAIN; + + pfi = current_prefetch_items() + fd; + if (!pfi->file) + return -EAGAIN; + + return xcall_read(pfi, buf, count); +} + +static inline void xcall_read_end(unsigned int fd) +{ + struct prefetch_item *pfi = NULL; + + if (fd >= MAX_FD || !current_prefetch_items()) + return; + + pfi = current_prefetch_items() + fd; + if (!pfi->file) + return; + + transition_state(pfi, XCALL_CACHE_CANCEL, XCALL_CACHE_NONE); +} + +static long __do_sys_epoll_create(struct pt_regs *regs) +{ + long ret; + int i; + struct xcall_area *area = mm_xcall_area(current->mm); + struct prefetch_item *items = NULL; + + ret = default_sys_call_table()[__NR_epoll_create1](regs); + if (ret < 0) + return ret; + + if (current_prefetch_items()) + return ret; + + items = kcalloc(MAX_FD, sizeof(struct prefetch_item), GFP_KERNEL); + if (!items) + return -ENOMEM; + + if (cmpxchg(&area->sys_call_data[__NR_epoll_pwait], NULL, items)) { + kfree(items); + return ret; + } + + for (i = 0; i < MAX_FD; i++) { + items[i].cache_pages = alloc_pages(GFP_KERNEL_ACCOUNT | __GFP_ZERO, + XCALL_CACHE_PAGE_ORDER); + if (!items[i].cache_pages) { + items[i].cache = NULL; + continue; + } + + INIT_WORK(&items[i].work, prefetch_work_fn); + atomic_set(&items[i].state, XCALL_CACHE_NONE); + items[i].cache = page_address(items[i].cache_pages); + items[i].len = 0; + items[i].pos = 0; + items[i].file = NULL; + set_prefetch_numa_cpu(&items[i]); + } + mmu_notifier_register(&xcall_mmu_notifier, current->mm); + return ret; +} + +static long __do_sys_epoll_ctl(struct pt_regs *regs) +{ + struct prefetch_item *pfi = NULL; + unsigned int fd = regs->regs[2]; + int op = regs->regs[1]; + struct file *file; + long ret; + + ret = default_sys_call_table()[__NR_epoll_ctl](regs); + if (ret || fd >= MAX_FD) + return ret; + + if (!current_prefetch_items()) + return ret; + + pfi = current_prefetch_items() + fd; + switch (op) { + case EPOLL_CTL_ADD: + file = fget(fd); + if (!file) + return ret; + + if (sock_from_file(file) && cmpxchg(&pfi->file, NULL, file)) + fput(file); + break; + case EPOLL_CTL_DEL: + xcall_cancel_work(fd); + break; + } + + return ret; +} + +static long __do_sys_epoll_pwait(struct pt_regs *regs) +{ + void __user *buf = (void *)regs->regs[1]; + struct prefetch_item *pfi = NULL; + struct epoll_event events[MAX_FD] = {0}; + int i, fd, cpu, prefech_task_num; + long ret; + + ret = default_sys_call_table()[__NR_epoll_pwait](regs); + if (ret <= 0) + return ret; + + if (!current_prefetch_items()) + return ret; + + prefech_task_num = ret > MAX_FD ? MAX_FD : ret; + if (copy_from_user(events, buf, prefech_task_num * sizeof(struct epoll_event))) + return ret; + + for (i = 0; i < prefech_task_num; i++) { + fd = events[i].data; + if (!(events[i].events & EPOLLIN) || fd >= MAX_FD) + continue; + + pfi = current_prefetch_items() + fd; + if (!(pfi->file) || !(pfi->file->f_mode & FMODE_READ)) + continue; + if (atomic_read(&pfi->state) != XCALL_CACHE_NONE) + continue; + + cpu = get_async_prefetch_cpu(pfi); + queue_work_on(cpu, rc_work, &pfi->work); + } + return ret; +} + +static long __do_sys_close(struct pt_regs *regs) +{ + unsigned int fd = regs->regs[0]; + struct prefetch_item *pfi = NULL; + struct file *pfi_old_file = NULL; + struct file *pfi_new_file = NULL; + + if (!current_prefetch_items()) + return default_sys_call_table()[__NR_close](regs); + + pfi = current_prefetch_items() + fd; + if (fd < MAX_FD && pfi->file) { + pfi_old_file = pfi->file; + pfi_new_file = cmpxchg(&pfi->file, pfi_old_file, NULL); + if (pfi_new_file == pfi_old_file) { + fput(pfi_old_file); + atomic_set(&pfi->state, XCALL_CACHE_NONE); + pfi->len = 0; + pfi->pos = 0; + } + } + return default_sys_call_table()[__NR_close](regs); +} + +static long __do_sys_read(struct pt_regs *regs) +{ + unsigned int fd = regs->regs[0]; + void __user *buf = (void *)regs->regs[1]; + size_t count = regs->regs[2]; + size_t ret = -EBADF; + + ret = xcall_read_begin(fd, buf, count); + if (ret != -EAGAIN) + return ret; + + ret = default_sys_call_table()[__NR_read](regs); + + xcall_read_end(fd); + return ret; +} + +/* MANDATORY */ +struct xcall_prog xcall_prefetch_prog = { + .name = "xcall_prefetch", + .owner = THIS_MODULE, + .objs = { + { + .scno = (unsigned long)__NR_epoll_create1, + .func = (unsigned long)__do_sys_epoll_create, + }, + { + .scno = (unsigned long)__NR_epoll_ctl, + .func = (unsigned long)__do_sys_epoll_ctl, + }, + { + .scno = (unsigned long)__NR_epoll_pwait, + .func = (unsigned long)__do_sys_epoll_pwait, + }, + { + .scno = (unsigned long)__NR_close, + .func = (unsigned long)__do_sys_close, + }, + { + .scno = (unsigned long)__NR_read, + .func = (unsigned long)__do_sys_read, + }, + {} + } +}; + +static int __init init_xcall_prefetch_procfs(void) +{ + xcall_proc_dir = proc_mkdir("xcall_feature", NULL); + if (!xcall_proc_dir) + return -ENOMEM; + prefetch_dir = proc_create("prefetch", 0640, xcall_proc_dir, + &xcall_prefetch_fops); + if (!prefetch_dir) + goto rm_xcall_proc_dir; + xcall_mask_dir = proc_create("cpu_list", 0640, xcall_proc_dir, + &xcall_mask_fops); + if (!xcall_mask_dir) + goto rm_prefetch_dir; + + cpumask_copy(&xcall_mask, cpu_online_mask); + return 0; + +rm_prefetch_dir: + proc_remove(prefetch_dir); +rm_xcall_proc_dir: + proc_remove(xcall_proc_dir); + return -ENOMEM; +} + +/* MANDATORY */ +static int __init xcall_prefetch_init(void) +{ + int ret; + + rc_work = alloc_workqueue("eventpoll_rc", 0, 0); + if (!rc_work) { + pr_warn("alloc eventpoll_rc workqueue failed.\n"); + return -ENOMEM; + } + + ret = init_xcall_prefetch_procfs(); + if (ret) + goto destroy_queue; + + ret = xcall_prog_register(&xcall_prefetch_prog); + if (ret) + goto remove_dir; + + return ret; + +remove_dir: + proc_remove(prefetch_dir); + proc_remove(xcall_mask_dir); + proc_remove(xcall_proc_dir); +destroy_queue: + destroy_workqueue(rc_work); + return ret; +} + +/* MANDATORY */ +static void __exit xcall_prefetch_exit(void) +{ + if (rc_work) + destroy_workqueue(rc_work); + if (prefetch_dir) + proc_remove(prefetch_dir); + if (xcall_mask_dir) + proc_remove(xcall_mask_dir); + if (xcall_proc_dir) + proc_remove(xcall_proc_dir); + + xcall_prog_unregister(&xcall_prefetch_prog); +} + +module_init(xcall_prefetch_init); +module_exit(xcall_prefetch_exit); +MODULE_AUTHOR(""); +MODULE_DESCRIPTION("Xcall Prefetch"); +MODULE_LICENSE("GPL"); -- 2.34.1