use etcd list watch to submit bisect job
Signed-off-by: Cao Xueliang caoxl78320@163.com --- src/delimiter.cr | 22 ++++++++++ src/delimiter/delimiter.cr | 83 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+) create mode 100644 src/delimiter.cr create mode 100644 src/delimiter/delimiter.cr
diff --git a/src/delimiter.cr b/src/delimiter.cr new file mode 100644 index 0000000..e8de306 --- /dev/null +++ b/src/delimiter.cr @@ -0,0 +1,22 @@ +# SPDX-License-Identifier: MulanPSL-2.0+ +# Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved. +# frozen_string_literal: true + +require "yaml" +require "./delimiter/delimiter" + +def config_secrets_yaml + %x(#{ENV["CCI_SRC"]}/sbin/config_account_yaml.rb delimiter) + account = YAML.parse(File.read("#{ENV["HOME"]}/.config/compass-ci/defaults/account.yaml")) + lab = YAML.parse(File.read("#{ENV["HOME"]}/.config/compass-ci/include/lab/#{account["lab"]}.yaml")) + secrets = {"secrets" => lab} + File.open("#{ENV["HOME"]}/.config/compass-ci/defaults/secrets.yaml", "w") { |f| YAML.dump(secrets, f) } +end + +begin + config_secrets_yaml + delimiter = Delimiter.new + delimiter.consume_delimiter("delimiter") +rescue ex + puts ex +end diff --git a/src/delimiter/delimiter.cr b/src/delimiter/delimiter.cr new file mode 100644 index 0000000..c98ded9 --- /dev/null +++ b/src/delimiter/delimiter.cr @@ -0,0 +1,83 @@ +# SPDX-License-Identifier: MulanPSL-2.0+ +# Copyright (c) 2020 Huawei Technologies Co., Ltd. All rights reserved. +# frozen_string_literal: true + +require "../lib/etcd_client" + +class Delimiter + def initialize + @ec = EtcdClient.new + end + + def consume_delimiter(queue) + channel = Channel(Etcd::Model::Kv).new + revision = consume_by_list(queue, channel) + consume_by_watch(queue, revision, channel) + end + + def consume_by_list(queue, channel) + tasks, revision = get_history_tasks(queue) + handle_history_tasks(tasks, channel) + + return revision + end + + def consume_by_watch(queue, revision, channel) + watch_queue(queue, revision, channel) + handle_events(channel) + end + + def get_history_tasks(queue) + tasks = [] of Etcd::Model::Kv + range = @ec.range_prefix(queue) + revision = range.header.not_nil!.revision + tasks += range.kvs + + return tasks, revision + end + + def handle_history_tasks(tasks, channel) + loop do + return if tasks.empty? + + task = tasks.delete_at(0) + spawn { submit_bisect_job(channel, task) } + end + end + + def watch_queue(queue, revision, channel) + watcher = EtcdClient.new.watch_prefix(queue, start_revision: revision.to_i64 + 1, filters: [Etcd::Watch::Filter::NODELETE]) do |events| + events.each do |event| + channel.send(event.kv) + end + end + + spawn { watcher.start } + Fiber.yield + end + + def handle_events(channel) + loop do + task = channel.receive + spawn { submit_bisect_job(channel, task) } + end + end + + def submit_bisect_job(channel, task) + key = task.key + value = Hash(String, String).from_json(task.value.not_nil!) + begin + response = %x(#{ENV["LKP_SRC"]}/sbin/submit bad_job_id=#{value["job_id"]} error_id=#{value["error_id"].inspect} bisect.yaml queue=dc-bisect) + puts response + if /id=0/ =~ response + channel.send(task) + return + end + + @ec.delete(key) + rescue ex + puts ex + channel.send(task) + end + end +end