Signed-off-by: Cao Xueliang caoxl78320@163.com --- src/scheduler/find_job_boot.cr | 43 +++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 14 deletions(-)
diff --git a/src/scheduler/find_job_boot.cr b/src/scheduler/find_job_boot.cr index 6e8d5d1..74a27af 100644 --- a/src/scheduler/find_job_boot.cr +++ b/src/scheduler/find_job_boot.cr @@ -129,13 +129,26 @@ class Sched jobs += job.kvs end
+ ec.close + return jobs, revisions.min end
def consume_by_watch(queues, revision) ready_queues = split_ready_queues(queues) - channel = watch_queues(ready_queues, revision) - loop_handle_event(channel) + + channel = Channel(Array(Etcd::Model::WatchEvent)).new + ech = Hash(EtcdClient, Etcd::Watch::Watcher).new + ready_queues.each do |queue| + ec = EtcdClient.new + watcher = ec.watch_prefix(queue, start_revision: revision.to_i64, progress_notify: false, filters: [Etcd::Watch::Filter::NODELETE]) do |events| + channel.send(events) + end + ech[ec] = watcher + end + + watchers = start_watcher(ech) + loop_handle_event(channel, ech) end
def split_ready_queues(queues) @@ -148,25 +161,25 @@ class Sched ready_queues.uniq end
- def watch_queues(queues, revision) - puts "watch #{queues}, revision #{revision}" - channel = Channel(Array(Etcd::Model::WatchEvent)).new - queues.each do |queue| - watcher = EtcdClient.new.watch_prefix(queue, start_revision: revision.to_i64, progress_notify: false ,filters: [Etcd::Watch::Filter::NODELETE]) do |events| - channel.send(events) - end + def start_watcher(ech) + ech.each do |ec, watcher| spawn { watcher.start } Fiber.yield end - - return channel end
- def loop_handle_event(channel) + def loop_handle_event(channel, ech) while true events = channel.receive events.each do |event| - return event.kv if ready2process(event.kv) + if ready2process(event.kv) + ech.each do |ec, watcher| + watcher.stop + ec.close + end + + return event.kv + end end end end @@ -176,7 +189,9 @@ class Sched f_queue = job.key t_queue = f_queue.gsub("/ready/", "/in_process/") value = job.value - ec.move(f_queue, t_queue, value) + res = ec.move(f_queue, t_queue, value) + ec.close + return res end
def get_job_boot(host, boot_type)