diff --git a/nexus-mcp/lib/adapters.py b/nexus-mcp/lib/adapters.py new file mode 100644 index 0000000..ffd4e27 --- /dev/null +++ b/nexus-mcp/lib/adapters.py @@ -0,0 +1,352 @@ +"""System adapters for transforming native API responses into canonical schemas. + +Each adapter class knows how to: + 1. Parse the system-specific response format + 2. Extract relevant fields using system-specific field names + 3. Transform into the canonical pydantic model + 4. Handle missing/null values gracefully + +This isolates format complexity to adapter layer, keeping business logic clean. +""" + +from __future__ import annotations +from datetime import datetime, timedelta +from typing import Optional, Any +from schemas import ( + CanonicalUser, UserStatus, + CanonicalDevice, DeviceType, DeviceStatus, + CanonicalIncident, IncidentPriority, IncidentStatus, + CanonicalShipment, ShipmentStatus, +) + + +# ── Helper Functions ────────────────────────────────────────────────────────── + +def _get(data: dict | None, *keys: str, default: Any = None) -> Any: + """Safely navigate nested dict paths. Returns default if any key is missing.""" + if data is None: + return default + + obj = data + for key in keys: + if isinstance(obj, dict): + obj = obj.get(key) + if obj is None: + return default + else: + return default + return obj if obj is not None else default + + +def _parse_iso_date(value: str | None) -> Optional[datetime]: + """Parse ISO 8601 datetime string. Returns None if parsing fails.""" + if not value: + return None + try: + # Handle common formats: 2024-01-15T10:30:00Z or 2024-01-15T10:30:00.123Z + if value.endswith('Z'): + value = value[:-1] + '+00:00' + return datetime.fromisoformat(value) + except (ValueError, AttributeError): + return None + + +def _parse_timestamp(value: str | int | None) -> Optional[datetime]: + """Parse Windows AD timestamp (e.g., 132876543210000000) to datetime.""" + if not value: + return None + try: + # AD timestamps are 100-nanosecond intervals since 1601-01-01 + if isinstance(value, str) and len(value) > 10: + timestamp_int = int(value) + windows_epoch = datetime(1601, 1, 1) + return windows_epoch + timedelta(microseconds=timestamp_int / 10) + return None + except (ValueError, TypeError): + return None + + +def _normalize_bool(value: Any) -> bool: + """Normalize various boolean representations.""" + if isinstance(value, bool): + return value + if isinstance(value, str): + return value.lower() in ('true', '1', 'yes', 'active', 'enabled') + if isinstance(value, int): + return value != 0 + return False + + +# ── Active Directory Adapters ───────────────────────────────────────────────── + +class ADUserAdapter: + """Transform Active Directory user objects into CanonicalUser.""" + + @staticmethod + def to_canonical(ad_user: dict) -> CanonicalUser: + """Convert AD LDAP user dict to canonical format. + + Example AD fields: + sAMAccountName, displayName, mail, department, title, manager, + employeeID, userAccountControl, lastLogonTimestamp, whenCreated + """ + # Parse account status from userAccountControl (512=enabled, 514=disabled) + uac_raw = _get(ad_user, "userAccountControl", "512") + uac = str(uac_raw) if uac_raw else "512" + + # Check if disabled (514 or has bit 2 set) + is_disabled = False + try: + uac_int = int(uac) + is_disabled = uac == "514" or (uac_int & 2) != 0 + except (ValueError, TypeError): + # If parsing fails, assume enabled + is_disabled = False + + status = UserStatus.DISABLED if is_disabled else UserStatus.ACTIVE + + # Parse manager DN to email (extract cn= from DN string) + manager_dn = _get(ad_user, "manager", "") + manager_email = None + if manager_dn and "cn=" in manager_dn.lower(): + # Extract CN from "CN=John Doe,OU=Users,DC=corp,DC=com" + cn_part = manager_dn.split(',')[0].replace('CN=', '').replace('cn=', '') + # This is a simplification; in reality we'd need to look up the manager's email + manager_email = None # Would require a separate AD lookup + + return CanonicalUser( + email=_get(ad_user, "mail", default=""), + employee_id=_get(ad_user, "employeeID"), + username=_get(ad_user, "sAMAccountName"), + display_name=_get(ad_user, "displayName", default="Unknown"), + first_name=_get(ad_user, "givenName"), + last_name=_get(ad_user, "sn"), + job_title=_get(ad_user, "title"), + department=_get(ad_user, "department"), + manager_email=manager_email, + office_location=_get(ad_user, "physicalDeliveryOfficeName"), + status=status, + is_enabled=not is_disabled, + last_login=_parse_iso_date(_get(ad_user, "lastLogonTimestamp")), + created_date=_parse_iso_date(_get(ad_user, "whenCreated")), + phone=_get(ad_user, "telephoneNumber"), + source_system="ActiveDirectory", + source_id=_get(ad_user, "dn"), + ) + + +# ── Microsoft Entra ID Adapters ─────────────────────────────────────────────── + +class EntraUserAdapter: + """Transform Entra ID (Azure AD) user objects into CanonicalUser.""" + + @staticmethod + def to_canonical(entra_user: dict) -> CanonicalUser: + """Convert Entra Graph API user dict to canonical format. + + Example Entra fields: + id, displayName, userPrincipalName, mail, jobTitle, department, + accountEnabled, createdDateTime, signInActivity + """ + # Parse status + is_enabled = _get(entra_user, "accountEnabled", default=True) + status = UserStatus.ACTIVE if is_enabled else UserStatus.DISABLED + + # Parse last sign-in from signInActivity object + last_login = None + sign_in_activity = _get(entra_user, "signInActivity", default={}) + if isinstance(sign_in_activity, dict): + last_login = _parse_iso_date(_get(sign_in_activity, "lastSignInDateTime")) + + return CanonicalUser( + email=_get(entra_user, "mail") or _get(entra_user, "userPrincipalName", default=""), + employee_id=_get(entra_user, "employeeId"), + username=_get(entra_user, "userPrincipalName"), + display_name=_get(entra_user, "displayName", default="Unknown"), + first_name=_get(entra_user, "givenName"), + last_name=_get(entra_user, "surname"), + job_title=_get(entra_user, "jobTitle"), + department=_get(entra_user, "department"), + manager_email=None, # Would require separate Graph API call to /manager + office_location=_get(entra_user, "officeLocation"), + status=status, + is_enabled=is_enabled, + last_login=last_login, + created_date=_parse_iso_date(_get(entra_user, "createdDateTime")), + phone=_get(entra_user, "mobilePhone") or _get(entra_user, "businessPhones", 0), + source_system="Entra", + source_id=_get(entra_user, "id"), + ) + + +# ── Workday Adapters ────────────────────────────────────────────────────────── + +class WorkdayWorkerAdapter: + """Transform Workday HCM worker objects into CanonicalUser.""" + + @staticmethod + def to_canonical(wd_worker: dict) -> CanonicalUser: + """Convert Workday API worker dict to canonical format. + + Example Workday structure: + id, descriptor (full name), primaryWorkEmail, employeeID, + primaryJob { jobProfile { descriptor }, businessUnit { descriptor } } + """ + # Extract job details from nested primaryJob object + primary_job = _get(wd_worker, "primaryJob", default={}) + job_title = _get(primary_job, "jobProfile", "descriptor") + department = _get(primary_job, "businessUnit", "descriptor") + manager_ref = _get(primary_job, "manager", default={}) + manager_email = _get(manager_ref, "primaryWorkEmail") + + # Workday status from employeeStatus field + status_str = _get(wd_worker, "employeeStatus", default="Active") + if "terminated" in status_str.lower(): + status = UserStatus.TERMINATED + elif "inactive" in status_str.lower(): + status = UserStatus.DISABLED + else: + status = UserStatus.ACTIVE + + return CanonicalUser( + email=_get(wd_worker, "primaryWorkEmail", default=""), + employee_id=_get(wd_worker, "employeeID"), + username=None, # Workday doesn't have username concept + display_name=_get(wd_worker, "descriptor", default="Unknown"), + first_name=_get(wd_worker, "firstName"), + last_name=_get(wd_worker, "lastName"), + job_title=job_title, + department=department, + manager_email=manager_email, + office_location=_get(wd_worker, "location", "descriptor"), + status=status, + is_enabled=status == UserStatus.ACTIVE, + last_login=None, # Workday doesn't track login times + created_date=_parse_iso_date(_get(wd_worker, "hireDate")), + phone=_get(wd_worker, "primaryWorkPhone"), + source_system="Workday", + source_id=_get(wd_worker, "id"), + ) + + +# ── Intune Adapters ─────────────────────────────────────────────────────────── + +class IntuneDeviceAdapter: + """Transform Intune managed device objects into CanonicalDevice.""" + + @staticmethod + def to_canonical(intune_device: dict) -> CanonicalDevice: + """Convert Intune Graph API device dict to canonical format. + + Example Intune fields: + id, deviceName, serialNumber, operatingSystem, osVersion, + manufacturer, model, complianceState, lastSyncDateTime, + userPrincipalName, enrolledDateTime + """ + # Map Intune device types + os_name = _get(intune_device, "operatingSystem", "").lower() + if "windows" in os_name: + device_type = DeviceType.LAPTOP if "laptop" in os_name else DeviceType.DESKTOP + elif "ios" in os_name or "iphone" in os_name: + device_type = DeviceType.MOBILE + elif "ipad" in os_name: + device_type = DeviceType.TABLET + else: + device_type = DeviceType.UNKNOWN + + # Parse compliance + compliance = _get(intune_device, "complianceState", "") + is_compliant = compliance.lower() == "compliant" + + # Parse status + is_managed = _get(intune_device, "managementState", "") == "managed" + status = DeviceStatus.ACTIVE if is_managed else DeviceStatus.INACTIVE + + return CanonicalDevice( + hostname=_get(intune_device, "deviceName", default="UNKNOWN"), + serial_number=_get(intune_device, "serialNumber"), + asset_tag=None, # Intune doesn't have asset tags + device_type=device_type, + os_name=_get(intune_device, "operatingSystem"), + os_version=_get(intune_device, "osVersion"), + manufacturer=_get(intune_device, "manufacturer"), + model=_get(intune_device, "model"), + assigned_user_email=_get(intune_device, "userPrincipalName"), + department=None, # Would need to look up user's department + location=None, + status=status, + is_compliant=is_compliant, + last_seen=_parse_iso_date(_get(intune_device, "lastSyncDateTime")), + enrollment_date=_parse_iso_date(_get(intune_device, "enrolledDateTime")), + ip_address=None, # Not available in Intune API + mac_address=_get(intune_device, "wiFiMacAddress") or _get(intune_device, "ethernetMacAddress"), + source_system="Intune", + source_id=_get(intune_device, "id"), + ) + + +# ── Lansweeper Adapters ─────────────────────────────────────────────────────── + +class LansweeperAssetAdapter: + """Transform Lansweeper asset objects into CanonicalDevice.""" + + @staticmethod + def to_canonical(ls_asset: dict) -> CanonicalDevice: + """Convert Lansweeper GraphQL asset dict to canonical format. + + Example Lansweeper structure: + assetBasicInfo { name, type, serialNumber, manufacturer, model }, + assetCustom { location, department }, + assetBasicInfo { lastSeen, firstSeen }, + networkAddress { ip, mac } + """ + # Parse device type from Lansweeper type field + ls_type = _get(ls_asset, "assetBasicInfo", "type", "").lower() + if "server" in ls_type: + device_type = DeviceType.SERVER + elif "laptop" in ls_type or "notebook" in ls_type: + device_type = DeviceType.LAPTOP + elif "desktop" in ls_type or "workstation" in ls_type: + device_type = DeviceType.DESKTOP + elif "vm" in ls_type or "virtual" in ls_type: + device_type = DeviceType.VIRTUAL_MACHINE + else: + device_type = DeviceType.UNKNOWN + + # Extract network info + network = _get(ls_asset, "networkAddress", {}) + ip = _get(network, "ip") if isinstance(network, dict) else None + mac = _get(network, "mac") if isinstance(network, dict) else None + + return CanonicalDevice( + hostname=_get(ls_asset, "assetBasicInfo", "name", default="UNKNOWN"), + serial_number=_get(ls_asset, "assetBasicInfo", "serialNumber"), + asset_tag=_get(ls_asset, "assetCustom", "assetTag"), + device_type=device_type, + os_name=_get(ls_asset, "operatingSystem", "name"), + os_version=_get(ls_asset, "operatingSystem", "version"), + manufacturer=_get(ls_asset, "assetBasicInfo", "manufacturer"), + model=_get(ls_asset, "assetBasicInfo", "model"), + assigned_user_email=_get(ls_asset, "assetCustom", "assignedUser"), + department=_get(ls_asset, "assetCustom", "department"), + location=_get(ls_asset, "assetCustom", "location"), + status=DeviceStatus.ACTIVE, # Lansweeper tracks active assets + is_compliant=None, # Not tracked by Lansweeper + last_seen=_parse_iso_date(_get(ls_asset, "assetBasicInfo", "lastSeen")), + enrollment_date=_parse_iso_date(_get(ls_asset, "assetBasicInfo", "firstSeen")), + ip_address=ip, + mac_address=mac, + source_system="Lansweeper", + source_id=_get(ls_asset, "assetBasicInfo", "assetId"), + ) + + +# ── Export Convenience ──────────────────────────────────────────────────────── + +__all__ = [ + "ADUserAdapter", + "EntraUserAdapter", + "WorkdayWorkerAdapter", + "IntuneDeviceAdapter", + "LansweeperAssetAdapter", +] diff --git a/nexus-mcp/lib/schemas.py b/nexus-mcp/lib/schemas.py new file mode 100644 index 0000000..5e2d70f --- /dev/null +++ b/nexus-mcp/lib/schemas.py @@ -0,0 +1,307 @@ +"""Canonical data schemas for Nexus-MCP. + +These pydantic models define normalized, system-agnostic representations +of domain objects (User, Device, Asset, Incident, etc.). + +Every system adapter must transform its native API response into these +canonical formats, ensuring: + • Type safety (str, int, bool validation) + • Field normalization (e.g., email.lower()) + • Consistent field names across all systems + • Clear validation errors when data is malformed + +This pattern prevents fragile dict access and enables compile-time safety. +""" + +from __future__ import annotations +from pydantic import BaseModel, Field, field_validator +from typing import Optional +from datetime import datetime +from enum import Enum + + +# ── User Identity Domain ────────────────────────────────────────────────────── + +class UserStatus(str, Enum): + """User account status.""" + ACTIVE = "active" + DISABLED = "disabled" + SUSPENDED = "suspended" + TERMINATED = "terminated" + + +class CanonicalUser(BaseModel): + """Normalized user object across AD, Entra, and Workday. + + This is the "universal" user format. All system adapters must map + their native response to this schema. + """ + # Identity + email: str = Field(description="Primary work email (normalized to lowercase)") + employee_id: Optional[str] = Field(default=None, description="Employee ID from HR system") + username: Optional[str] = Field(default=None, description="Login username (sAMAccountName/UPN)") + + # Profile + display_name: str = Field(description="Full display name") + first_name: Optional[str] = None + last_name: Optional[str] = None + + # Organizational + job_title: Optional[str] = None + department: Optional[str] = None + manager_email: Optional[str] = None + office_location: Optional[str] = None + + # Status + status: UserStatus = UserStatus.ACTIVE + is_enabled: bool = True + + # Technical + last_login: Optional[datetime] = None + created_date: Optional[datetime] = None + phone: Optional[str] = None + + # Source tracking + source_system: str = Field(description="System this data came from (AD, Entra, Workday)") + source_id: Optional[str] = Field(default=None, description="Native ID in source system") + + @field_validator('email') + @classmethod + def normalize_email(cls, v: str) -> str: + """Normalize email to lowercase for consistent comparison.""" + return v.lower().strip() if v else "" + + @field_validator('username') + @classmethod + def normalize_username(cls, v: Optional[str]) -> Optional[str]: + """Normalize username to lowercase.""" + return v.lower().strip() if v else None + + +# ── Device/Asset Domain ─────────────────────────────────────────────────────── + +class DeviceType(str, Enum): + """Device categories.""" + DESKTOP = "desktop" + LAPTOP = "laptop" + SERVER = "server" + MOBILE = "mobile" + TABLET = "tablet" + VIRTUAL_MACHINE = "vm" + UNKNOWN = "unknown" + + +class DeviceStatus(str, Enum): + """Device operational status.""" + ACTIVE = "active" + INACTIVE = "inactive" + RETIRED = "retired" + PENDING = "pending" + + +class CanonicalDevice(BaseModel): + """Normalized device/asset object across Intune, Lansweeper, and Helix CMDB. + + Unified representation of physical/virtual compute assets. + """ + # Identity + hostname: str = Field(description="Device hostname/computer name") + serial_number: Optional[str] = Field(default=None, description="Hardware serial number") + asset_tag: Optional[str] = Field(default=None, description="Organization asset tag") + + # Classification + device_type: DeviceType = DeviceType.UNKNOWN + os_name: Optional[str] = None + os_version: Optional[str] = None + manufacturer: Optional[str] = None + model: Optional[str] = None + + # Assignment + assigned_user_email: Optional[str] = None + department: Optional[str] = None + location: Optional[str] = None + + # Status + status: DeviceStatus = DeviceStatus.ACTIVE + is_compliant: Optional[bool] = None + last_seen: Optional[datetime] = None + enrollment_date: Optional[datetime] = None + + # Network + ip_address: Optional[str] = None + mac_address: Optional[str] = None + + # Source tracking + source_system: str = Field(description="System this data came from (Intune, Lansweeper, Helix)") + source_id: Optional[str] = Field(default=None, description="Native ID in source system") + + @field_validator('hostname') + @classmethod + def normalize_hostname(cls, v: str) -> str: + """Normalize hostname to uppercase.""" + return v.upper().strip() if v else "" + + @field_validator('assigned_user_email') + @classmethod + def normalize_email(cls, v: Optional[str]) -> Optional[str]: + """Normalize email to lowercase.""" + return v.lower().strip() if v else None + + @field_validator('serial_number', 'asset_tag') + @classmethod + def normalize_identifiers(cls, v: Optional[str]) -> Optional[str]: + """Normalize serial/asset tag to uppercase.""" + return v.upper().strip() if v else None + + +# ── ITSM Domain ─────────────────────────────────────────────────────────────── + +class IncidentPriority(str, Enum): + """Incident priority levels.""" + CRITICAL = "critical" + HIGH = "high" + MEDIUM = "medium" + LOW = "low" + + +class IncidentStatus(str, Enum): + """Incident lifecycle status.""" + NEW = "new" + IN_PROGRESS = "in_progress" + PENDING = "pending" + RESOLVED = "resolved" + CLOSED = "closed" + + +class CanonicalIncident(BaseModel): + """Normalized incident/ticket object from Helix ITSM. + + Represents service desk tickets and incidents. + """ + # Identity + incident_id: str = Field(description="Unique incident number (e.g., INC0001234)") + + # Content + summary: str = Field(description="Short description/title") + description: Optional[str] = Field(default=None, description="Full incident details") + + # Classification + priority: IncidentPriority = IncidentPriority.MEDIUM + status: IncidentStatus = IncidentStatus.NEW + category: Optional[str] = None + subcategory: Optional[str] = None + + # Assignment + assigned_to: Optional[str] = None + assigned_group: Optional[str] = None + reported_by: Optional[str] = None + + # Timestamps + created_date: datetime + updated_date: Optional[datetime] = None + resolved_date: Optional[datetime] = None + + # Source tracking + source_system: str = "helix" + source_id: Optional[str] = None + + +# ── Logistics Domain ────────────────────────────────────────────────────────── + +class ShipmentStatus(str, Enum): + """Package shipment status.""" + CREATED = "created" + PICKED_UP = "picked_up" + IN_TRANSIT = "in_transit" + OUT_FOR_DELIVERY = "out_for_delivery" + DELIVERED = "delivered" + EXCEPTION = "exception" + CANCELLED = "cancelled" + + +class CanonicalShipment(BaseModel): + """Normalized shipment object from FedEx API. + + Represents package tracking and delivery information. + """ + # Identity + tracking_number: str = Field(description="FedEx tracking number") + + # Status + status: ShipmentStatus + status_description: Optional[str] = None + + # Routing + origin_city: Optional[str] = None + origin_state: Optional[str] = None + destination_city: Optional[str] = None + destination_state: Optional[str] = None + + # Recipient + recipient_name: Optional[str] = None + recipient_address: Optional[str] = None + + # Timestamps + ship_date: Optional[datetime] = None + estimated_delivery: Optional[datetime] = None + actual_delivery: Optional[datetime] = None + + # Package details + weight_lbs: Optional[float] = None + service_type: Optional[str] = None + + # Source tracking + source_system: str = "fedex" + source_id: Optional[str] = None + + +# ── Audit/Drift Domain ──────────────────────────────────────────────────────── + +class DriftType(str, Enum): + """Types of cross-system discrepancies.""" + FIELD_MISMATCH = "field_mismatch" + MISSING_IN_SYSTEM = "missing_in_system" + STATUS_CONFLICT = "status_conflict" + STALE_DATA = "stale_data" + + +class FieldDrift(BaseModel): + """Represents a single field mismatch between two systems. + + Used by audit tools to report cross-system inconsistencies. + """ + drift_type: DriftType = DriftType.FIELD_MISMATCH + field_name: str = Field(description="Canonical field name (e.g., 'job_title')") + + # Source A + system_a: str = Field(description="First system (e.g., 'Workday')") + value_a: Optional[str] = Field(default=None, description="Value in system A") + + # Source B + system_b: str = Field(description="Second system (e.g., 'Active Directory')") + value_b: Optional[str] = Field(default=None, description="Value in system B") + + # Metadata + severity: str = Field(default="medium", description="Impact level: low/medium/high") + detected_at: datetime = Field(default_factory=datetime.utcnow) + + +class UserDriftReport(BaseModel): + """Aggregated drift report for a single user across all systems. + + Returned by audit_user_drift() tool. + """ + email: str + systems_checked: list[str] + + # System presence + workday_found: bool + ad_found: bool + entra_found: bool + + # Drift summary + discrepancy_count: int + discrepancies: list[FieldDrift] + + # Timestamp + audit_date: datetime = Field(default_factory=datetime.utcnow) diff --git a/nexus-mcp/src/shards/audit.py b/nexus-mcp/src/shards/audit.py index 7bbc850..85c39a3 100644 --- a/nexus-mcp/src/shards/audit.py +++ b/nexus-mcp/src/shards/audit.py @@ -18,6 +18,8 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "lib")) from mcp.server.fastmcp import FastMCP import mock_data as M +from adapters import ADUserAdapter, EntraUserAdapter, WorkdayWorkerAdapter +from schemas import CanonicalUser, FieldDrift, DriftType _USE_MOCK = os.getenv("USE_MOCK", "false").lower() == "true" @@ -81,50 +83,89 @@ def _get_intune(): # ── Internal helpers ────────────────────────────────────────────────────────── def _norm(val: Any) -> str | None: + """Normalize value to lowercase string for comparison.""" return str(val).strip().lower() if val is not None else None -def _drift(sys_a: str, sys_b: str, field: str, val_a: Any, val_b: Any) -> dict | None: +def _compare_field(field_name: str, sys_a: str, val_a: Any, sys_b: str, val_b: Any) -> FieldDrift | None: + """Compare a single field between two systems, return FieldDrift if different.""" if _norm(val_a) != _norm(val_b): - return {"field": field, "system_a": sys_a, "value_a": val_a, - "system_b": sys_b, "value_b": val_b} + return FieldDrift( + drift_type=DriftType.FIELD_MISMATCH, + field_name=field_name, + system_a=sys_a, + value_a=str(val_a) if val_a is not None else None, + system_b=sys_b, + value_b=str(val_b) if val_b is not None else None, + severity="medium", + ) return None -def _pick(obj: dict | None, *keys: str) -> Any: - for k in keys: - if obj is None: - return None - obj = obj.get(k) - return obj - - -def _ts() -> str: - return datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") - - -def _week() -> str: - t = datetime.date.today() - return f"{t.year}-W{t.isocalendar()[1]:02d}" - - -def _count_by(items: list[dict], key: str) -> dict[str, int]: - out: dict[str, int] = {} - for item in items: - v = str(item.get(key) or "unknown") - out[v] = out.get(v, 0) + 1 - return dict(sorted(out.items(), key=lambda x: -x[1])) - - -def _save(report: dict, filename: str) -> str: - from config import ReportConfig - cfg = ReportConfig() - cfg.output_dir.mkdir(parents=True, exist_ok=True) - p = cfg.output_dir / filename - p.write_text(json.dumps(report, indent=2)) - return str(p) - +def _compare_users(wd_user: CanonicalUser | None, ad_user: CanonicalUser | None, entra_user: CanonicalUser | None) -> list[FieldDrift]: + """Compare canonical user objects across systems and return list of drifts.""" + drifts: list[FieldDrift] = [] + + # Compare Workday vs AD + if wd_user and ad_user: + for field_name, wd_val, ad_val in [ + ("display_name", wd_user.display_name, ad_user.display_name), + ("job_title", wd_user.job_title, ad_user.job_title), + ("department", wd_user.department, ad_user.department), + ]: + drift = _compare_field(field_name, "Workday", wd_val, "ActiveDirectory", ad_val) + if drift: + drifts.append(drift) + + # Compare Workday vs Entra + if wd_user and entra_user: + for field_name, wd_val, entra_val in [ + ("display_name", wd_user.display_name, entra_user.display_name), + ("job_title", wd_user.job_title, entra_user.job_title), + ("department", wd_user.department, entra_user.department), + ]: + drift = _compare_field(field_name, "Workday", wd_val, "Entra", entra_val) + if drift: + drifts.append(drift) + + # Compare AD vs Entra + if ad_user and entra_user: + for field_name, ad_val, entra_val in [ + ("display_name", ad_user.display_name, entra_user.display_name), + ("job_title", ad_user.job_title, entra_user.job_title), + ("department", ad_user.department, entra_user.department), + ]: + drift = _compare_field(field_name, "ActiveDirectory", ad_val, "Entra", entra_val) + if drift: + drifts.append(drift) + _name, job_title, and department across all three systems using + canonical pydantic schemas for type safety and consistent field names. + Args: + email: Primary work email of the user to audit. + """ + if _USE_MOCK: + # Get raw dicts from mock data + wd_dict = M.WORKDAY_WORKERS_BY_EMAIL.get(email.lower()) + ad_dict = M.AD_USERS_BY_EMAIL.get(email.lower()) + entra_dict = M.ENTRA_USERS_BY_MAIL.get(email.lower()) + + # Transform to canonical models + wd_user = WorkdayWorkerAdapter.to_canonical(wd_dict) if wd_dict else None + ad_user = ADUserAdapter.to_canonical(ad_dict) if ad_dict else None + entra_user = EntraUserAdapter.to_canonical(entra_dict) if entra_dict else None + + # Compare using canonical models + drifts = _compare_users(wd_user, ad_user, entra_user) + + return { + "email": email, + "systems_checked": ["Workday", "ActiveDirectory", "Entra"], + "workday_found": wd_user is not None, + "ad_found": ad_user is not None, + "entra_found": entra_user is not None, + "discrepancy_count": len(drifts), + "discrepancies": [d.model_dump(mode='json') for d in drifts] # ── Shard registration ──────────────────────────────────────────────────────── def register(mcp: FastMCP) -> None: diff --git a/nexus-mcp/src/shards/identity.py b/nexus-mcp/src/shards/identity.py index 3cfa30b..1ac0c8d 100644 --- a/nexus-mcp/src/shards/identity.py +++ b/nexus-mcp/src/shards/identity.py @@ -13,6 +13,8 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "lib")) from mcp.server.fastmcp import FastMCP import mock_data as M +from adapters import ADUserAdapter, EntraUserAdapter +from schemas import CanonicalUser _USE_MOCK = os.getenv("USE_MOCK", "false").lower() == "true" @@ -42,29 +44,61 @@ def register(mcp: FastMCP) -> None: @mcp.tool() async def ad_get_user(sam_account_name: str) -> dict | None: - """Look up an Active Directory user by their sAMAccountName (login name).""" + """Look up an Active Directory user by their sAMAccountName (login name). + + Returns a normalized user object with consistent field names across all systems. + """ if _USE_MOCK: - return M.AD_USERS_BY_SAM.get(sam_account_name.lower()) - return await asyncio.to_thread(_get_ad().get_user, sam_account_name) + ad_dict = M.AD_USERS_BY_SAM.get(sam_account_name.lower()) + if not ad_dict: + return None + canonical = ADUserAdapter.to_canonical(ad_dict) + return canonical.model_dump(mode='json', exclude_none=True) + + ad_dict = await asyncio.to_thread(_get_ad().get_user, sam_account_name) + if not ad_dict: + return None + canonical = ADUserAdapter.to_canonical(ad_dict) + return canonical.model_dump(mode='json', exclude_none=True) @mcp.tool() async def ad_get_user_by_email(email: str) -> dict | None: - """Look up an Active Directory user by their email address.""" + """Look up an Active Directory user by their email address. + + Returns a normalized user object with consistent field names across all systems. + """ if _USE_MOCK: - return M.AD_USERS_BY_EMAIL.get(email.lower()) - return await asyncio.to_thread(_get_ad().get_user_by_email, email) + ad_dict = M.AD_USERS_BY_EMAIL.get(email.lower()) + if not ad_dict: + return None + canonical = ADUserAdapter.to_canonical(ad_dict) + return canonical.model_dump(mode='json', exclude_none=True) + + ad_dict = await asyncio.to_thread(_get_ad().get_user_by_email, email) + if not ad_dict: + return None + canonical = ADUserAdapter.to_canonical(ad_dict) + return canonical.model_dump(mode='json', exclude_none=True) @mcp.tool() async def ad_search_users(query: str, limit: int = 50) -> list[dict]: - """Search Active Directory users by display name or sAMAccountName fragment.""" + """Search Active Directory users by display name or sAMAccountName fragment. + + Returns a list of normalized user objects with consistent field names. + """ if _USE_MOCK: q = query.lower() matches = [ u for u in M.AD_USERS if q in u["displayName"].lower() or q in u["sAMAccountName"].lower() ] - return matches[:limit] - return await asyncio.to_thread(_get_ad().search_users, query, limit) + # Convert to canonical format + canonical_users = [ADUserAdapter.to_canonical(u) for u in matches[:limit]] + return [u.model_dump(mode='json', exclude_none=True) for u in canonical_users] + + ad_dicts = await asyncio.to_thread(_get_ad().search_users, query, limit) + canonical_users = [ADUserAdapter.to_canonical(u) for u in ad_dicts] + return [u.model_dump(mode='json', exclude_none=True) for u in canonical_users] @mcp.tool() async def ad_list_groups(limit: int = 200) -> list[dict]: @@ -119,9 +153,14 @@ def register(mcp: FastMCP) -> None: @mcp.tool() async def entra_list_users(limit: int = 100) -> list[dict]: - """List users in Microsoft Entra ID (Azure AD).""" + """List users in Microsoft Entra ID (Azure AD). + + Returns a list of normalized user objects with consistent field names. + """ if _USE_MOCK: - return M.ENTRA_USERS[:limit] + canonical_users = [EntraUserAdapter.to_canonical(u) for u in M.ENTRA_USERS[:limit]] + return [u.model_dump(mode='json', exclude_none=True) for u in canonical_users] + fields = ( "id,displayName,userPrincipalName,mail,jobTitle,department," "accountEnabled,createdDateTime,onPremisesSyncEnabled,assignedLicenses" @@ -129,18 +168,32 @@ def register(mcp: FastMCP) -> None: data = await _get_entra().get( "/users", params={"$select": fields, "$top": min(limit, 999)} ) - return data.get("value", []) + entra_users = data.get("value", []) + canonical_users = [EntraUserAdapter.to_canonical(u) for u in entra_users] + return [u.model_dump(mode='json', exclude_none=True) for u in canonical_users] @mcp.tool() async def entra_get_user(user_id_or_upn: str) -> dict | None: - """Retrieve a single Entra ID user by object ID or UPN (user@company.com).""" + """Retrieve a single Entra ID user by object ID or UPN (user@company.com). + + Returns a normalized user object with consistent field names across all systems. + """ if _USE_MOCK: - return ( + entra_dict = ( M.ENTRA_USERS_BY_UPN.get(user_id_or_upn) or M.ENTRA_USERS_BY_MAIL.get(user_id_or_upn) or next((u for u in M.ENTRA_USERS if u["id"] == user_id_or_upn), None) ) - return await _get_entra().get(f"/users/{user_id_or_upn}") + if not entra_dict: + return None + canonical = EntraUserAdapter.to_canonical(entra_dict) + return canonical.model_dump(mode='json', exclude_none=True) + + entra_dict = await _get_entra().get(f"/users/{user_id_or_upn}") + if not entra_dict: + return None + canonical = EntraUserAdapter.to_canonical(entra_dict) + return canonical.model_dump(mode='json', exclude_none=True) @mcp.tool() async def entra_list_groups(limit: int = 100) -> list[dict]: diff --git a/nexus-mcp/src/shards/workday.py b/nexus-mcp/src/shards/workday.py index adc3fc1..c0d511c 100644 --- a/nexus-mcp/src/shards/workday.py +++ b/nexus-mcp/src/shards/workday.py @@ -12,6 +12,8 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "lib")) from mcp.server.fastmcp import FastMCP import mock_data as M +from adapters import WorkdayWorkerAdapter +from schemas import CanonicalUser _USE_MOCK = os.getenv("USE_MOCK", "false").lower() == "true" @@ -31,35 +33,68 @@ def register(mcp: FastMCP) -> None: @mcp.tool() async def workday_list_workers(limit: int = 100, offset: int = 0) -> list[dict]: """List workers from Workday HCM. + + Returns a list of normalized user objects with consistent field names across all systems. Args: limit: Page size. offset: Pagination offset. """ if _USE_MOCK: - return M.WORKDAY_WORKERS[offset:offset + limit] + workers = M.WORKDAY_WORKERS[offset:offset + limit] + canonical_users = [WorkdayWorkerAdapter.to_canonical(w) for w in workers] + return [u.model_dump(mode='json', exclude_none=True) for u in canonical_users] + data = await _get().get( "/staffing/v6/workers", params={"limit": limit, "offset": offset} ) - return data.get("data", []) + workers = data.get("data", []) + canonical_users = [WorkdayWorkerAdapter.to_canonical(w) for w in workers] + return [u.model_dump(mode='json', exclude_none=True) for u in canonical_users] @mcp.tool() async def workday_get_worker(worker_id: str) -> dict | None: - """Retrieve full details for a single Workday worker by their Workday ID.""" + """Retrieve full details for a single Workday worker by their Workday ID. + + Returns a normalized user object with consistent field names across all systems. + """ if _USE_MOCK: - return next((w for w in M.WORKDAY_WORKERS if w["id"] == worker_id), None) - return await _get().get(f"/staffing/v6/workers/{worker_id}") + worker = next((w for w in M.WORKDAY_WORKERS if w["id"] == worker_id), None) + if not worker: + return None + canonical = WorkdayWorkerAdapter.to_canonical(worker) + return canonical.model_dump(mode='json', exclude_none=True) + + worker = await _get().get(f"/staffing/v6/workers/{worker_id}") + if not worker: + return None + canonical = WorkdayWorkerAdapter.to_canonical(worker) + return canonical.model_dump(mode='json', exclude_none=True) @mcp.tool() async def workday_find_worker_by_email(email: str) -> dict | None: - """Find a Workday worker by their primary work email address.""" + """Find a Workday worker by their primary work email address. + + Returns a normalized user object with consistent field names across all systems. + """ if _USE_MOCK: - return M.WORKDAY_WORKERS_BY_EMAIL.get(email.lower()) + worker = M.WORKDAY_WORKERS_BY_EMAIL.get(email.lower()) + if not worker: + return None + canonical = WorkdayWorkerAdapter.to_canonical(worker) + return canonical.model_dump(mode='json', exclude_none=True) + data = await _get().get("/staffing/v6/workers", params={"limit": 500}) + worker = None for w in data.get("data", []): if (w.get("primaryWorkEmail") or "").lower() == email.lower(): - return w - return None + worker = w + break + + if not worker: + return None + canonical = WorkdayWorkerAdapter.to_canonical(worker) + return canonical.model_dump(mode='json', exclude_none=True) @mcp.tool() async def workday_list_positions(limit: int = 100) -> list[dict]: