before, we use taskqueue to handle extract-stats queue, after, we use etcd list-watch to handle extract-stats queue.
Signed-off-by: Cao Xueliang caoxl78320@163.com --- src/extract-stats.cr | 2 +- src/extract-stats/extract_stats.cr | 67 +++++++++++++++++++------ src/extract-stats/stats_worker.cr | 78 +++++++++--------------------- 3 files changed, 75 insertions(+), 72 deletions(-)
diff --git a/src/extract-stats.cr b/src/extract-stats.cr index 2f4b97d..0908a91 100644 --- a/src/extract-stats.cr +++ b/src/extract-stats.cr @@ -5,4 +5,4 @@ require "./extract-stats/extract_stats"
# results data post processing
-ExtractStats.in_extract_stats +ExtractStats.consume_tasks diff --git a/src/extract-stats/extract_stats.cr b/src/extract-stats/extract_stats.cr index 45310ad..4ae3e71 100644 --- a/src/extract-stats/extract_stats.cr +++ b/src/extract-stats/extract_stats.cr @@ -1,30 +1,67 @@ # SPDX-License-Identifier: MulanPSL-2.0+ # Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved.
+require "../lib/etcd_client" require "./constants" require "./stats_worker"
module ExtractStats - # Consume scheduler queue - def self.in_extract_stats - back_fill_task - STATS_WORKER_COUNT.times do - Process.fork do - consume_task + def self.consume_tasks + channel = Channel(String).new + revision = self.consume_by_list(EXTRACT_STATS_QUEUE_PATH, channel) + self.consume_by_watch(EXTRACT_STATS_QUEUE_PATH, revision, channel) + end + + def self.consume_by_list(queue, channel) + tasks, revision = self.get_history_tasks(queue) + self.handle_history_tasks(tasks, channel) + + return revision + end + + def self.consume_by_watch(queue, revision, channel) + self.watch_queue(queue, revision, channel) + self.handle_events(channel) + end + + def self.get_history_tasks(queue) + tasks = [] of Etcd::Model::Kv + range = EtcdClient.new.range_prefix(queue) + revision = range.header.not_nil!.revision + tasks += range.kvs + + return tasks, revision + end + + def self.handle_history_tasks(tasks, channel) + while true + return if tasks.empty? + + task = tasks.delete_at(0) + spawn { StatsWorker.new.handle(task.key, channel) } + + sleep 1 + end + end + + def self.watch_queue(queue, revision, channel) + watcher = EtcdClient.new.watch_prefix(queue, start_revision: revision.to_i64, filters: [Etcd::Watch::Filter::NODELETE]) do |events| + events.each do |event| + puts event + channel.send(event.kv.key) end end
- # keep main-process alive - sleep + spawn { watcher.start } + Fiber.yield end
- def self.consume_task - worker = StatsWorker.new - worker.consume_sched_queue(EXTRACT_STATS_QUEUE_PATH) - end + def self.handle_events(channel) + while true + key = channel.receive + spawn { StatsWorker.new.handle(key, channel) }
- def self.back_fill_task - worker = StatsWorker.new - worker.back_fill_task(EXTRACT_STATS_QUEUE_PATH) + sleep 1 + end end end diff --git a/src/extract-stats/stats_worker.cr b/src/extract-stats/stats_worker.cr index 44c327d..5e0100e 100644 --- a/src/extract-stats/stats_worker.cr +++ b/src/extract-stats/stats_worker.cr @@ -1,7 +1,7 @@ # SPDX-License-Identifier: MulanPSL-2.0+ # Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved.
-require "../lib/taskqueue_api" +require "../lib/etcd_client" require "../scheduler/elasticsearch_client" require "../scheduler/redis_client" require "../scheduler/constants" @@ -11,40 +11,25 @@ require "./constants.cr" class StatsWorker def initialize @es = Elasticsearch::Client.new - @tq = TaskQueueAPI.new + @etcd = EtcdClient.new @rc = RegressionClient.new end
- def consume_sched_queue(queue_path : String) - loop do - begin - job_id = get_job_id(queue_path) - if job_id # will consume the job by post-processing - job = @es.get_job_content(job_id) - result_root = job["result_root"]? - result_post_processing(job_id, result_root.to_s, queue_path) - @tq.delete_task(queue_path + "/in_process", "#{job_id}") - end - rescue e - STDERR.puts e.message - error_message = e.message - - # incase of many error message when task-queue, ES does not work - sleep(10) - @tq = TaskQueueAPI.new if error_message && error_message.includes?("3060': Connection refused") - end - end - end - - # get job_id from task-queue - def get_job_id(queue_path : String) - response = @tq.consume_task(queue_path) - if response[0] == 200 - JSON.parse(response[1].to_json)["id"].to_s - else - # will sleep 60s if no task in task-queue - sleep(60) - nil + def handle(queue_path, channel) + begin + res = @etcd.range(queue_path) + return nil if res.count == 0 + + job_id = queue_path.split("/")[-1] + job = @es.get_job_content(job_id) + result_root = job["result_root"]? + result_post_processing(job_id, result_root.to_s, queue_path) + @etcd.delete(queue_path) + rescue e + channel.send(queue_path) + STDERR.puts e.message + # incase of many error message when task-queue, ES does not work + sleep(10) end end
@@ -61,10 +46,7 @@ class StatsWorker
def store_stats_es(result_root : String, job_id : String, queue_path : String) stats_path = "#{result_root}/stats.json" - unless File.exists?(stats_path) - @tq.delete_task(queue_path + "/in_process", "#{job_id}") - raise "#{stats_path} file not exists." - end + return unless File.exists?(stats_path)
stats = File.open(stats_path) do |file| JSON.parse(file) @@ -81,7 +63,6 @@ class StatsWorker { :index => "jobs", :type => "_doc", :id => job_id, - :refresh => "wait_for", :body => {:doc => update_content}, } ) @@ -90,9 +71,10 @@ class StatsWorker unless new_error_ids.empty? sample_error_id = new_error_ids.sample STDOUT.puts "send a delimiter task: job_id is #{job_id}" - @tq.add_task(DELIMITER_TASK_QUEUE, JSON.parse({"error_id" => sample_error_id, - "job_id" => job_id, - "lab" => LAB}.to_json)) + queue = "#{DELIMITER_TASK_QUEUE}/#{job_id}" + value = {"error_id" => sample_error_id} + @etcd.put(queue, value) + msg = %({"job_id": "#{job_id}", "new_error_id": "#{sample_error_id}"}) system "echo '#{msg}'" end @@ -130,20 +112,4 @@ class StatsWorker end error_ids end - - def back_fill_task(queue_path) - redis_client = Redis::Client.new - # this queue may have leftover task_ids - queue_name = "queues/#{queue_path}/in_process" - begin - job_ids = redis_client.@client.zrange(queue_name, 0, -1) - return if job_ids.empty? - - job_ids.each do |job_id| - @tq.hand_over_task(queue_path, queue_path, job_id.to_s) - end - rescue e - STDERR.puts e.message - end - end end