#!/usr/bin/env python3 """Aggregate per-host CSV inventories + anomaly text into reports/SUMMARY.md.""" from __future__ import annotations import csv, glob, os, re, sys from pathlib import Path from datetime import datetime, timezone ROOT = Path(__file__).resolve().parent.parent INV_DIR = ROOT / "logs" / "inventory" ANOM_DIR = ROOT / "anomalies" OUT = ROOT / "reports" / "SUMMARY.md" def human(n: int) -> str: for unit in ("B","K","M","G","T"): if n < 1024: return f"{n:.0f}{unit}" if unit == "B" else f"{n:.1f}{unit}" n /= 1024 return f"{n:.1f}P" def load_inventory(host: str, csvpath: Path): rows = [] if not csvpath.exists() or csvpath.stat().st_size == 0: return rows with csvpath.open(newline="", errors="replace") as f: for r in csv.reader(f): if len(r) < 4: continue try: rows.append((r[0], int(r[1]), r[2], r[3])) except ValueError: continue return rows ANOM_RE = re.compile(r"^(\S+)\s+errors=(\d+)\s+warns=(\d+)\s+size=(\d+)") def parse_anomaly(host: str, txt: Path): """Return list of (path, errors, warns, size) and journal error count.""" findings = [] journal_err = 0 if not txt.exists(): return findings, journal_err, "missing" body = txt.read_text(errors="replace") if not body.strip(): return findings, journal_err, "empty (host unreachable?)" for line in body.splitlines(): m = ANOM_RE.match(line) if m: findings.append((m.group(1), int(m.group(2)), int(m.group(3)), int(m.group(4)))) # crude journal error tally in_journal = False for line in body.splitlines(): if line.startswith("--- journalctl"): in_journal = True; continue if line.startswith("---") and in_journal: break if in_journal and line.strip(): journal_err += 1 return findings, journal_err, "ok" def severity(errors: int, warns: int) -> str: if errors >= 50 or warns >= 1000: return "HIGH" if errors >= 10 or warns >= 200: return "MED" if errors > 0 or warns > 50: return "LOW" return "-" def main(): hosts = sorted({p.stem for p in INV_DIR.glob("*.csv")} | {p.stem for p in ANOM_DIR.glob("*.txt")}) out = [] out.append("# Cross-Server Log Inspection — Summary") out.append("") out.append(f"_Generated: {datetime.now(timezone.utc).isoformat(timespec='seconds')}_") out.append("") out.append("## Coverage") out.append("") out.append("| Host | Inventory entries | Status | Top log dirs |") out.append("|------|-------------------:|--------|--------------|") per_host_findings = {} per_host_inv = {} for h in hosts: inv = load_inventory(h, INV_DIR / f"{h}.csv") per_host_inv[h] = inv findings, jerr, status = parse_anomaly(h, ANOM_DIR / f"{h}.txt") per_host_findings[h] = (findings, jerr, status) # top dirs by total size dirs = {} for path, sz, _, _ in inv: d = "/".join(path.split("/")[:4]) dirs[d] = dirs.get(d, 0) + sz topdirs = ", ".join(f"{d} ({human(s)})" for d, s in sorted(dirs.items(), key=lambda x:-x[1])[:3]) out.append(f"| {h} | {len(inv)} | {status} | {topdirs or '-'} |") out.append("") # Largest individual log files across all hosts out.append("## Top 25 largest log files (cluster-wide)") out.append("") out.append("| Host | Path | Size | Mtime | Service |") out.append("|------|------|-----:|-------|---------|") flat = [] for h, rows in per_host_inv.items(): for path, sz, mt, svc in rows: flat.append((h, path, sz, mt, svc)) flat.sort(key=lambda x: -x[2]) for h, p, sz, mt, svc in flat[:25]: out.append(f"| {h} | `{p}` | {human(sz)} | {mt} | {svc} |") out.append("") # Anomaly findings table out.append("## Anomalies — files with errors or excessive warnings") out.append("") out.append("| Host | Severity | Errors | Warns | Size | Path |") out.append("|------|----------|-------:|------:|-----:|------|") rows_sev = [] for h, (findings, _, _) in per_host_findings.items(): for path, e, w, sz in findings: rows_sev.append((severity(e,w), h, e, w, sz, path)) sev_rank = {"HIGH":0, "MED":1, "LOW":2, "-":3} rows_sev.sort(key=lambda r: (sev_rank[r[0]], -r[2], -r[3])) for sev, h, e, w, sz, p in rows_sev: out.append(f"| {h} | **{sev}** | {e} | {w} | {human(sz)} | `{p}` |") if not rows_sev: out.append("| - | - | - | - | - | _no error patterns detected in 7-day window_ |") out.append("") # journal error summary out.append("## systemd journal error volume (24h)") out.append("") out.append("| Host | journalctl -p err lines |") out.append("|------|------------------------:|") for h, (_, jerr, _) in per_host_findings.items(): out.append(f"| {h} | {jerr} |") out.append("") # Recommendations out.append("## Recommendations") out.append("") recs = [] # 1. Severity-based high = [r for r in rows_sev if r[0] == "HIGH"] if high: recs.append(f"- **Investigate {len(high)} HIGH-severity log file(s) immediately** — see table above. " "These have either ≥50 error lines or ≥1000 warning lines in the last 7 days.") # 2. Big files bigfiles = [r for r in flat if r[2] > 100*1024*1024] if bigfiles: recs.append(f"- **{len(bigfiles)} log file(s) exceed 100 MB** — consider tightening logrotate " "(e.g. `/etc/logrotate.d/`) and/or using zstd compression. Largest: " f"`{bigfiles[0][1]}` on {bigfiles[0][0]} at {human(bigfiles[0][2])}.") # 3. Hosts with no inventory (likely unprivileged) empty = [h for h, inv in per_host_inv.items() if len(inv) < 30] if empty: recs.append(f"- **Sparse inventories on {', '.join(empty)}** — these likely require sudo to enumerate " "/var/log fully. Re-run discovery as root for a complete picture (the runner can be " "extended to use `sudo -n` on Linux hosts as it already does on FreeBSD).") # 4. journal noise noisy = sorted(((h, j) for h, (_, j, _) in per_host_findings.items() if j > 100), key=lambda x:-x[1]) if noisy: h, j = noisy[0] recs.append(f"- **journald noisiest on {h}** ({j} error lines/24h). Top drivers worth triaging: " "check `journalctl -p err -b` for repeating units (mbsync, sudo PAM failures, etc.).") # 5. Generic recs.append("- Re-run `./scripts/run-all.sh` on a schedule (cron / systemd timer) and commit the diff " "to track regressions over time.") recs.append("- Consider centralising logs (Loki / Vector → VictoriaLogs on mo1) so this scan becomes " "a single query rather than 10 SSH fan-outs.") out.extend(recs) out.append("") OUT.parent.mkdir(parents=True, exist_ok=True) OUT.write_text("\n".join(out)) print(f"wrote {OUT} ({len(out)} lines)") if __name__ == "__main__": main()