From: Longfang Liu <liulongfang@huawei.com> After the uadk framework updates the heterogeneous scheduling function, the internal implementation functions of the hash-join and gather algorithm need to be adapted and modified. Signed-off-by: Longfang Liu <liulongfang@huawei.com> --- wd_join_gather.c | 124 ++++++++++++++++++++++++++--------------------- 1 file changed, 69 insertions(+), 55 deletions(-) diff --git a/wd_join_gather.c b/wd_join_gather.c index 86190c4..52cd5f9 100644 --- a/wd_join_gather.c +++ b/wd_join_gather.c @@ -32,7 +32,6 @@ struct wd_join_gather_setting { struct wd_ctx_config_internal config; struct wd_sched sched; struct wd_async_msg_pool pool; - struct wd_alg_driver *driver; void *priv; void *dlhandle; void *dlh_list; @@ -79,7 +78,6 @@ static void wd_join_gather_close_driver(void) wd_dlclose_drv(wd_join_gather_setting.dlh_list); wd_join_gather_setting.dlh_list = NULL; #else - wd_release_drv(wd_join_gather_setting.driver); hisi_dae_join_gather_remove(); #endif } @@ -406,33 +404,64 @@ free_join: static void wd_join_gather_uninit_sess(struct wd_join_gather_sess *sess) { + struct wd_ctx_config_internal *config = &wd_join_gather_setting.config; + struct wd_join_gather_ops *eops; + __u32 i; + if (sess->gather_conf.batch_row_size) free(sess->gather_conf.batch_row_size); - if (sess->ops.sess_uninit) - sess->ops.sess_uninit(sess->priv); + for (i = 0; i < config->ctx_num; i++) { + /* At this point, extends_ops has completed its initialization. */ + eops = config->ctxs[i].extend_ops; + if (!eops) + continue; + + if (eops->sess_uninit) + eops->sess_uninit(sess->priv); + } } static int wd_join_gather_init_sess(struct wd_join_gather_sess *sess, struct wd_join_gather_sess_setup *setup) { - __u32 array_size; + struct wd_ctx_config_internal *config = &wd_join_gather_setting.config; + struct wd_join_gather_ops *eops = NULL; + struct wd_alg_driver *drv; + __u32 i, array_size; int ret; - if (sess->ops.sess_init) { - if (!sess->ops.sess_uninit) { + for (i = 0; i < config->ctx_num; i++) { + drv = config->ctxs[i].drv; + if (!drv->get_extend_ops) + continue; + + ret = drv->get_extend_ops(config->ctxs[i].extend_ops); + if (!ret && config->ctxs[i].extend_ops) { + eops = config->ctxs[i].extend_ops; + break; + } + } + if (!eops) { + WD_ERR("failed to get extend ops!\n"); + return -WD_EINVAL; + } + + memcpy(&sess->ops, eops, sizeof(*eops)); + if (eops->sess_init) { + if (!eops->sess_uninit) { WD_ERR("failed to get session uninit ops!\n"); return -WD_EINVAL; } - ret = sess->ops.sess_init(setup, &sess->priv); + ret = eops->sess_init(setup, &sess->priv); if (ret) { WD_ERR("failed to init session priv!\n"); return ret; } } - if (sess->ops.get_table_row_size && setup->alg != WD_GATHER) { - ret = sess->ops.get_table_row_size(sess->priv); + if (eops->get_table_row_size && setup->alg != WD_GATHER) { + ret = eops->get_table_row_size(sess->priv); if (ret <= 0) { WD_ERR("failed to get hash table row size: %d!\n", ret); goto uninit; @@ -440,13 +469,13 @@ static int wd_join_gather_init_sess(struct wd_join_gather_sess *sess, sess->hash_table.table_row_size = ret; } - if (sess->ops.get_batch_row_size && setup->alg != WD_JOIN) { + if (eops->get_batch_row_size && setup->alg != WD_JOIN) { array_size = setup->gather_table_num * sizeof(__u32); sess->gather_conf.batch_row_size = malloc(array_size); if (!sess->gather_conf.batch_row_size) goto uninit; - ret = sess->ops.get_batch_row_size(sess->priv, + ret = eops->get_batch_row_size(sess->priv, sess->gather_conf.batch_row_size, array_size); if (ret) { @@ -460,8 +489,8 @@ static int wd_join_gather_init_sess(struct wd_join_gather_sess *sess, free_batch: free(sess->gather_conf.batch_row_size); uninit: - if (sess->ops.sess_uninit) - sess->ops.sess_uninit(sess->priv); + if (eops->sess_uninit) + eops->sess_uninit(sess->priv); return -WD_EINVAL; } @@ -485,7 +514,7 @@ handle_t wd_join_gather_alloc_sess(struct wd_join_gather_sess_setup *setup) sess->index_type = setup->index_type; sess->join_conf.key_output_enable = setup->join_table.key_output_enable; - ret = wd_drv_alg_support(wd_join_gather_alg[sess->alg], wd_join_gather_setting.driver); + ret = wd_drv_alg_support(wd_join_gather_alg[sess->alg], &wd_join_gather_setting.config); if (!ret) { WD_ERR("failed to check driver alg: %s!\n", wd_join_gather_alg[sess->alg]); goto free_sess; @@ -499,14 +528,6 @@ handle_t wd_join_gather_alloc_sess(struct wd_join_gather_sess_setup *setup) goto free_sess; } - if (wd_join_gather_setting.driver->get_extend_ops) { - ret = wd_join_gather_setting.driver->get_extend_ops(&sess->ops); - if (ret) { - WD_ERR("failed to get join gather extend ops!\n"); - goto free_key; - } - } - ret = wd_join_gather_init_sess(sess, setup); if (ret) goto free_key; @@ -697,16 +718,10 @@ static int wd_join_gather_alg_init(struct wd_ctx_config *config, struct wd_sched if (ret < 0) goto out_clear_sched; - ret = wd_alg_init_driver(&wd_join_gather_setting.config, - wd_join_gather_setting.driver, - &wd_join_gather_setting.priv); - if (ret) - goto out_clear_pool; + wd_join_gather_setting.priv = STATUS_ENABLE; return WD_SUCCESS; -out_clear_pool: - wd_uninit_async_request_pool(&wd_join_gather_setting.pool); out_clear_sched: wd_clear_sched(&wd_join_gather_setting.sched); out_clear_ctx_config: @@ -728,9 +743,7 @@ static int wd_join_gather_alg_uninit(void) /* Unset config, sched, driver */ wd_clear_sched(&wd_join_gather_setting.sched); - wd_alg_uninit_driver(&wd_join_gather_setting.config, - wd_join_gather_setting.driver, - &wd_join_gather_setting.priv); + wd_join_gather_setting.priv = NULL; return WD_SUCCESS; } @@ -769,21 +782,12 @@ int wd_join_gather_init(char *alg, __u32 sched_type, int task_type, while (ret != 0) { memset(&wd_join_gather_setting.config, 0, sizeof(struct wd_ctx_config_internal)); - /* Get alg driver and dev name */ - wd_join_gather_setting.driver = wd_alg_drv_bind(task_type, alg); - if (!wd_join_gather_setting.driver) { - WD_ERR("failed to bind %s driver.\n", alg); - goto out_dlopen; - } - + /* Init ctx param and prepare for ctx request */ join_gather_ctx_params.ctx_set_num = &join_gather_ctx_num; - ret = wd_ctx_param_init(&join_gather_ctx_params, ctx_params, - wd_join_gather_setting.driver, - WD_JOIN_GATHER_TYPE, 1); + ret = wd_ctx_param_init_nw(&join_gather_ctx_params, ctx_params, + alg, task_type, WD_JOIN_GATHER_TYPE, 1); if (ret) { if (ret == -WD_EAGAIN) { - wd_disable_drv(wd_join_gather_setting.driver); - wd_alg_drv_unbind(wd_join_gather_setting.driver); continue; } goto out_driver; @@ -791,15 +795,13 @@ int wd_join_gather_init(char *alg, __u32 sched_type, int task_type, (void)strcpy(wd_join_gather_init_attrs.alg, alg); wd_join_gather_init_attrs.sched_type = sched_type; - wd_join_gather_init_attrs.driver = wd_join_gather_setting.driver; + wd_join_gather_init_attrs.task_type = task_type; wd_join_gather_init_attrs.ctx_params = &join_gather_ctx_params; wd_join_gather_init_attrs.alg_init = wd_join_gather_alg_init; wd_join_gather_init_attrs.alg_poll_ctx = wd_join_gather_poll_ctx; ret = wd_alg_attrs_init(&wd_join_gather_init_attrs); if (ret) { if (ret == -WD_ENODEV) { - wd_disable_drv(wd_join_gather_setting.driver); - wd_alg_drv_unbind(wd_join_gather_setting.driver); wd_ctx_param_uninit(&join_gather_ctx_params); continue; } @@ -807,17 +809,27 @@ int wd_join_gather_init(char *alg, __u32 sched_type, int task_type, goto out_params_uninit; } } + ret = wd_ctx_drv_config(alg, &wd_join_gather_setting.config); + if (ret) + goto out_uninit_nolock; + + ret = wd_alg_init_driver_nw(&wd_join_gather_setting.config); + if (ret) + goto out_drv_deconfig; wd_alg_set_init(&wd_join_gather_setting.status); wd_ctx_param_uninit(&join_gather_ctx_params); return WD_SUCCESS; +out_drv_deconfig: + wd_ctx_drv_deconfig(&wd_join_gather_setting.config); +out_uninit_nolock: + wd_join_gather_alg_uninit(); + wd_alg_attrs_uninit(&wd_join_gather_init_attrs); out_params_uninit: wd_ctx_param_uninit(&join_gather_ctx_params); out_driver: - wd_alg_drv_unbind(wd_join_gather_setting.driver); -out_dlopen: wd_join_gather_close_driver(); out_uninit: wd_alg_clear_init(&wd_join_gather_setting.status); @@ -832,8 +844,10 @@ void wd_join_gather_uninit(void) if (ret) return; + wd_alg_uninit_driver_nw(&wd_join_gather_setting.config); + wd_ctx_drv_deconfig(&wd_join_gather_setting.config); wd_alg_attrs_uninit(&wd_join_gather_init_attrs); - wd_alg_drv_unbind(wd_join_gather_setting.driver); + wd_join_gather_close_driver(); wd_alg_clear_init(&wd_join_gather_setting.status); } @@ -1213,8 +1227,8 @@ static int wd_join_gather_sync_job(struct wd_join_gather_sess *sess, wd_dfx_msg_cnt(config, WD_CTX_CNT_NUM, idx); ctx = config->ctxs + idx; - msg_handle.send = setting->driver->send; - msg_handle.recv = setting->driver->recv; + msg_handle.send = ctx->drv->send; + msg_handle.recv = ctx->drv->recv; pthread_spin_lock(&ctx->lock); ret = wd_handle_msg_sync(&msg_handle, ctx->ctx, @@ -1313,7 +1327,7 @@ static int wd_join_gather_async_job(struct wd_join_gather_sess *sess, fill_join_gather_msg(msg, req, sess); msg->tag = msg_id; - ret = wd_join_gather_setting.driver->send(ctx->ctx, msg); + ret = ctx->drv->send(ctx->ctx, msg); if (ret < 0) { if (ret != -WD_EBUSY) WD_ERR("wd join gather async send err!\n"); @@ -1783,7 +1797,7 @@ static int wd_join_gather_poll_ctx(__u32 idx, __u32 expt, __u32 *count) ctx = config->ctxs + idx; do { - ret = wd_join_gather_setting.driver->recv(ctx->ctx, &resp_msg); + ret = ctx->drv->recv(ctx->ctx, &resp_msg); if (ret == -WD_EAGAIN) { return ret; } else if (ret < 0) { -- 2.43.0