Skip to content

Manager

psengine.threat_maps.threat_map_mgr.ThreatMapMgr

ThreatMapMgr(rf_token: str | None = None)

Manages requests for Recorded Future Threat Maps API.

PARAMETER DESCRIPTION
rf_token

Recorded Future API token.

TYPE: str | None DEFAULT: None

Source code in psengine/threat_maps/threat_map_mgr.py
def __init__(
    self,
    rf_token: Annotated[str | None, Doc('Recorded Future API token.')] = None,
):
    """Initialize the `ThreatMapMgr` object."""
    self.log = logging.getLogger(__name__)
    self.rf_client = RFClient(api_token=rf_token) if rf_token else RFClient()

fetch_available_maps

fetch_available_maps() -> list[ThreatMapInfo]

Fetch available threat maps for the organization.

Endpoint

threat/maps

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

ThreatMapInfoError

If connection error occurs.

RETURNS DESCRIPTION
list[ThreatMapInfo]

A list of available threat maps.

Source code in psengine/threat_maps/threat_map_mgr.py
@debug_call
@validate_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=ThreatMapInfoError)
def fetch_available_maps(
    self,
) -> Annotated[list[ThreatMapInfo], Doc('A list of available threat maps.')]:
    """Fetch available threat maps for the organization.

    Endpoint:
        `threat/maps`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        ThreatMapInfoError: If connection error occurs.
    """
    maps_response = self.rf_client.request(method='get', url=EP_THREAT_MAPS_LIST).json()['data']
    return [ThreatMapInfo.model_validate(entry) for entry in maps_response]

fetch_entity_categories

fetch_entity_categories(
    map_type: MAP_TYPE,
) -> list[EntityCategory]

Fetch the entity category taxonomy used to filter threat maps.

PARAMETER DESCRIPTION
map_type

Type of threat map.

TYPE: MAP_TYPE

Endpoint

threat/{type}/categories

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

ThreatMapCategoriesError

If connection error occurs.

RETURNS DESCRIPTION
list[EntityCategory]

A list of threat map taxonomy categories.

Source code in psengine/threat_maps/threat_map_mgr.py
@debug_call
@validate_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=ThreatMapCategoriesError)
def fetch_entity_categories(
    self,
    map_type: Annotated[MAP_TYPE, Doc('Type of threat map.')],
) -> Annotated[list[EntityCategory], Doc('A list of threat map taxonomy categories.')]:
    """Fetch the entity category taxonomy used to filter threat maps.

    Endpoint:
        `threat/{type}/categories`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        ThreatMapCategoriesError: If connection error occurs.
    """
    map_type = ThreatMapType(map_type)
    url = EP_CATEGORIES.format(map_type.category_slug)
    cat_response = self.rf_client.request(method='get', url=url).json()['data']
    return [EntityCategory.model_validate(ent) for ent in cat_response]

search_threat_actor

search_threat_actor(
    name: str | None = None,
    max_results: int | None = DEFAULT_LIMIT,
    actors_per_page: int | None = Field(
        ge=1, le=10000, default=DEFAULT_LIMIT
    ),
) -> list[ThreatActorProfile]

Search Recorded Future's threat actor database by name, alias, or classification.

PARAMETER DESCRIPTION
name

Free text search of threat actor names, common names, or aliases.

TYPE: str | None DEFAULT: None

max_results

Limit the total number of results returned.

TYPE: int | None DEFAULT: DEFAULT_LIMIT

actors_per_page

The number of threat actors per page for pagination.

TYPE: int | None DEFAULT: Field(ge=1, le=10000, default=DEFAULT_LIMIT)

Endpoint

threat/actor/search

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

ThreatActorSearchError

If connection error occurs.

RETURNS DESCRIPTION
list[ThreatActorProfile]

A list of threat actors matching the search criteria.

Source code in psengine/threat_maps/threat_map_mgr.py
@debug_call
@validate_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=ThreatActorSearchError)
def search_threat_actor(
    self,
    name: Annotated[
        str | None, Doc('Free text search of threat actor names, common names, or aliases.')
    ] = None,
    max_results: Annotated[
        int | None, Doc('Limit the total number of results returned.')
    ] = DEFAULT_LIMIT,
    actors_per_page: Annotated[
        int | None, Doc('The number of threat actors per page for pagination.')
    ] = Field(ge=1, le=10_000, default=DEFAULT_LIMIT),
) -> Annotated[
    list[ThreatActorProfile], Doc('A list of threat actors matching the search criteria.')
]:
    """Search Recorded Future's threat actor database by name, alias, or classification.

    Endpoint:
        `threat/actor/search`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        ThreatActorSearchError: If connection error occurs.
    """
    data = {
        'name': name,
        'limit': min(max_results or DEFAULT_LIMIT, actors_per_page or DEFAULT_LIMIT),
    }
    search_response = self.rf_client.request_paged(
        method='post',
        url=EP_ACTOR_SEARCH,
        data=data,
        results_path='data',
        offset_key='offset',
        max_results=max_results or DEFAULT_LIMIT,
    )
    return [ThreatActorProfile.model_validate(ta) for ta in search_response]

fetch_map

fetch_map(
    map_type: MAP_TYPE,
    org_id: str | None = None,
    malware: str | list[str] | None = None,
    actors: str | list[str] | None = None,
    categories: str | list[str] | None = None,
    watchlists: str | list[str] | None = None,
) -> ThreatMap

Fetch a threat map with optional entity, category, and watchlist filters.

PARAMETER DESCRIPTION
map_type

Type of threat map.

TYPE: MAP_TYPE

org_id

Organization ID.

TYPE: str | None DEFAULT: None

malware

Filter by malware entity ID(s).

TYPE: str | list[str] | None DEFAULT: None

actors

Filter by threat actor ID(s).

TYPE: str | list[str] | None DEFAULT: None

categories

Filter by category ID(s).

TYPE: str | list[str] | None DEFAULT: None

watchlists

Filter by watch list ID(s).

TYPE: str | list[str] | None DEFAULT: None

Endpoint

threat/map/{type} or threat/map/{org_id}/{type}

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

ThreatMapFetchError

If connection error occurs.

RETURNS DESCRIPTION
ThreatMap

Threat map with entities matching filter criteria.

Source code in psengine/threat_maps/threat_map_mgr.py
@debug_call
@validate_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=ThreatMapFetchError)
def fetch_map(
    self,
    map_type: Annotated[MAP_TYPE, Doc('Type of threat map.')],
    org_id: Annotated[str | None, Doc('Organization ID.')] = None,
    malware: Annotated[str | list[str] | None, Doc('Filter by malware entity ID(s).')] = None,
    actors: Annotated[str | list[str] | None, Doc('Filter by threat actor ID(s).')] = None,
    categories: Annotated[str | list[str] | None, Doc('Filter by category ID(s).')] = None,
    watchlists: Annotated[str | list[str] | None, Doc('Filter by watch list ID(s).')] = None,
) -> Annotated[ThreatMap, Doc('Threat map with entities matching filter criteria.')]:
    """Fetch a threat map with optional entity, category, and watchlist filters.

    Endpoint:
        `threat/map/{type}` or `threat/map/{org_id}/{type}`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        ThreatMapFetchError: If connection error occurs.
    """
    body = {'categories': categories, 'watchlists': watchlists}
    map_type = ThreatMapType(map_type).value
    if map_type is ThreatMapType.actors:
        body['actors'] = actors
    else:
        body['malware'] = malware

    url = (
        EP_THREAT_MAP_ORG.format(org_id, map_type) if org_id else EP_THREAT_MAP.format(map_type)
    )

    data = ThreatMapFetchIn.model_validate(body).json()
    map_response = self.rf_client.request(method='post', url=url, data=data).json()['data']
    return ThreatMap.model_validate(map_response)