before: consume job by shuffle after: our priority is reflected in email account for now, consume job by priority and the delimiter@localhost has the highest priority
Signed-off-by: Cao Xueliang caoxl78320@163.com --- src/scheduler/find_job_boot.cr | 40 ++++++++++++++++++++++++++++++---- 1 file changed, 36 insertions(+), 4 deletions(-)
diff --git a/src/scheduler/find_job_boot.cr b/src/scheduler/find_job_boot.cr index 01b6f1d..a0e5220 100644 --- a/src/scheduler/find_job_boot.cr +++ b/src/scheduler/find_job_boot.cr @@ -110,15 +110,47 @@ class Sched
def consume_by_list(queues) jobs, revision = get_history_jobs(queues) - jobs.shuffle! - while true - return nil, revision if jobs.empty? + email_jobs = split_jobs_by_email(jobs) + loop do + return nil, revision if email_jobs.empty? + + job = pop_job_by_priority(email_jobs) + return nil, revision unless job
- job = jobs.delete_at(0) return job, revision if ready2process(job) end end
+ def split_jobs_by_email(jobs) + hash = Hash(String, Array(Etcd::Model::Kv)).new + jobs.each do |job| + key = job.key.split("/")[4] + if hash.has_key?(key) + hash[key] << job + else + hash[key] = [job] + end + end + + return hash + end + + def pop_job_by_priority(email_jobs) + delimiter = "delimiter@localhost" + if email_jobs.has_key?(delimiter) + return email_jobs[delimiter].delete_at(0) unless email_jobs[delimiter].empty? + + email_jobs.delete(delimiter) + end + + keys = rand_queues(email_jobs.keys) + keys.each do |key| + return email_jobs[key].delete_at(0) unless email_jobs[key].empty? + + email_jobs.delete(key) + end + end + def get_history_jobs(queues) revisions = [] of Int64 ec = EtcdClient.new