feat: implement AD backend aliases and fix identity shard async calls (#3)

* docs: add comprehensive Nexus MCP test cases for identity shards

* fix: enhance Active Directory user retrieval methods and logging
This commit is contained in:
Nathan Castaldi 2026-04-15 10:44:58 -04:00 committed by GitHub
parent 0d5c921a94
commit f4ec8b1d9a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 190 additions and 8 deletions

165
TEST_CASES.md Normal file
View File

@ -0,0 +1,165 @@
# Nexus MCP Test Cases
## 🟢 Identity Shard — Active Directory
Test the core lookup tools first since those are the ones we just fixed.
### User lookups
Look up the AD user "jsmith"
<!---->
Find the Active Directory account for email "john.smith@wheels.com"
<!---->
Search for AD users matching "martinez"
### Group tools
List all Active Directory groups
<!---->
Get the members of the AD group "CN=IT-Admins,OU=Groups,DC=wheels,DC=com"
### Account hygiene (the ones we just fixed)
Show me all disabled accounts in Active Directory
<!---->
Find AD accounts that haven't logged in for 90 days
<!---->
Show stale accounts inactive for more than 60 days
***
## 🟢 Identity Shard — Microsoft Entra ID
List users in Entra ID
<!---->
Get the Entra ID user for "john.smith@wheels.com"
<!---->
List all Entra ID groups
<!---->
Show me all Entra ID service principals
<!---->
Get the Conditional Access policies from Entra ID
<!---->
Show recent sign-in logs from Entra ID
<!---->
List users flagged as risky in Entra ID Identity Protection
***
## 🟡 Workday Shard
List workers in Workday
<!---->
Get the Workday worker record for employee ID "EMP001"
<!---->
Find the Workday worker with email "john.smith@wheels.com"
<!---->
List all supervisory organizations in Workday
<!---->
Show open positions in Workday
***
## 🟡 Audit Shard (the most interesting ones)
These are your cross-system drift tools — great for confirming the full pipeline works end-to-end.
Scan for terminated workers who still have active AD accounts
<!---->
Run a job title drift scan between Workday and Active Directory
<!---->
Check for department mismatches between Workday and AD
<!---->
Scan for name variance mismatches between Workday and AD
<!---->
Show me the last 20 Nexus audit log entries
<!---->
Give me Nexus audit statistics
***
## 🔴 Stub Shards (these should return empty or stub responses, not errors)
These confirm your feature flag / holding pattern works correctly — the server should accept the call and return gracefully.
List incidents from BMC Helix
<!---->
Track FedEx shipment "449044304137821"
<!---->
List assets from Lansweeper
<!---->
Show me Intune managed devices
***
## 🧪 Suggested Test Order (most value, least noise)
Run them in this order for a clean "smoke test" progression:
| # | Command | What it validates |
| - | ----------------------------------------------------- | ---------------------------------------------- |
| 1 | `Show me all disabled accounts in Active Directory` | Fixed `query_users` path ✅ |
| 2 | `Find stale AD accounts inactive for 90 days` | Fixed `find_stale_users` rename ✅ |
| 3 | `Search for AD users matching "smith"` | Fixed `search_users_by_name` rename ✅ |
| 4 | `Find the AD user with email "john.smith@wheels.com"` | Fixed `ad_get_user_by_email` path ✅ |
| 5 | `List all Active Directory groups` | Confirms mock path + WIS-018 holding pattern ✅ |
| 6 | `Scan for terminated workers still active in AD` | Confirms cross-shard audit works ✅ |
| 7 | `Show me the last 20 Nexus audit log entries` | Confirms SOC 2 logging is active ✅ |
| 8 | `List incidents from BMC Helix` | Confirms stub shards fail gracefully ✅ |
***
## One thing to watch for
If any tool returns an **empty list `[]` that you didn't expect**, check:
* Is `USE_MOCK=true` confirmed in the MCP server output?
* Does the mock data in `mock_data.py` have entries for that tool?
If a tool **errors** instead of returning empty, that's a real bug worth capturing — paste the error here and we'll triage it.

View File

@ -6,6 +6,7 @@ Mock: Set USE_MOCK=true to use built-in sample data (no credentials needed).
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import logging
import os import os
import sys import sys
@ -16,6 +17,8 @@ import mock_data as M
from adapters import ADUserAdapter, EntraUserAdapter from adapters import ADUserAdapter, EntraUserAdapter
from schemas import CanonicalUser from schemas import CanonicalUser
logger = logging.getLogger(__name__)
_USE_MOCK = os.getenv("USE_MOCK", "false").lower() == "true" _USE_MOCK = os.getenv("USE_MOCK", "false").lower() == "true"
_ad = None _ad = None
@ -54,7 +57,7 @@ def register(mcp: FastMCP) -> None:
canonical = ADUserAdapter.to_canonical(ad_dict) canonical = ADUserAdapter.to_canonical(ad_dict)
return canonical.model_dump(mode='json', exclude_none=True) return canonical.model_dump(mode='json', exclude_none=True)
ad_dict = await asyncio.to_thread(_get_ad().get_user, sam_account_name) ad_dict = await _get_ad().get_user(sam_account_name)
if not ad_dict: if not ad_dict:
return None return None
canonical = ADUserAdapter.to_canonical(ad_dict) canonical = ADUserAdapter.to_canonical(ad_dict)
@ -73,7 +76,17 @@ def register(mcp: FastMCP) -> None:
canonical = ADUserAdapter.to_canonical(ad_dict) canonical = ADUserAdapter.to_canonical(ad_dict)
return canonical.model_dump(mode='json', exclude_none=True) return canonical.model_dump(mode='json', exclude_none=True)
ad_dict = await asyncio.to_thread(_get_ad().get_user_by_email, email) # No dedicated email lookup in backend — bounded paginated scan via query_users.
resp = await _get_ad().query_users(
fields=["username", "display_name", "first_name", "last_name",
"enabled", "ou", "description", "last_logon_utc", "email"],
page_size=500,
)
target = email.lower()
ad_dict = next(
(u for u in resp.get("items", []) if (u.get("email") or "").lower() == target),
None,
)
if not ad_dict: if not ad_dict:
return None return None
canonical = ADUserAdapter.to_canonical(ad_dict) canonical = ADUserAdapter.to_canonical(ad_dict)
@ -95,7 +108,7 @@ def register(mcp: FastMCP) -> None:
canonical_users = [ADUserAdapter.to_canonical(u) for u in matches[:limit]] canonical_users = [ADUserAdapter.to_canonical(u) for u in matches[:limit]]
return [u.model_dump(mode='json', exclude_none=True) for u in canonical_users] return [u.model_dump(mode='json', exclude_none=True) for u in canonical_users]
ad_dicts = await asyncio.to_thread(_get_ad().search_users, query, limit) ad_dicts = await _get_ad().search_users_by_name(query, limit)
canonical_users = [ADUserAdapter.to_canonical(u) for u in ad_dicts] canonical_users = [ADUserAdapter.to_canonical(u) for u in ad_dicts]
return [u.model_dump(mode='json', exclude_none=True) for u in canonical_users] return [u.model_dump(mode='json', exclude_none=True) for u in canonical_users]
@ -104,7 +117,9 @@ def register(mcp: FastMCP) -> None:
"""List all security and distribution groups in Active Directory.""" """List all security and distribution groups in Active Directory."""
if _USE_MOCK: if _USE_MOCK:
return M.AD_GROUPS[:limit] return M.AD_GROUPS[:limit]
return await asyncio.to_thread(_get_ad().get_groups, limit) # TODO: AD backend does not yet expose group enumeration (see WIS-018).
logger.warning("ad_list_groups: group enumeration not yet implemented in AD backend")
return []
@mcp.tool() @mcp.tool()
async def ad_get_group_members(group_dn: str) -> list[dict]: async def ad_get_group_members(group_dn: str) -> list[dict]:
@ -117,7 +132,8 @@ def register(mcp: FastMCP) -> None:
for u in M.AD_USERS for u in M.AD_USERS
if group_cn in (u.get("memberOf") or "").lower() if group_cn in (u.get("memberOf") or "").lower()
] ]
return await asyncio.to_thread(_get_ad().get_group_members, group_dn) usernames = await _get_ad().get_group_members(group_dn)
return [{"sAMAccountName": u} for u in usernames]
@mcp.tool() @mcp.tool()
async def ad_get_disabled_accounts() -> list[dict]: async def ad_get_disabled_accounts() -> list[dict]:
@ -127,7 +143,8 @@ def register(mcp: FastMCP) -> None:
""" """
if _USE_MOCK: if _USE_MOCK:
return [u for u in M.AD_USERS if u.get("userAccountControl") == "514"] return [u for u in M.AD_USERS if u.get("userAccountControl") == "514"]
return await asyncio.to_thread(_get_ad().get_disabled_accounts) resp = await _get_ad().query_users(filter_params={"enabled": False}, page_size=200)
return resp.get("items", [])
@mcp.tool() @mcp.tool()
async def ad_get_stale_accounts(days_inactive: int = 90) -> list[dict]: async def ad_get_stale_accounts(days_inactive: int = 90) -> list[dict]:
@ -146,7 +163,7 @@ def register(mcp: FastMCP) -> None:
if u.get("userAccountControl") != "514" if u.get("userAccountControl") != "514"
and (u.get("lastLogonTimestamp") or "9999") < cutoff and (u.get("lastLogonTimestamp") or "9999") < cutoff
] ]
return await asyncio.to_thread(_get_ad().get_stale_accounts, days_inactive) return await _get_ad().find_stale_users(days_inactive)
# ── Microsoft Entra ID ──────────────────────────────────────────────────── # ── Microsoft Entra ID ────────────────────────────────────────────────────