911 lines
36 KiB
Python
911 lines
36 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from typing import Any
|
|
|
|
logger = logging.getLogger("identity-mcp.ad-adapter")
|
|
|
|
|
|
class ActiveDirectoryIdentityBackend:
|
|
"""PowerShell-based Active Directory backend for read-only identity queries.
|
|
|
|
Uses subprocess calls to approved Get-AD* cmdlets with deterministic output
|
|
parsing. All methods maintain the same async contract and return shapes as
|
|
IdentityBackend interface.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
username: str | None = None,
|
|
password: str | None = None,
|
|
timeout_seconds: float = 30.0,
|
|
) -> None:
|
|
"""Initialize AD adapter with optional explicit credentials for testing.
|
|
|
|
Args:
|
|
username: Optional explicit username for test environments only
|
|
password: Optional explicit password for test environments only
|
|
timeout_seconds: Per-query timeout limit
|
|
"""
|
|
self.username = username
|
|
self.password = password
|
|
self.timeout = timeout_seconds
|
|
|
|
@staticmethod
|
|
def _escape_ps_single_quoted(value: str) -> str:
|
|
return value.replace("'", "''")
|
|
|
|
async def _run_powershell(
|
|
self, command: str, params: dict[str, Any] | None = None
|
|
) -> dict[str, Any]:
|
|
"""Execute PowerShell command and return parsed output or error.
|
|
|
|
Args:
|
|
command: PowerShell command string
|
|
params: Optional parameters for logging context
|
|
|
|
Returns:
|
|
dict with "success" (bool), "data" (Any), "error" (str | None)
|
|
"""
|
|
# Build credential block if explicit auth is configured
|
|
cred_block = ""
|
|
if self.username and self.password:
|
|
# Escape single quotes in password for PowerShell
|
|
escaped_password = self.password.replace("'", "''")
|
|
# Use single quotes to avoid variable expansion and special char issues
|
|
cred_block = f"$secpass = ConvertTo-SecureString '{escaped_password}' -AsPlainText -Force; $cred = New-Object System.Management.Automation.PSCredential('{self.username}', $secpass); "
|
|
|
|
full_command = cred_block + command
|
|
|
|
try:
|
|
# Use create_subprocess_exec to avoid shell $ interpretation
|
|
process = await asyncio.create_subprocess_exec(
|
|
'powershell',
|
|
'-NoProfile',
|
|
'-NonInteractive',
|
|
'-Command',
|
|
full_command,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
|
|
stdout, stderr = await asyncio.wait_for(
|
|
process.communicate(), timeout=self.timeout
|
|
)
|
|
|
|
stdout_text = stdout.decode("utf-8", errors="replace").strip()
|
|
stderr_text = stderr.decode("utf-8", errors="replace").strip()
|
|
|
|
if process.returncode != 0:
|
|
logger.warning(
|
|
"PowerShell command failed: returncode=%d stderr=%s",
|
|
process.returncode,
|
|
stderr_text,
|
|
)
|
|
return {
|
|
"success": False,
|
|
"data": None,
|
|
"error": f"Command failed: {stderr_text[:200]}",
|
|
}
|
|
|
|
if stderr_text:
|
|
logger.debug("PowerShell stderr: %s", stderr_text)
|
|
|
|
return {"success": True, "data": stdout_text, "error": None}
|
|
|
|
except asyncio.TimeoutError:
|
|
logger.error("PowerShell command timeout after %s seconds", self.timeout)
|
|
return {
|
|
"success": False,
|
|
"data": None,
|
|
"error": f"Query timeout after {self.timeout} seconds",
|
|
}
|
|
except Exception as e:
|
|
logger.error("PowerShell execution error: %s", str(e))
|
|
return {
|
|
"success": False,
|
|
"data": None,
|
|
"error": f"Execution error: {str(e)[:200]}",
|
|
}
|
|
|
|
async def get_user(self, username: str) -> dict[str, Any] | None:
|
|
"""Get user state for a username.
|
|
|
|
Returns enabled/disabled, OU, description, and last logon timestamp.
|
|
"""
|
|
# Build PowerShell command using JSON output for reliable parsing
|
|
escaped_username = self._escape_ps_single_quoted(username)
|
|
command = f"""
|
|
$user = Get-ADUser -Filter "samAccountName -eq '{escaped_username}'" -Properties GivenName,Surname,DisplayName,Enabled,DistinguishedName,Description,lastLogonTimestamp -ErrorAction Stop
|
|
if ($user) {{
|
|
$lastLogon = if ($user.lastLogonTimestamp) {{ [DateTime]::FromFileTime($user.lastLogonTimestamp).ToUniversalTime().ToString('o') }} else {{ $null }}
|
|
@{{
|
|
username = $user.SamAccountName
|
|
first_name = if ($user.GivenName) {{ $user.GivenName }} else {{ '' }}
|
|
last_name = if ($user.Surname) {{ $user.Surname }} else {{ '' }}
|
|
display_name = if ($user.DisplayName) {{ $user.DisplayName }} else {{ '' }}
|
|
enabled = $user.Enabled
|
|
ou = $user.DistinguishedName
|
|
description = if ($user.Description) {{ $user.Description }} else {{ '' }}
|
|
last_logon_utc = $lastLogon
|
|
}} | ConvertTo-Json -Compress
|
|
}}
|
|
"""
|
|
|
|
result = await self._run_powershell(command, {"username": username})
|
|
|
|
if not result["success"]:
|
|
logger.warning(
|
|
"get_user failed for username=%s: %s", username, result["error"]
|
|
)
|
|
return None
|
|
|
|
if not result["data"]:
|
|
return None
|
|
|
|
try:
|
|
user_data = json.loads(result["data"])
|
|
return {
|
|
"username": user_data["username"],
|
|
"first_name": user_data.get("first_name", "") or "",
|
|
"last_name": user_data.get("last_name", "") or "",
|
|
"display_name": user_data.get("display_name", "") or "",
|
|
"enabled": user_data["enabled"],
|
|
"ou": user_data["ou"],
|
|
"description": user_data["description"] or "",
|
|
"last_logon_utc": user_data["last_logon_utc"] or "",
|
|
}
|
|
except (json.JSONDecodeError, KeyError) as e:
|
|
logger.error("Failed to parse user data: %s", str(e))
|
|
return None
|
|
|
|
async def search_users_by_name(
|
|
self, name_query: str, limit: int = 20
|
|
) -> list[dict[str, Any]]:
|
|
"""Search users by first name, last name, or full display name."""
|
|
query = name_query.strip()
|
|
if not query:
|
|
return []
|
|
|
|
max_results = max(1, min(limit, 100))
|
|
escaped_query = self._escape_ps_single_quoted(query)
|
|
|
|
command = f"""
|
|
$query = '{escaped_query}'
|
|
$tokens = @($query.Split(' ', [System.StringSplitOptions]::RemoveEmptyEntries))
|
|
|
|
if ($tokens.Count -ge 2) {{
|
|
$first = $tokens[0]
|
|
$last = $tokens[1]
|
|
$adFilter = "(givenName -like '$($first)*' -and surname -like '$($last)*') -or (displayName -like '$query*')"
|
|
}} else {{
|
|
$adFilter = "givenName -like '$query*' -or surname -like '$query*' -or displayName -like '$query*'"
|
|
}}
|
|
|
|
$users = @(Get-ADUser -Filter $adFilter -Properties GivenName,Surname,DisplayName,Enabled,DistinguishedName -ErrorAction Stop |
|
|
Sort-Object DisplayName |
|
|
Select-Object -First {max_results} |
|
|
ForEach-Object {{
|
|
@{{
|
|
username = $_.SamAccountName
|
|
first_name = if ($_.GivenName) {{ $_.GivenName }} else {{ '' }}
|
|
last_name = if ($_.Surname) {{ $_.Surname }} else {{ '' }}
|
|
display_name = if ($_.DisplayName) {{ $_.DisplayName }} else {{ '' }}
|
|
enabled = $_.Enabled
|
|
ou = $_.DistinguishedName
|
|
}}
|
|
}})
|
|
|
|
$users | ConvertTo-Json -Compress
|
|
"""
|
|
|
|
result = await self._run_powershell(
|
|
command,
|
|
{"name_query": name_query, "limit": max_results},
|
|
)
|
|
|
|
if not result["success"]:
|
|
logger.warning(
|
|
"search_users_by_name failed for query=%s: %s",
|
|
name_query,
|
|
result["error"],
|
|
)
|
|
return []
|
|
|
|
if not result["data"]:
|
|
return []
|
|
|
|
try:
|
|
users = json.loads(result["data"])
|
|
if isinstance(users, dict):
|
|
users = [users]
|
|
if not isinstance(users, list):
|
|
return []
|
|
|
|
normalized: list[dict[str, Any]] = []
|
|
for user in users:
|
|
if not isinstance(user, dict):
|
|
continue
|
|
normalized.append(
|
|
{
|
|
"username": user.get("username", "") or "",
|
|
"first_name": user.get("first_name", "") or "",
|
|
"last_name": user.get("last_name", "") or "",
|
|
"display_name": user.get("display_name", "") or "",
|
|
"enabled": bool(user.get("enabled", False)),
|
|
"ou": user.get("ou", "") or "",
|
|
}
|
|
)
|
|
return normalized
|
|
except json.JSONDecodeError as e:
|
|
logger.error("Failed to parse search user data: %s", str(e))
|
|
return []
|
|
|
|
async def get_user_groups(self, username: str) -> list[str]:
|
|
"""Get all group memberships for a user."""
|
|
escaped_username = self._escape_ps_single_quoted(username)
|
|
command = f"""
|
|
$user = Get-ADUser -Filter "samAccountName -eq '{escaped_username}'" -Properties MemberOf -ErrorAction Stop
|
|
if ($user -and $user.MemberOf) {{
|
|
$groups = @($user.MemberOf | ForEach-Object {{
|
|
$group = Get-ADGroup $_ -ErrorAction SilentlyContinue
|
|
if ($group) {{ $group.Name }}
|
|
}})
|
|
$groups | ConvertTo-Json -Compress
|
|
}} else {{
|
|
@() | ConvertTo-Json -Compress
|
|
}}
|
|
"""
|
|
|
|
result = await self._run_powershell(command, {"username": username})
|
|
|
|
if not result["success"]:
|
|
logger.warning(
|
|
"get_user_groups failed for username=%s: %s",
|
|
username,
|
|
result["error"],
|
|
)
|
|
return []
|
|
|
|
if not result["data"]:
|
|
return []
|
|
|
|
try:
|
|
groups = json.loads(result["data"])
|
|
return sorted(groups) if isinstance(groups, list) else []
|
|
except json.JSONDecodeError as e:
|
|
logger.error("Failed to parse group data: %s", str(e))
|
|
return []
|
|
|
|
async def get_group_members(self, group_name: str) -> list[str]:
|
|
"""Get all members of a named group."""
|
|
escaped_group_name = self._escape_ps_single_quoted(group_name)
|
|
command = f"""
|
|
$group = Get-ADGroup -Filter "Name -eq '{escaped_group_name}'" -ErrorAction Stop
|
|
if ($group) {{
|
|
$members = @(Get-ADGroupMember $group -ErrorAction Stop | Where-Object {{ $_.objectClass -eq 'user' }} | ForEach-Object {{ $_.SamAccountName }})
|
|
$members | ConvertTo-Json -Compress
|
|
}} else {{
|
|
@() | ConvertTo-Json -Compress
|
|
}}
|
|
"""
|
|
|
|
result = await self._run_powershell(command, {"group_name": group_name})
|
|
|
|
if not result["success"]:
|
|
logger.warning(
|
|
"get_group_members failed for group=%s: %s",
|
|
group_name,
|
|
result["error"],
|
|
)
|
|
return []
|
|
|
|
if not result["data"]:
|
|
return []
|
|
|
|
try:
|
|
members = json.loads(result["data"])
|
|
return sorted(members) if isinstance(members, list) else []
|
|
except json.JSONDecodeError as e:
|
|
logger.error("Failed to parse member data: %s", str(e))
|
|
return []
|
|
|
|
async def find_stale_users(self, days: int) -> list[dict[str, Any]]:
|
|
"""Get users with no logon activity in N days using lastLogonTimestamp."""
|
|
if days < 0:
|
|
return []
|
|
|
|
command = f"""
|
|
$cutoff = (Get-Date).AddDays(-{days})
|
|
$cutoffFileTime = $cutoff.ToFileTime()
|
|
$users = @(Get-ADUser -Filter * -Properties Enabled,lastLogonTimestamp -ErrorAction Stop | Where-Object {{
|
|
(-not $_.lastLogonTimestamp) -or ($_.lastLogonTimestamp -lt $cutoffFileTime)
|
|
}} | Select-Object -First 100 | ForEach-Object {{
|
|
$lastLogon = if ($_.lastLogonTimestamp) {{ [DateTime]::FromFileTime($_.lastLogonTimestamp).ToUniversalTime().ToString('o') }} else {{ '' }}
|
|
@{{
|
|
username = $_.SamAccountName
|
|
enabled = $_.Enabled
|
|
last_logon_utc = $lastLogon
|
|
}}
|
|
}})
|
|
$users | ConvertTo-Json -Compress
|
|
"""
|
|
|
|
result = await self._run_powershell(command, {"days": days})
|
|
|
|
if not result["success"]:
|
|
logger.warning("find_stale_users failed: %s", result["error"])
|
|
return []
|
|
|
|
if not result["data"]:
|
|
return []
|
|
|
|
try:
|
|
users = json.loads(result["data"])
|
|
if not isinstance(users, list):
|
|
users = [users] if users else []
|
|
return sorted(users, key=lambda u: u.get("username", ""))
|
|
except json.JSONDecodeError as e:
|
|
logger.error("Failed to parse stale user data: %s", str(e))
|
|
return []
|
|
|
|
async def get_computer(self, computer_name: str) -> dict[str, Any] | None:
|
|
"""Get computer account details including OU placement.
|
|
|
|
Note: assigned_username returns null in this phase per requirements.
|
|
"""
|
|
escaped_computer_name = self._escape_ps_single_quoted(computer_name)
|
|
command = f"""
|
|
$computer = Get-ADComputer -Filter "Name -eq '{escaped_computer_name}'" -Properties DistinguishedName -ErrorAction Stop
|
|
if ($computer) {{
|
|
@{{
|
|
computer_name = $computer.Name
|
|
ou = $computer.DistinguishedName
|
|
assigned_username = $null
|
|
}} | ConvertTo-Json -Compress
|
|
}}
|
|
"""
|
|
|
|
result = await self._run_powershell(command, {"computer_name": computer_name})
|
|
|
|
if not result["success"]:
|
|
logger.warning(
|
|
"get_computer failed for computer=%s: %s",
|
|
computer_name,
|
|
result["error"],
|
|
)
|
|
return None
|
|
|
|
if not result["data"]:
|
|
return None
|
|
|
|
try:
|
|
computer_data = json.loads(result["data"])
|
|
return {
|
|
"computer_name": computer_data["computer_name"],
|
|
"ou": computer_data["ou"],
|
|
"assigned_username": None,
|
|
}
|
|
except (json.JSONDecodeError, KeyError) as e:
|
|
logger.error("Failed to parse computer data: %s", str(e))
|
|
return None
|
|
|
|
async def query_users(
|
|
self,
|
|
filter_params: dict[str, Any] | None = None,
|
|
fields: list[str] | None = None,
|
|
sort_by: str = "display_name",
|
|
sort_direction: str = "asc",
|
|
page_size: int = 50,
|
|
cursor: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Query users with flexible validated filters and pagination."""
|
|
from identity_backend import ALLOWED_USER_FIELDS
|
|
|
|
filter_params = filter_params or {}
|
|
fields = fields or list(ALLOWED_USER_FIELDS)
|
|
|
|
# Validate fields
|
|
invalid_fields = set(fields) - ALLOWED_USER_FIELDS
|
|
if invalid_fields:
|
|
return {
|
|
"items": [],
|
|
"next_cursor": None,
|
|
"page_size": page_size,
|
|
"total_estimate": 0,
|
|
"applied_filter": filter_params,
|
|
"warnings": [f"Invalid fields requested: {', '.join(invalid_fields)}"],
|
|
}
|
|
|
|
# Build AD filter from filter_params
|
|
ad_filter_parts = []
|
|
|
|
if "enabled" in filter_params:
|
|
enabled_val = "True" if filter_params["enabled"] else "False"
|
|
ad_filter_parts.append(f"Enabled -eq ${enabled_val}")
|
|
|
|
if "name_contains" in filter_params:
|
|
name_query = self._escape_ps_single_quoted(filter_params["name_contains"])
|
|
ad_filter_parts.append(f"DisplayName -like '*{name_query}*'")
|
|
|
|
if "username_prefix" in filter_params:
|
|
prefix = self._escape_ps_single_quoted(filter_params["username_prefix"])
|
|
ad_filter_parts.append(f"samAccountName -like '{prefix}*'")
|
|
|
|
if "ou_contains" in filter_params:
|
|
ou_part = self._escape_ps_single_quoted(filter_params["ou_contains"])
|
|
ad_filter_parts.append(f"DistinguishedName -like '*{ou_part}*'")
|
|
|
|
if "description_contains" in filter_params:
|
|
desc_part = self._escape_ps_single_quoted(filter_params["description_contains"])
|
|
ad_filter_parts.append(f"Description -like '*{desc_part}*'")
|
|
|
|
# Build final filter - default to all users if no filters specified
|
|
if ad_filter_parts:
|
|
ad_filter = " -and ".join(ad_filter_parts)
|
|
else:
|
|
ad_filter = "*"
|
|
|
|
# Determine which AD properties to fetch
|
|
ad_properties = [
|
|
"SamAccountName",
|
|
"GivenName",
|
|
"Surname",
|
|
"DisplayName",
|
|
"Enabled",
|
|
"DistinguishedName",
|
|
"Description",
|
|
"lastLogonTimestamp",
|
|
]
|
|
if "when_created_utc" in fields:
|
|
ad_properties.append("whenCreated")
|
|
if "department" in fields:
|
|
ad_properties.append("Department")
|
|
if "title" in fields:
|
|
ad_properties.append("Title")
|
|
if "email" in fields:
|
|
ad_properties.append("EmailAddress")
|
|
|
|
# Build sort expression
|
|
sort_field_map = {
|
|
"display_name": "DisplayName",
|
|
"username": "SamAccountName",
|
|
"last_logon_utc": "lastLogonTimestamp",
|
|
"when_created_utc": "whenCreated",
|
|
"department": "Department",
|
|
}
|
|
sort_field = sort_field_map.get(sort_by, "DisplayName")
|
|
|
|
# Clamp page size
|
|
clamped_size = min(max(1, page_size), 200)
|
|
|
|
# Note: Simple offset-based pagination for Phase 2
|
|
# For production, consider lastLogonTimestamp-based keyset pagination
|
|
start_index = 0
|
|
if cursor:
|
|
try:
|
|
start_index = int(cursor)
|
|
except ValueError:
|
|
pass
|
|
|
|
# Build PowerShell command
|
|
props_csv = ", ".join(ad_properties)
|
|
command = f"""
|
|
$filter = "{ad_filter}"
|
|
$users = Get-ADUser -Filter $filter -Properties {props_csv} -ErrorAction Stop | Sort-Object {sort_field}"""
|
|
|
|
if sort_direction == "desc":
|
|
command += " -Descending"
|
|
|
|
command += "\n"
|
|
|
|
# Handle last_logon_before_days post-filter (requires FileTime comparison)
|
|
if "last_logon_before_days" in filter_params:
|
|
days = filter_params["last_logon_before_days"]
|
|
command += f"$users = $users | Where-Object {{ if ($_.lastLogonTimestamp) {{ $cutoff = (Get-Date).AddDays(-{days}).ToFileTime(); $_.lastLogonTimestamp -lt $cutoff }} else {{ $true }} }}\n"
|
|
|
|
# Handle group_any post-filter
|
|
if "group_any" in filter_params:
|
|
groups_json = json.dumps(filter_params["group_any"])
|
|
escaped_groups_json = groups_json.replace("'", "''")
|
|
command += f"$users = $users | Where-Object {{ $targetGroups = '{escaped_groups_json}' | ConvertFrom-Json; $userGroups = @($_.MemberOf | ForEach-Object {{ $g = Get-ADGroup $_ -ErrorAction SilentlyContinue; if ($g) {{ $g.Name }} }}); $found = $false; foreach ($tg in $targetGroups) {{ if ($userGroups -contains $tg) {{ $found = $true; break }} }}; $found }}\n"
|
|
|
|
# Add pagination and field projection
|
|
command += f"""
|
|
$users = @($users)
|
|
$total = $users.Count
|
|
$pageUsers = $users | Select-Object -Skip {start_index} -First {clamped_size}
|
|
$hasMore = ($total -gt {start_index + clamped_size})
|
|
|
|
$items = @($pageUsers | ForEach-Object {{
|
|
$lastLogon = if ($_.lastLogonTimestamp) {{
|
|
[DateTime]::FromFileTime($_.lastLogonTimestamp).ToUniversalTime().ToString('o')
|
|
}} else {{ '' }}
|
|
$whenCreated = if ($_.whenCreated) {{
|
|
$_.whenCreated.ToUniversalTime().ToString('o')
|
|
}} else {{ '' }}
|
|
|
|
@{{
|
|
username = $_.SamAccountName
|
|
display_name = if ($_.DisplayName) {{ $_.DisplayName }} else {{ '' }}
|
|
first_name = if ($_.GivenName) {{ $_.GivenName }} else {{ '' }}
|
|
last_name = if ($_.Surname) {{ $_.Surname }} else {{ '' }}
|
|
enabled = $_.Enabled
|
|
ou = $_.DistinguishedName
|
|
description = if ($_.Description) {{ $_.Description }} else {{ '' }}
|
|
last_logon_utc = $lastLogon
|
|
when_created_utc = $whenCreated
|
|
department = if ($_.Department) {{ $_.Department }} else {{ '' }}
|
|
title = if ($_.Title) {{ $_.Title }} else {{ '' }}
|
|
email = if ($_.EmailAddress) {{ $_.EmailAddress }} else {{ '' }}
|
|
}}
|
|
}})
|
|
|
|
$nextCursor = if ($hasMore) {{ {start_index + clamped_size} }} else {{ $null }}
|
|
|
|
@{{
|
|
items = $items
|
|
next_cursor = $nextCursor
|
|
page_size = {clamped_size}
|
|
total_estimate = $total
|
|
}} | ConvertTo-Json -Depth 3 -Compress
|
|
"""
|
|
|
|
result = await self._run_powershell(
|
|
command,
|
|
{"filter_params": filter_params, "page_size": clamped_size},
|
|
)
|
|
|
|
if not result["success"]:
|
|
logger.warning("query_users failed: %s", result["error"])
|
|
return {
|
|
"items": [],
|
|
"next_cursor": None,
|
|
"page_size": clamped_size,
|
|
"total_estimate": 0,
|
|
"applied_filter": filter_params,
|
|
"warnings": [f"Query failed: {result['error'][:100]}"],
|
|
}
|
|
|
|
if not result["data"]:
|
|
return {
|
|
"items": [],
|
|
"next_cursor": None,
|
|
"page_size": clamped_size,
|
|
"total_estimate": 0,
|
|
"applied_filter": filter_params,
|
|
"warnings": [],
|
|
}
|
|
|
|
try:
|
|
response = json.loads(result["data"])
|
|
|
|
# Filter fields based on requested fields
|
|
filtered_items = []
|
|
for item in response.get("items", []):
|
|
filtered_item = {k: v for k, v in item.items() if k in fields}
|
|
filtered_items.append(filtered_item)
|
|
|
|
return {
|
|
"items": filtered_items,
|
|
"next_cursor": response.get("next_cursor"),
|
|
"page_size": clamped_size,
|
|
"total_estimate": response.get("total_estimate", 0),
|
|
"applied_filter": filter_params,
|
|
"warnings": [],
|
|
}
|
|
except json.JSONDecodeError as e:
|
|
logger.error("Failed to parse query_users data: %s", str(e))
|
|
return {
|
|
"items": [],
|
|
"next_cursor": None,
|
|
"page_size": clamped_size,
|
|
"total_estimate": 0,
|
|
"applied_filter": filter_params,
|
|
"warnings": ["Failed to parse query results"],
|
|
}
|
|
|
|
async def count_users(
|
|
self,
|
|
filter_params: dict[str, Any] | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Count users matching filter without returning full records."""
|
|
filter_params = filter_params or {}
|
|
|
|
# Build AD filter from filter_params (same logic as query_users)
|
|
ad_filter_parts = []
|
|
|
|
if "enabled" in filter_params:
|
|
enabled_val = "True" if filter_params["enabled"] else "False"
|
|
ad_filter_parts.append(f"Enabled -eq ${enabled_val}")
|
|
|
|
if "name_contains" in filter_params:
|
|
name_query = self._escape_ps_single_quoted(filter_params["name_contains"])
|
|
ad_filter_parts.append(f"DisplayName -like '*{name_query}*'")
|
|
|
|
if "username_prefix" in filter_params:
|
|
prefix = self._escape_ps_single_quoted(filter_params["username_prefix"])
|
|
ad_filter_parts.append(f"samAccountName -like '{prefix}*'")
|
|
|
|
if "ou_contains" in filter_params:
|
|
ou_part = self._escape_ps_single_quoted(filter_params["ou_contains"])
|
|
ad_filter_parts.append(f"DistinguishedName -like '*{ou_part}*'")
|
|
|
|
if "description_contains" in filter_params:
|
|
desc_part = self._escape_ps_single_quoted(filter_params["description_contains"])
|
|
ad_filter_parts.append(f"Description -like '*{desc_part}*'")
|
|
|
|
if ad_filter_parts:
|
|
ad_filter = " -and ".join(ad_filter_parts)
|
|
else:
|
|
ad_filter = "*"
|
|
|
|
command = f"""
|
|
$filter = "{ad_filter}"
|
|
$users = Get-ADUser -Filter $filter -Properties lastLogonTimestamp,MemberOf -ErrorAction Stop
|
|
"""
|
|
|
|
# Handle last_logon_before_days post-filter
|
|
if "last_logon_before_days" in filter_params:
|
|
days = filter_params["last_logon_before_days"]
|
|
command += f"$users = $users | Where-Object {{ if ($_.lastLogonTimestamp) {{ $cutoff = (Get-Date).AddDays(-{days}).ToFileTime(); $_.lastLogonTimestamp -lt $cutoff }} else {{ $true }} }}\n"
|
|
|
|
# Handle group_any post-filter
|
|
if "group_any" in filter_params:
|
|
groups_json = json.dumps(filter_params["group_any"])
|
|
escaped_groups_json = groups_json.replace("'", "''")
|
|
command += f"$users = $users | Where-Object {{ $targetGroups = '{escaped_groups_json}' | ConvertFrom-Json; $userGroups = @($_.MemberOf | ForEach-Object {{ $g = Get-ADGroup $_ -ErrorAction SilentlyContinue; if ($g) {{ $g.Name }} }}); $found = $false; foreach ($tg in $targetGroups) {{ if ($userGroups -contains $tg) {{ $found = $true; break }} }}; $found }}\n"
|
|
|
|
command += """
|
|
$users = @($users)
|
|
@{
|
|
count = $users.Count
|
|
} | ConvertTo-Json -Compress
|
|
"""
|
|
|
|
result = await self._run_powershell(command, {"filter_params": filter_params})
|
|
|
|
if not result["success"]:
|
|
logger.warning("count_users failed: %s", result["error"])
|
|
return {
|
|
"count": 0,
|
|
"applied_filter": filter_params,
|
|
}
|
|
|
|
if not result["data"]:
|
|
return {
|
|
"count": 0,
|
|
"applied_filter": filter_params,
|
|
}
|
|
|
|
try:
|
|
response = json.loads(result["data"])
|
|
return {
|
|
"count": response.get("count", 0),
|
|
"applied_filter": filter_params,
|
|
}
|
|
except json.JSONDecodeError as e:
|
|
logger.error("Failed to parse count_users data: %s", str(e))
|
|
return {
|
|
"count": 0,
|
|
"applied_filter": filter_params,
|
|
}
|
|
|
|
async def summarize_users(
|
|
self,
|
|
filter_params: dict[str, Any] | None = None,
|
|
group_by: str = "enabled",
|
|
top: int = 20,
|
|
) -> dict[str, Any]:
|
|
"""Return grouped aggregates for users matching filter."""
|
|
filter_params = filter_params or {}
|
|
|
|
# Build AD filter (same as count_users)
|
|
ad_filter_parts = []
|
|
|
|
if "enabled" in filter_params:
|
|
enabled_val = "True" if filter_params["enabled"] else "False"
|
|
ad_filter_parts.append(f"Enabled -eq ${enabled_val}")
|
|
|
|
if "name_contains" in filter_params:
|
|
name_query = self._escape_ps_single_quoted(filter_params["name_contains"])
|
|
ad_filter_parts.append(f"DisplayName -like '*{name_query}*'")
|
|
|
|
if "username_prefix" in filter_params:
|
|
prefix = self._escape_ps_single_quoted(filter_params["username_prefix"])
|
|
ad_filter_parts.append(f"samAccountName -like '{prefix}*'")
|
|
|
|
if "ou_contains" in filter_params:
|
|
ou_part = self._escape_ps_single_quoted(filter_params["ou_contains"])
|
|
ad_filter_parts.append(f"DistinguishedName -like '*{ou_part}*'")
|
|
|
|
if "description_contains" in filter_params:
|
|
desc_part = self._escape_ps_single_quoted(filter_params["description_contains"])
|
|
ad_filter_parts.append(f"Description -like '*{desc_part}*'")
|
|
|
|
if ad_filter_parts:
|
|
ad_filter = " -and ".join(ad_filter_parts)
|
|
else:
|
|
ad_filter = "*"
|
|
|
|
# Determine grouping field
|
|
group_field_map = {
|
|
"enabled": "Enabled",
|
|
"ou": "DistinguishedName",
|
|
"department": "Department",
|
|
"title": "Title",
|
|
}
|
|
group_field = group_field_map.get(group_by, "Enabled")
|
|
|
|
# Determine required properties
|
|
required_props = ["Enabled", "lastLogonTimestamp", "MemberOf", "whenCreated"]
|
|
if group_by in ["department", "title", "ou"]:
|
|
if group_by == "department":
|
|
required_props.append("Department")
|
|
elif group_by == "title":
|
|
required_props.append("Title")
|
|
|
|
props_csv = ", ".join(set(required_props))
|
|
clamped_top = min(max(1, top), 50)
|
|
|
|
command = f"""
|
|
$filter = "{ad_filter}"
|
|
$users = Get-ADUser -Filter $filter -Properties {props_csv} -ErrorAction Stop
|
|
"""
|
|
|
|
# Handle last_logon_before_days post-filter
|
|
if "last_logon_before_days" in filter_params:
|
|
days = filter_params["last_logon_before_days"]
|
|
command += f"$users = $users | Where-Object {{ if ($_.lastLogonTimestamp) {{ $cutoff = (Get-Date).AddDays(-{days}).ToFileTime(); $_.lastLogonTimestamp -lt $cutoff }} else {{ $true }} }}\n"
|
|
|
|
# Handle group_any post-filter
|
|
if "group_any" in filter_params:
|
|
groups_json = json.dumps(filter_params["group_any"])
|
|
escaped_groups_json = groups_json.replace("'", "''")
|
|
command += f"$users = $users | Where-Object {{ $targetGroups = '{escaped_groups_json}' | ConvertFrom-Json; $userGroups = @($_.MemberOf | ForEach-Object {{ $g = Get-ADGroup $_ -ErrorAction SilentlyContinue; if ($g) {{ $g.Name }} }}); $found = $false; foreach ($tg in $targetGroups) {{ if ($userGroups -contains $tg) {{ $found = $true; break }} }}; $found }}\n"
|
|
|
|
# Group by logic
|
|
command += """
|
|
$users = @($users)
|
|
"""
|
|
if group_by == "enabled":
|
|
command += """
|
|
$buckets = $users | Group-Object Enabled | ForEach-Object {
|
|
@{
|
|
key = if ($_.Name -eq "True") { "Enabled" } else { "Disabled" }
|
|
count = $_.Count
|
|
}
|
|
} | Sort-Object count -Descending
|
|
"""
|
|
elif group_by == "ou":
|
|
command += """
|
|
$buckets = $users | ForEach-Object {
|
|
# Extract OU from DN
|
|
$dn = $_.DistinguishedName
|
|
if ($dn -match 'OU=([^,]+)') {
|
|
$matches[1]
|
|
} else {
|
|
"Root"
|
|
}
|
|
} | Group-Object | ForEach-Object {
|
|
@{
|
|
key = $_.Name
|
|
count = $_.Count
|
|
}
|
|
} | Sort-Object count -Descending
|
|
"""
|
|
elif group_by == "department":
|
|
command += """
|
|
$buckets = $users | ForEach-Object {
|
|
if ($_.Department) { $_.Department } else { "No Department" }
|
|
} | Group-Object | ForEach-Object {
|
|
@{
|
|
key = $_.Name
|
|
count = $_.Count
|
|
}
|
|
} | Sort-Object count -Descending
|
|
"""
|
|
elif group_by == "title":
|
|
command += """
|
|
$buckets = $users | ForEach-Object {
|
|
if ($_.Title) { $_.Title } else { "No Title" }
|
|
} | Group-Object | ForEach-Object {
|
|
@{
|
|
key = $_.Name
|
|
count = $_.Count
|
|
}
|
|
} | Sort-Object count -Descending
|
|
"""
|
|
elif group_by == "last_logon_bucket":
|
|
command += """
|
|
$now = Get-Date
|
|
$buckets = $users | ForEach-Object {
|
|
if ($_.lastLogonTimestamp) {
|
|
$lastLogon = [DateTime]::FromFileTime($_.lastLogonTimestamp)
|
|
$daysAgo = ($now - $lastLogon).Days
|
|
if ($daysAgo -lt 7) { "Last 7 days" }
|
|
elseif ($daysAgo -lt 30) { "Last 30 days" }
|
|
elseif ($daysAgo -lt 90) { "Last 90 days" }
|
|
else { "90+ days" }
|
|
} else {
|
|
"Never logged in"
|
|
}
|
|
} | Group-Object | ForEach-Object {
|
|
@{
|
|
key = $_.Name
|
|
count = $_.Count
|
|
}
|
|
} | Sort-Object count -Descending
|
|
"""
|
|
elif group_by == "created_month":
|
|
command += """
|
|
$buckets = $users | ForEach-Object {
|
|
if ($_.whenCreated) {
|
|
$_.whenCreated.ToString("yyyy-MM")
|
|
} else {
|
|
"Unknown"
|
|
}
|
|
} | Group-Object | ForEach-Object {
|
|
@{
|
|
key = $_.Name
|
|
count = $_.Count
|
|
}
|
|
} | Sort-Object key -Descending
|
|
"""
|
|
else:
|
|
command += """
|
|
$buckets = @(@{
|
|
key = "Unknown"
|
|
count = $users.Count
|
|
})
|
|
"""
|
|
|
|
command += f"""
|
|
@{{
|
|
buckets = @($buckets | Select-Object -First {clamped_top})
|
|
total = $users.Count
|
|
}} | ConvertTo-Json -Depth 3 -Compress
|
|
"""
|
|
|
|
result = await self._run_powershell(
|
|
command,
|
|
{"filter_params": filter_params, "group_by": group_by},
|
|
)
|
|
|
|
if not result["success"]:
|
|
logger.warning("summarize_users failed: %s", result["error"])
|
|
return {
|
|
"buckets": [],
|
|
"total": 0,
|
|
"applied_filter": filter_params,
|
|
}
|
|
|
|
if not result["data"]:
|
|
return {
|
|
"buckets": [],
|
|
"total": 0,
|
|
"applied_filter": filter_params,
|
|
}
|
|
|
|
try:
|
|
response = json.loads(result["data"])
|
|
buckets = response.get("buckets", [])
|
|
if not isinstance(buckets, list):
|
|
buckets = [buckets] if buckets else []
|
|
|
|
return {
|
|
"buckets": buckets,
|
|
"total": response.get("total", 0),
|
|
"applied_filter": filter_params,
|
|
}
|
|
except json.JSONDecodeError as e:
|
|
logger.error("Failed to parse summarize_users data: %s", str(e))
|
|
return {
|
|
"buckets": [],
|
|
"total": 0,
|
|
"applied_filter": filter_params,
|
|
}
|
|
|