Signed-off-by: Li Ping <1477412247(a)qq.com>
---
src/monitoring/filter.cr | 2 +-
src/monitoring/parse_serial_logs.cr | 50 ++++++++++++++++++++++++++++-
2 files changed, 50 insertions(+), 2 deletions(-)
diff --git a/src/monitoring/filter.cr b/src/monitoring/filter.cr
index 39cfbb6..b663f11 100644
--- a/src/monitoring/filter.cr
+++ b/src/monitoring/filter.cr
@@ -56,7 +56,7 @@ class Filter
msg = JSON.parse(msg.to_s).as_h?
return unless msg
- @sp.save_dmesg_to_result_root(msg)
+ @sp.deal_serial_log(msg)
@hash.keys.each do |query|
if match_query(query.as_h, msg)
send_msg(query, msg)
diff --git a/src/monitoring/parse_serial_logs.cr b/src/monitoring/parse_serial_logs.cr
index 0589523..9430d8c 100644
--- a/src/monitoring/parse_serial_logs.cr
+++ b/src/monitoring/parse_serial_logs.cr
@@ -4,6 +4,7 @@
require "../scheduler/elasticsearch_client"
require "set"
require "json"
+require "../lib/mq"
# This parses dmesg in a stream of serial log, finding a number of patterns
# in various places of the dmesg and take actions accordingly.
@@ -40,9 +41,15 @@ class SerialParser
"Restarting system",
]
+ CRASH_PATTERNS = [
+ "mount.nfs: Connection timed out",
+ "No space left on device",
+ ]
+
def initialize
@host2head = Hash(String, Array(String)).new
@host2rt = Hash(String, String).new
+ @mq = MQClient.instance
end
def host_in_msg(msg)
@@ -59,6 +66,14 @@ class SerialParser
end
end
+ def detect_crash(msg, host)
+ message = msg["message"].to_s
+ CRASH_PATTERNS.each do |pattern|
+ matched = message.match(/.*(?<crash>#{pattern})/)
+ return matched.named_captures["crash"] unless matched.nil?
+ end
+ end
+
def delete_host(msg, host, signal)
boundary_signal = detect_start_or_end(msg, host, signal)
return unless boundary_signal
@@ -67,10 +82,43 @@ class SerialParser
@host2rt.delete(host)
end
- def save_dmesg_to_result_root(msg)
+ def mq_publish(msg, host)
+ crash_signal = detect_crash(msg, host)
+ return unless crash_signal
+
+ job_id = ""
+ if @host2rt.has_key?(host)
+ job_id = File.basename(@host2rt[host])
+ end
+
+ mq_msg = {
+ "job_id" => job_id,
+ "testbox" => host,
+ "time" => msg["time"]? || Time.local.to_s("%Y-%m-%dT%H:%M:%S+0800"),
+ "job_state" => "crash"
+ }
+ spawn mq_publish_check("job_mq", mq_msg.to_json)
+ end
+
+ def mq_publish_check(queue, msg)
+ 3.times do
+ @mq.publish_confirm(queue, msg)
+ break
+ rescue e
+ res = @mq.reconnect
+ sleep 5
+ end
+ end
+
+ def deal_serial_log(msg)
host = host_in_msg(msg)
return unless host
+ mq_publish(msg, host)
+ save_dmesg_to_result_root(msg, host)
+ end
+
+ def save_dmesg_to_result_root(msg, host)
delete_host(msg, host, START_PATTERNS)
check_save = check_save_dmesg(msg, host)
--
2.23.0