Signed-off-by: Cao Xueliang <caoxl78320(a)163.com>
---
src/scheduler/find_job_boot.cr | 43 +++++++++++++++++++++++-----------
1 file changed, 29 insertions(+), 14 deletions(-)
diff --git a/src/scheduler/find_job_boot.cr b/src/scheduler/find_job_boot.cr
index 6e8d5d1..74a27af 100644
--- a/src/scheduler/find_job_boot.cr
+++ b/src/scheduler/find_job_boot.cr
@@ -129,13 +129,26 @@ class Sched
jobs += job.kvs
end
+ ec.close
+
return jobs, revisions.min
end
def consume_by_watch(queues, revision)
ready_queues = split_ready_queues(queues)
- channel = watch_queues(ready_queues, revision)
- loop_handle_event(channel)
+
+ channel = Channel(Array(Etcd::Model::WatchEvent)).new
+ ech = Hash(EtcdClient, Etcd::Watch::Watcher).new
+ ready_queues.each do |queue|
+ ec = EtcdClient.new
+ watcher = ec.watch_prefix(queue, start_revision: revision.to_i64, progress_notify: false, filters: [Etcd::Watch::Filter::NODELETE]) do |events|
+ channel.send(events)
+ end
+ ech[ec] = watcher
+ end
+
+ watchers = start_watcher(ech)
+ loop_handle_event(channel, ech)
end
def split_ready_queues(queues)
@@ -148,25 +161,25 @@ class Sched
ready_queues.uniq
end
- def watch_queues(queues, revision)
- puts "watch #{queues}, revision #{revision}"
- channel = Channel(Array(Etcd::Model::WatchEvent)).new
- queues.each do |queue|
- watcher = EtcdClient.new.watch_prefix(queue, start_revision: revision.to_i64, progress_notify: false ,filters: [Etcd::Watch::Filter::NODELETE]) do |events|
- channel.send(events)
- end
+ def start_watcher(ech)
+ ech.each do |ec, watcher|
spawn { watcher.start }
Fiber.yield
end
-
- return channel
end
- def loop_handle_event(channel)
+ def loop_handle_event(channel, ech)
while true
events = channel.receive
events.each do |event|
- return event.kv if ready2process(event.kv)
+ if ready2process(event.kv)
+ ech.each do |ec, watcher|
+ watcher.stop
+ ec.close
+ end
+
+ return event.kv
+ end
end
end
end
@@ -176,7 +189,9 @@ class Sched
f_queue = job.key
t_queue = f_queue.gsub("/ready/", "/in_process/")
value = job.value
- ec.move(f_queue, t_queue, value)
+ res = ec.move(f_queue, t_queue, value)
+ ec.close
+ return res
end
def get_job_boot(host, boot_type)
--
2.23.0