Signed-off-by: Cao Xueliang caoxl78320@163.com --- src/extract-stats/extract_stats.cr | 11 +++++++---- src/extract-stats/stats_worker.cr | 5 +++-- 2 files changed, 10 insertions(+), 6 deletions(-)
diff --git a/src/extract-stats/extract_stats.cr b/src/extract-stats/extract_stats.cr index 4ae3e71..c35d3ec 100644 --- a/src/extract-stats/extract_stats.cr +++ b/src/extract-stats/extract_stats.cr @@ -6,10 +6,13 @@ require "./constants" require "./stats_worker"
module ExtractStats + @@ec = EtcdClient.new + def self.consume_tasks channel = Channel(String).new - revision = self.consume_by_list(EXTRACT_STATS_QUEUE_PATH, channel) - self.consume_by_watch(EXTRACT_STATS_QUEUE_PATH, revision, channel) + queue = EXTRACT_STATS_QUEUE_PATH + revision = self.consume_by_list(queue, channel) + self.consume_by_watch(queue, revision, channel) end
def self.consume_by_list(queue, channel) @@ -26,7 +29,7 @@ module ExtractStats
def self.get_history_tasks(queue) tasks = [] of Etcd::Model::Kv - range = EtcdClient.new.range_prefix(queue) + range = @@ec.range_prefix(queue) revision = range.header.not_nil!.revision tasks += range.kvs
@@ -45,7 +48,7 @@ module ExtractStats end
def self.watch_queue(queue, revision, channel) - watcher = EtcdClient.new.watch_prefix(queue, start_revision: revision.to_i64, filters: [Etcd::Watch::Filter::NODELETE]) do |events| + watcher = EtcdClient.new.watch_prefix(queue, start_revision: revision.to_i64 + 1, filters: [Etcd::Watch::Filter::NODELETE]) do |events| events.each do |event| puts event channel.send(event.kv.key) diff --git a/src/extract-stats/stats_worker.cr b/src/extract-stats/stats_worker.cr index 8910951..45b75a3 100644 --- a/src/extract-stats/stats_worker.cr +++ b/src/extract-stats/stats_worker.cr @@ -1,6 +1,5 @@ # SPDX-License-Identifier: MulanPSL-2.0+ # Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved. - require "../lib/etcd_client" require "../scheduler/elasticsearch_client" require "../scheduler/redis_client" @@ -30,6 +29,8 @@ class StatsWorker STDERR.puts e.message # incase of many error message when task-queue, ES does not work sleep(10) + ensure + @etcd.close end end
@@ -71,7 +72,7 @@ class StatsWorker sample_error_id = new_error_ids.sample STDOUT.puts "send a delimiter task: job_id is #{job_id}" queue = "#{DELIMITER_TASK_QUEUE}/#{job_id}" - value = {"error_id" => sample_error_id} + value = {"job_id" => job_id, "error_id" => sample_error_id} @etcd.put(queue, value)
msg = %({"job_id": "#{job_id}", "new_error_id": "#{sample_error_id}"})