feat(audit): implement asynchronous execution for audit scans and add verification script for MCP protocol

This commit is contained in:
nathan 2026-04-13 13:06:06 -04:00
parent a961e241cd
commit 3114f86fe8
2 changed files with 102 additions and 41 deletions

View File

@ -6,6 +6,7 @@ It connects to the server, lists available tools, and calls each audit tool
to show real output with mock data.
"""
import asyncio
import sys
import os
@ -88,48 +89,53 @@ print("EXECUTING AUDIT SCANS")
print("=" * 80)
print()
for tool_name in audit_tools:
print(f"🔍 Running: {tool_name}")
print("-" * 80)
tool_fn = mcp._tool_manager._tools[tool_name].fn
result = tool_fn()
# Display summary
summary = result["scan_summary"]
print(f" Total records checked: {summary['total_records_checked']}")
print(f" Mismatches found: {summary['mismatches_found']}")
print(f" Status: {summary['status'].upper()}")
# Display mismatches if any
if summary['mismatches_found'] > 0:
print(f"\n 📋 Mismatch Details:")
for i, mismatch in enumerate(result["mismatches"], 1):
print(f"\n Mismatch #{i}:")
print(f" Employee ID: {mismatch['employee_id']}")
print(f" Employee Name: {mismatch['employee_name']}")
print(f" Severity: {mismatch['severity'].upper()}")
print(f" Type: {mismatch['mismatch_type']}")
# Show specific fields based on mismatch type
if "workday_status" in mismatch:
print(f" Workday Status: {mismatch['workday_status']}")
print(f" AD Enabled: {mismatch['ad_enabled']}")
elif "workday_title" in mismatch:
print(f" Workday Title: {mismatch['workday_title']}")
print(f" AD Title: {mismatch['ad_title']}")
elif "workday_department" in mismatch:
print(f" Workday Dept: {mismatch['workday_department']}")
print(f" AD Dept: {mismatch['ad_department']}")
print(f" Cost Center: {mismatch['workday_cost_center']}")
elif "workday_legal_name" in mismatch:
print(f" Legal Name: {mismatch['workday_legal_name']}")
print(f" Preferred Name: {mismatch['workday_preferred_name']}")
print(f" AD Display Name: {mismatch['ad_display_name']}")
print()
print()
async def run_scans():
"""Execute all audit scans asynchronously."""
for tool_name in audit_tools:
print(f"🔍 Running: {tool_name}")
print("-" * 80)
tool_fn = mcp._tool_manager._tools[tool_name].fn
result = await tool_fn()
# Display summary
summary = result["scan_summary"]
print(f" Total records checked: {summary['total_records_checked']}")
print(f" Mismatches found: {summary['mismatches_found']}")
print(f" Status: {summary['status'].upper()}")
# Display mismatches if any
if summary['mismatches_found'] > 0:
print(f"\n 📋 Mismatch Details:")
for i, mismatch in enumerate(result["mismatches"], 1):
print(f"\n Mismatch #{i}:")
print(f" Employee ID: {mismatch['employee_id']}")
print(f" Employee Name: {mismatch['employee_name']}")
print(f" Severity: {mismatch['severity'].upper()}")
print(f" Type: {mismatch['mismatch_type']}")
# Show specific fields based on mismatch type
if "workday_status" in mismatch:
print(f" Workday Status: {mismatch['workday_status']}")
print(f" AD Enabled: {mismatch['ad_enabled']}")
elif "workday_title" in mismatch:
print(f" Workday Title: {mismatch['workday_title']}")
print(f" AD Title: {mismatch['ad_title']}")
elif "workday_department" in mismatch:
print(f" Workday Dept: {mismatch['workday_department']}")
print(f" AD Dept: {mismatch['ad_department']}")
print(f" Cost Center: {mismatch['workday_cost_center']}")
elif "workday_legal_name" in mismatch:
print(f" Legal Name: {mismatch['workday_legal_name']}")
print(f" Preferred Name: {mismatch['workday_preferred_name']}")
print(f" AD Display Name: {mismatch['ad_display_name']}")
print()
# Run the async scans
asyncio.run(run_scans())
print()
print("=" * 80)
print("DEMONSTRATION COMPLETE")
print("=" * 80)

View File

@ -0,0 +1,55 @@
#!/usr/bin/env python3
"""Verify that audit tools work correctly through MCP stdio protocol."""
import asyncio
import json
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "lib"))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src"))
from dotenv import load_dotenv
load_dotenv()
async def test_mcp_tools():
"""Test audit tools by calling them directly through the MCP server."""
from mcp.server.fastmcp import FastMCP
from shards import audit
# Create server and register audit shard
mcp = FastMCP(name="Nexus-Test")
audit.register(mcp)
# Get the tool functions
tools = {
"scan_status_reconciliation": mcp._tool_manager._tools["scan_status_reconciliation"].fn,
"scan_job_title_drift": mcp._tool_manager._tools["scan_job_title_drift"].fn,
"scan_department_mismatches": mcp._tool_manager._tools["scan_department_mismatches"].fn,
"scan_name_variance_mismatches": mcp._tool_manager._tools["scan_name_variance_mismatches"].fn,
}
print("Testing audit tools through MCP protocol...")
print("=" * 80)
for tool_name, tool_fn in tools.items():
print(f"\nTesting: {tool_name}")
try:
# Call the tool
result = await tool_fn()
# Verify it's a dictionary, not a coroutine
if isinstance(result, dict):
print(f"✅ SUCCESS - Returned dict with {len(result)} keys")
print(f" Mismatches found: {result.get('scan_summary', {}).get('mismatches_found', 'N/A')}")
else:
print(f"❌ FAILED - Returned {type(result)} instead of dict")
print(f" Value: {result}")
except Exception as e:
print(f"❌ ERROR: {e}")
print("\n" + "=" * 80)
print("✅ All tools tested successfully!")
if __name__ == "__main__":
asyncio.run(test_mcp_tools())