Skip to content

Risklists

Introduction

The RisklistMgr class of the risklists module allows you to fetch risklists. A risklist is a file that contains a list of indicators with different levels of maliciousness. These risklists are often used as correlation files in SIEM tools.

In Recorded Future, the concept of a "default" risklist means a risklist with usually up to 100,000 indicators with a score from 65 and above.

See the API Reference for internal details of the module.

Notes

  • The fetch_risklist method returns a generator object. If it needs to be saved to a file, you should transform it to a list first.
  • Even though there are CSV-based risklists, PSEngine converts them to JSON.
  • With this module, fetching custom risklists is possible if any has been built for your enterprise.

Examples

Warning

The following examples demonstrate how to use this module. Be sure to add appropriate error handling as needed; all possible errors for each method or function are listed in the API Reference page.

Additionally, you must configure the RF_TOKEN environment variable before getting started. For instructions, see Learn.

1: Fetch and save the default domain risklist as JSON

In this example, we fetch the risklist with the fetch_risklist method, giving the arguments of default for the type of risklist and domain for the type of indicator. Since the file is converted by PSEngine into a JSON-like structure, we can convert the generator to a list and then save it to a file with json.dumps.

import json
from pathlib import Path

from psengine.risklists import RisklistMgr

OUTPUT_DIR = Path.cwd() / 'risklists'
OUTPUT_DIR.mkdir(exist_ok=True)


mgr = RisklistMgr()

risklist = list(mgr.fetch_risklist('default', 'domain'))

out_file = OUTPUT_DIR / 'default_domain_risklist.json'
out_file.write_text(json.dumps(risklist, indent=4))

After executing the script, you should have a file called default_domain_risklist.json in the risklists directory. However, you will see the content of the EvidenceDetails block is a JSON-like string.

To have a complete JSON, we can use the validate argument of the fetch_risklist method to:

  • Validate that each entry of the risklist respects a model
  • Dump the validated model with the fields we need

In the code below, we are performing the same operations but passing the DefaultRiskList object (a pydantic model) to fetch_risklist, and saving the results to a file.

What will happen is that while the risklist is converted to JSON, it also gets validated. The DefaultRiskList model is already present in PSEngine, but a custom model can be used as well; see Example 2.

import json
from pathlib import Path

from psengine.risklists import RisklistMgr
from psengine.risklists.models import DefaultRiskList

OUTPUT_DIR = Path.cwd() / 'risklists'
OUTPUT_DIR.mkdir(exist_ok=True)

mgr = RisklistMgr()

risklist = list(
    mgr.fetch_risklist(
        'default', 'domain', validate=DefaultRiskList
    )
)

out_file = (
    OUTPUT_DIR / 'default_domain_risklist_validated.json'
)
out_file.write_text(
    json.dumps(
        [entry.json() for entry in risklist], indent=4
    )
)

2: Fetch and save a custom risklist as JSON and perform validation

In this example, we assume that we want to build a script that ingests the Threat Actor–related indicators from the Recorded Future risklist ta_ip_risklist_v2.csv.

The risklist has the following headers:

  • Name,
  • Risk,
  • RiskString,
  • EvidenceDetails,
  • Sources,
  • ThreatActorIDs,
  • ThreatActorNames,
  • ThreatActorAliases,
  • ThreatActorCategories,
  • ThreatActorNotes,
  • IndicatorNotes

We want to validate and save the risklist without the EvidenceDetails, Sources, and the related notes fields. These fields have been excluded only for the sake of making the example shorter.

This example is a bit longer, but what we are doing is defining the TARisklist model, which inherits from RFBaseModel. In the TARisklist model, we define how the fields should be organized based on the needs of our tool. We could have left the fields untouched, but it is often required to slightly manipulate some of the data for easier ingestion.

The whole data manipulation is done using only pydantic constructs, like BeforeValidator and @field_validator. They transform the data from one shape to another, specifically from a dictionary to a list of dictionaries and from a JSON-like string to a JSON object, respectively.

Once the model is defined, we can fetch the risklist, validate the content, and save it to a file, same as the previous examples.

import json
from pathlib import Path
from typing import Annotated, Union

from psengine.common_models import RFBaseModel
from psengine.risklists import RisklistMgr
from pydantic import BeforeValidator, Field, field_validator

OUTPUT_DIR = Path.cwd() / 'risklists'
OUTPUT_DIR.mkdir(exist_ok=True)


class TADetail(RFBaseModel):
    """Threat actor details."""

    id_: str = Field(alias='id')
    value: Union[list[str], str]


def arrange_data(data) -> list[dict]:
    """Unpacking fields as a list of dictionaries."""
    data = json.loads(data)
    return [{'id': k, 'value': v} for k, v in data.items()]


class TARisklist(RFBaseModel):
    """Custom TA Risklist validator."""

    ioc: str = Field(validation_alias='Name')
    risk_score: int = Field(validation_alias='Risk')
    risk_string: str = Field(validation_alias='RiskString')
    ta_ids: list[str] = Field(
        validation_alias='ThreatActorIDs'
    )
    ta_names: Annotated[
        list[TADetail], BeforeValidator(arrange_data)
    ] = Field(validation_alias='ThreatActorNames')
    ta_aliases: Annotated[
        list[TADetail], BeforeValidator(arrange_data)
    ] = Field(validation_alias='ThreatActorAliases')
    ta_categories: Annotated[
        list[TADetail], BeforeValidator(arrange_data)
    ] = Field(validation_alias='ThreatActorCategories')

    @field_validator('ta_ids', mode='before')
    @classmethod
    def parse_ta_ids(cls, field: str) -> list[str]:
        """ta_ids field from string to list."""
        return json.loads(field)


mgr = RisklistMgr()
risklist = list(
    mgr.fetch_risklist(
        '/public/risklists/ta_ip_risklist_v2.csv',
        validate=TARisklist,
    )
)

out_file = OUTPUT_DIR / 'ta_risklist_ip.json'
out_file.write_text(
    json.dumps(
        [entry.json() for entry in risklist], indent=4
    )
)