From 0d5c921a943a044c758ad2ea2accc9a8a2904c14 Mon Sep 17 00:00:00 2001 From: Nathan Castaldi Date: Wed, 15 Apr 2026 09:11:58 -0400 Subject: [PATCH] fix: update Active Directory adapter to use ActiveDirectoryIdentityBackend --- ...n-fixPsCommandInjectionAdAdapter.prompt.md | 141 ++++++++++++++++++ nexus-mcp/src/shards/identity.py | 5 +- 2 files changed, 143 insertions(+), 3 deletions(-) create mode 100644 .github/prompts/plan-fixPsCommandInjectionAdAdapter.prompt.md diff --git a/.github/prompts/plan-fixPsCommandInjectionAdAdapter.prompt.md b/.github/prompts/plan-fixPsCommandInjectionAdAdapter.prompt.md new file mode 100644 index 0000000..ca6b122 --- /dev/null +++ b/.github/prompts/plan-fixPsCommandInjectionAdAdapter.prompt.md @@ -0,0 +1,141 @@ +# Plan: Fix PowerShell Command Injection in `ad_adapter.py` + +## ๐ŸŒฟ Branch Identity + +**Suggested Branch Name:** `fix/ps-command-injection-ad-adapter` + +**Purpose:** Closes CRITICAL #2 from the code health report โ€” eliminates a PowerShell command injection vulnerability (OWASP A03) in `nexus-mcp/lib/ad_adapter.py` before any production connection to the AD server. + +--- + +## ๐Ÿš€ The Fix: Parameterized PowerShell Execution + +**Plain English:** Right now, when a username like `jane.doe` is looked up, Python builds a PowerShell command *as a string* with the username baked into it โ€” like handing someone a note that says `"search for jane.doe"`. A malicious input like `'; Remove-Item C:\Important; '` would make the note say *"search for nothing, then delete files."* + +The fix passes user values through a separate channel โ€” **OS environment variables** โ€” so they can never change the *structure* of the command, only its *data*. PowerShell reads them via `$env:NEXUS_ARG0`, exactly like reading from a locked box, not from the note itself. No new libraries are needed. + +--- + +## ๐Ÿ’ป Implementation Plan + +This is a 5-phase change, all contained to one file plus its test file. + +### Phase 1 โ€” Harden `_run_powershell()` *(blocks all other phases)* + +In `nexus-mcp/lib/ad_adapter.py`: + +1. Add `import os` to the top-of-file imports (line 1โ€“6 block). +2. Change the `_run_powershell()` signature to add `env_params: dict[str, str] | None = None`. +3. Before the `create_subprocess_exec` call, build: `proc_env = {**os.environ, **(env_params or {})}`. +4. Pass `env=proc_env` to `asyncio.create_subprocess_exec(...)`. +5. Fix the `cred_block` injection surface: replace `'{escaped_password}'` and `'{self.username}'` with `$env:NEXUS_PS_PASSWORD` and `$env:NEXUS_PS_USERNAME` in the PowerShell string. Inject those values via `proc_env` from `self.password` / `self.username` directly (they are server-config values, not user input, but this eliminates the escape-quote surface entirely). + +### Phase 2 โ€” Fix `get_user()`, `get_user_groups()`, `get_group_members()`, `get_computer()` *(parallel โ€” all follow the same pattern)* + +For each: +1. Add a length guard at the top: `if len(input_var) > 256: raise ValueError(...)`. +2. Remove the `_escape_ps_single_quoted()` call. +3. Pass the value via `env_params={"NEXUS_ARG0": input_var}` in the `_run_powershell()` call. +4. In the PowerShell command string, replace `'{escaped_X}'` with `'$($env:NEXUS_ARG0)'`. + +**Before** (line ~113 in `get_user()`): +```python +escaped_username = self._escape_ps_single_quoted(username) +command = f""" + $user = Get-ADUser -Filter "samAccountName -eq '{escaped_username}'" ... +""" +result = await self._run_powershell(command, {"username": username}) +``` + +**After:** +```python +if len(username) > 256: + raise ValueError("username too long") +command = """ + $user = Get-ADUser -Filter "samAccountName -eq '$($env:NEXUS_ARG0)'" ... +""" +result = await self._run_powershell(command, {"username": username}, env_params={"NEXUS_ARG0": username}) +``` + +### Phase 3 โ€” Fix `search_users_by_name()` *(parallel with Phase 2)* + +1. Add length guard: `if len(query.strip()) > 100: raise ValueError(...)`. +2. Remove `escaped_query` interpolation. +3. Pass `env_params={"NEXUS_ARG0": query}`. +4. In the PowerShell script, add `$query = $env:NEXUS_ARG0` as the first line. All downstream PS-internal variable uses (`$query`, `$tokens`, `$adFilter`) already operate on that PS variable โ€” they stay as-is. + +### Phase 4 โ€” Fix `query_users()`, `count_users()`, `summarize_users()` *(parallel with Phase 2)* + +These build dynamic filters from multiple string params. Each filter string value gets its own env var: + +| Python key | Env var | Max length | +| :------------------- | :------------------- | :--------- | +| `name_contains` | `NEXUS_FILTER_NAME` | 256 | +| `username_prefix` | `NEXUS_FILTER_PREFIX`| 64 | +| `ou_contains` | `NEXUS_FILTER_OU` | 512 | +| `description_contains`| `NEXUS_FILTER_DESC` | 256 | +| `group_any` (JSON) | `NEXUS_FILTER_GROUPS`| 2048 | + +In the PowerShell filter fragment, replace the Python-interpolated value with the env var reference. For example: +```powershell +# Before (Python builds this): "DisplayName -like '*{escaped_query}*'" +# After: "DisplayName -like '*$($env:NEXUS_FILTER_NAME)*'" +``` + +Collect all env vars into a single `env_params` dict and pass once to `_run_powershell()`. + +### Phase 5 โ€” Tests *(depends on Phases 1โ€“4)* + +Add to `nexus-mcp/tests/identity_tests/test_ad_adapter.py`: + +1. **`test_get_user_injection_attempt`** โ€” calls `get_user("'; Remove-Item C:\\test; '")` with `_run_powershell` mocked. Asserts it returns `None` (not raised, not crashed) โ€” proving the method still works with hostile input. +2. **`test_search_users_injection_attempt`** โ€” same for `search_users_by_name`. +3. **`test_run_powershell_passes_env_params`** โ€” mocks `asyncio.create_subprocess_exec`, calls `_run_powershell("echo test", env_params={"NEXUS_ARG0": "val"})`, asserts the `env` kwarg passed to `create_subprocess_exec` contains `"NEXUS_ARG0": "val"`. +4. **`test_get_user_input_too_long`** โ€” asserts `ValueError` is raised for a 257-char username. + +> **Note on existing test impact:** The `cred_block` fix in Phase 1 affects `test_run_powershell_with_credentials` โ€” that test currently asserts on the command string content. It will need updating to assert on env var presence instead of password interpolation in the command string. + +--- + +## ๐Ÿ› ๏ธ Deployment Steps + +```bash +# 1. Create and switch to the feature branch +git checkout -b fix/ps-command-injection-ad-adapter + +# 2. No new packages required โ€” os is stdlib, pyproject.toml is unchanged + +# 3. Run existing tests before making changes (baseline) +cd nexus-mcp +python -m pytest tests/identity_tests/test_ad_adapter.py -v + +# 4. Implement the phases above in ad_adapter.py and test_ad_adapter.py + +# 5. Run tests after changes +python -m pytest tests/identity_tests/test_ad_adapter.py -v + +# 6. Run full nexus-mcp test suite to confirm no regressions +python -m pytest tests/ -v --ignore=tests/integration_test_audit_shard.py +``` + +--- + +## ๐Ÿงช How to Verify It Works + +**Success metric โ€” automated:** +``` +tests/identity_tests/test_ad_adapter.py::TestActiveDirectoryBackend::test_get_user_injection_attempt PASSED +tests/identity_tests/test_ad_adapter.py::TestActiveDirectoryBackend::test_run_powershell_passes_env_params PASSED +tests/identity_tests/test_ad_adapter.py::TestActiveDirectoryBackend::test_get_user_input_too_long PASSED +``` + +**Success metric โ€” code audit (zero output expected):** +```bash +# Should return nothing โ€” no more _escape_ps_single_quoted calls in method bodies +grep -n "_escape_ps_single_quoted(" nexus-mcp/lib/ad_adapter.py | grep -v "def _escape" + +# Should return nothing โ€” no more Python f-string interpolation of user values into PS command strings +grep -n "escaped_" nexus-mcp/lib/ad_adapter.py +``` + +**Out of scope:** `archive/Identity/` tree. The static method `_escape_ps_single_quoted()` remains in place, marked `# DEPRECATED`, to avoid breaking archive callers. diff --git a/nexus-mcp/src/shards/identity.py b/nexus-mcp/src/shards/identity.py index 1ac0c8d..4a3c9b5 100644 --- a/nexus-mcp/src/shards/identity.py +++ b/nexus-mcp/src/shards/identity.py @@ -21,12 +21,11 @@ _USE_MOCK = os.getenv("USE_MOCK", "false").lower() == "true" _ad = None _entra = None - def _get_ad(): global _ad if _ad is None: - from ad_adapter import ActiveDirectoryAdapter - _ad = ActiveDirectoryAdapter() + from ad_adapter import ActiveDirectoryIdentityBackend + _ad = ActiveDirectoryIdentityBackend() return _ad