Skip to content

Manager

psengine.detection.detection_mgr.DetectionMgr

DetectionMgr(rf_token: Optional[str] = None)

Class to manage DetectionRules and interaction with the Detection API.

PARAMETER DESCRIPTION
rf_token

Recorded Future API token.

TYPE: Optional[str] DEFAULT: None

Source code in psengine/detection/detection_mgr.py
def __init__(
    self,
    rf_token: Annotated[Optional[str], Doc('Recorded Future API token.')] = None,
):
    """Initialize the `DetectionMgr` object."""
    self.log = logging.getLogger(__name__)
    self.rf_client = RFClient(api_token=rf_token) if rf_token else RFClient()

fetch

fetch(doc_id: str) -> Optional[DetectionRule]

Fetch a detection rule based on its ID.

PARAMETER DESCRIPTION
doc_id

Detection rule ID to look up.

TYPE: str

Endpoint

detection-rule/search

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

DetectionRuleLookupError

If no rule is found for the given ID.

RETURNS DESCRIPTION
Optional[DetectionRule]

The detection rule found for the given ID.

Source code in psengine/detection/detection_mgr.py
@debug_call
@validate_call
def fetch(
    self,
    doc_id: Annotated[str, Doc('Detection rule ID to look up.')],
) -> Annotated[Optional[DetectionRule], Doc('The detection rule found for the given ID.')]:
    """Fetch a detection rule based on its ID.

    Endpoint:
        `detection-rule/search`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        DetectionRuleLookupError: If no rule is found for the given ID.
    """
    try:
        result = self.search(doc_id=doc_id)
    except DetectionRuleSearchError as e:
        raise DetectionRuleFetchError(f'Error in fething of {doc_id}') from e

    if result:
        return result[0]

    self.log.info(f'No rule found for id {doc_id}')
    return None

search

search(
    detection_rule: Union[list[str], str, None] = None,
    entities: Optional[list[str]] = None,
    created_before: Optional[str] = None,
    created_after: Optional[str] = None,
    updated_before: Optional[str] = None,
    updated_after: Optional[str] = None,
    doc_id: Optional[str] = None,
    title: Optional[str] = None,
    tagged_entities: Optional[bool] = None,
    max_results: Optional[int] = DEFAULT_LIMIT,
) -> list[DetectionRule]

Search for detection rules based on various filter criteria.

PARAMETER DESCRIPTION
detection_rule

Types of detection rules to search for.

TYPE: Union[list[str], str, None] DEFAULT: None

entities

List of entities to filter the search.

TYPE: Optional[list[str]] DEFAULT: None

created_before

Filter for rules created before this date.

TYPE: Optional[str] DEFAULT: None

created_after

Filter for rules created after this date.

TYPE: Optional[str] DEFAULT: None

updated_before

Filter for rules updated before this date.

TYPE: Optional[str] DEFAULT: None

updated_after

Filter for rules updated after this date.

TYPE: Optional[str] DEFAULT: None

doc_id

Filter by document ID.

TYPE: Optional[str] DEFAULT: None

title

Filter by title.

TYPE: Optional[str] DEFAULT: None

tagged_entities

Whether to filter by tagged entities.

TYPE: Optional[bool] DEFAULT: None

max_results

Limit the total number of results returned.

TYPE: Optional[int] DEFAULT: DEFAULT_LIMIT

Endpoint

detection-rule/search

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

DetectionRuleSearchError

If connection error occurs.

RETURNS DESCRIPTION
list[DetectionRule]

A list of detection rules matching the search criteria.

Source code in psengine/detection/detection_mgr.py
@debug_call
@validate_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=DetectionRuleSearchError)
def search(
    self,
    detection_rule: Annotated[
        Union[list[str], str, None], Doc('Types of detection rules to search for.')
    ] = None,
    entities: Annotated[
        Optional[list[str]], Doc('List of entities to filter the search.')
    ] = None,
    created_before: Annotated[
        Optional[str], Doc('Filter for rules created before this date.')
    ] = None,
    created_after: Annotated[
        Optional[str], Doc('Filter for rules created after this date.')
    ] = None,
    updated_before: Annotated[
        Optional[str], Doc('Filter for rules updated before this date.')
    ] = None,
    updated_after: Annotated[
        Optional[str], Doc('Filter for rules updated after this date.')
    ] = None,
    doc_id: Annotated[Optional[str], Doc('Filter by document ID.')] = None,
    title: Annotated[Optional[str], Doc('Filter by title.')] = None,
    tagged_entities: Annotated[
        Optional[bool], Doc('Whether to filter by tagged entities.')
    ] = None,
    max_results: Annotated[
        Optional[int], Doc('Limit the total number of results returned.')
    ] = DEFAULT_LIMIT,
) -> Annotated[
    list[DetectionRule], Doc('A list of detection rules matching the search criteria.')
]:
    """Search for detection rules based on various filter criteria.

    Endpoint:
        `detection-rule/search`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        DetectionRuleSearchError: If connection error occurs.
    """
    detection_rule = [detection_rule] if isinstance(detection_rule, str) else detection_rule
    filters = {
        'types': detection_rule,
        'entities': entities,
        'created': {'before': created_before, 'after': created_after},
        'updated': {'before': updated_before, 'after': updated_after},
        'doc_id': doc_id,
        'title': title,
    }
    data = {
        'filter': filters,
        'tagged_entities': tagged_entities,
        'limit': SEARCH_LIMIT,
    }

    data = DetectionRuleSearchOut.model_validate(data)
    results = self.rf_client.request_paged(
        'post',
        EP_DETECTION_RULES,
        data=data.json(),
        results_path='result',
        offset_key='offset',
        max_results=max_results,
    )

    results = results if isinstance(results, list) else results.json().get('result', [])
    return [DetectionRule.model_validate(data) for data in results]

psengine.detection.detection_mgr.SEARCH_LIMIT module-attribute

SEARCH_LIMIT = 100