Signed-off-by: Li Ping 1477412247@qq.com --- src/monitoring/filter.cr | 2 +- src/monitoring/parse_serial_logs.cr | 50 ++++++++++++++++++++++++++++- 2 files changed, 50 insertions(+), 2 deletions(-)
diff --git a/src/monitoring/filter.cr b/src/monitoring/filter.cr index 39cfbb6..b663f11 100644 --- a/src/monitoring/filter.cr +++ b/src/monitoring/filter.cr @@ -56,7 +56,7 @@ class Filter msg = JSON.parse(msg.to_s).as_h? return unless msg
- @sp.save_dmesg_to_result_root(msg) + @sp.deal_serial_log(msg) @hash.keys.each do |query| if match_query(query.as_h, msg) send_msg(query, msg) diff --git a/src/monitoring/parse_serial_logs.cr b/src/monitoring/parse_serial_logs.cr index 0589523..9430d8c 100644 --- a/src/monitoring/parse_serial_logs.cr +++ b/src/monitoring/parse_serial_logs.cr @@ -4,6 +4,7 @@ require "../scheduler/elasticsearch_client" require "set" require "json" +require "../lib/mq"
# This parses dmesg in a stream of serial log, finding a number of patterns # in various places of the dmesg and take actions accordingly. @@ -40,9 +41,15 @@ class SerialParser "Restarting system", ]
+ CRASH_PATTERNS = [ + "mount.nfs: Connection timed out", + "No space left on device", + ] + def initialize @host2head = Hash(String, Array(String)).new @host2rt = Hash(String, String).new + @mq = MQClient.instance end
def host_in_msg(msg) @@ -59,6 +66,14 @@ class SerialParser end end
+ def detect_crash(msg, host) + message = msg["message"].to_s + CRASH_PATTERNS.each do |pattern| + matched = message.match(/.*(?<crash>#{pattern})/) + return matched.named_captures["crash"] unless matched.nil? + end + end + def delete_host(msg, host, signal) boundary_signal = detect_start_or_end(msg, host, signal) return unless boundary_signal @@ -67,10 +82,43 @@ class SerialParser @host2rt.delete(host) end
- def save_dmesg_to_result_root(msg) + def mq_publish(msg, host) + crash_signal = detect_crash(msg, host) + return unless crash_signal + + job_id = "" + if @host2rt.has_key?(host) + job_id = File.basename(@host2rt[host]) + end + + mq_msg = { + "job_id" => job_id, + "testbox" => host, + "time" => msg["time"]? || Time.local.to_s("%Y-%m-%dT%H:%M:%S+0800"), + "job_state" => "crash" + } + spawn mq_publish_check("job_mq", mq_msg.to_json) + end + + def mq_publish_check(queue, msg) + 3.times do + @mq.publish_confirm(queue, msg) + break + rescue e + res = @mq.reconnect + sleep 5 + end + end + + def deal_serial_log(msg) host = host_in_msg(msg) return unless host
+ mq_publish(msg, host) + save_dmesg_to_result_root(msg, host) + end + + def save_dmesg_to_result_root(msg, host) delete_host(msg, host, START_PATTERNS)
check_save = check_save_dmesg(msg, host)