nexus-mcp/nexus-mcp/lib/audit_log.py
nathan 0c9aebf97a feat(nexus): implement sharded architecture
- Create nexus-mcp/ with 6-shard plugin model (identity, workday, audit, itsm, assets, logistics)
- Migrate 31 tools from legacy Identity + Workday servers into unified orchestrator
- Add feature flag control (ENABLE_*) for atomic shard deployment per Gemini design
- Implement SOC 2 audit logging with automatic PII redaction (CC7.2 / CC6.1)
- Create stub shards for ITSM, Assets, Logistics (Red status awaiting credentials)
- Add comprehensive mock data library with drift scenarios for credential-free testing
- Update README.md: reposition from Workday-MCP to Nexus-MCP as primary server
- Document installation, configuration, and shard toggling in Local-Setup.md

Architecture: Orchestrator (main.py) + Shards (src/shards/*.py) + Adapters (lib/)
enables piece-at-a-time deployment. Mock mode (USE_MOCK=true) supports full 53-tool
testing without credentials. Smoke test verified: 33 tools registered successfully.

BREAKING CHANGE: Legacy Identity/ and Workday/ servers deprecated. Users must update
Claude Desktop config to point to nexus-mcp/src/main.py. Legacy folders preserved
for reference pending verification.

Refs: WIS-006, WIS-009, WIS-014-018, Gemini conversation 2026-04-06
2026-04-13 09:20:35 -04:00

259 lines
10 KiB
Python

"""Nexus-MCP SOC 2 Audit Logger.
Implements:
CC7.2 — System Monitoring (every tool call recorded)
CC6.1 — Logical Access Controls (action_category per call)
PI1.4 — Processing Integrity: completeness (pre/post logging)
PI1.5 — Processing Integrity: accuracy (mock_mode flag)
A1.1 — Availability monitoring (latency_ms per call)
Log format: newline-delimited JSON (JSONL) — one event object per line.
The file is opened in append mode; existing lines are never modified.
That property, combined with file-system ACL controls, provides the
tamper-evident quality required by SOC 2 auditors.
Log entry schema v1:
event_id str — UUID v4; globally unique per invocation
timestamp str — ISO 8601 UTC with microsecond precision
schema_version str — "1" (increment on breaking schema changes)
source str — always "nexus-mcp"
tool str — MCP tool name, e.g. "ad_get_user"
shard str — domain: identity | workday | itsm | assets | logistics | audit
action_category str — READ | AUDIT | REPORT (CC6.1 logical access label)
args_summary dict — call arguments with sensitive values replaced by **REDACTED**
mock_mode bool — True when USE_MOCK=true (no real system data accessed)
status str — "success" | "error"
latency_ms int — wall-clock duration in milliseconds
error_type str? — exception class name; null on success
error_message str? — sanitised error text; null on success
"""
from __future__ import annotations
import json
import logging
import os
import sys
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
# ── Sensitive field redaction ─────────────────────────────────────────────────
# Any argument key whose name contains one of these substrings has its value
# replaced with **REDACTED** before the entry is written to disk.
_REDACT_SUBSTRINGS: frozenset[str] = frozenset({
"password", "passwd", "secret", "token",
"api_key", "apikey", "access_key", "private_key",
"credential", "credentials", "auth", "authorization",
"ssn", "dob", "date_of_birth", "birth",
"card", "cvv", "pin", "account_number",
})
def _should_redact(key: str) -> bool:
k = key.lower()
return any(s in k for s in _REDACT_SUBSTRINGS)
def _redact(args: dict[str, Any]) -> dict[str, Any]:
"""Return a copy of *args* with sensitive values replaced by **REDACTED**."""
out: dict[str, Any] = {}
for k, v in args.items():
if _should_redact(k):
out[k] = "**REDACTED**"
elif isinstance(v, dict):
out[k] = _redact(v)
elif isinstance(v, list) and v and isinstance(v[0], dict):
out[k] = [_redact(i) if isinstance(i, dict) else i for i in v]
else:
out[k] = v
return out
# ── Shard inference ───────────────────────────────────────────────────────────
_SHARD_PREFIXES: list[tuple[tuple[str, ...], str]] = [
(("ad_", "entra_"), "identity"),
(("workday_",), "workday"),
(("helix_",), "itsm"),
(("lansweeper_", "intune_"), "assets"),
(("fedex_",), "logistics"),
(("audit_", "generate_"), "audit"),
]
def _infer_shard(tool_name: str) -> str:
for prefixes, shard in _SHARD_PREFIXES:
if any(tool_name.startswith(p) for p in prefixes):
return shard
return "unknown"
# ── Action category inference (CC6.1) ─────────────────────────────────────────
# Logical access categories tell auditors what class of operation was performed.
_AUDIT_PREFIXES: tuple[str, ...] = ("audit_",)
_REPORT_PREFIXES: tuple[str, ...] = ("generate_",)
def _infer_action_category(tool_name: str) -> str:
if any(tool_name.startswith(p) for p in _AUDIT_PREFIXES):
return "AUDIT"
if any(tool_name.startswith(p) for p in _REPORT_PREFIXES):
return "REPORT"
return "READ"
# ── AuditLogger ───────────────────────────────────────────────────────────────
class AuditLogger:
"""Thread-safe, append-only SOC 2 audit logger.
Writes one JSONL record per tool invocation to *log_file*.
When *log_to_stderr* is True it also emits the same JSON to stderr
so the record flows into systemd / CloudWatch / SIEM without a sidecar.
"""
_instance: AuditLogger | None = None
def __init__(self, log_file: Path | None = None, log_to_stderr: bool | None = None) -> None:
if log_file is None:
from config import AuditConfig # lazy — avoids circular import at module level
cfg = AuditConfig()
log_file = cfg.log_file
if log_to_stderr is None:
log_to_stderr = cfg.log_to_stderr
self._log_file = Path(log_file)
self._log_to_stderr = bool(log_to_stderr)
self._mock_mode: bool = os.getenv("USE_MOCK", "false").lower() == "true"
self._log_file.parent.mkdir(parents=True, exist_ok=True)
self._stderr_logger = logging.getLogger("nexus.audit")
if not self._stderr_logger.handlers:
handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(logging.Formatter("%(message)s"))
self._stderr_logger.addHandler(handler)
self._stderr_logger.propagate = False
self._stderr_logger.setLevel(logging.INFO)
# ── Public API ────────────────────────────────────────────────────────────
def record(
self,
*,
event_id: str,
tool: str,
args: dict[str, Any],
status: str,
latency_ms: int,
error_type: str | None = None,
error_message: str | None = None,
) -> None:
"""Write one audit record. Called by the middleware after every tool call."""
entry = {
"event_id": event_id,
"timestamp": datetime.now(timezone.utc).isoformat(timespec="microseconds"),
"schema_version": "1",
"source": "nexus-mcp",
"tool": tool,
"shard": _infer_shard(tool),
"action_category": _infer_action_category(tool),
"args_summary": _redact(args),
"mock_mode": self._mock_mode,
"status": status,
"latency_ms": latency_ms,
"error_type": error_type,
"error_message": error_message,
}
line = json.dumps(entry, default=str)
with open(self._log_file, "a", encoding="utf-8") as fh:
fh.write(line + "\n")
if self._log_to_stderr:
self._stderr_logger.info(line)
# ── Convenience helpers ────────────────────────────────────────────────────
def record_success(self, event_id: str, tool: str, args: dict, latency_ms: int) -> None:
self.record(event_id=event_id, tool=tool, args=args,
status="success", latency_ms=latency_ms)
def record_error(self, event_id: str, tool: str, args: dict, latency_ms: int,
exc: Exception) -> None:
self.record(
event_id=event_id, tool=tool, args=args,
status="error", latency_ms=latency_ms,
error_type=type(exc).__name__,
error_message=_sanitise_error(str(exc)),
)
# ── Singleton accessor ────────────────────────────────────────────────────
@classmethod
def get(cls) -> AuditLogger:
"""Return the process-wide singleton (created on first call)."""
if cls._instance is None:
cls._instance = cls()
return cls._instance
def _sanitise_error(msg: str) -> str:
"""Strip anything that looks like a credential or token from an error string."""
import re
msg = re.sub(r"(password|secret|token|key)\s*[=:]\s*\S+",
r"\1=**REDACTED**", msg, flags=re.IGNORECASE)
return msg[:500]
# ── Audit log reader (for compliance queries) ─────────────────────────────────
def tail_audit_log(n: int = 100, log_file: Path | None = None) -> list[dict]:
"""Return the last *n* audit log entries as parsed dicts.
Useful for building internal compliance dashboards or spot-checks.
"""
if log_file is None:
from config import AuditConfig
log_file = AuditConfig().log_file
if not Path(log_file).exists():
return []
with open(log_file, encoding="utf-8") as fh:
lines = fh.readlines()
return [json.loads(l) for l in lines[-n:] if l.strip()]
def audit_log_stats(log_file: Path | None = None) -> dict:
"""Summarise the audit log — total calls, errors, tool frequency.
Returns a dict suitable for feeding into a compliance dashboard.
"""
entries = tail_audit_log(n=100_000, log_file=log_file)
if not entries:
return {"total_entries": 0}
from collections import Counter
tools = Counter(e["tool"] for e in entries)
statuses = Counter(e["status"] for e in entries)
shards = Counter(e["shard"] for e in entries)
categories = Counter(e["action_category"] for e in entries)
errors = [e for e in entries if e["status"] == "error"]
return {
"total_entries": len(entries),
"status_breakdown": dict(statuses),
"shard_breakdown": dict(shards),
"action_category_breakdown": dict(categories),
"top_10_tools": tools.most_common(10),
"error_count": len(errors),
"recent_errors": [
{"timestamp": e["timestamp"], "tool": e["tool"],
"error_type": e["error_type"], "error_message": e["error_message"]}
for e in errors[-10:]
],
"mock_mode_calls": sum(1 for e in entries if e.get("mock_mode")),
}