97 lines
3.0 KiB
Python

from mcp.server.fastmcp import FastMCP
from typing import Any
from lib.data import MOCK_WORKERS
mcp = FastMCP("Workday-Sync") #Server name for MCP registration, e.g. "Workday-Sync"
@mcp.tool()
def get_worker_status(employee_id: str) -> dict[str, Any]:
"""
WIS-009: Fetch structured worker status.
This replaces the previous string-based version.
"""
worker_id = employee_id.upper()
worker = MOCK_WORKERS.get(worker_id)
# Structured Return (WIS-004 Allowlist)
if worker:
return {
"employee_id": worker_id,
"full_name": worker["name"],
"status": worker["status"],
"department": worker["dept"],
"work_email": worker["email"],
"manager_id": worker["manager_id"],
}
return {"error": f"Worker {employee_id} not found."}
@mcp.tool()
def get_worker_manager(employee_id: str) -> dict[str, Any]:
"""WIS-017 prep: resolve a worker's manager relationship from mock data."""
worker_id = employee_id.upper()
worker = MOCK_WORKERS.get(worker_id)
if not worker:
return {"error": f"Worker {employee_id} not found."}
manager_id = worker.get("manager_id", "")
if not manager_id:
return {
"employee_id": worker_id,
"employee_name": worker["name"],
"manager_id": "",
"manager_name": None,
"relationship_status": "no_manager_assigned",
}
manager = MOCK_WORKERS.get(manager_id)
if not manager:
return {
"employee_id": worker_id,
"employee_name": worker["name"],
"manager_id": manager_id,
"manager_name": None,
"relationship_status": "manager_not_found",
}
return {
"employee_id": worker_id,
"employee_name": worker["name"],
"manager_id": manager_id,
"manager_name": manager["name"],
"relationship_status": "ok",
}
@mcp.tool()
def scan_manager_mismatches() -> dict[str, Any]:
"""
WIS-017: Scan all workers and return a report of manager relationship errors.
This provides the data for WIS-020 weekly drift reporting.
"""
mismatches: list[dict[str, Any]] = []
total_scanned = 0
for emp_id, details in MOCK_WORKERS.items():
total_scanned += 1
manager_id = details.get("manager_id")
# We only care about cases where a manager is assigned but cannot be found
if manager_id and manager_id not in MOCK_WORKERS:
mismatches.append({
"employee_id": emp_id,
"employee_name": details["name"],
"invalid_manager_id": manager_id,
"error_type": "manager_not_found"
})
return {
"scan_summary": {
"total_records_checked": total_scanned,
"mismatches_found": len(mismatches),
"status": "action_required" if mismatches else "clean"
},
"mismatches": mismatches
}
if __name__ == "__main__":
mcp.run()