Signed-off-by: Cao Xueliang caoxl78320@163.com --- src/scheduler/find_job_boot.cr | 113 +++++++++++++++++++++++++++------ 1 file changed, 92 insertions(+), 21 deletions(-)
diff --git a/src/scheduler/find_job_boot.cr b/src/scheduler/find_job_boot.cr index 78b10ad..2596f83 100644 --- a/src/scheduler/find_job_boot.cr +++ b/src/scheduler/find_job_boot.cr @@ -69,42 +69,113 @@ class Sched
default_queues = [] of String queues_str.split(',', remove_empty: true) do |item| - default_queues << item.strip + default_queues << "sched/#{item.strip}/ready" end
- sub_queues = [] of String - default_queues.each do |queue| - # TODO: this could be high cost and should be improved in future : keys(pattern). - matched_queues = @redis.keys("#{QUEUE_NAME_BASE}/sched/#{queue}/*/ready") - next if matched_queues.empty? + return default_queues.uniq + end
- matched_queues.each do |mq| - match_data = "#{mq}".match(%r(^#{QUEUE_NAME_BASE}/sched/(#{queue}/.+)/ready$)) - sub_queues << match_data[1] if match_data + def get_job_from_queues(queues, testbox) + job = nil + etcd_job = consume_job(queues) + job_id = etcd_job.key.split("/")[-1] + puts "#{testbox} got the job #{job_id}" + if job_id + begin + job = @es.get_job(job_id.to_s) + rescue ex + @log.warn("Invalid job (id=#{job_id}) in es. Info: #{ex}") + @log.warn(ex.inspect_with_backtrace) end end
- idle_queues = [] of String - delimiter_queues = [] of String - default_queues.each do |queue| - idle_queues << "#{queue}/idle" - delimiter_queues << "#{queue}/delimiter@localhost" + if job + job.update({"testbox" => testbox}) + job.set_result_root + @log.info(%({"job_id": "#{job_id}", "result_root": "/srv#{job.result_root}", "job_state" +: "set result root"})) + set_id2job(job) end
- all_queues = delimiter_queues + rand_queues(sub_queues) + idle_queues + return job + end + + def consume_job(queues) + job, revision = consume_by_list(queues) + return job if job
- return all_queues.uniq + consume_by_watch(queues, revision) end
- def get_job_from_queues(queues, testbox) - job = nil + def consume_by_list(queues) + jobs, revision = get_history_jobs(queues) + jobs.shuffle! + while true + return nil, revision if jobs.empty? + + job = jobs.delete_at(0) + return job, revision if ready2process(job) + end + end
+ def get_history_jobs(queues) + revisions = [] of Int64 + ec = EtcdClient.new + jobs = [] of Etcd::Model::Kv queues.each do |queue| - job = prepare_job("sched/#{queue}", testbox) - return job if job + job = ec.range_prefix(queue) + revisions << job.header.not_nil!.revision + jobs += job.kvs end
- return job + return jobs, revisions.min + end + + def consume_by_watch(queues, revision) + ready_queues = split_ready_queues(queues) + channel = watch_queues(ready_queues, revision) + loop_handle_event(channel) + end + + def split_ready_queues(queues) + ready_queues = [] of String + queues.each do |queue| + tmp = queue.split("/ready")[0] + ready_queues << "#{tmp}/ready" + end + + ready_queues.uniq + end + + def watch_queues(queues, revision) + puts "watch #{queues}, revision #{revision}" + channel = Channel(Array(Etcd::Model::WatchEvent)).new + queues.each do |queue| + watcher = EtcdClient.new.watch_prefix(queue, start_revision: revision.to_i64, progress_notify: false ,filters: [Etcd::Watch::Filter::NODELETE]) do |events| + channel.send(events) + end + spawn { watcher.start } + Fiber.yield + end + + return channel + end + + def loop_handle_event(channel) + while true + events = channel.receive + events.each do |event| + return event.kv if ready2process(event.kv) + end + end + end + + def ready2process(job) + ec = EtcdClient.new + f_queue = job.key + t_queue = f_queue.gsub("/ready/", "/in_process/") + value = job.value + ec.move(f_queue, t_queue, value) end
def get_job_boot(host, boot_type)