Skip to content

Manager

psengine.enrich.lookup_mgr.LookupMgr

LookupMgr(rf_token: Optional[str] = None)

Enrichment of a single or a group of Entities.

PARAMETER DESCRIPTION
rf_token

Recorded Future API token.

TYPE: Optional[str] DEFAULT: None

Source code in psengine/enrich/lookup_mgr.py
def __init__(
    self,
    rf_token: Annotated[Optional[str], Doc('Recorded Future API token.')] = None,
):
    """Initialize the `LookupMgr` object."""
    self.log = logging.getLogger(__name__)
    self.rf_client = RFClient(api_token=rf_token) if rf_token else RFClient()

lookup

lookup(
    entity: str,
    entity_type: ALLOWED_ENTITIES,
    fields: Optional[list[str]] = None,
) -> EnrichmentData

Perform lookup of an entity based on its ID or name.

The entity can be either a Recorded Future ID or a readable entity name. The entity_type must always be specified. Allowed values include:

  • company
  • Company
  • company_by_domain
  • CyberVulnerability
  • domain
  • hash
  • Hash
  • InternetDomainName
  • ip
  • IpAddress
  • malware
  • Malware
  • Organization
  • url
  • URL
  • vulnerability

If fields are specified, they are added to the mandatory fields:

  • All entities except malware: ['entity', 'risk', 'timestamps']
  • For malware: ['entity', 'timestamps']
PARAMETER DESCRIPTION
entity

Name or Recorded Future ID of the entity.

TYPE: str

entity_type

Type of the entity to enrich.

TYPE: ALLOWED_ENTITIES

fields

Optional additional fields for enrichment.

TYPE: Optional[list[str]] DEFAULT: None

Endpoint

v2/{entity_type}/{entity}

Example
1
2
3
4
5
6
from psengine.enrich import LookupMgr

mgr = LookupMgr()
enriched_dom = mgr.lookup('idn:google.com', 'domain')
enriched_dom2 = mgr.lookup('google.com', 'domain')
enriched_company_via_id = mgr.lookup('A_BCDE', 'company')

Lookup with additional fields:

1
2
3
4
5
6
7
8
9
from psengine.enrich import LookupMgr

mgr = LookupMgr()
enriched_dom = mgr.lookup('idn:google.com', 'domain')
company_by_domain = mgr.lookup(
    'recordedfuture.com',
    entity_type='company_by_domain',
    fields=['curated']
)
To save to file:
1
2
3
4
5
6
7
from pathlib import Path
from json import dumps

OUTPUT_DIR = Path('your' / 'path')
OUTPUT_DIR.mkdir(exist_ok=True)
ip = mgr.lookup('1.1.1.1', 'ip')
(OUTPUT_DIR / f'{ip.entity}.json').write_text(dumps(ip.json(), indent=2))

If a 404 is received:

1
2
3
4
5
6
{
    'entity': entity,
    'entity_type': entity_type,
    'is_enriched': False,
    'content': '404 received. Nothing known on this entity',
}

If a 200 is received:

1
2
3
4
5
6
{
    'entity': entity,
    'entity_type': entity_type,
    'is_enriched': True,
    'content': the enriched data model
}
Raises: ValidationError: if any supplied parameter is of incorrect type. EnrichmentLookupError: If the lookup fails with a non-200 or non-404 status.

RETURNS DESCRIPTION
EnrichmentData

An object containing the enriched entity details.

Source code in psengine/enrich/lookup_mgr.py
@validate_call
@debug_call
def lookup(
    self,
    entity: Annotated[str, Doc('Name or Recorded Future ID of the entity.')],
    entity_type: Annotated[ALLOWED_ENTITIES, Doc('Type of the entity to enrich.')],
    fields: Annotated[
        Optional[list[str]], Doc('Optional additional fields for enrichment.')
    ] = None,
) -> Annotated[EnrichmentData, Doc('An object containing the enriched entity details.')]:
    """Perform lookup of an entity based on its ID or name.

    The `entity` can be either a Recorded Future ID or a readable entity name.
    The `entity_type` must always be specified. Allowed values include:

    - `company`
    - `Company`
    - `company_by_domain`
    - `CyberVulnerability`
    - `domain`
    - `hash`
    - `Hash`
    - `InternetDomainName`
    - `ip`
    - `IpAddress`
    - `malware`
    - `Malware`
    - `Organization`
    - `url`
    - `URL`
    - `vulnerability`

    If `fields` are specified, they are added to the mandatory fields:

    - All entities except malware: `['entity', 'risk', 'timestamps']`
    - For malware: `['entity', 'timestamps']`

    Endpoint:
        `v2/{entity_type}/{entity}`

    Example:
        ```python
        from psengine.enrich import LookupMgr

        mgr = LookupMgr()
        enriched_dom = mgr.lookup('idn:google.com', 'domain')
        enriched_dom2 = mgr.lookup('google.com', 'domain')
        enriched_company_via_id = mgr.lookup('A_BCDE', 'company')
        ```

        Lookup with additional fields:
        ```python
        from psengine.enrich import LookupMgr

        mgr = LookupMgr()
        enriched_dom = mgr.lookup('idn:google.com', 'domain')
        company_by_domain = mgr.lookup(
            'recordedfuture.com',
            entity_type='company_by_domain',
            fields=['curated']
        )
        ```
        To save to file:
        ```python
        from pathlib import Path
        from json import dumps

        OUTPUT_DIR = Path('your' / 'path')
        OUTPUT_DIR.mkdir(exist_ok=True)
        ip = mgr.lookup('1.1.1.1', 'ip')
        (OUTPUT_DIR / f'{ip.entity}.json').write_text(dumps(ip.json(), indent=2))
        ```

    If a 404 is received:
    ```python
    {
        'entity': entity,
        'entity_type': entity_type,
        'is_enriched': False,
        'content': '404 received. Nothing known on this entity',
    }
    ```

    If a 200 is received:
    ```python
    {
        'entity': entity,
        'entity_type': entity_type,
        'is_enriched': True,
        'content': the enriched data model
    }
    ```
    Raises:
        ValidationError: if any supplied parameter is of incorrect type.
        EnrichmentLookupError: If the lookup fails with a non-200 or non-404 status.
    """
    default_fields = MALWARE_FIELDS if entity_type.lower() == 'malware' else ENTITY_FIELDS
    fields = fields or default_fields
    fields = self._merge_fields(fields, default_fields)

    return self._lookup(entity, entity_type, fields)

lookup_bulk

lookup_bulk(
    entity: list[str],
    entity_type: ALLOWED_ENTITIES,
    fields: list[str] = ENTITY_FIELDS,
    max_workers: Optional[int] = 0,
) -> list[EnrichmentData]

Perform lookup of multiple entities based on IDs or names.

The entity list can contain Recorded Future IDs or plain entity names. All entities must be of the same entity_type.

Allowed entity_type values include:

  • company
  • Company
  • company_by_domain
  • CyberVulnerability
  • domain
  • hash
  • Hash
  • InternetDomainName
  • ip
  • IpAddress
  • malware
  • Malware
  • Organization
  • url
  • URL
  • vulnerability

If fields are specified, they are added to the mandatory fields:

  • All entities except malware: ['entity', 'risk', 'timestamps']
  • For malware: ['entity', 'timestamps']
PARAMETER DESCRIPTION
entity

List of entity names or Recorded Future IDs.

TYPE: list[str]

entity_type

Type of the entities to enrich.

TYPE: ALLOWED_ENTITIES

fields

Optional additional fields for enrichment.

TYPE: list[str] DEFAULT: ENTITY_FIELDS

max_workers

Number of workers to multithread requests.

TYPE: Optional[int] DEFAULT: 0

Endpoint

v2/{entity_type}/{entity}

Example
from psengine.enrich import LookupMgr

mgr = LookupMgr()
data = {
    'IpAddress': ['1.1.1.1', '2.2.2.2'],
    'InternetDomainName': ['google.com', 'facebook.com']
}
results = []
for entity_type, entities in data.items():
    results.extend(mgr.lookup_bulk(entities, entity_type))

To save the results:

1
2
3
4
5
6
7
8
from pathlib import Path
from json import dumps

OUTPUT_DIR = Path('your' / 'path')
OUTPUT_DIR.mkdir(exist_ok=True)
results = mgr.lookup_bulk(['1.1.1.1', '8.8.8.8'], 'ip')
for entity in results:
    (OUTPUT_DIR / f'{entity.entity}.json').write_text(dumps(entity.json(), indent=2))

With multithreading:

1
2
3
4
from psengine.enrich import LookupMgr

mgr = LookupMgr()
domains = mgr.lookup_bulk(['google.com', 'facebook.com'], 'domain', max_workers=10)

If a 404 is received:

1
2
3
4
5
6
{
    'entity': entity,
    'entity_type': entity_type,
    'is_enriched': False,
    'content': '404 received. Nothing known on this entity',
}

If a 200 is received:

1
2
3
4
5
6
{
    'entity': entity,
    'entity_type': entity_type,
    'is_enriched': True,
    'content': the enriched data model
}

RAISES DESCRIPTION
ValidationError

if any supplied parameter is of incorrect type.

EnrichmentLookupError

If a lookup terminates with a non-200 or 404 return code.

RETURNS DESCRIPTION
list[EnrichmentData]

A list of objects containing the enriched entity details.

Source code in psengine/enrich/lookup_mgr.py
@validate_call
@debug_call
def lookup_bulk(
    self,
    entity: Annotated[list[str], Doc('List of entity names or Recorded Future IDs.')],
    entity_type: Annotated[ALLOWED_ENTITIES, Doc('Type of the entities to enrich.')],
    fields: Annotated[
        list[str], Doc('Optional additional fields for enrichment.')
    ] = ENTITY_FIELDS,
    max_workers: Annotated[
        Optional[int], Doc('Number of workers to multithread requests.')
    ] = 0,
) -> Annotated[
    list[EnrichmentData], Doc('A list of objects containing the enriched entity details.')
]:
    """Perform lookup of multiple entities based on IDs or names.

    The `entity` list can contain Recorded Future IDs or plain entity names.
    All entities must be of the same `entity_type`.

    Allowed `entity_type` values include:

    - `company`
    - `Company`
    - `company_by_domain`
    - `CyberVulnerability`
    - `domain`
    - `hash`
    - `Hash`
    - `InternetDomainName`
    - `ip`
    - `IpAddress`
    - `malware`
    - `Malware`
    - `Organization`
    - `url`
    - `URL`
    - `vulnerability`

    If `fields` are specified, they are added to the mandatory fields:

    - All entities except malware: `['entity', 'risk', 'timestamps']`
    - For malware: `['entity', 'timestamps']`

    Endpoint:
        `v2/{entity_type}/{entity}`

    Example:
        ```python
        from psengine.enrich import LookupMgr

        mgr = LookupMgr()
        data = {
            'IpAddress': ['1.1.1.1', '2.2.2.2'],
            'InternetDomainName': ['google.com', 'facebook.com']
        }
        results = []
        for entity_type, entities in data.items():
            results.extend(mgr.lookup_bulk(entities, entity_type))
        ```

        To save the results:
        ```python
        from pathlib import Path
        from json import dumps

        OUTPUT_DIR = Path('your' / 'path')
        OUTPUT_DIR.mkdir(exist_ok=True)
        results = mgr.lookup_bulk(['1.1.1.1', '8.8.8.8'], 'ip')
        for entity in results:
            (OUTPUT_DIR / f'{entity.entity}.json').write_text(dumps(entity.json(), indent=2))
        ```

        With multithreading:
        ```python
        from psengine.enrich import LookupMgr

        mgr = LookupMgr()
        domains = mgr.lookup_bulk(['google.com', 'facebook.com'], 'domain', max_workers=10)
        ```

    If a 404 is received:
    ```python
    {
        'entity': entity,
        'entity_type': entity_type,
        'is_enriched': False,
        'content': '404 received. Nothing known on this entity',
    }
    ```

    If a 200 is received:
    ```python
    {
        'entity': entity,
        'entity_type': entity_type,
        'is_enriched': True,
        'content': the enriched data model
    }
    ```

    Raises:
        ValidationError: if any supplied parameter is of incorrect type.
        EnrichmentLookupError: If a lookup terminates with a non-200 or 404 return code.
    """
    default_fields = MALWARE_FIELDS if entity_type.lower() == 'malware' else ENTITY_FIELDS
    fields = fields or default_fields
    fields = self._merge_fields(fields, default_fields)
    if max_workers:
        res = MultiThreadingHelper.multithread_it(
            max_workers,
            self._lookup,
            iterator=entity,
            entity_type=entity_type,
            fields=fields,
        )
    else:
        res = [self._lookup(entity, entity_type, fields) for entity in entity]

    return res