background: extract-stats process is at risk of be interrupted, due to other dependent services(task-queue, es_client) may not working properly solution: 1. use begin ... rescue to handle potential exceptions - when ES and task-queue not working properly, sleep 10s and visit again - when "result_root" does not exist, skip and remove task from task-queue 2. refactor for consume_sched_queue() of stats_worker.cr
Signed-off-by: Lu Weitao luweitaobe@163.com --- src/extract-stats/stats_worker.cr | 60 ++++++++++++++++++------------- 1 file changed, 35 insertions(+), 25 deletions(-)
diff --git a/src/extract-stats/stats_worker.cr b/src/extract-stats/stats_worker.cr index fcba6af..2af07d4 100644 --- a/src/extract-stats/stats_worker.cr +++ b/src/extract-stats/stats_worker.cr @@ -18,40 +18,50 @@ class StatsWorker def consume_sched_queue(queue_path : String) loop do begin - response = @tq.consume_task(queue_path) + job_id = get_job_id(queue_path) + if job_id # will consume the job by post-processing + job = @es.get_job_content(job_id) + result_root = job["result_root"]? + result_post_processing(job_id, result_root.to_s, queue_path) + @tq.delete_task(queue_path + "/in_process", "#{job_id}") + end rescue e STDERR.puts e.message - # incase of many error message when task-queue is busy + # incase of many error message when task-queue, ES does not work sleep(10) - next end + end + end
- if response[0] == 200 - job_id = JSON.parse(response[1].to_json)["id"] - job = @es.get_job_content(job_id.to_s) - result_root = job["result_root"] - begin - # extract stats.json - system "#{ENV["CCI_SRC"]}/sbin/result2stats #{result_root}" - # storage stats to job in es - store_stats_es(result_root.to_s, job_id.to_s) if result_root - # send mail to submitter for job results - system "#{ENV["CCI_SRC"]}/sbin/mail-job #{job_id}" - rescue e - STDERR.puts e.message - next - end - - @tq.delete_task(queue_path + "/in_process", "#{job_id}") - else - sleep(60) - end + # get job_id from task-queue + def get_job_id(queue_path : String) + response = @tq.consume_task(queue_path) + if response[0] == 200 + JSON.parse(response[1].to_json)["id"].to_s + else + # will sleep 60s if no task in task-queue + sleep(60) + nil end end
- def store_stats_es(result_root : String, job_id : String) + def result_post_processing(job_id : String, result_root : String, queue_path : String) + return nil unless result_root && File.exists?(result_root) + + # extract stats.json + system "#{ENV["CCI_SRC"]}/sbin/result2stats #{result_root}" + # storage stats to job in es + store_stats_es(result_root, job_id, queue_path) + # send mail to submitter for job results + system "#{ENV["CCI_SRC"]}/sbin/mail-job #{job_id}" + end + + def store_stats_es(result_root : String, job_id : String, queue_path : String) stats_path = "#{result_root}/stats.json" - raise "#{stats_path} file not exists." unless File.exists?(stats_path) + unless File.exists?(stats_path) + @tq.delete_task(queue_path + "/in_process", "#{job_id}") + raise "#{stats_path} file not exists." + end
stats = File.open(stats_path) do |file| JSON.parse(file)