before, we use taskqueue to handle extract-stats queue,
after, we use etcd list-watch to handle extract-stats queue.
Signed-off-by: Cao Xueliang <caoxl78320(a)163.com>
---
src/extract-stats.cr | 2 +-
src/extract-stats/extract_stats.cr | 67 +++++++++++++++++++------
src/extract-stats/stats_worker.cr | 78 +++++++++---------------------
3 files changed, 75 insertions(+), 72 deletions(-)
diff --git a/src/extract-stats.cr b/src/extract-stats.cr
index 2f4b97d..0908a91 100644
--- a/src/extract-stats.cr
+++ b/src/extract-stats.cr
@@ -5,4 +5,4 @@ require "./extract-stats/extract_stats"
# results data post processing
-ExtractStats.in_extract_stats
+ExtractStats.consume_tasks
diff --git a/src/extract-stats/extract_stats.cr b/src/extract-stats/extract_stats.cr
index 45310ad..4ae3e71 100644
--- a/src/extract-stats/extract_stats.cr
+++ b/src/extract-stats/extract_stats.cr
@@ -1,30 +1,67 @@
# SPDX-License-Identifier: MulanPSL-2.0+
# Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved.
+require "../lib/etcd_client"
require "./constants"
require "./stats_worker"
module ExtractStats
- # Consume scheduler queue
- def self.in_extract_stats
- back_fill_task
- STATS_WORKER_COUNT.times do
- Process.fork do
- consume_task
+ def self.consume_tasks
+ channel = Channel(String).new
+ revision = self.consume_by_list(EXTRACT_STATS_QUEUE_PATH, channel)
+ self.consume_by_watch(EXTRACT_STATS_QUEUE_PATH, revision, channel)
+ end
+
+ def self.consume_by_list(queue, channel)
+ tasks, revision = self.get_history_tasks(queue)
+ self.handle_history_tasks(tasks, channel)
+
+ return revision
+ end
+
+ def self.consume_by_watch(queue, revision, channel)
+ self.watch_queue(queue, revision, channel)
+ self.handle_events(channel)
+ end
+
+ def self.get_history_tasks(queue)
+ tasks = [] of Etcd::Model::Kv
+ range = EtcdClient.new.range_prefix(queue)
+ revision = range.header.not_nil!.revision
+ tasks += range.kvs
+
+ return tasks, revision
+ end
+
+ def self.handle_history_tasks(tasks, channel)
+ while true
+ return if tasks.empty?
+
+ task = tasks.delete_at(0)
+ spawn { StatsWorker.new.handle(task.key, channel) }
+
+ sleep 1
+ end
+ end
+
+ def self.watch_queue(queue, revision, channel)
+ watcher = EtcdClient.new.watch_prefix(queue, start_revision: revision.to_i64, filters: [Etcd::Watch::Filter::NODELETE]) do |events|
+ events.each do |event|
+ puts event
+ channel.send(event.kv.key)
end
end
- # keep main-process alive
- sleep
+ spawn { watcher.start }
+ Fiber.yield
end
- def self.consume_task
- worker = StatsWorker.new
- worker.consume_sched_queue(EXTRACT_STATS_QUEUE_PATH)
- end
+ def self.handle_events(channel)
+ while true
+ key = channel.receive
+ spawn { StatsWorker.new.handle(key, channel) }
- def self.back_fill_task
- worker = StatsWorker.new
- worker.back_fill_task(EXTRACT_STATS_QUEUE_PATH)
+ sleep 1
+ end
end
end
diff --git a/src/extract-stats/stats_worker.cr b/src/extract-stats/stats_worker.cr
index 44c327d..5e0100e 100644
--- a/src/extract-stats/stats_worker.cr
+++ b/src/extract-stats/stats_worker.cr
@@ -1,7 +1,7 @@
# SPDX-License-Identifier: MulanPSL-2.0+
# Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved.
-require "../lib/taskqueue_api"
+require "../lib/etcd_client"
require "../scheduler/elasticsearch_client"
require "../scheduler/redis_client"
require "../scheduler/constants"
@@ -11,40 +11,25 @@ require "./constants.cr"
class StatsWorker
def initialize
@es = Elasticsearch::Client.new
- @tq = TaskQueueAPI.new
+ @etcd = EtcdClient.new
@rc = RegressionClient.new
end
- def consume_sched_queue(queue_path : String)
- loop do
- begin
- job_id = get_job_id(queue_path)
- if job_id # will consume the job by post-processing
- job = @es.get_job_content(job_id)
- result_root = job["result_root"]?
- result_post_processing(job_id, result_root.to_s, queue_path)
- @tq.delete_task(queue_path + "/in_process", "#{job_id}")
- end
- rescue e
- STDERR.puts e.message
- error_message = e.message
-
- # incase of many error message when task-queue, ES does not work
- sleep(10)
- @tq = TaskQueueAPI.new if error_message && error_message.includes?("3060': Connection refused")
- end
- end
- end
-
- # get job_id from task-queue
- def get_job_id(queue_path : String)
- response = @tq.consume_task(queue_path)
- if response[0] == 200
- JSON.parse(response[1].to_json)["id"].to_s
- else
- # will sleep 60s if no task in task-queue
- sleep(60)
- nil
+ def handle(queue_path, channel)
+ begin
+ res = @etcd.range(queue_path)
+ return nil if res.count == 0
+
+ job_id = queue_path.split("/")[-1]
+ job = @es.get_job_content(job_id)
+ result_root = job["result_root"]?
+ result_post_processing(job_id, result_root.to_s, queue_path)
+ @etcd.delete(queue_path)
+ rescue e
+ channel.send(queue_path)
+ STDERR.puts e.message
+ # incase of many error message when task-queue, ES does not work
+ sleep(10)
end
end
@@ -61,10 +46,7 @@ class StatsWorker
def store_stats_es(result_root : String, job_id : String, queue_path : String)
stats_path = "#{result_root}/stats.json"
- unless File.exists?(stats_path)
- @tq.delete_task(queue_path + "/in_process", "#{job_id}")
- raise "#{stats_path} file not exists."
- end
+ return unless File.exists?(stats_path)
stats = File.open(stats_path) do |file|
JSON.parse(file)
@@ -81,7 +63,6 @@ class StatsWorker
{
:index => "jobs", :type => "_doc",
:id => job_id,
- :refresh => "wait_for",
:body => {:doc => update_content},
}
)
@@ -90,9 +71,10 @@ class StatsWorker
unless new_error_ids.empty?
sample_error_id = new_error_ids.sample
STDOUT.puts "send a delimiter task: job_id is #{job_id}"
- @tq.add_task(DELIMITER_TASK_QUEUE, JSON.parse({"error_id" => sample_error_id,
- "job_id" => job_id,
- "lab" => LAB}.to_json))
+ queue = "#{DELIMITER_TASK_QUEUE}/#{job_id}"
+ value = {"error_id" => sample_error_id}
+ @etcd.put(queue, value)
+
msg = %({"job_id": "#{job_id}", "new_error_id": "#{sample_error_id}"})
system "echo '#{msg}'"
end
@@ -130,20 +112,4 @@ class StatsWorker
end
error_ids
end
-
- def back_fill_task(queue_path)
- redis_client = Redis::Client.new
- # this queue may have leftover task_ids
- queue_name = "queues/#{queue_path}/in_process"
- begin
- job_ids = redis_client.(a)client.zrange(queue_name, 0, -1)
- return if job_ids.empty?
-
- job_ids.each do |job_id|
- @tq.hand_over_task(queue_path, queue_path, job_id.to_s)
- end
- rescue e
- STDERR.puts e.message
- end
- end
end
--
2.23.0