Signed-off-by: Cao Xueliang <caoxl78320(a)163.com>
---
src/extract-stats/extract_stats.cr | 11 +++++++----
src/extract-stats/stats_worker.cr | 5 +++--
2 files changed, 10 insertions(+), 6 deletions(-)
diff --git a/src/extract-stats/extract_stats.cr b/src/extract-stats/extract_stats.cr
index 4ae3e71..c35d3ec 100644
--- a/src/extract-stats/extract_stats.cr
+++ b/src/extract-stats/extract_stats.cr
@@ -6,10 +6,13 @@ require "./constants"
require "./stats_worker"
module ExtractStats
+ @@ec = EtcdClient.new
+
def self.consume_tasks
channel = Channel(String).new
- revision = self.consume_by_list(EXTRACT_STATS_QUEUE_PATH, channel)
- self.consume_by_watch(EXTRACT_STATS_QUEUE_PATH, revision, channel)
+ queue = EXTRACT_STATS_QUEUE_PATH
+ revision = self.consume_by_list(queue, channel)
+ self.consume_by_watch(queue, revision, channel)
end
def self.consume_by_list(queue, channel)
@@ -26,7 +29,7 @@ module ExtractStats
def self.get_history_tasks(queue)
tasks = [] of Etcd::Model::Kv
- range = EtcdClient.new.range_prefix(queue)
+ range = @@ec.range_prefix(queue)
revision = range.header.not_nil!.revision
tasks += range.kvs
@@ -45,7 +48,7 @@ module ExtractStats
end
def self.watch_queue(queue, revision, channel)
- watcher = EtcdClient.new.watch_prefix(queue, start_revision: revision.to_i64, filters: [Etcd::Watch::Filter::NODELETE]) do |events|
+ watcher = EtcdClient.new.watch_prefix(queue, start_revision: revision.to_i64 + 1, filters: [Etcd::Watch::Filter::NODELETE]) do |events|
events.each do |event|
puts event
channel.send(event.kv.key)
diff --git a/src/extract-stats/stats_worker.cr b/src/extract-stats/stats_worker.cr
index 8910951..45b75a3 100644
--- a/src/extract-stats/stats_worker.cr
+++ b/src/extract-stats/stats_worker.cr
@@ -1,6 +1,5 @@
# SPDX-License-Identifier: MulanPSL-2.0+
# Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved.
-
require "../lib/etcd_client"
require "../scheduler/elasticsearch_client"
require "../scheduler/redis_client"
@@ -30,6 +29,8 @@ class StatsWorker
STDERR.puts e.message
# incase of many error message when task-queue, ES does not work
sleep(10)
+ ensure
+ @etcd.close
end
end
@@ -71,7 +72,7 @@ class StatsWorker
sample_error_id = new_error_ids.sample
STDOUT.puts "send a delimiter task: job_id is #{job_id}"
queue = "#{DELIMITER_TASK_QUEUE}/#{job_id}"
- value = {"error_id" => sample_error_id}
+ value = {"job_id" => job_id, "error_id" => sample_error_id}
@etcd.put(queue, value)
msg = %({"job_id": "#{job_id}", "new_error_id": "#{sample_error_id}"})
--
2.23.0