Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/detectors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ garak.detectors
garak.detectors.any
garak.detectors.ansiescape
garak.detectors.continuation
garak.detectors.custom
garak.detectors.dan
garak.detectors.divergence
garak.detectors.encoding
Expand Down
8 changes: 8 additions & 0 deletions docs/source/garak.detectors.custom.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
garak.detectors.custom
======================

.. automodule:: garak.detectors.custom
:members:
:undoc-members:
:show-inheritance:

313 changes: 313 additions & 0 deletions garak/detectors/custom.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the name "custom" add much? I can see it makes sense to group them as done here. Though looking at the probe names, we have probes.function - detectors.function seems a fine analogue to that. And then detectors.regex could work for the other. I'll think on this some more.

Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
"""Custom detectors for flexible success criteria.

This module provides customizable detectors that allow users to define
their own success criteria without writing detector classes:

1. RegexDetector - Match regex patterns in outputs
2. FunctionDetector - Call user-provided functions for custom logic

CLI Examples:
# Regex detector
garak --detectors custom.RegexDetector \\
--detector_options '{"custom": {"RegexDetector": {"patterns": ["api.?key","sk-[A-Za-z0-9]{32,}"]}}}

# Function detector
garak --detectors custom.FunctionDetector \\
--detector_options '{"custom": {"FunctionDetector": {"function_name": "mymodule#check_harmful"}}}' \\
--probes dan.Dan_11_0

# Or use config file
garak --detectors custom.RegexDetector \\
--detector_option_file detector_config.json \\
--probes dan.Dan_11_0
Comment on lines +9 to +22
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please manage through Configurable mechanism and not CLI

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A cli command based on a detector config file is viable here, however reference to using --config for a consolidated example may add clarity.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry, yes, missed that these were viable existing options

"""

import importlib
import logging
import re
from typing import List

from garak import _config
import garak.attempt
from garak.detectors.base import Detector
from garak.resources.function_detector import dummy

class RegexDetector(Detector):
"""Detector that matches regex patterns in model outputs.

Useful for detecting specific strings,
code patterns, credentials, or any text that matches regex patterns.

Configuration:
{
"custom": {
"RegexDetector": {
"patterns": ["pattern1", "pattern2", ...],
"match_type": "any", # or "all"
"case_sensitive": false
}
}
}

Examples:
# Detect credentials
--detector_options '{"custom": {"RegexDetector": {
"patterns": ["api.?key", "sk-[A-Za-z0-9]{32,}"]
}}}'

# Detect dangerous commands
--detector_options '{"custom": {"RegexDetector": {
"patterns": ["sudo", "rm -rf", "DROP TABLE"],
"match_type": "any",
"case_sensitive": false
}}}'
"""
lang_spec = "*"

DEFAULT_PARAMS = Detector.DEFAULT_PARAMS | {
"patterns": [], # users must provide patterns
"match_type": "any", # "any" or "all"
"case_sensitive": False,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it makes sense to expose multiline / re.M here?

Copy link
Collaborator

@jmartin-tech jmartin-tech Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make sense to offer this as a set of strings corresponding to allowed enum values from re.RegexFlag to | together. Named something like re_flags?

This could even be initialized to the re.NOFLAG value:

Suggested change
"case_sensitive": False,
"re_flags": [ "NOFLAG" ],

If accepting this idea be sure to add validation in __init__.

}

active = False
tags = []

def __init__(self, config_root=_config):
"""Initialize RegexDetector from config.

Args:
config_root: Configuration root object containing detector configuration

Raises:
ValueError: If no patterns provided or invalid regex pattern
"""
super().__init__(config_root=config_root)

# Normalize to list
if isinstance(self.patterns, str):
self.patterns = [self.patterns]

# Validate match_type
self.match_type = str(self.match_type).lower()
if self.match_type not in ("any", "all"):
raise ValueError(f"match_type must be 'any' or 'all', got '{self.match_type}'")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please log and skip, rather than risking aborting the run


# Compile regex patterns
flags = 0 if self.case_sensitive else re.IGNORECASE
try:
self.compiled_patterns = [
re.compile(pattern, flags) for pattern in self.patterns
]
except re.error as e:
raise ValueError(f"Invalid regex pattern: {e}") from e
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please log and skip, rather than risking aborting the run


# Update description
self.description = (
f"Regex detector matching {self.match_type} of {len(self.patterns)} pattern(s)"
)

logging.info(
"RegexDetector initialized with %d patterns (match_type=%s, case_sensitive=%s)",
len(self.patterns), self.match_type, self.case_sensitive
)

def detect(self, attempt: garak.attempt.Attempt) -> List[float]:
"""Detect if outputs match regex patterns.

Returns 1.0 if match found (vulnerability), 0.0 otherwise.

Args:
attempt: Attempt object containing outputs to check

Returns:
List of scores (0.0 or 1.0) for each output
"""
results = []
for output in attempt.outputs:
output_text = output.text if output else None
if output_text is None:
results.append(None)
continue

if not self.compiled_patterns:
results.append(0.0) # default to safe if no patterns are provided
continue

# Check patterns based on match_type
if self.match_type == "any":
# Match if ANY pattern is found
matched = any(
pattern.search(output_text) for pattern in self.compiled_patterns
)
elif self.match_type == "all":
# Match only if ALL patterns are found
matched = all(
pattern.search(output_text) for pattern in self.compiled_patterns
)
else:
raise ValueError(f"Invalid match_type: {self.match_type}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please log and skip, rather than risking aborting the run. Failing detector can safely return [None] * input_length. Would be best if we accepted detectors returning flat None on overall detector failure but I'm not sure that's possible yet.


results.append(1.0 if matched else 0.0)

return results


class FunctionDetector(Detector):
"""Detector that calls user-provided function for custom detection logic.

Inspired by function.py generator. The user specifies a function in the format
'module#function', and it's called with each output string.

The function must have signature: function(output_text: str) -> float
where return value is 0.0 (safe) to 1.0 (detected).

Configuration (via --detector_options or --detector_option_file):
{
"custom": {
"FunctionDetector": {
"function_name": "mymodule#myfunction",
"kwargs": { # Optional: additional args passed to function
"threshold": 0.5,
"custom_param": "value"
}
}
}
}

Examples:
# In mydetectors.py:
def check_length(output_text, min_length=100):
return 1.0 if len(output_text) > min_length else 0.0

# CLI:
--detector_options '{"custom": {"FunctionDetector": {
"function_name": "mydetectors#check_length",
"kwargs": {"min_length": 150}
}}}'

# In mydetectors.py:
def check_pii(output_text):
import re
has_ssn = bool(re.search(r'\\d{3}-\\d{2}-\\d{4}', output_text))
has_email = bool(re.search(r'\\S+@\\S+', output_text))
return 1.0 if (has_ssn or has_email) else 0.0

# CLI:
--detector_options '{"custom": {"FunctionDetector": {
"function_name": "mydetectors#check_pii"
}}}'
"""

DEFAULT_PARAMS = Detector.DEFAULT_PARAMS | {
"function_name": f"{dummy.__name__}#always_pass", # users must provide a function name. We provide a dummy function for tests.
"kwargs": {}, # Optional: additional kwargs for function
}

active = False
tags = []
lang_spec = "*"

def __init__(self, config_root=_config):
"""Initialize FunctionDetector from config.

Loads the specified function dynamically using importlib, similar to
how function.py generator works.

Args:
config_root: Configuration root object containing detector configuration

Raises:
ValueError: If function_name not provided or invalid format
ImportError: If module cannot be imported
AttributeError: If function not found in module
"""
super().__init__(config_root=config_root)

self.function_name = str(self.function_name)

# Parse module#function format
if "#" not in self.function_name:
raise ValueError(
f"function_name must be in format 'module#function', got '{self.function_name}'"
)
Comment on lines +231 to +233
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what the best way of handling this is post-inference. There may be other detectors configured, for example. Please log and skip, rather than risking aborting the run


module_name, func_name = self.function_name.split("#", 1)

# Load the module and function
try:
module = importlib.import_module(module_name)
except ImportError as e:
raise ImportError(
f"Could not import module '{module_name}' for FunctionDetector"
) from e
Comment on lines +238 to +243
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is module discarded once the constructor returns?


try:
self.detection_function = getattr(module, func_name)
except AttributeError as e:
raise AttributeError(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please log and skip, rather than risking aborting the run

f"Function '{func_name}' not found in module '{module_name}'"
) from e

# Validate function is callable
if not callable(self.detection_function):
raise ValueError(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please log and skip, rather than risking aborting the run

f"{module_name}#{func_name} is not callable"
)

# Update description
self.description = f"Function detector using {self.function_name}"

logging.info(
"FunctionDetector initialized with function %s (kwargs=%s)",
self.function_name, self.kwargs
)

def detect(self, attempt: garak.attempt.Attempt) -> List[float]:
"""Call user function to detect issues in outputs.

The function receives the output text (str) and returns a score (float).
Additional kwargs from config are passed to the function.

Args:
attempt: Attempt object containing outputs to check

Returns:
List of scores (0.0-1.0) for each output
"""
results = []
for output in attempt.outputs:
output_text = output.text if output else None
if output_text is None:
results.append(None)
continue

try:
# Call user's function with output text
score = self.detection_function(output_text, **self.kwargs)

# Validate return type
if not isinstance(score, (int, float)):
logging.error(
"Function %s returned non-numeric value: %s (type: %s)",
self.function_name, score, type(score)
)
score = 0.0

# Clamp to 0-1 range (shall we raise if outside range?)
score_clamped = max(0.0, min(1.0, float(score)))
if score != score_clamped:
logging.warning(
"Function %s returned score outside 0-1 range: %s (clamped to %s)",
self.function_name, score, score_clamped
)
results.append(score_clamped)

except Exception as e:
logging.error(
"Error calling detection function %s: %s",
self.function_name, e, exc_info=e
)
results.append(None) # Default to None on error

return results
2 changes: 2 additions & 0 deletions garak/resources/function_detector/dummy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def always_pass(output_text: str) -> float:
return 1.0
Loading