background:
extract-stats process is at risk of be interrupted, due to
other dependent services(task-queue, es_client) may not working properly
solution:
1. use begin ... rescue to handle potential exceptions
- when ES and task-queue not working properly, sleep 10s and visit again
- when "result_root" does not exist, skip and remove task from task-queue
2. refactor for consume_sched_queue() of stats_worker.cr
Signed-off-by: Lu Weitao <luweitaobe(a)163.com>
---
src/extract-stats/stats_worker.cr | 60 ++++++++++++++++++-------------
1 file changed, 35 insertions(+), 25 deletions(-)
diff --git a/src/extract-stats/stats_worker.cr b/src/extract-stats/stats_worker.cr
index fcba6af..2af07d4 100644
--- a/src/extract-stats/stats_worker.cr
+++ b/src/extract-stats/stats_worker.cr
@@ -18,40 +18,50 @@ class StatsWorker
def consume_sched_queue(queue_path : String)
loop do
begin
- response = @tq.consume_task(queue_path)
+ job_id = get_job_id(queue_path)
+ if job_id # will consume the job by post-processing
+ job = @es.get_job_content(job_id)
+ result_root = job["result_root"]?
+ result_post_processing(job_id, result_root.to_s, queue_path)
+ @tq.delete_task(queue_path + "/in_process", "#{job_id}")
+ end
rescue e
STDERR.puts e.message
- # incase of many error message when task-queue is busy
+ # incase of many error message when task-queue, ES does not work
sleep(10)
- next
end
+ end
+ end
- if response[0] == 200
- job_id = JSON.parse(response[1].to_json)["id"]
- job = @es.get_job_content(job_id.to_s)
- result_root = job["result_root"]
- begin
- # extract stats.json
- system "#{ENV["CCI_SRC"]}/sbin/result2stats #{result_root}"
- # storage stats to job in es
- store_stats_es(result_root.to_s, job_id.to_s) if result_root
- # send mail to submitter for job results
- system "#{ENV["CCI_SRC"]}/sbin/mail-job #{job_id}"
- rescue e
- STDERR.puts e.message
- next
- end
-
- @tq.delete_task(queue_path + "/in_process", "#{job_id}")
- else
- sleep(60)
- end
+ # get job_id from task-queue
+ def get_job_id(queue_path : String)
+ response = @tq.consume_task(queue_path)
+ if response[0] == 200
+ JSON.parse(response[1].to_json)["id"].to_s
+ else
+ # will sleep 60s if no task in task-queue
+ sleep(60)
+ nil
end
end
- def store_stats_es(result_root : String, job_id : String)
+ def result_post_processing(job_id : String, result_root : String, queue_path : String)
+ return nil unless result_root && File.exists?(result_root)
+
+ # extract stats.json
+ system "#{ENV["CCI_SRC"]}/sbin/result2stats #{result_root}"
+ # storage stats to job in es
+ store_stats_es(result_root, job_id, queue_path)
+ # send mail to submitter for job results
+ system "#{ENV["CCI_SRC"]}/sbin/mail-job #{job_id}"
+ end
+
+ def store_stats_es(result_root : String, job_id : String, queue_path : String)
stats_path = "#{result_root}/stats.json"
- raise "#{stats_path} file not exists." unless File.exists?(stats_path)
+ unless File.exists?(stats_path)
+ @tq.delete_task(queue_path + "/in_process", "#{job_id}")
+ raise "#{stats_path} file not exists."
+ end
stats = File.open(stats_path) do |file|
JSON.parse(file)
--
2.23.0