Adding File Enrichment Modules
Adding File Enrichment Modules
File enrichment modules for the main enrichment workflow are located in libs/file_enrichment_modules/file_enrichment_modules/.
To add a new module, create a new folder matching Python's PEP8 naming conventions:
Modules should have short, all-lowercase names. Underscores can be used in the module name if it improves readability. Python packages should also have short, all-lowercase names, although the use of underscores is discouraged.
Create a main analyzer.py file with your enrichment logic. The easiest method for this (and enrichment modules are fairly small) is to find an example module, and use it as a base with a LLM to help draft your code.
If your module needs additional dependencies, you have two options. Before either, first install Poetry. To prevent version issues, we recommend installing version 2.0.1 of Poetry with pipx install poetry==2.0.1
For the first option, you can cd to projects/file_enrichment or libs/file_enrichment_modules/ and run poetry add X for the needed library.
Alternatively (and easier) you can create a pyproject.yaml in the new module module folder. An example is:
[tool.poetry]
name = "module"
version = "0.1.0"
description = "Enriches things"
authors = ["harmj0y <will@harmj0y.net>"]
package-mode = false
[tool.poetry.dependencies]
python = "^3.9"
Then in this folder, run poetry add X to add a new library. The dynamic module loader will install the necessary dependencies in a Poetry env for just that module.
Tips / Tricks
The async should_process() function determines if the module should run on a file. You can either check the name or any other component of the base enriched file with file_enriched = await get_file_enriched(object_id, self.asyncpg_pool):
...
async def should_process(self, object_id: str, file_path: str | None = None) -> bool:
"""Determine if this module should run based on file type."""
file_enriched = await get_file_enriched_async(object_id, self.asyncpg_pool)
should_run = (
file_enriched.file_name.lower().endswith(".ini")
and "vnc" in file_enriched.file_name.lower()
and "text" in file_enriched.magic_type.lower()
)
return should_run
...
Or you can use a Yara rule (or you could do both!):
...
# Yara rule to check for DPAPI blob content
self.yara_rule = yara_x.compile("""
rule has_dpapi_blob
{
strings:
$dpapi_header = { 01 00 00 00 D0 8C 9D DF 01 15 D1 11 8C 7A 00 C0 4F C2 97 EB }
$dpapi_header_b64_1 = "AAAA0Iyd3wEV0RGMegDAT8KX6"
$dpapi_header_b64_2 = "AQAAANCMnd8BFdERjHoAwE/Cl+"
$dpapi_header_b64_3 = "EAAADQjJ3fARXREYx6AMBPwpfr"
condition:
$dpapi_header or $dpapi_header_b64_1 or $dpapi_header_b64_2 or $dpapi_header_b64_3
}
""")
async def should_process(self, object_id: str, file_path: str | None = None) -> bool:
"""Check if this file should be processed by scanning for DPAPI blobs.
Args:
object_id: The object ID of the file
file_path: Optional path to already downloaded file
"""
file_enriched = await get_file_enriched_async(object_id, self.asyncpg_pool)
if file_enriched.size > self.size_limit:
logger.debug(
f"[dpapi_analyzer] file {file_enriched.path} ({file_enriched.object_id} / {file_enriched.size} bytes) exceeds the size limit of {self.size_limit} bytes, only analyzing the first {self.size_limit} bytes"
)
if file_path:
# Use provided file path - read only the needed bytes
with open(file_path, "rb") as f:
num_bytes = min(file_enriched.size, self.size_limit)
file_bytes = f.read(num_bytes)
else:
# Fallback to downloading the file itself
num_bytes = file_enriched.size if file_enriched.size < self.size_limit else self.size_limit
file_bytes = self.storage.download_bytes(file_enriched.object_id, length=num_bytes)
should_run = len(self.yara_rule.scan(file_bytes).matching_rules) > 0
return should_run
...
On Transforms
File transforms require a type (used as a title for display) and an object_id to reference the data to display.
Optional metadata is:
| Metadata Field | Type | Description |
|---|---|---|
| file_name | string | Name of the file (i.e., for downloads) |
| display_type_in_dashboard | display_type | How to display in the dashboard |
| display_title | string | Title to display for the transform in the dashboard |
| default_display | bool | true to set this transform as the default display |
| offer_as_download | bool | If set to true offered as a download tab, downloading as file_name |
Display Types are:
| Value | Description |
|---|---|
| monaco | Display in a Monaco editor, using the extension from file_name to help determine the language type. |
| Render as a PDF | |
| image | Render as an image |
| markdown | Render as an image |
| null | Don't display content |
Examples
Example of setting a text file as the default display (in file_enrichment_modules/sqlite/analyzer.py):
with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8") as tmp_display_file:
display = format_sqlite_data(database_data)
tmp_display_file.write(display)
tmp_display_file.flush()
object_id = self.storage.upload_file(tmp_display_file.name)
displayable_parsed = Transform(
type="displayable_parsed",
object_id=f"{object_id}",
metadata={
"file_name": f"{file_enriched.file_name}.txt",
"display_type_in_dashboard": "monaco",
"default_display": True
},
)
enrichment_result.transforms = [displayable_parsed]
Example of offering a file for download (in file_enrichment_modules/dotnet/analyzer.py):
decompilation = Transform(
type = "decompilation",
object_id = service_results["decompilation"]["object_id"],
metadata = {
"file_name" : f"{file_enriched.file_name}.zip",
"offer_as_download" : True
}
)
enrichment_result.transforms = [decompilation]