- 10 hosts (mo1, ams, ams2, ro1, ca1, ca2, ca3, fr1, sony, termux) - discover-logs.sh: portable inventory (Linux/FreeBSD/Termux) - scan-anomalies.sh: ERROR/WARN/CRITICAL counts + journalctl + kubectl - run-all.sh: parallel SSH fan-out - build-summary.py: aggregates into reports/SUMMARY.md - 5 HIGH-severity findings identified on ro1 (apache scanner traffic, mount_monitor warnings)
175 lines
7.1 KiB
Python
175 lines
7.1 KiB
Python
#!/usr/bin/env python3
|
|
"""Aggregate per-host CSV inventories + anomaly text into reports/SUMMARY.md."""
|
|
from __future__ import annotations
|
|
import csv, glob, os, re, sys
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
INV_DIR = ROOT / "logs" / "inventory"
|
|
ANOM_DIR = ROOT / "anomalies"
|
|
OUT = ROOT / "reports" / "SUMMARY.md"
|
|
|
|
def human(n: int) -> str:
|
|
for unit in ("B","K","M","G","T"):
|
|
if n < 1024:
|
|
return f"{n:.0f}{unit}" if unit == "B" else f"{n:.1f}{unit}"
|
|
n /= 1024
|
|
return f"{n:.1f}P"
|
|
|
|
def load_inventory(host: str, csvpath: Path):
|
|
rows = []
|
|
if not csvpath.exists() or csvpath.stat().st_size == 0:
|
|
return rows
|
|
with csvpath.open(newline="", errors="replace") as f:
|
|
for r in csv.reader(f):
|
|
if len(r) < 4: continue
|
|
try:
|
|
rows.append((r[0], int(r[1]), r[2], r[3]))
|
|
except ValueError:
|
|
continue
|
|
return rows
|
|
|
|
ANOM_RE = re.compile(r"^(\S+)\s+errors=(\d+)\s+warns=(\d+)\s+size=(\d+)")
|
|
|
|
def parse_anomaly(host: str, txt: Path):
|
|
"""Return list of (path, errors, warns, size) and journal error count."""
|
|
findings = []
|
|
journal_err = 0
|
|
if not txt.exists():
|
|
return findings, journal_err, "missing"
|
|
body = txt.read_text(errors="replace")
|
|
if not body.strip():
|
|
return findings, journal_err, "empty (host unreachable?)"
|
|
for line in body.splitlines():
|
|
m = ANOM_RE.match(line)
|
|
if m:
|
|
findings.append((m.group(1), int(m.group(2)), int(m.group(3)), int(m.group(4))))
|
|
# crude journal error tally
|
|
in_journal = False
|
|
for line in body.splitlines():
|
|
if line.startswith("--- journalctl"):
|
|
in_journal = True; continue
|
|
if line.startswith("---") and in_journal:
|
|
break
|
|
if in_journal and line.strip():
|
|
journal_err += 1
|
|
return findings, journal_err, "ok"
|
|
|
|
def severity(errors: int, warns: int) -> str:
|
|
if errors >= 50 or warns >= 1000: return "HIGH"
|
|
if errors >= 10 or warns >= 200: return "MED"
|
|
if errors > 0 or warns > 50: return "LOW"
|
|
return "-"
|
|
|
|
def main():
|
|
hosts = sorted({p.stem for p in INV_DIR.glob("*.csv")} |
|
|
{p.stem for p in ANOM_DIR.glob("*.txt")})
|
|
out = []
|
|
out.append("# Cross-Server Log Inspection — Summary")
|
|
out.append("")
|
|
out.append(f"_Generated: {datetime.now(timezone.utc).isoformat(timespec='seconds')}_")
|
|
out.append("")
|
|
out.append("## Coverage")
|
|
out.append("")
|
|
out.append("| Host | Inventory entries | Status | Top log dirs |")
|
|
out.append("|------|-------------------:|--------|--------------|")
|
|
per_host_findings = {}
|
|
per_host_inv = {}
|
|
for h in hosts:
|
|
inv = load_inventory(h, INV_DIR / f"{h}.csv")
|
|
per_host_inv[h] = inv
|
|
findings, jerr, status = parse_anomaly(h, ANOM_DIR / f"{h}.txt")
|
|
per_host_findings[h] = (findings, jerr, status)
|
|
# top dirs by total size
|
|
dirs = {}
|
|
for path, sz, _, _ in inv:
|
|
d = "/".join(path.split("/")[:4])
|
|
dirs[d] = dirs.get(d, 0) + sz
|
|
topdirs = ", ".join(f"{d} ({human(s)})" for d, s in sorted(dirs.items(), key=lambda x:-x[1])[:3])
|
|
out.append(f"| {h} | {len(inv)} | {status} | {topdirs or '-'} |")
|
|
out.append("")
|
|
|
|
# Largest individual log files across all hosts
|
|
out.append("## Top 25 largest log files (cluster-wide)")
|
|
out.append("")
|
|
out.append("| Host | Path | Size | Mtime | Service |")
|
|
out.append("|------|------|-----:|-------|---------|")
|
|
flat = []
|
|
for h, rows in per_host_inv.items():
|
|
for path, sz, mt, svc in rows:
|
|
flat.append((h, path, sz, mt, svc))
|
|
flat.sort(key=lambda x: -x[2])
|
|
for h, p, sz, mt, svc in flat[:25]:
|
|
out.append(f"| {h} | `{p}` | {human(sz)} | {mt} | {svc} |")
|
|
out.append("")
|
|
|
|
# Anomaly findings table
|
|
out.append("## Anomalies — files with errors or excessive warnings")
|
|
out.append("")
|
|
out.append("| Host | Severity | Errors | Warns | Size | Path |")
|
|
out.append("|------|----------|-------:|------:|-----:|------|")
|
|
rows_sev = []
|
|
for h, (findings, _, _) in per_host_findings.items():
|
|
for path, e, w, sz in findings:
|
|
rows_sev.append((severity(e,w), h, e, w, sz, path))
|
|
sev_rank = {"HIGH":0, "MED":1, "LOW":2, "-":3}
|
|
rows_sev.sort(key=lambda r: (sev_rank[r[0]], -r[2], -r[3]))
|
|
for sev, h, e, w, sz, p in rows_sev:
|
|
out.append(f"| {h} | **{sev}** | {e} | {w} | {human(sz)} | `{p}` |")
|
|
if not rows_sev:
|
|
out.append("| - | - | - | - | - | _no error patterns detected in 7-day window_ |")
|
|
out.append("")
|
|
|
|
# journal error summary
|
|
out.append("## systemd journal error volume (24h)")
|
|
out.append("")
|
|
out.append("| Host | journalctl -p err lines |")
|
|
out.append("|------|------------------------:|")
|
|
for h, (_, jerr, _) in per_host_findings.items():
|
|
out.append(f"| {h} | {jerr} |")
|
|
out.append("")
|
|
|
|
# Recommendations
|
|
out.append("## Recommendations")
|
|
out.append("")
|
|
recs = []
|
|
# 1. Severity-based
|
|
high = [r for r in rows_sev if r[0] == "HIGH"]
|
|
if high:
|
|
recs.append(f"- **Investigate {len(high)} HIGH-severity log file(s) immediately** — see table above. "
|
|
"These have either ≥50 error lines or ≥1000 warning lines in the last 7 days.")
|
|
# 2. Big files
|
|
bigfiles = [r for r in flat if r[2] > 100*1024*1024]
|
|
if bigfiles:
|
|
recs.append(f"- **{len(bigfiles)} log file(s) exceed 100 MB** — consider tightening logrotate "
|
|
"(e.g. `/etc/logrotate.d/`) and/or using zstd compression. Largest: "
|
|
f"`{bigfiles[0][1]}` on {bigfiles[0][0]} at {human(bigfiles[0][2])}.")
|
|
# 3. Hosts with no inventory (likely unprivileged)
|
|
empty = [h for h, inv in per_host_inv.items() if len(inv) < 30]
|
|
if empty:
|
|
recs.append(f"- **Sparse inventories on {', '.join(empty)}** — these likely require sudo to enumerate "
|
|
"/var/log fully. Re-run discovery as root for a complete picture (the runner can be "
|
|
"extended to use `sudo -n` on Linux hosts as it already does on FreeBSD).")
|
|
# 4. journal noise
|
|
noisy = sorted(((h, j) for h, (_, j, _) in per_host_findings.items() if j > 100),
|
|
key=lambda x:-x[1])
|
|
if noisy:
|
|
h, j = noisy[0]
|
|
recs.append(f"- **journald noisiest on {h}** ({j} error lines/24h). Top drivers worth triaging: "
|
|
"check `journalctl -p err -b` for repeating units (mbsync, sudo PAM failures, etc.).")
|
|
# 5. Generic
|
|
recs.append("- Re-run `./scripts/run-all.sh` on a schedule (cron / systemd timer) and commit the diff "
|
|
"to track regressions over time.")
|
|
recs.append("- Consider centralising logs (Loki / Vector → VictoriaLogs on mo1) so this scan becomes "
|
|
"a single query rather than 10 SSH fan-outs.")
|
|
out.extend(recs)
|
|
out.append("")
|
|
|
|
OUT.parent.mkdir(parents=True, exist_ok=True)
|
|
OUT.write_text("\n".join(out))
|
|
print(f"wrote {OUT} ({len(out)} lines)")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|