Enrichment Module Development Guide
This guide covers how to create new enrichment modules for Nemesis. Enrichment modules analyze files and extract security-relevant information like credentials, hashes, metadata, and indicators of compromise.
There are two ways to build a new module:
- Manual: Follow the sections below to understand the module structure, implement the protocol, and wire up testing yourself. Best for learning how things work under the hood.
- Claude Code skill (easy mode): Run the
/new-enrichment-moduleskill in Claude Code to get a guided, interactive workflow that handles scaffolding, library selection, implementation, and testing. Jump to Quick Start with Claude Code to get started. - Codex skill (easy mode): Run the
$enrichment-module-builderskill from the repo root to get a guided, interactive workflow that handles scaffolding, library selection, implementation, and testing. Jump to Quick Start with Codex to get started.
Table of Contents
- Module Structure
- Protocol Interface
- Detection Patterns
- Output Types
- Finding Categories & Severity
- Common Patterns
- Testing
- Quick Start with Claude Code
- Quick Start with Codex
- Reference Modules
Module Structure
Each enrichment module lives in its own directory under libs/file_enrichment_modules/file_enrichment_modules/. The folder name should follow Python's PEP8 naming conventions:
Modules should have short, all-lowercase names. Underscores can be used in the module name if it improves readability. Python packages should also have short, all-lowercase names, although the use of underscores is discouraged.
libs/file_enrichment_modules/file_enrichment_modules/{module_name}/
├── analyzer.py # Required: Main module code with create_enrichment_module()
├── pyproject.toml # Optional: Module-specific dependencies
└── rules.yar # Optional: YARA rules for detection
Required: analyzer.py
The analyzer.py file must export a create_enrichment_module() factory function:
from common.models import EnrichmentResult
from file_enrichment_modules.module_loader import EnrichmentModule
class MyAnalyzer(EnrichmentModule):
name: str = "my_analyzer"
dependencies: list[str] = []
def __init__(self):
from common.storage import StorageS3
self.storage = StorageS3()
self.asyncpg_pool = None # Injected at runtime
self.workflows = ["default"]
async def should_process(self, object_id: str, file_path: str | None = None) -> bool:
# Return True if this module should analyze this file
...
async def process(self, object_id: str, file_path: str | None = None) -> EnrichmentResult | None:
# Analyze the file and return results
...
def create_enrichment_module() -> EnrichmentModule:
return MyAnalyzer()
Optional: pyproject.toml
If your module needs dependencies not in the base file_enrichment_modules, you have two options. First, install uv.
Option 1: cd to projects/file_enrichment or libs/file_enrichment_modules/ and run uv add X for the needed library.
Option 2 (easier): Create a pyproject.toml in the new module folder:
[project]
name = "my_module"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
"some-library>=1.0.0",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
Then in this folder, run uv add X to add a new library. The dynamic module loader will install the necessary dependencies in a virtual env for just that module.
Protocol Interface
All modules must implement the EnrichmentModule protocol:
from typing import Protocol
import asyncpg
from common.models import EnrichmentResult
class EnrichmentModule(Protocol):
name: str # Unique module identifier
dependencies: list[str] # Other modules this depends on
asyncpg_pool: asyncpg.Pool | None # Database pool (injected at runtime)
async def should_process(self, object_id: str, file_path: str | None = None) -> bool:
"""Determine if this module should process the given file.
Args:
object_id: UUID of the file in the database
file_path: Optional local path to the file (for performance)
Returns:
True if the module should process this file
"""
...
async def process(self, object_id: str, file_path: str | None = None) -> EnrichmentResult | None:
"""Process the file and return enrichment results.
Args:
object_id: UUID of the file in the database
file_path: Optional local path to the file
Returns:
EnrichmentResult with findings/transforms, or None on failure
"""
...
Important: In addition to the protocol fields above, modules must set
self.workflows = ["default"]in their__init__method. The workflow engine filters modules by this attribute — without it, your module will load but never execute.
Detection Patterns
Choose the appropriate detection pattern based on your target file type:
1. Magic Type / MIME Type Matching
Best for: Files with distinctive magic signatures (PE, ELF, PDF, SQLite, etc.)
async def should_process(self, object_id: str, file_path: str | None = None) -> bool:
file_enriched = await get_file_enriched_async(object_id, self.asyncpg_pool)
# Check magic type
return "PE32" in file_enriched.magic_type
# Or check MIME type
return file_enriched.mime_type == "application/x-sqlite3"
Example modules: container (uses is_container() helper), sqlite
2. File Extension Matching
Best for: Files identified by extension (.keytab, .pem, .lnk)
async def should_process(self, object_id: str, file_path: str | None = None) -> bool:
file_enriched = await get_file_enriched_async(object_id, self.asyncpg_pool)
# Check extension
return file_enriched.extension and file_enriched.extension.lower() in [".keytab", ".kt"]
Example modules: keytab (extension OR YARA), lnk
3. Filename Matching
Best for: Configuration files with specific names (.git-credentials, web.config)
async def should_process(self, object_id: str, file_path: str | None = None) -> bool:
file_enriched = await get_file_enriched_async(object_id, self.asyncpg_pool)
# Check filename + plaintext
return file_enriched.is_plaintext and file_enriched.file_name.lower() in [
".git-credentials",
".gitcredentials"
]
Example modules: gitcredentials, filezilla
4. YARA Rule Matching
Best for: Files with distinctive binary signatures or content patterns
import yara_x
class MyAnalyzer(EnrichmentModule):
def __init__(self):
self.yara_rule = yara_x.compile("""
rule target_file {
strings:
$header = { 05 02 } // Magic bytes
condition:
$header at 0
}
""")
async def should_process(self, object_id: str, file_path: str | None = None) -> bool:
file_enriched = await get_file_enriched_async(object_id, self.asyncpg_pool)
# Read bytes for YARA scan
num_bytes = min(file_enriched.size, 1000)
if file_path:
with open(file_path, "rb") as f:
file_bytes = f.read(num_bytes)
else:
file_bytes = self.storage.download_bytes(object_id, length=num_bytes)
return len(self.yara_rule.scan(file_bytes).matching_rules) > 0
Example modules: pe, keytab, dpapi_blob
5. Combined Detection
For higher confidence, combine multiple detection methods:
async def should_process(self, object_id: str, file_path: str | None = None) -> bool:
file_enriched = await get_file_enriched_async(object_id, self.asyncpg_pool)
# Method 1: Extension check (fast)
if file_enriched.file_name.lower().endswith(".keytab"):
return True
# Method 2: YARA verification (slower but more accurate)
file_bytes = self.storage.download_bytes(object_id, length=1000)
return len(self.yara_rule.scan(file_bytes).matching_rules) > 0
Example modules: chromium_cookies (magic + YARA + filename), office_doc (extension OR magic)
6. Process All Files
For scanners that should run on everything:
async def should_process(self, object_id: str, file_path: str | None = None) -> bool:
return True # Run on all files
Example modules: yara (scans all files with custom rules)
Output Types
EnrichmentResult Structure
from common.models import EnrichmentResult, Finding, Transform
result = EnrichmentResult(
module_name=self.name,
results={"parsed_data": {...}}, # Raw parsed data (stored in DB)
findings=[...], # Security findings
transforms=[...], # Derived files
dependencies=self.dependencies, # Module dependencies
)
1. Results (Raw Data)
Store parsed data that doesn't fit findings/transforms:
result.results = {
"headers": {...},
"sections": [...],
"imports": [...],
}
2. Findings
Security-relevant discoveries with severity ratings:
from common.models import Finding, FindingCategory, FindingOrigin, FileObject
# Create display data (shown in UI)
summary_markdown = "# Credentials Found\n\n* Username: admin\n* Password: ..."
display_data = FileObject(
type="finding_summary",
metadata={"summary": summary_markdown}
)
finding = Finding(
category=FindingCategory.CREDENTIAL,
finding_name="git_credentials_detected",
origin_type=FindingOrigin.ENRICHMENT_MODULE,
origin_name=self.name,
object_id=file_enriched.object_id,
severity=7,
raw_data={"credentials": [...]}, # Structured data
data=[display_data], # Display objects
)
result.findings.append(finding)
3. Transforms
Derived files uploaded to storage. Transforms require a type (used as a title for display) and an object_id to reference the data to display.
Transform Metadata Reference
| Metadata Field | Type | Description |
|---|---|---|
| file_name | string | Name of the file (i.e., for downloads) |
| display_type_in_dashboard | display_type | How to display in the dashboard |
| display_title | string | Title to display for the transform in the dashboard |
| default_display | bool | true to set this transform as the default display |
| offer_as_download | bool | If set to true offered as a download tab, downloading as file_name |
Display Types
| Value | Description |
|---|---|
| monaco | Display in a Monaco editor, using the extension from file_name to help determine the language type. |
| Render as a PDF | |
| image | Render as an image |
| markdown | Render as markdown |
| null | Don't display content |
Basic Transform Example
from common.models import Transform
import tempfile
# Create a report/derived file
with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8") as tmp:
tmp.write("# Analysis Report\n...")
tmp.flush()
transform_id = self.storage.upload_file(tmp.name)
transform = Transform(
type="analysis_report",
object_id=str(transform_id),
metadata={
"file_name": f"{file_enriched.file_name}_analysis.md",
"display_type_in_dashboard": "markdown", # or "monaco", "hex"
"default_display": True, # Show by default
"offer_as_download": False, # Allow download
},
)
result.transforms.append(transform)
Setting a Text File as Default Display
From file_enrichment_modules/sqlite/analyzer.py:
with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8") as tmp_display_file:
display = format_sqlite_data(database_data)
tmp_display_file.write(display)
tmp_display_file.flush()
object_id = self.storage.upload_file(tmp_display_file.name)
displayable_parsed = Transform(
type="displayable_parsed",
object_id=f"{object_id}",
metadata={
"file_name": f"{file_enriched.file_name}.txt",
"display_type_in_dashboard": "monaco",
"default_display": True
},
)
enrichment_result.transforms = [displayable_parsed]
Offering a File for Download
From file_enrichment_modules/dotnet/analyzer.py:
decompilation = Transform(
type = "decompilation",
object_id = service_results["decompilation"]["object_id"],
metadata = {
"file_name" : f"{file_enriched.file_name}.zip",
"offer_as_download" : True
}
)
enrichment_result.transforms = [decompilation]
Finding Categories & Severity
Categories
from common.models import FindingCategory
FindingCategory.CREDENTIAL # Usernames, passwords, tokens, API keys
FindingCategory.EXTRACTED_HASH # Password hashes, encryption keys
FindingCategory.EXTRACTED_DATA # Parsed configuration, metadata
FindingCategory.VULNERABILITY # Security misconfigurations, weaknesses
FindingCategory.YARA_MATCH # YARA rule matches
FindingCategory.PII # Personal identifiable information
FindingCategory.MISC # Other security findings
FindingCategory.INFORMATIONAL # Low-priority info
Severity Scale (0-10)
| Severity | Use For | Examples |
|---|---|---|
| 9-10 | Critical credentials, active exploits | Domain admin creds, cleartext passwords |
| 7-8 | High-value credentials, keys | Kerberos keytabs, API tokens, SSH keys |
| 5-6 | Medium findings | YARA matches, password hashes |
| 3-4 | Low findings | Metadata disclosure, expired certs |
| 1-2 | Informational | Debug info, version strings |
Common Patterns
Getting File Metadata
from common.state_helpers import get_file_enriched_async
async def process(self, object_id: str, file_path: str | None = None):
file_enriched = await get_file_enriched_async(object_id, self.asyncpg_pool)
# Available fields:
# file_enriched.file_name - Original filename
# file_enriched.extension - File extension
# file_enriched.size - File size in bytes
# file_enriched.magic_type - Magic file type
# file_enriched.mime_type - MIME type
# file_enriched.is_plaintext - True if text file
# file_enriched.is_container - True if archive
# file_enriched.hashes - Dict with md5, sha1, sha256
# file_enriched.path - Original path on source system
Reading File Contents
# Option 1: Use provided file_path (preferred for performance)
if file_path:
with open(file_path, "rb") as f:
content = f.read()
else:
# Option 2: Download from storage
with self.storage.download(object_id) as temp_file:
with open(temp_file.name, "rb") as f:
content = f.read()
# Option 3: Read specific bytes (for detection)
header_bytes = self.storage.download_bytes(object_id, length=1000)
Standard Process Pattern
async def process(self, object_id: str, file_path: str | None = None) -> EnrichmentResult | None:
try:
file_enriched = await get_file_enriched_async(object_id, self.asyncpg_pool)
if file_path:
return self._analyze_file(file_path, file_enriched)
else:
with self.storage.download(object_id) as temp_file:
return self._analyze_file(temp_file.name, file_enriched)
except Exception:
logger.exception(message="Error in process()", object_id=object_id)
return None
def _analyze_file(self, file_path: str, file_enriched) -> EnrichmentResult | None:
"""Actual analysis logic (sync method)."""
result = EnrichmentResult(module_name=self.name)
# ... analysis code ...
return result
Testing
Standalone Testing with Test Harness
Use the test harness to test modules without running Nemesis:
import pytest
from tests.harness import ModuleTestHarness, FileEnrichedFactory
from file_enrichment_modules.my_module.analyzer import MyAnalyzer
@pytest.mark.asyncio
async def test_should_process_target_file():
harness = ModuleTestHarness()
# Register a test file
harness.register_file(
object_id="test-uuid",
local_path="/path/to/test/file",
file_enriched=FileEnrichedFactory.create_pe_file(object_id="test-uuid"),
)
# Test the module
async with harness.create_module(MyAnalyzer) as module:
assert await module.should_process("test-uuid") is True
@pytest.mark.asyncio
async def test_process_extracts_data():
harness = ModuleTestHarness()
harness.register_file(
object_id="test-uuid",
local_path="/path/to/test/file",
file_enriched=FileEnrichedFactory.create_pe_file(object_id="test-uuid"),
)
async with harness.create_module(MyAnalyzer) as module:
result = await module.process("test-uuid")
assert result is not None
assert result.module_name == "my_analyzer"
assert len(result.findings) > 0
Run Tests
cd libs/file_enrichment_modules
uv run pytest tests/test_my_module.py -v
Integration Testing
For full integration testing with Nemesis running:
# Start dev environment
./tools/nemesis-ctl.sh start dev
# Submit test file
cd projects/cli
uv run python -m nemesis_cli.main submit --file /path/to/sample
# Check results in Hasura or via SQL
Quick Start with Claude Code
If you have Claude Code available or another coding AI agent, the /new-enrichment-module skill provides a guided workflow that handles design, implementation, and testing.
Usage
Launch Claude Code from the Nemesis project root and run:
/new-enrichment-module <description of file type to support>
Examples:
/new-enrichment-module Windows Prefetch files (.pf)
/new-enrichment-module SSH private keys (RSA, ECDSA, Ed25519)
/new-enrichment-module macOS Keychain database files
/new-enrichment-module KeePass database files (.kdbx)
What the Skill Does
The skill walks through 8 steps, pausing at review gates for your input:
| Step | What Happens | Review Gate? |
|---|---|---|
| 1. Problem Analysis | Gathers requirements about target file types and data to extract | |
| 2. Output Mode | Choose Findings, Parsing-Only, or Hybrid mode | Yes |
| 3. Library Research | Searches for and evaluates parsing libraries | Yes |
| 4. Sample File | Obtain or generate a test file | Yes |
| 5. Detection Strategy | Builds should_process() using magic types, extensions, YARA, etc. |
|
| 6. Implementation | Creates analyzer.py, pyproject.toml, and rules.yar as needed |
|
| 7. Standalone Tests | Writes and runs unit tests using the test harness | |
| 8. Integration Test | Submits the test file to a running Nemesis instance and verifies results | Yes |
The review gates let you steer library choices, output format, and test file selection before the skill commits to an approach.
Prerequisites
For the full workflow including integration testing (step 8), start Nemesis in development mode first:
./tools/nemesis-ctl.sh start dev
The skill can still scaffold and unit-test a module without Nemesis running, but the final integration test requires a live instance.
Output
When complete, the skill produces a ready-to-use module at libs/file_enrichment_modules/file_enrichment_modules/{module_name}/ with:
analyzer.py— Full module implementation withshould_process()andprocess()pyproject.toml— Created if the module needs dependencies beyond the base packagerules.yar— Created if the detection strategy uses YARA rules- Unit tests in
tests/using the test harness
Quick Start with Codex
If you are using Codex in this repository, the $enrichment-module-builder skill provides a guided workflow that handles design, implementation, and testing.
Usage
Launch Codex from the Nemesis project root and run:
$enrichment-module-builder <description of file type to support>
Examples:
$enrichment-module-builder Windows Prefetch files (.pf)
$enrichment-module-builder SSH private keys (RSA, ECDSA, Ed25519)
$enrichment-module-builder macOS Keychain database files
$enrichment-module-builder KeePass database files (.kdbx)
What the Skill Does
The skill walks through 8 steps, pausing at review gates for your input:
| Step | What Happens | Review Gate? |
|---|---|---|
| 1. Problem Analysis | Gathers requirements about target file types and data to extract | |
| 2. Output Mode | Choose Findings, Parsing-Only, or Hybrid mode | Yes |
| 3. Library Research | Searches for and evaluates parsing libraries | Yes |
| 4. Sample File | Obtain or generate a test file | Yes |
| 5. Detection Strategy | Builds should_process() using magic types, extensions, YARA, etc. |
|
| 6. Implementation | Creates analyzer.py, pyproject.toml, and rules.yar as needed |
|
| 7. Standalone Tests | Writes and runs unit tests using the test harness | |
| 8. Integration Test | Submits the test file to a running Nemesis instance and verifies results | Yes |
The review gates let you steer library choices, output format, and test file selection before the skill commits to an approach.
Prerequisites
For the full workflow including integration testing (step 8), start Nemesis in development mode first:
./tools/nemesis-ctl.sh start dev
The skill can still scaffold and unit-test a module without Nemesis running, but the final integration test requires a live instance.
Output
When complete, the skill produces a ready-to-use module at libs/file_enrichment_modules/file_enrichment_modules/{module_name}/ with:
analyzer.py— Full module implementation withshould_process()andprocess()pyproject.toml— Created if the module needs dependencies beyond the base packagerules.yar— Created if the detection strategy uses YARA rules- Unit tests in
tests/using the test harness
Reference Modules
These modules demonstrate the major patterns:
| Module | Detection | Output | Key Feature |
|---|---|---|---|
pe |
Magic + YARA | Findings + Transforms | Complex parsing with lief |
yara |
All files | Findings | YARA rule management |
chromium_cookies |
Magic + YARA + filename | Findings + Transforms | SQLite + DPAPI |
gitcredentials |
Filename + plaintext | Findings | Simple text parsing |
group_policy_preferences |
YARA + plaintext | Findings | XML + crypto |
container |
is_container() helper |
Transforms | Archive handling |
keytab |
Extension OR YARA | Findings | Binary struct parsing |
office_doc |
Extension OR magic | Findings + Transforms | Multi-format handling |
Browse these at: libs/file_enrichment_modules/file_enrichment_modules/