Skip to content

Manager

psengine.collective_insights.collective_insights.CollectiveInsights

CollectiveInsights(rf_token: str = None)

Class for interacting with the Recorded Future Collective Insights API.

PARAMETER DESCRIPTION
rf_token

Recorded Future API token.

TYPE: str DEFAULT: None

Source code in psengine/collective_insights/collective_insights.py
def __init__(self, rf_token: str = None):
    """Initializes the CollectiveInsights object.

    Args:
        rf_token (str, optional): Recorded Future API token.
    """
    self.log = logging.getLogger(__name__)
    self.rf_client = RFClient(api_token=rf_token) if rf_token else RFClient()

submit

submit(
    insight: Union[Insight, list[Insight]],
    debug: bool = True,
    organization_ids: Optional[list] = None,
) -> InsightsIn

Submit a detection or insight to the Recorded Future Collective Insights API.

PARAMETER DESCRIPTION
insight

A detection or list of detections to submit.

TYPE: Union[Insight, list[Insight]]

debug

Whether the submission should appear in the SecOPS dashboard.

TYPE: bool DEFAULT: True

organization_ids

List of organization IDs.

TYPE: Optional[list] DEFAULT: None

Endpoint

collective-insights/detections

RAISES DESCRIPTION
CollectiveInsightsError

If connection error occurs.

ValidationError

If any supplied parameter is of incorrect type.

RETURNS DESCRIPTION
InsightsIn

Response from the Recorded Future API.

Source code in psengine/collective_insights/collective_insights.py
@validate_call
@debug_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=CollectiveInsightsError)
def submit(
    self,
    insight: Annotated[
        Union[Insight, list[Insight]], Doc('A detection or list of detections to submit.')
    ],
    debug: Annotated[
        bool, Doc('Whether the submission should appear in the SecOPS dashboard.')
    ] = True,
    organization_ids: Annotated[Optional[list], Doc('List of organization IDs.')] = None,
) -> Annotated[InsightsIn, Doc('Response from the Recorded Future API.')]:
    """Submit a detection or insight to the Recorded Future Collective Insights API.

    Endpoint:
        `collective-insights/detections`

    Raises:
        CollectiveInsightsError: If connection error occurs.
        ValidationError: If any supplied parameter is of incorrect type.
    """
    if not insight:
        raise ValueError('Insight cannot be empty')

    insight = insight if isinstance(insight, list) else [insight]

    ci_data = self._prepare_ci_request(insight, debug, organization_ids)
    response = self.rf_client.request(
        'post',
        url=EP_COLLECTIVE_INSIGHTS_DETECTIONS,
        data=ci_data.json(),
    )

    return InsightsIn.model_validate(response.json())

create

create(
    ioc_value: str,
    ioc_type: str,
    timestamp: str,
    detection_type: str,
    detection_sub_type: Optional[str] = None,
    detection_id: Optional[str] = None,
    detection_name: Optional[str] = None,
    ioc_field: Optional[str] = None,
    ioc_source_type: Optional[str] = None,
    incident_id: Optional[str] = None,
    incident_name: Optional[str] = None,
    incident_type: Optional[str] = None,
    mitre_codes: Union[list[str], str, None] = None,
    malwares: Union[list[str], str, None] = None,
    **kwargs,
) -> Insight

Create a new Insight object.

PARAMETER DESCRIPTION
ioc_value

The value of the IOC.

TYPE: str

ioc_type

The type of the IOC.

TYPE: str

timestamp

The timestamp associated with the detection as ISO 8601.

TYPE: str

detection_type

The type of the detection.

TYPE: str

detection_sub_type

The subtype of the detection.

TYPE: Optional[str] DEFAULT: None

detection_id

The ID of the detection.

TYPE: Optional[str] DEFAULT: None

detection_name

The name of the detection.

TYPE: Optional[str] DEFAULT: None

ioc_field

The field in which the IOC was detected.

TYPE: Optional[str] DEFAULT: None

ioc_source_type

The source type of the IOC.

TYPE: Optional[str] DEFAULT: None

incident_id

The ID of the incident.

TYPE: Optional[str] DEFAULT: None

incident_name

The name of the incident.

TYPE: Optional[str] DEFAULT: None

incident_type

The type of the incident.

TYPE: Optional[str] DEFAULT: None

mitre_codes

MITRE ATT&CK technique or tactic codes.

TYPE: Union[list[str], str, None] DEFAULT: None

malwares

Associated malware family or names.

TYPE: Union[list[str], str, None] DEFAULT: None

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

RETURNS DESCRIPTION
Insight

The created Insight object.

Source code in psengine/collective_insights/collective_insights.py
@validate_call
@debug_call
def create(
    self,
    ioc_value: Annotated[str, Doc('The value of the IOC.')],
    ioc_type: Annotated[str, Doc('The type of the IOC.')],
    timestamp: Annotated[str, Doc('The timestamp associated with the detection as ISO 8601.')],
    detection_type: Annotated[str, Doc('The type of the detection.')],
    detection_sub_type: Annotated[Optional[str], Doc('The subtype of the detection.')] = None,
    detection_id: Annotated[Optional[str], Doc('The ID of the detection.')] = None,
    detection_name: Annotated[Optional[str], Doc('The name of the detection.')] = None,
    ioc_field: Annotated[Optional[str], Doc('The field in which the IOC was detected.')] = None,
    ioc_source_type: Annotated[Optional[str], Doc('The source type of the IOC.')] = None,
    incident_id: Annotated[Optional[str], Doc('The ID of the incident.')] = None,
    incident_name: Annotated[Optional[str], Doc('The name of the incident.')] = None,
    incident_type: Annotated[Optional[str], Doc('The type of the incident.')] = None,
    mitre_codes: Annotated[
        Union[list[str], str, None], Doc('MITRE ATT&CK technique or tactic codes.')
    ] = None,
    malwares: Annotated[
        Union[list[str], str, None], Doc('Associated malware family or names.')
    ] = None,
    **kwargs,
) -> Annotated[Insight, Doc('The created Insight object.')]:
    """Create a new Insight object.

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
    """
    incident = {'id': incident_id, 'type': incident_type, 'name': incident_name}
    detection = {
        'id': detection_id,
        'name': detection_name,
        'type': detection_type,
        'sub_type': detection_sub_type,
    }
    ioc = {
        'type': ioc_type,
        'value': ioc_value,
        'source_type': ioc_source_type,
        'field': ioc_field,
    }
    data = {
        'timestamp': timestamp,
        'ioc': ioc,
        'incident': incident,
        'detection': detection,
        'mitre_codes': mitre_codes,
        'malwares': malwares,
    }
    data['incident'] = (
        None
        if isinstance(data['incident'], dict)
        and all(sub_v is None for sub_v in data['incident'].values())
        else data['incident']
    )
    if kwargs:
        data.update(kwargs)

    return Insight.model_validate(data)