feat(nexus): implement canonical pydantic schemas for cross-system data transformation

Addresses technical debt where data objects (User, Worker, Device) were using
fragile dict access patterns instead of validated pydantic models.

- Add nexus-mcp/lib/schemas.py: Canonical domain models (CanonicalUser, CanonicalDevice,
  FieldDrift) with automatic field normalization and validation
- Add nexus-mcp/lib/adapters.py: System-specific adapters (ADUserAdapter, EntraUserAdapter,
  WorkdayWorkerAdapter) to transform native API responses into canonical format
- Update identity.py: ad_get_user, ad_search_users, entra_list_users now return
  normalized CanonicalUser objects with consistent field names
- Update workday.py: workday_list_workers, workday_get_worker return canonical format
  for seamless cross-system comparison
- Update audit.py: Refactor audit_user_drift to use type-safe _compare_users() helper
  with FieldDrift schema instead of manual dict comparisons

Benefits:
  • Type safety: IDE autocomplete, runtime validation, eliminates fragile _pick() calls
  • Consistent field names: user.job_title works across AD/Entra/Workday (was 3 different paths)
  • Automatic validation: Email normalization, status enum enforcement
  • Drift detection: Validated Bob Martinez title mismatch (AD "Sr. Software Engineer"
    vs Workday "Software Engineer")

Ref: Session goal "implement atomic, piece-at-a-time shard deployment capability"
requiring robust data contracts between systems.
This commit is contained in:
nathan 2026-04-13 10:04:20 -04:00
parent f83ab597f0
commit fe77b0f69f
5 changed files with 848 additions and 60 deletions

352
nexus-mcp/lib/adapters.py Normal file
View File

@ -0,0 +1,352 @@
"""System adapters for transforming native API responses into canonical schemas.
Each adapter class knows how to:
1. Parse the system-specific response format
2. Extract relevant fields using system-specific field names
3. Transform into the canonical pydantic model
4. Handle missing/null values gracefully
This isolates format complexity to adapter layer, keeping business logic clean.
"""
from __future__ import annotations
from datetime import datetime, timedelta
from typing import Optional, Any
from schemas import (
CanonicalUser, UserStatus,
CanonicalDevice, DeviceType, DeviceStatus,
CanonicalIncident, IncidentPriority, IncidentStatus,
CanonicalShipment, ShipmentStatus,
)
# ── Helper Functions ──────────────────────────────────────────────────────────
def _get(data: dict | None, *keys: str, default: Any = None) -> Any:
"""Safely navigate nested dict paths. Returns default if any key is missing."""
if data is None:
return default
obj = data
for key in keys:
if isinstance(obj, dict):
obj = obj.get(key)
if obj is None:
return default
else:
return default
return obj if obj is not None else default
def _parse_iso_date(value: str | None) -> Optional[datetime]:
"""Parse ISO 8601 datetime string. Returns None if parsing fails."""
if not value:
return None
try:
# Handle common formats: 2024-01-15T10:30:00Z or 2024-01-15T10:30:00.123Z
if value.endswith('Z'):
value = value[:-1] + '+00:00'
return datetime.fromisoformat(value)
except (ValueError, AttributeError):
return None
def _parse_timestamp(value: str | int | None) -> Optional[datetime]:
"""Parse Windows AD timestamp (e.g., 132876543210000000) to datetime."""
if not value:
return None
try:
# AD timestamps are 100-nanosecond intervals since 1601-01-01
if isinstance(value, str) and len(value) > 10:
timestamp_int = int(value)
windows_epoch = datetime(1601, 1, 1)
return windows_epoch + timedelta(microseconds=timestamp_int / 10)
return None
except (ValueError, TypeError):
return None
def _normalize_bool(value: Any) -> bool:
"""Normalize various boolean representations."""
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.lower() in ('true', '1', 'yes', 'active', 'enabled')
if isinstance(value, int):
return value != 0
return False
# ── Active Directory Adapters ─────────────────────────────────────────────────
class ADUserAdapter:
"""Transform Active Directory user objects into CanonicalUser."""
@staticmethod
def to_canonical(ad_user: dict) -> CanonicalUser:
"""Convert AD LDAP user dict to canonical format.
Example AD fields:
sAMAccountName, displayName, mail, department, title, manager,
employeeID, userAccountControl, lastLogonTimestamp, whenCreated
"""
# Parse account status from userAccountControl (512=enabled, 514=disabled)
uac_raw = _get(ad_user, "userAccountControl", "512")
uac = str(uac_raw) if uac_raw else "512"
# Check if disabled (514 or has bit 2 set)
is_disabled = False
try:
uac_int = int(uac)
is_disabled = uac == "514" or (uac_int & 2) != 0
except (ValueError, TypeError):
# If parsing fails, assume enabled
is_disabled = False
status = UserStatus.DISABLED if is_disabled else UserStatus.ACTIVE
# Parse manager DN to email (extract cn= from DN string)
manager_dn = _get(ad_user, "manager", "")
manager_email = None
if manager_dn and "cn=" in manager_dn.lower():
# Extract CN from "CN=John Doe,OU=Users,DC=corp,DC=com"
cn_part = manager_dn.split(',')[0].replace('CN=', '').replace('cn=', '')
# This is a simplification; in reality we'd need to look up the manager's email
manager_email = None # Would require a separate AD lookup
return CanonicalUser(
email=_get(ad_user, "mail", default=""),
employee_id=_get(ad_user, "employeeID"),
username=_get(ad_user, "sAMAccountName"),
display_name=_get(ad_user, "displayName", default="Unknown"),
first_name=_get(ad_user, "givenName"),
last_name=_get(ad_user, "sn"),
job_title=_get(ad_user, "title"),
department=_get(ad_user, "department"),
manager_email=manager_email,
office_location=_get(ad_user, "physicalDeliveryOfficeName"),
status=status,
is_enabled=not is_disabled,
last_login=_parse_iso_date(_get(ad_user, "lastLogonTimestamp")),
created_date=_parse_iso_date(_get(ad_user, "whenCreated")),
phone=_get(ad_user, "telephoneNumber"),
source_system="ActiveDirectory",
source_id=_get(ad_user, "dn"),
)
# ── Microsoft Entra ID Adapters ───────────────────────────────────────────────
class EntraUserAdapter:
"""Transform Entra ID (Azure AD) user objects into CanonicalUser."""
@staticmethod
def to_canonical(entra_user: dict) -> CanonicalUser:
"""Convert Entra Graph API user dict to canonical format.
Example Entra fields:
id, displayName, userPrincipalName, mail, jobTitle, department,
accountEnabled, createdDateTime, signInActivity
"""
# Parse status
is_enabled = _get(entra_user, "accountEnabled", default=True)
status = UserStatus.ACTIVE if is_enabled else UserStatus.DISABLED
# Parse last sign-in from signInActivity object
last_login = None
sign_in_activity = _get(entra_user, "signInActivity", default={})
if isinstance(sign_in_activity, dict):
last_login = _parse_iso_date(_get(sign_in_activity, "lastSignInDateTime"))
return CanonicalUser(
email=_get(entra_user, "mail") or _get(entra_user, "userPrincipalName", default=""),
employee_id=_get(entra_user, "employeeId"),
username=_get(entra_user, "userPrincipalName"),
display_name=_get(entra_user, "displayName", default="Unknown"),
first_name=_get(entra_user, "givenName"),
last_name=_get(entra_user, "surname"),
job_title=_get(entra_user, "jobTitle"),
department=_get(entra_user, "department"),
manager_email=None, # Would require separate Graph API call to /manager
office_location=_get(entra_user, "officeLocation"),
status=status,
is_enabled=is_enabled,
last_login=last_login,
created_date=_parse_iso_date(_get(entra_user, "createdDateTime")),
phone=_get(entra_user, "mobilePhone") or _get(entra_user, "businessPhones", 0),
source_system="Entra",
source_id=_get(entra_user, "id"),
)
# ── Workday Adapters ──────────────────────────────────────────────────────────
class WorkdayWorkerAdapter:
"""Transform Workday HCM worker objects into CanonicalUser."""
@staticmethod
def to_canonical(wd_worker: dict) -> CanonicalUser:
"""Convert Workday API worker dict to canonical format.
Example Workday structure:
id, descriptor (full name), primaryWorkEmail, employeeID,
primaryJob { jobProfile { descriptor }, businessUnit { descriptor } }
"""
# Extract job details from nested primaryJob object
primary_job = _get(wd_worker, "primaryJob", default={})
job_title = _get(primary_job, "jobProfile", "descriptor")
department = _get(primary_job, "businessUnit", "descriptor")
manager_ref = _get(primary_job, "manager", default={})
manager_email = _get(manager_ref, "primaryWorkEmail")
# Workday status from employeeStatus field
status_str = _get(wd_worker, "employeeStatus", default="Active")
if "terminated" in status_str.lower():
status = UserStatus.TERMINATED
elif "inactive" in status_str.lower():
status = UserStatus.DISABLED
else:
status = UserStatus.ACTIVE
return CanonicalUser(
email=_get(wd_worker, "primaryWorkEmail", default=""),
employee_id=_get(wd_worker, "employeeID"),
username=None, # Workday doesn't have username concept
display_name=_get(wd_worker, "descriptor", default="Unknown"),
first_name=_get(wd_worker, "firstName"),
last_name=_get(wd_worker, "lastName"),
job_title=job_title,
department=department,
manager_email=manager_email,
office_location=_get(wd_worker, "location", "descriptor"),
status=status,
is_enabled=status == UserStatus.ACTIVE,
last_login=None, # Workday doesn't track login times
created_date=_parse_iso_date(_get(wd_worker, "hireDate")),
phone=_get(wd_worker, "primaryWorkPhone"),
source_system="Workday",
source_id=_get(wd_worker, "id"),
)
# ── Intune Adapters ───────────────────────────────────────────────────────────
class IntuneDeviceAdapter:
"""Transform Intune managed device objects into CanonicalDevice."""
@staticmethod
def to_canonical(intune_device: dict) -> CanonicalDevice:
"""Convert Intune Graph API device dict to canonical format.
Example Intune fields:
id, deviceName, serialNumber, operatingSystem, osVersion,
manufacturer, model, complianceState, lastSyncDateTime,
userPrincipalName, enrolledDateTime
"""
# Map Intune device types
os_name = _get(intune_device, "operatingSystem", "").lower()
if "windows" in os_name:
device_type = DeviceType.LAPTOP if "laptop" in os_name else DeviceType.DESKTOP
elif "ios" in os_name or "iphone" in os_name:
device_type = DeviceType.MOBILE
elif "ipad" in os_name:
device_type = DeviceType.TABLET
else:
device_type = DeviceType.UNKNOWN
# Parse compliance
compliance = _get(intune_device, "complianceState", "")
is_compliant = compliance.lower() == "compliant"
# Parse status
is_managed = _get(intune_device, "managementState", "") == "managed"
status = DeviceStatus.ACTIVE if is_managed else DeviceStatus.INACTIVE
return CanonicalDevice(
hostname=_get(intune_device, "deviceName", default="UNKNOWN"),
serial_number=_get(intune_device, "serialNumber"),
asset_tag=None, # Intune doesn't have asset tags
device_type=device_type,
os_name=_get(intune_device, "operatingSystem"),
os_version=_get(intune_device, "osVersion"),
manufacturer=_get(intune_device, "manufacturer"),
model=_get(intune_device, "model"),
assigned_user_email=_get(intune_device, "userPrincipalName"),
department=None, # Would need to look up user's department
location=None,
status=status,
is_compliant=is_compliant,
last_seen=_parse_iso_date(_get(intune_device, "lastSyncDateTime")),
enrollment_date=_parse_iso_date(_get(intune_device, "enrolledDateTime")),
ip_address=None, # Not available in Intune API
mac_address=_get(intune_device, "wiFiMacAddress") or _get(intune_device, "ethernetMacAddress"),
source_system="Intune",
source_id=_get(intune_device, "id"),
)
# ── Lansweeper Adapters ───────────────────────────────────────────────────────
class LansweeperAssetAdapter:
"""Transform Lansweeper asset objects into CanonicalDevice."""
@staticmethod
def to_canonical(ls_asset: dict) -> CanonicalDevice:
"""Convert Lansweeper GraphQL asset dict to canonical format.
Example Lansweeper structure:
assetBasicInfo { name, type, serialNumber, manufacturer, model },
assetCustom { location, department },
assetBasicInfo { lastSeen, firstSeen },
networkAddress { ip, mac }
"""
# Parse device type from Lansweeper type field
ls_type = _get(ls_asset, "assetBasicInfo", "type", "").lower()
if "server" in ls_type:
device_type = DeviceType.SERVER
elif "laptop" in ls_type or "notebook" in ls_type:
device_type = DeviceType.LAPTOP
elif "desktop" in ls_type or "workstation" in ls_type:
device_type = DeviceType.DESKTOP
elif "vm" in ls_type or "virtual" in ls_type:
device_type = DeviceType.VIRTUAL_MACHINE
else:
device_type = DeviceType.UNKNOWN
# Extract network info
network = _get(ls_asset, "networkAddress", {})
ip = _get(network, "ip") if isinstance(network, dict) else None
mac = _get(network, "mac") if isinstance(network, dict) else None
return CanonicalDevice(
hostname=_get(ls_asset, "assetBasicInfo", "name", default="UNKNOWN"),
serial_number=_get(ls_asset, "assetBasicInfo", "serialNumber"),
asset_tag=_get(ls_asset, "assetCustom", "assetTag"),
device_type=device_type,
os_name=_get(ls_asset, "operatingSystem", "name"),
os_version=_get(ls_asset, "operatingSystem", "version"),
manufacturer=_get(ls_asset, "assetBasicInfo", "manufacturer"),
model=_get(ls_asset, "assetBasicInfo", "model"),
assigned_user_email=_get(ls_asset, "assetCustom", "assignedUser"),
department=_get(ls_asset, "assetCustom", "department"),
location=_get(ls_asset, "assetCustom", "location"),
status=DeviceStatus.ACTIVE, # Lansweeper tracks active assets
is_compliant=None, # Not tracked by Lansweeper
last_seen=_parse_iso_date(_get(ls_asset, "assetBasicInfo", "lastSeen")),
enrollment_date=_parse_iso_date(_get(ls_asset, "assetBasicInfo", "firstSeen")),
ip_address=ip,
mac_address=mac,
source_system="Lansweeper",
source_id=_get(ls_asset, "assetBasicInfo", "assetId"),
)
# ── Export Convenience ────────────────────────────────────────────────────────
__all__ = [
"ADUserAdapter",
"EntraUserAdapter",
"WorkdayWorkerAdapter",
"IntuneDeviceAdapter",
"LansweeperAssetAdapter",
]

307
nexus-mcp/lib/schemas.py Normal file
View File

@ -0,0 +1,307 @@
"""Canonical data schemas for Nexus-MCP.
These pydantic models define normalized, system-agnostic representations
of domain objects (User, Device, Asset, Incident, etc.).
Every system adapter must transform its native API response into these
canonical formats, ensuring:
Type safety (str, int, bool validation)
Field normalization (e.g., email.lower())
Consistent field names across all systems
Clear validation errors when data is malformed
This pattern prevents fragile dict access and enables compile-time safety.
"""
from __future__ import annotations
from pydantic import BaseModel, Field, field_validator
from typing import Optional
from datetime import datetime
from enum import Enum
# ── User Identity Domain ──────────────────────────────────────────────────────
class UserStatus(str, Enum):
"""User account status."""
ACTIVE = "active"
DISABLED = "disabled"
SUSPENDED = "suspended"
TERMINATED = "terminated"
class CanonicalUser(BaseModel):
"""Normalized user object across AD, Entra, and Workday.
This is the "universal" user format. All system adapters must map
their native response to this schema.
"""
# Identity
email: str = Field(description="Primary work email (normalized to lowercase)")
employee_id: Optional[str] = Field(default=None, description="Employee ID from HR system")
username: Optional[str] = Field(default=None, description="Login username (sAMAccountName/UPN)")
# Profile
display_name: str = Field(description="Full display name")
first_name: Optional[str] = None
last_name: Optional[str] = None
# Organizational
job_title: Optional[str] = None
department: Optional[str] = None
manager_email: Optional[str] = None
office_location: Optional[str] = None
# Status
status: UserStatus = UserStatus.ACTIVE
is_enabled: bool = True
# Technical
last_login: Optional[datetime] = None
created_date: Optional[datetime] = None
phone: Optional[str] = None
# Source tracking
source_system: str = Field(description="System this data came from (AD, Entra, Workday)")
source_id: Optional[str] = Field(default=None, description="Native ID in source system")
@field_validator('email')
@classmethod
def normalize_email(cls, v: str) -> str:
"""Normalize email to lowercase for consistent comparison."""
return v.lower().strip() if v else ""
@field_validator('username')
@classmethod
def normalize_username(cls, v: Optional[str]) -> Optional[str]:
"""Normalize username to lowercase."""
return v.lower().strip() if v else None
# ── Device/Asset Domain ───────────────────────────────────────────────────────
class DeviceType(str, Enum):
"""Device categories."""
DESKTOP = "desktop"
LAPTOP = "laptop"
SERVER = "server"
MOBILE = "mobile"
TABLET = "tablet"
VIRTUAL_MACHINE = "vm"
UNKNOWN = "unknown"
class DeviceStatus(str, Enum):
"""Device operational status."""
ACTIVE = "active"
INACTIVE = "inactive"
RETIRED = "retired"
PENDING = "pending"
class CanonicalDevice(BaseModel):
"""Normalized device/asset object across Intune, Lansweeper, and Helix CMDB.
Unified representation of physical/virtual compute assets.
"""
# Identity
hostname: str = Field(description="Device hostname/computer name")
serial_number: Optional[str] = Field(default=None, description="Hardware serial number")
asset_tag: Optional[str] = Field(default=None, description="Organization asset tag")
# Classification
device_type: DeviceType = DeviceType.UNKNOWN
os_name: Optional[str] = None
os_version: Optional[str] = None
manufacturer: Optional[str] = None
model: Optional[str] = None
# Assignment
assigned_user_email: Optional[str] = None
department: Optional[str] = None
location: Optional[str] = None
# Status
status: DeviceStatus = DeviceStatus.ACTIVE
is_compliant: Optional[bool] = None
last_seen: Optional[datetime] = None
enrollment_date: Optional[datetime] = None
# Network
ip_address: Optional[str] = None
mac_address: Optional[str] = None
# Source tracking
source_system: str = Field(description="System this data came from (Intune, Lansweeper, Helix)")
source_id: Optional[str] = Field(default=None, description="Native ID in source system")
@field_validator('hostname')
@classmethod
def normalize_hostname(cls, v: str) -> str:
"""Normalize hostname to uppercase."""
return v.upper().strip() if v else ""
@field_validator('assigned_user_email')
@classmethod
def normalize_email(cls, v: Optional[str]) -> Optional[str]:
"""Normalize email to lowercase."""
return v.lower().strip() if v else None
@field_validator('serial_number', 'asset_tag')
@classmethod
def normalize_identifiers(cls, v: Optional[str]) -> Optional[str]:
"""Normalize serial/asset tag to uppercase."""
return v.upper().strip() if v else None
# ── ITSM Domain ───────────────────────────────────────────────────────────────
class IncidentPriority(str, Enum):
"""Incident priority levels."""
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
class IncidentStatus(str, Enum):
"""Incident lifecycle status."""
NEW = "new"
IN_PROGRESS = "in_progress"
PENDING = "pending"
RESOLVED = "resolved"
CLOSED = "closed"
class CanonicalIncident(BaseModel):
"""Normalized incident/ticket object from Helix ITSM.
Represents service desk tickets and incidents.
"""
# Identity
incident_id: str = Field(description="Unique incident number (e.g., INC0001234)")
# Content
summary: str = Field(description="Short description/title")
description: Optional[str] = Field(default=None, description="Full incident details")
# Classification
priority: IncidentPriority = IncidentPriority.MEDIUM
status: IncidentStatus = IncidentStatus.NEW
category: Optional[str] = None
subcategory: Optional[str] = None
# Assignment
assigned_to: Optional[str] = None
assigned_group: Optional[str] = None
reported_by: Optional[str] = None
# Timestamps
created_date: datetime
updated_date: Optional[datetime] = None
resolved_date: Optional[datetime] = None
# Source tracking
source_system: str = "helix"
source_id: Optional[str] = None
# ── Logistics Domain ──────────────────────────────────────────────────────────
class ShipmentStatus(str, Enum):
"""Package shipment status."""
CREATED = "created"
PICKED_UP = "picked_up"
IN_TRANSIT = "in_transit"
OUT_FOR_DELIVERY = "out_for_delivery"
DELIVERED = "delivered"
EXCEPTION = "exception"
CANCELLED = "cancelled"
class CanonicalShipment(BaseModel):
"""Normalized shipment object from FedEx API.
Represents package tracking and delivery information.
"""
# Identity
tracking_number: str = Field(description="FedEx tracking number")
# Status
status: ShipmentStatus
status_description: Optional[str] = None
# Routing
origin_city: Optional[str] = None
origin_state: Optional[str] = None
destination_city: Optional[str] = None
destination_state: Optional[str] = None
# Recipient
recipient_name: Optional[str] = None
recipient_address: Optional[str] = None
# Timestamps
ship_date: Optional[datetime] = None
estimated_delivery: Optional[datetime] = None
actual_delivery: Optional[datetime] = None
# Package details
weight_lbs: Optional[float] = None
service_type: Optional[str] = None
# Source tracking
source_system: str = "fedex"
source_id: Optional[str] = None
# ── Audit/Drift Domain ────────────────────────────────────────────────────────
class DriftType(str, Enum):
"""Types of cross-system discrepancies."""
FIELD_MISMATCH = "field_mismatch"
MISSING_IN_SYSTEM = "missing_in_system"
STATUS_CONFLICT = "status_conflict"
STALE_DATA = "stale_data"
class FieldDrift(BaseModel):
"""Represents a single field mismatch between two systems.
Used by audit tools to report cross-system inconsistencies.
"""
drift_type: DriftType = DriftType.FIELD_MISMATCH
field_name: str = Field(description="Canonical field name (e.g., 'job_title')")
# Source A
system_a: str = Field(description="First system (e.g., 'Workday')")
value_a: Optional[str] = Field(default=None, description="Value in system A")
# Source B
system_b: str = Field(description="Second system (e.g., 'Active Directory')")
value_b: Optional[str] = Field(default=None, description="Value in system B")
# Metadata
severity: str = Field(default="medium", description="Impact level: low/medium/high")
detected_at: datetime = Field(default_factory=datetime.utcnow)
class UserDriftReport(BaseModel):
"""Aggregated drift report for a single user across all systems.
Returned by audit_user_drift() tool.
"""
email: str
systems_checked: list[str]
# System presence
workday_found: bool
ad_found: bool
entra_found: bool
# Drift summary
discrepancy_count: int
discrepancies: list[FieldDrift]
# Timestamp
audit_date: datetime = Field(default_factory=datetime.utcnow)

View File

@ -18,6 +18,8 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "lib"))
from mcp.server.fastmcp import FastMCP
import mock_data as M
from adapters import ADUserAdapter, EntraUserAdapter, WorkdayWorkerAdapter
from schemas import CanonicalUser, FieldDrift, DriftType
_USE_MOCK = os.getenv("USE_MOCK", "false").lower() == "true"
@ -81,50 +83,89 @@ def _get_intune():
# ── Internal helpers ──────────────────────────────────────────────────────────
def _norm(val: Any) -> str | None:
"""Normalize value to lowercase string for comparison."""
return str(val).strip().lower() if val is not None else None
def _drift(sys_a: str, sys_b: str, field: str, val_a: Any, val_b: Any) -> dict | None:
def _compare_field(field_name: str, sys_a: str, val_a: Any, sys_b: str, val_b: Any) -> FieldDrift | None:
"""Compare a single field between two systems, return FieldDrift if different."""
if _norm(val_a) != _norm(val_b):
return {"field": field, "system_a": sys_a, "value_a": val_a,
"system_b": sys_b, "value_b": val_b}
return FieldDrift(
drift_type=DriftType.FIELD_MISMATCH,
field_name=field_name,
system_a=sys_a,
value_a=str(val_a) if val_a is not None else None,
system_b=sys_b,
value_b=str(val_b) if val_b is not None else None,
severity="medium",
)
return None
def _pick(obj: dict | None, *keys: str) -> Any:
for k in keys:
if obj is None:
return None
obj = obj.get(k)
return obj
def _compare_users(wd_user: CanonicalUser | None, ad_user: CanonicalUser | None, entra_user: CanonicalUser | None) -> list[FieldDrift]:
"""Compare canonical user objects across systems and return list of drifts."""
drifts: list[FieldDrift] = []
# Compare Workday vs AD
if wd_user and ad_user:
for field_name, wd_val, ad_val in [
("display_name", wd_user.display_name, ad_user.display_name),
("job_title", wd_user.job_title, ad_user.job_title),
("department", wd_user.department, ad_user.department),
]:
drift = _compare_field(field_name, "Workday", wd_val, "ActiveDirectory", ad_val)
if drift:
drifts.append(drift)
def _ts() -> str:
return datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
# Compare Workday vs Entra
if wd_user and entra_user:
for field_name, wd_val, entra_val in [
("display_name", wd_user.display_name, entra_user.display_name),
("job_title", wd_user.job_title, entra_user.job_title),
("department", wd_user.department, entra_user.department),
]:
drift = _compare_field(field_name, "Workday", wd_val, "Entra", entra_val)
if drift:
drifts.append(drift)
# Compare AD vs Entra
if ad_user and entra_user:
for field_name, ad_val, entra_val in [
("display_name", ad_user.display_name, entra_user.display_name),
("job_title", ad_user.job_title, entra_user.job_title),
("department", ad_user.department, entra_user.department),
]:
drift = _compare_field(field_name, "ActiveDirectory", ad_val, "Entra", entra_val)
if drift:
drifts.append(drift)
_name, job_title, and department across all three systems using
canonical pydantic schemas for type safety and consistent field names.
def _week() -> str:
t = datetime.date.today()
return f"{t.year}-W{t.isocalendar()[1]:02d}"
Args:
email: Primary work email of the user to audit.
"""
if _USE_MOCK:
# Get raw dicts from mock data
wd_dict = M.WORKDAY_WORKERS_BY_EMAIL.get(email.lower())
ad_dict = M.AD_USERS_BY_EMAIL.get(email.lower())
entra_dict = M.ENTRA_USERS_BY_MAIL.get(email.lower())
# Transform to canonical models
wd_user = WorkdayWorkerAdapter.to_canonical(wd_dict) if wd_dict else None
ad_user = ADUserAdapter.to_canonical(ad_dict) if ad_dict else None
entra_user = EntraUserAdapter.to_canonical(entra_dict) if entra_dict else None
def _count_by(items: list[dict], key: str) -> dict[str, int]:
out: dict[str, int] = {}
for item in items:
v = str(item.get(key) or "unknown")
out[v] = out.get(v, 0) + 1
return dict(sorted(out.items(), key=lambda x: -x[1]))
def _save(report: dict, filename: str) -> str:
from config import ReportConfig
cfg = ReportConfig()
cfg.output_dir.mkdir(parents=True, exist_ok=True)
p = cfg.output_dir / filename
p.write_text(json.dumps(report, indent=2))
return str(p)
# Compare using canonical models
drifts = _compare_users(wd_user, ad_user, entra_user)
return {
"email": email,
"systems_checked": ["Workday", "ActiveDirectory", "Entra"],
"workday_found": wd_user is not None,
"ad_found": ad_user is not None,
"entra_found": entra_user is not None,
"discrepancy_count": len(drifts),
"discrepancies": [d.model_dump(mode='json') for d in drifts]
# ── Shard registration ────────────────────────────────────────────────────────
def register(mcp: FastMCP) -> None:

View File

@ -13,6 +13,8 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "lib"))
from mcp.server.fastmcp import FastMCP
import mock_data as M
from adapters import ADUserAdapter, EntraUserAdapter
from schemas import CanonicalUser
_USE_MOCK = os.getenv("USE_MOCK", "false").lower() == "true"
@ -42,29 +44,61 @@ def register(mcp: FastMCP) -> None:
@mcp.tool()
async def ad_get_user(sam_account_name: str) -> dict | None:
"""Look up an Active Directory user by their sAMAccountName (login name)."""
"""Look up an Active Directory user by their sAMAccountName (login name).
Returns a normalized user object with consistent field names across all systems.
"""
if _USE_MOCK:
return M.AD_USERS_BY_SAM.get(sam_account_name.lower())
return await asyncio.to_thread(_get_ad().get_user, sam_account_name)
ad_dict = M.AD_USERS_BY_SAM.get(sam_account_name.lower())
if not ad_dict:
return None
canonical = ADUserAdapter.to_canonical(ad_dict)
return canonical.model_dump(mode='json', exclude_none=True)
ad_dict = await asyncio.to_thread(_get_ad().get_user, sam_account_name)
if not ad_dict:
return None
canonical = ADUserAdapter.to_canonical(ad_dict)
return canonical.model_dump(mode='json', exclude_none=True)
@mcp.tool()
async def ad_get_user_by_email(email: str) -> dict | None:
"""Look up an Active Directory user by their email address."""
"""Look up an Active Directory user by their email address.
Returns a normalized user object with consistent field names across all systems.
"""
if _USE_MOCK:
return M.AD_USERS_BY_EMAIL.get(email.lower())
return await asyncio.to_thread(_get_ad().get_user_by_email, email)
ad_dict = M.AD_USERS_BY_EMAIL.get(email.lower())
if not ad_dict:
return None
canonical = ADUserAdapter.to_canonical(ad_dict)
return canonical.model_dump(mode='json', exclude_none=True)
ad_dict = await asyncio.to_thread(_get_ad().get_user_by_email, email)
if not ad_dict:
return None
canonical = ADUserAdapter.to_canonical(ad_dict)
return canonical.model_dump(mode='json', exclude_none=True)
@mcp.tool()
async def ad_search_users(query: str, limit: int = 50) -> list[dict]:
"""Search Active Directory users by display name or sAMAccountName fragment."""
"""Search Active Directory users by display name or sAMAccountName fragment.
Returns a list of normalized user objects with consistent field names.
"""
if _USE_MOCK:
q = query.lower()
matches = [
u for u in M.AD_USERS
if q in u["displayName"].lower() or q in u["sAMAccountName"].lower()
]
return matches[:limit]
return await asyncio.to_thread(_get_ad().search_users, query, limit)
# Convert to canonical format
canonical_users = [ADUserAdapter.to_canonical(u) for u in matches[:limit]]
return [u.model_dump(mode='json', exclude_none=True) for u in canonical_users]
ad_dicts = await asyncio.to_thread(_get_ad().search_users, query, limit)
canonical_users = [ADUserAdapter.to_canonical(u) for u in ad_dicts]
return [u.model_dump(mode='json', exclude_none=True) for u in canonical_users]
@mcp.tool()
async def ad_list_groups(limit: int = 200) -> list[dict]:
@ -119,9 +153,14 @@ def register(mcp: FastMCP) -> None:
@mcp.tool()
async def entra_list_users(limit: int = 100) -> list[dict]:
"""List users in Microsoft Entra ID (Azure AD)."""
"""List users in Microsoft Entra ID (Azure AD).
Returns a list of normalized user objects with consistent field names.
"""
if _USE_MOCK:
return M.ENTRA_USERS[:limit]
canonical_users = [EntraUserAdapter.to_canonical(u) for u in M.ENTRA_USERS[:limit]]
return [u.model_dump(mode='json', exclude_none=True) for u in canonical_users]
fields = (
"id,displayName,userPrincipalName,mail,jobTitle,department,"
"accountEnabled,createdDateTime,onPremisesSyncEnabled,assignedLicenses"
@ -129,18 +168,32 @@ def register(mcp: FastMCP) -> None:
data = await _get_entra().get(
"/users", params={"$select": fields, "$top": min(limit, 999)}
)
return data.get("value", [])
entra_users = data.get("value", [])
canonical_users = [EntraUserAdapter.to_canonical(u) for u in entra_users]
return [u.model_dump(mode='json', exclude_none=True) for u in canonical_users]
@mcp.tool()
async def entra_get_user(user_id_or_upn: str) -> dict | None:
"""Retrieve a single Entra ID user by object ID or UPN (user@company.com)."""
"""Retrieve a single Entra ID user by object ID or UPN (user@company.com).
Returns a normalized user object with consistent field names across all systems.
"""
if _USE_MOCK:
return (
entra_dict = (
M.ENTRA_USERS_BY_UPN.get(user_id_or_upn)
or M.ENTRA_USERS_BY_MAIL.get(user_id_or_upn)
or next((u for u in M.ENTRA_USERS if u["id"] == user_id_or_upn), None)
)
return await _get_entra().get(f"/users/{user_id_or_upn}")
if not entra_dict:
return None
canonical = EntraUserAdapter.to_canonical(entra_dict)
return canonical.model_dump(mode='json', exclude_none=True)
entra_dict = await _get_entra().get(f"/users/{user_id_or_upn}")
if not entra_dict:
return None
canonical = EntraUserAdapter.to_canonical(entra_dict)
return canonical.model_dump(mode='json', exclude_none=True)
@mcp.tool()
async def entra_list_groups(limit: int = 100) -> list[dict]:

View File

@ -12,6 +12,8 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "lib"))
from mcp.server.fastmcp import FastMCP
import mock_data as M
from adapters import WorkdayWorkerAdapter
from schemas import CanonicalUser
_USE_MOCK = os.getenv("USE_MOCK", "false").lower() == "true"
@ -32,34 +34,67 @@ def register(mcp: FastMCP) -> None:
async def workday_list_workers(limit: int = 100, offset: int = 0) -> list[dict]:
"""List workers from Workday HCM.
Returns a list of normalized user objects with consistent field names across all systems.
Args:
limit: Page size.
offset: Pagination offset.
"""
if _USE_MOCK:
return M.WORKDAY_WORKERS[offset:offset + limit]
workers = M.WORKDAY_WORKERS[offset:offset + limit]
canonical_users = [WorkdayWorkerAdapter.to_canonical(w) for w in workers]
return [u.model_dump(mode='json', exclude_none=True) for u in canonical_users]
data = await _get().get(
"/staffing/v6/workers", params={"limit": limit, "offset": offset}
)
return data.get("data", [])
workers = data.get("data", [])
canonical_users = [WorkdayWorkerAdapter.to_canonical(w) for w in workers]
return [u.model_dump(mode='json', exclude_none=True) for u in canonical_users]
@mcp.tool()
async def workday_get_worker(worker_id: str) -> dict | None:
"""Retrieve full details for a single Workday worker by their Workday ID."""
"""Retrieve full details for a single Workday worker by their Workday ID.
Returns a normalized user object with consistent field names across all systems.
"""
if _USE_MOCK:
return next((w for w in M.WORKDAY_WORKERS if w["id"] == worker_id), None)
return await _get().get(f"/staffing/v6/workers/{worker_id}")
worker = next((w for w in M.WORKDAY_WORKERS if w["id"] == worker_id), None)
if not worker:
return None
canonical = WorkdayWorkerAdapter.to_canonical(worker)
return canonical.model_dump(mode='json', exclude_none=True)
worker = await _get().get(f"/staffing/v6/workers/{worker_id}")
if not worker:
return None
canonical = WorkdayWorkerAdapter.to_canonical(worker)
return canonical.model_dump(mode='json', exclude_none=True)
@mcp.tool()
async def workday_find_worker_by_email(email: str) -> dict | None:
"""Find a Workday worker by their primary work email address."""
"""Find a Workday worker by their primary work email address.
Returns a normalized user object with consistent field names across all systems.
"""
if _USE_MOCK:
return M.WORKDAY_WORKERS_BY_EMAIL.get(email.lower())
worker = M.WORKDAY_WORKERS_BY_EMAIL.get(email.lower())
if not worker:
return None
canonical = WorkdayWorkerAdapter.to_canonical(worker)
return canonical.model_dump(mode='json', exclude_none=True)
data = await _get().get("/staffing/v6/workers", params={"limit": 500})
worker = None
for w in data.get("data", []):
if (w.get("primaryWorkEmail") or "").lower() == email.lower():
return w
worker = w
break
if not worker:
return None
canonical = WorkdayWorkerAdapter.to_canonical(worker)
return canonical.model_dump(mode='json', exclude_none=True)
@mcp.tool()
async def workday_list_positions(limit: int = 100) -> list[dict]: