From: Chenghai Huang huangchenghai2@huawei.com
Ensure that all packets in the msg pool are removed before end. In V2, resources such as tags are released in a centralized manner to avoid errors caused by asynchronous resource release sequence. In V1, before the packet sending thread releases the tag, ensure that the poll thread has ended.
Signed-off-by: Chenghai Huang huangchenghai2@huawei.com --- uadk_tool/benchmark/zip_uadk_benchmark.c | 151 ++++++++++++----------- uadk_tool/benchmark/zip_wd_benchmark.c | 25 ++-- 2 files changed, 96 insertions(+), 80 deletions(-)
diff --git a/uadk_tool/benchmark/zip_uadk_benchmark.c b/uadk_tool/benchmark/zip_uadk_benchmark.c index 63fbdab..1dd3990 100644 --- a/uadk_tool/benchmark/zip_uadk_benchmark.c +++ b/uadk_tool/benchmark/zip_uadk_benchmark.c @@ -16,7 +16,7 @@ #define MAX_POOL_LENTH_COMP 1 #define COMPRESSION_RATIO_FACTOR 0.7 #define CHUNK_SIZE (128 * 1024) - +#define MAX_UNRECV_PACKET_NUM 2 struct uadk_bd { u8 *src; u8 *dst; @@ -37,11 +37,17 @@ enum ZIP_OP_MODE { STREAM_MODE };
+enum ZIP_THREAD_STATE { + THREAD_PROCESSING, + THREAD_COMPLETED +}; + struct zip_async_tag { handle_t sess; u32 td_id; u32 bd_idx; u32 cm_len; + u32 recv_cnt; ZSTD_CCtx *cctx; };
@@ -52,6 +58,10 @@ typedef struct uadk_thread_res { u32 td_id; u32 win_sz; u32 comp_lv; + u32 send_cnt; + struct zip_async_tag *tag; + COMP_TUPLE_TAG *ftuple; + char *hw_buff_out; } thread_data;
struct zip_file_head { @@ -67,6 +77,7 @@ static unsigned int g_thread_num; static unsigned int g_ctxnum; static unsigned int g_pktlen; static unsigned int g_prefetch; +static unsigned int g_state;
#ifndef ZLIB_FSE static ZSTD_CCtx* zstd_soft_fse_init(unsigned int level) @@ -541,6 +552,7 @@ static void *zip_lz77_async_cb(struct wd_comp_req *req, void *data) zstd_output.dst = uadk_pool->bds[idx].dst; zstd_output.size = tag->cm_len; zstd_output.pos = 0; + __atomic_add_fetch(&tag->recv_cnt, 1, __ATOMIC_RELAXED); fse_size = zstd_soft_fse(req->priv, &zstd_input, &zstd_output, cctx, ZSTD_e_end);
uadk_pool->bds[idx].dst_len = fse_size; @@ -554,6 +566,7 @@ static void *zip_async_cb(struct wd_comp_req *req, void *data) struct bd_pool *uadk_pool; int td_id = tag->td_id; int idx = tag->bd_idx; + __atomic_add_fetch(&tag->recv_cnt, 1, __ATOMIC_RELAXED);
uadk_pool = &g_zip_pool.pool[td_id]; uadk_pool->bds[idx].dst_len = req->dst_len; @@ -566,15 +579,14 @@ static void *zip_uadk_poll(void *data) thread_data *pdata = (thread_data *)data; u32 expt = ACC_QUEUE_SIZE * g_thread_num; u32 id = pdata->td_id; - u32 last_time = 2; // poll need one more recv time u32 count = 0; u32 recv = 0; - int ret; + int ret;
if (id > g_ctxnum) return NULL;
- while (last_time) { + while (g_state == THREAD_PROCESSING) { ret = wd_comp_poll_ctx(id, expt, &recv); count += recv; recv = 0; @@ -582,9 +594,6 @@ static void *zip_uadk_poll(void *data) ZIP_TST_PRT("poll ret: %d!\n", ret); goto recv_error; } - - if (get_run_state() == 0) - last_time--; }
recv_error: @@ -596,12 +605,11 @@ recv_error: static void *zip_uadk_poll2(void *data) { u32 expt = ACC_QUEUE_SIZE * g_thread_num; - u32 last_time = 2; // poll need one more recv time u32 count = 0; u32 recv = 0; int ret;
- while (last_time) { + while (g_state == THREAD_PROCESSING) { ret = wd_comp_poll(expt, &recv); count += recv; recv = 0; @@ -609,9 +617,6 @@ static void *zip_uadk_poll2(void *data) ZIP_TST_PRT("poll ret: %d!\n", ret); goto recv_error; } - - if (get_run_state() == 0) - last_time--; }
recv_error: @@ -803,11 +808,8 @@ static void *zip_uadk_blk_lz77_async_run(void *arg) thread_data *pdata = (thread_data *)arg; struct wd_comp_sess_setup comp_setup = {0}; ZSTD_CCtx *cctx = zstd_soft_fse_init(15); - COMP_TUPLE_TAG *ftuple = NULL; struct bd_pool *uadk_pool; struct wd_comp_req creq; - struct zip_async_tag *tag; - char *hw_buff_out = NULL; handle_t h_sess; u32 out_len = 0; u32 count = 0; @@ -838,37 +840,22 @@ static void *zip_uadk_blk_lz77_async_run(void *arg) creq.data_fmt = 0; creq.status = 0;
- ftuple = malloc(sizeof(COMP_TUPLE_TAG) * MAX_POOL_LENTH_COMP); - if (!ftuple) - goto fse_err; - - hw_buff_out = malloc(out_len * MAX_POOL_LENTH_COMP); - if (!hw_buff_out) - goto hw_buff_err; - memset(hw_buff_out, 0x0, out_len * MAX_POOL_LENTH_COMP); - - tag = malloc(sizeof(*tag) * MAX_POOL_LENTH_COMP); - if (!tag) { - ZIP_TST_PRT("failed to malloc zip tag!\n"); - goto tag_err; - } - while(1) { if (get_run_state() == 0) break;
i = count % MAX_POOL_LENTH_COMP; creq.src = uadk_pool->bds[i].src; - creq.dst = &hw_buff_out[i]; //temp out + creq.dst = &pdata->hw_buff_out[i]; //temp out creq.src_len = uadk_pool->bds[i].src_len; creq.dst_len = out_len; - creq.priv = &ftuple[i]; + creq.priv = &pdata->ftuple[i];
- tag[i].td_id = pdata->td_id; - tag[i].bd_idx = i; - tag[i].cm_len = out_len; - tag[i].cctx = cctx; - creq.cb_param = &tag[i]; + pdata->tag[i].td_id = pdata->td_id; + pdata->tag[i].bd_idx = i; + pdata->tag[i].cm_len = out_len; + pdata->tag[i].cctx = cctx; + creq.cb_param = &pdata->tag[i];
ret = wd_do_comp_async(h_sess, &creq); if (ret == -WD_EBUSY) { @@ -884,20 +871,8 @@ static void *zip_uadk_blk_lz77_async_run(void *arg) } try_cnt = 0; count++; + __atomic_add_fetch(&pdata->send_cnt, 1, __ATOMIC_RELAXED); } - - while (1) { - if (get_recv_time() > 0) // wait Async mode finish recv - break; - usleep(SEND_USLEEP); - } - -tag_err: - free(tag); -hw_buff_err: - free(hw_buff_out); -fse_err: - free(ftuple); wd_comp_free_sess(h_sess); add_send_complete();
@@ -1033,7 +1008,6 @@ static void *zip_uadk_blk_async_run(void *arg) thread_data *pdata = (thread_data *)arg; struct wd_comp_sess_setup comp_setup = {0}; struct bd_pool *uadk_pool; - struct zip_async_tag *tag; struct wd_comp_req creq; handle_t h_sess; int try_cnt = 0; @@ -1066,13 +1040,6 @@ static void *zip_uadk_blk_async_run(void *arg) creq.priv = 0; creq.status = 0;
- tag = malloc(sizeof(*tag) * MAX_POOL_LENTH_COMP); - if (!tag) { - ZIP_TST_PRT("failed to malloc zip tag!\n"); - wd_comp_free_sess(h_sess); - return NULL; - } - while(1) { if (get_run_state() == 0) break; @@ -1083,9 +1050,9 @@ static void *zip_uadk_blk_async_run(void *arg) creq.src_len = uadk_pool->bds[i].src_len; creq.dst_len = out_len;
- tag[i].td_id = pdata->td_id; - tag[i].bd_idx = i; - creq.cb_param = &tag[i]; + pdata->tag[i].td_id = pdata->td_id; + pdata->tag[i].bd_idx = i; + creq.cb_param = &pdata->tag[i];
ret = wd_do_comp_async(h_sess, &creq); if (ret == -WD_EBUSY) { @@ -1101,15 +1068,9 @@ static void *zip_uadk_blk_async_run(void *arg) } try_cnt = 0; count++; + __atomic_add_fetch(&pdata->send_cnt, 1, __ATOMIC_RELAXED); }
- while (1) { - if (get_recv_time() > 0) // wait Async mode finish recv - break; - usleep(SEND_USLEEP); - } - - free(tag); wd_comp_free_sess(h_sess);
add_send_complete(); @@ -1215,10 +1176,35 @@ static int zip_uadk_async_threads(struct acc_option *options) threads_args[i].win_sz = threads_option.win_sz; threads_args[i].comp_lv = threads_option.comp_lv; threads_args[i].td_id = i; + if (threads_option.alg == LZ77_ZSTD) { + struct bd_pool *uadk_pool = &g_zip_pool.pool[i]; + u32 out_len = uadk_pool->bds[0].dst_len; + + threads_args[i].ftuple = malloc(sizeof(COMP_TUPLE_TAG) * + MAX_POOL_LENTH_COMP); + if (!threads_args[i].ftuple) { + ZIP_TST_PRT("failed to malloc lz77 ftuple!\n"); + goto lz77_free; + } + + threads_args[i].hw_buff_out = malloc(out_len * MAX_POOL_LENTH_COMP); + if (!threads_args[i].hw_buff_out) { + ZIP_TST_PRT("failed to malloc lz77 hw_buff_out!\n"); + goto lz77_free; + } + memset(threads_args[i].hw_buff_out, 0x0, out_len * MAX_POOL_LENTH_COMP); + } + threads_args[i].tag = malloc(sizeof(struct zip_async_tag) * MAX_POOL_LENTH_COMP); + if (!threads_args[i].tag) { + ZIP_TST_PRT("failed to malloc zip tag!\n"); + goto tag_free; + } + threads_args[i].tag->recv_cnt = 0; + threads_args[i].send_cnt = 0; ret = pthread_create(&tdid[i], NULL, uadk_zip_async_run, &threads_args[i]); if (ret) { ZIP_TST_PRT("Create async thread fail!\n"); - goto async_error; + goto tag_free; } }
@@ -1227,18 +1213,41 @@ static int zip_uadk_async_threads(struct acc_option *options) ret = pthread_join(tdid[i], NULL); if (ret) { ZIP_TST_PRT("Join async thread fail!\n"); - goto async_error; + goto tag_free; } }
+ /* wait for the poll to clear packets */ + g_state = THREAD_PROCESSING; + for (i = 0; i < g_thread_num;) { + if (threads_args[i].send_cnt <= threads_args[i].tag->recv_cnt + MAX_UNRECV_PACKET_NUM) + i++; + } + g_state = THREAD_COMPLETED; // finish poll + for (i = 0; i < g_ctxnum; i++) { ret = pthread_join(pollid[i], NULL); if (ret) { ZIP_TST_PRT("Join poll thread fail!\n"); - goto async_error; + goto tag_free; } }
+tag_free: + for (i = 0; i < g_thread_num; i++) { + if (threads_args[i].tag) + free(threads_args[i].tag); + } +lz77_free: + if (threads_option.alg == LZ77_ZSTD) { + for (i = 0; i < g_thread_num; i++) { + if (threads_args[i].ftuple) + free(threads_args[i].ftuple); + + if (threads_args[i].hw_buff_out) + free(threads_args[i].hw_buff_out); + } + } async_error: return ret; } diff --git a/uadk_tool/benchmark/zip_wd_benchmark.c b/uadk_tool/benchmark/zip_wd_benchmark.c index 7f85b3c..a65447a 100644 --- a/uadk_tool/benchmark/zip_wd_benchmark.c +++ b/uadk_tool/benchmark/zip_wd_benchmark.c @@ -21,6 +21,7 @@ #define COMPRESSION_RATIO_FACTOR 0.7 #define MAX_POOL_LENTH_COMP 512 #define CHUNK_SIZE (128 * 1024) +#define MAX_UNRECV_PACKET_NUM 2
#define __ALIGN_MASK(x, mask) (((x) + (mask)) & ~(mask)) #define ALIGN(x, a) __ALIGN_MASK(x, (typeof(x))(a)-1) @@ -49,6 +50,11 @@ enum ZIP_OP_MODE { STREAM_MODE };
+enum ZIP_THREAD_STATE { + THREAD_PROCESSING, + THREAD_COMPLETED +}; + struct zip_async_tag { void *ctx; u32 td_id; @@ -75,6 +81,8 @@ struct zip_file_head {
static unsigned int g_thread_num; static unsigned int g_pktlen; +static unsigned int g_send_cnt[THREADS_NUM]; +static unsigned int g_recv_state[THREADS_NUM];
static int save_file_data(const char *alg, u32 pkg_len, u32 optype) { @@ -470,9 +478,10 @@ static void *zip_wd_poll(void *data) count += recv; recv = 0;
- if (get_run_state() == 0) + if (get_run_state() == 0 && g_send_cnt[id] <= count + MAX_UNRECV_PACKET_NUM) last_time--; } + g_recv_state[id] = THREAD_COMPLETED;
recv_error: add_recv_data(count, g_pktlen); @@ -746,13 +755,11 @@ static void *zip_wd_blk_lz77_async_run(void *arg) } try_cnt = 0; count++; + __atomic_add_fetch(&g_send_cnt[pdata->td_id], 1, __ATOMIC_RELAXED); }
- while (1) { - if (get_recv_time() > 0) // wait Async mode finish recv - break; + while (g_recv_state[pdata->td_id] == THREAD_PROCESSING) usleep(SEND_USLEEP); - }
free(tag); tag_err: @@ -1011,13 +1018,11 @@ static void *zip_wd_blk_async_run(void *arg) } try_cnt = 0; count++; + __atomic_add_fetch(&g_send_cnt[pdata->td_id], 1, __ATOMIC_RELAXED); }
- while (1) { - if (get_recv_time() > 0) // wait Async mode finish recv - break; + while (g_recv_state[pdata->td_id] == THREAD_PROCESSING) usleep(SEND_USLEEP); - }
tag_release: free(tag); @@ -1107,6 +1112,7 @@ static int zip_wd_async_threads(struct acc_option *options)
for (i = 0; i < g_thread_num; i++) { threads_args[i].td_id = i; + g_recv_state[i] = THREAD_PROCESSING; /* poll thread */ ret = pthread_create(&pollid[i], NULL, zip_wd_poll, &threads_args[i]); if (ret) { @@ -1122,6 +1128,7 @@ static int zip_wd_async_threads(struct acc_option *options) threads_args[i].comp_lv = threads_option.comp_lv; threads_args[i].win_size = threads_option.win_size; threads_args[i].td_id = i; + g_send_cnt[i] = 0; ret = pthread_create(&tdid[i], NULL, wd_zip_async_run, &threads_args[i]); if (ret) { ZIP_TST_PRT("Create async thread fail!\n");