Skip to content

Manager

psengine.enrich.soar_mgr.SoarMgr

SoarMgr(rf_token: Optional[str] = None)

Perform SOAR enrichment of entities.

PARAMETER DESCRIPTION
rf_token

Recorded Future API token.

TYPE: Optional[str] DEFAULT: None

Source code in psengine/enrich/soar_mgr.py
def __init__(
    self,
    rf_token: Annotated[Optional[str], Doc('Recorded Future API token.')] = None,
):
    """Initialize the `SoarMgr` object."""
    self.log = logging.getLogger(__name__)
    self.rf_client = RFClient(api_token=rf_token) if rf_token else RFClient()

soar

soar(
    ip: Optional[list[str]] = None,
    domain: Optional[list[str]] = None,
    hash_: Optional[list[str]] = None,
    vulnerability: Optional[list[str]] = None,
    url: Optional[list[str]] = None,
    companybydomain: Optional[list[str]] = None,
    max_workers: Optional[int] = 0,
) -> list[SOAREnrichOut]

Enrich multiple types of IOCs via the SOAR API.

This method supports batch processing of IOC types including IPs, domains, hashes, vulnerabilities, URLs, and company domains. Uses multithreading if max_workers > 0.

PARAMETER DESCRIPTION
ip

List of IP addresses to enrich.

TYPE: Optional[list[str]] DEFAULT: None

domain

List of domains to enrich.

TYPE: Optional[list[str]] DEFAULT: None

hash_

List of file hashes to enrich.

TYPE: Optional[list[str]] DEFAULT: None

vulnerability

List of vulnerabilities to enrich.

TYPE: Optional[list[str]] DEFAULT: None

url

List of URLs to enrich.

TYPE: Optional[list[str]] DEFAULT: None

companybydomain

List of company domains to enrich.

TYPE: Optional[list[str]] DEFAULT: None

max_workers

Number of workers to multithread requests.

TYPE: Optional[int] DEFAULT: 0

Endpoint

v2/soar/enrichment

Example

Simple bulk enrichment:

1
2
3
4
from psengine.enrich import SoarMgr

mgr = SoarMgr()
ips = mgr.soar(ip=['1.1.1.1'])

With multithreading:

1
2
3
4
from psengine.enrich import SoarMgr

mgr = SoarMgr()
ips = mgr.soar(ip=['1.1.1.1'], max_workers=10)

Save enriched results to file:

from pathlib import Path
from json import dumps
from psengine.enrich import SoarMgr

mgr = SoarMgr()

OUTPUT_DIR = Path('your' / 'path')
OUTPUT_DIR.mkdir(exist_ok=True)
data = mgr.soar(ip=['1.1.1.1', '8.8.8.8'])
for ip in data:
    (OUTPUT_DIR / f'{ip.entity}.json').write_text(dumps(ip.json(), indent=2))

RAISES DESCRIPTION
ValueError

If no parameters are provided or all provided lists are empty.

ValidationError

if any supplied parameter is of incorrect type.

EnrichmentSoarError

If an HTTP or JSON decoding error occurs during enrichment.

RETURNS DESCRIPTION
list[SOAREnrichOut]

A list of enriched data for the provided IOCs.

Source code in psengine/enrich/soar_mgr.py
@validate_call
@debug_call
def soar(
    self,
    ip: Annotated[Optional[list[str]], Doc('List of IP addresses to enrich.')] = None,
    domain: Annotated[Optional[list[str]], Doc('List of domains to enrich.')] = None,
    hash_: Annotated[Optional[list[str]], Doc('List of file hashes to enrich.')] = None,
    vulnerability: Annotated[
        Optional[list[str]], Doc('List of vulnerabilities to enrich.')
    ] = None,
    url: Annotated[Optional[list[str]], Doc('List of URLs to enrich.')] = None,
    companybydomain: Annotated[
        Optional[list[str]], Doc('List of company domains to enrich.')
    ] = None,
    max_workers: Annotated[
        Optional[int], Doc('Number of workers to multithread requests.')
    ] = 0,
) -> Annotated[list[SOAREnrichOut], Doc('A list of enriched data for the provided IOCs.')]:
    """Enrich multiple types of IOCs via the SOAR API.

    This method supports batch processing of IOC types including IPs, domains, hashes,
    vulnerabilities, URLs, and company domains. Uses multithreading if `max_workers` > 0.

    Endpoint:
        `v2/soar/enrichment`

    Example:
        Simple bulk enrichment:
        ```python
        from psengine.enrich import SoarMgr

        mgr = SoarMgr()
        ips = mgr.soar(ip=['1.1.1.1'])
        ```

        With multithreading:
        ```python
        from psengine.enrich import SoarMgr

        mgr = SoarMgr()
        ips = mgr.soar(ip=['1.1.1.1'], max_workers=10)
        ```

        Save enriched results to file:
        ```
        from pathlib import Path
        from json import dumps
        from psengine.enrich import SoarMgr

        mgr = SoarMgr()

        OUTPUT_DIR = Path('your' / 'path')
        OUTPUT_DIR.mkdir(exist_ok=True)
        data = mgr.soar(ip=['1.1.1.1', '8.8.8.8'])
        for ip in data:
            (OUTPUT_DIR / f'{ip.entity}.json').write_text(dumps(ip.json(), indent=2))
        ```

    Raises:
        ValueError: If no parameters are provided or all provided lists are empty.
        ValidationError: if any supplied parameter is of incorrect type.
        EnrichmentSoarError: If an HTTP or JSON decoding error occurs during enrichment.
    """
    iocs = {
        'ip': ip,
        'domain': domain,
        'hash': hash_,
        'vulnerability': vulnerability,
        'url': url,
        'companybydomain': companybydomain,
    }
    iocs = {k: v for k, v in iocs.items() if v}
    if not iocs:
        raise ValueError('At least one parameter must be used')

    results = []
    if max_workers:
        results.append(
            chain.from_iterable(
                MultiThreadingHelper.multithread_it(
                    max_workers,
                    self._fetch_data,
                    iterator=self._batched_cross_entity(iocs, SOAR_POST_ROWS),
                )
            )
        )
    else:
        results = [
            self._fetch_data(batched_iocs)
            for batched_iocs in self._batched_cross_entity(iocs, SOAR_POST_ROWS)
        ]

    return list(chain.from_iterable(results))