feat: add functionality to list disabled AD accounts and update related API
This commit is contained in:
parent
275d69cd31
commit
e7d986a3c5
@ -317,6 +317,58 @@ class ActiveDirectoryIdentityBackend:
|
|||||||
logger.error("Failed to parse member data: %s", str(e))
|
logger.error("Failed to parse member data: %s", str(e))
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
async def list_disabled_accounts(self, limit: int = 5000) -> list[dict[str, Any]]:
|
||||||
|
"""Return disabled AD user accounts with normalized fields."""
|
||||||
|
clamped_limit = min(max(1, limit), 10000)
|
||||||
|
command = f"""
|
||||||
|
$users = @(Get-ADUser -Filter {{ Enabled -eq $false }} -Properties GivenName,Surname,DisplayName,Enabled,DistinguishedName,Description,lastLogonTimestamp,whenCreated,Department,Title,EmailAddress -ErrorAction Stop | Select-Object -First {clamped_limit})
|
||||||
|
|
||||||
|
$items = @($users | ForEach-Object {{
|
||||||
|
$lastLogon = if ($_.lastLogonTimestamp) {{
|
||||||
|
[DateTime]::FromFileTime($_.lastLogonTimestamp).ToUniversalTime().ToString('o')
|
||||||
|
}} else {{ '' }}
|
||||||
|
$whenCreated = if ($_.whenCreated) {{
|
||||||
|
$_.whenCreated.ToUniversalTime().ToString('o')
|
||||||
|
}} else {{ '' }}
|
||||||
|
|
||||||
|
@{{
|
||||||
|
username = $_.SamAccountName
|
||||||
|
display_name = if ($_.DisplayName) {{ $_.DisplayName }} else {{ '' }}
|
||||||
|
first_name = if ($_.GivenName) {{ $_.GivenName }} else {{ '' }}
|
||||||
|
last_name = if ($_.Surname) {{ $_.Surname }} else {{ '' }}
|
||||||
|
enabled = $_.Enabled
|
||||||
|
ou = $_.DistinguishedName
|
||||||
|
description = if ($_.Description) {{ $_.Description }} else {{ '' }}
|
||||||
|
last_logon_utc = $lastLogon
|
||||||
|
when_created_utc = $whenCreated
|
||||||
|
department = if ($_.Department) {{ $_.Department }} else {{ '' }}
|
||||||
|
title = if ($_.Title) {{ $_.Title }} else {{ '' }}
|
||||||
|
email = if ($_.EmailAddress) {{ $_.EmailAddress }} else {{ '' }}
|
||||||
|
}}
|
||||||
|
}})
|
||||||
|
|
||||||
|
$items | ConvertTo-Json -Depth 3 -Compress
|
||||||
|
"""
|
||||||
|
|
||||||
|
result = await self._run_powershell(command, {"limit": clamped_limit})
|
||||||
|
if not result["success"]:
|
||||||
|
logger.warning("list_disabled_accounts failed: %s", result["error"])
|
||||||
|
return []
|
||||||
|
|
||||||
|
if not result["data"]:
|
||||||
|
return []
|
||||||
|
|
||||||
|
try:
|
||||||
|
items = json.loads(result["data"])
|
||||||
|
if isinstance(items, list):
|
||||||
|
return items
|
||||||
|
if isinstance(items, dict):
|
||||||
|
return [items]
|
||||||
|
return []
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
logger.error("Failed to parse disabled account data: %s", str(e))
|
||||||
|
return []
|
||||||
|
|
||||||
async def find_stale_users(self, days: int) -> list[dict[str, Any]]:
|
async def find_stale_users(self, days: int) -> list[dict[str, Any]]:
|
||||||
"""Get users with no logon activity in N days using lastLogonTimestamp."""
|
"""Get users with no logon activity in N days using lastLogonTimestamp."""
|
||||||
if days < 0:
|
if days < 0:
|
||||||
|
|||||||
@ -145,15 +145,15 @@ def register(mcp: FastMCP) -> None:
|
|||||||
return [{"sAMAccountName": u} for u in usernames]
|
return [{"sAMAccountName": u} for u in usernames]
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
async def ad_get_disabled_accounts() -> list[dict]:
|
async def ad_get_disabled_accounts(limit: int = 5000) -> list[dict]:
|
||||||
"""Return all disabled user accounts in Active Directory.
|
"""Return all disabled user accounts in Active Directory.
|
||||||
|
|
||||||
userAccountControl value 514 = normal account (512) + disabled (2).
|
userAccountControl value 514 = normal account (512) + disabled (2).
|
||||||
"""
|
"""
|
||||||
if _USE_MOCK:
|
if _USE_MOCK:
|
||||||
return [u for u in M.AD_USERS if u.get("userAccountControl") == "514"]
|
disabled = [u for u in M.AD_USERS if u.get("userAccountControl") == "514"]
|
||||||
resp = await _get_ad().query_users(filter_params={"enabled": False}, page_size=200)
|
return disabled[: max(1, limit)]
|
||||||
return resp.get("items", [])
|
return await _get_ad().list_disabled_accounts(limit=limit)
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
async def ad_get_stale_accounts(days_inactive: int = 90) -> list[dict]:
|
async def ad_get_stale_accounts(days_inactive: int = 90) -> list[dict]:
|
||||||
|
|||||||
15
scripts/verify-disabled-accounts.ps1
Normal file
15
scripts/verify-disabled-accounts.ps1
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
# verify-disabled-accounts.ps1
|
||||||
|
# Returns the total number of disabled user accounts in Active Directory.
|
||||||
|
|
||||||
|
[CmdletBinding()]
|
||||||
|
param()
|
||||||
|
|
||||||
|
try {
|
||||||
|
$disabled = Get-ADUser -Filter { Enabled -eq $false } -ErrorAction Stop
|
||||||
|
$count = @($disabled).Count
|
||||||
|
Write-Output "Disabled accounts: $count"
|
||||||
|
}
|
||||||
|
catch {
|
||||||
|
Write-Error "Failed to query Active Directory: $_"
|
||||||
|
exit 1
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user