Skip to content

Enrichment

Introduction

The LookupMgr and SoarMgr classes of the enrich module allows to enrich entities and indicators with more contextual information.

The LookupMgr allows for one by one enrichment of entities, with the possibility of specifying which type of information you want to get, for example: the location of an IP address.

The SoarMgr is used for bulk enrichments, the returned payload is the same for each entity type with no possibility of getting specific information for each entity type. It is more generic but allows for quicker enrichment of information like the risk score and risk rules of all the indicators.

See the Lookup API Reference and the Soar API Reference for internal details of the module.

Note

When performing enrichment with LookupMgr and fields is specified, the field you use are added on top of the default fields. The default fields are based on the entity type you are enriching:

  • If the entity is a malware the fields are entity and timestamp
  • For all the other entities the fields are entity, risk and timestamps.

Examples

Warning

Below are some examples of usage of the module. Consider adding error handling as necessary. All the errors that can be raised by each method or function are specified in the API Reference page.

Also, you need to configure the RF_TOKEN environment variable before starting. See Learn.

Example 1: Enrich a vulnerability to get the CVSSv3 information.

Tip

To replicate this example the token you are using must have Vulnerability Module access. If you don't have it change the entity to enrich in a domain or IP address or use the Example 2 as reference.

This example uses the LookupMgr.lookup method to get the enrichment data of the CVE, adding the cvssv3 field. Before printing the result, it is needed to check if the CVE has been enriched, we do that with the is_enriched boolean attribute, and if it is we print the result as JSON. Note that the result is stored under the content, which can be an object or a string depending on the API response. If the entity has not been found it will be a string containing a 404 message.

1
2
3
4
5
6
from psengine.enrich import LookupMgr

mgr = LookupMgr()
cve = mgr.lookup('CVE-2012-1535', 'vulnerability', fields=['cvssv3'])
if cve.is_enriched:
    print(cve.content.cvssv3.json())

This example uses the LookupMgr.lookup_bulk method to enrich two URLs. Note that the lookup_bulk is not a real bulk enrichment. The real calls are still one per entity, but it is a convenient method when specific fields are needed.

Here the links fields is specified, along with the max_workers which determines the number of threads to use. In this case one per call. See the Guidelines page to check the recommended amount of threads to use.

1
2
3
4
5
6
7
8
9
from psengine.enrich import LookupMgr

URLS = ['http://www.example.com/1', 'http://www.example.com/2']
mgr = LookupMgr()

urls = mgr.lookup_bulk(URLS, 'url', fields=['links'], max_workers=2)
for url in urls:
    if url.is_enriched:
        print(url)

Note that the EnrichedData object returned by the LookupMgr methods, as per most of the other objects returned by PSEngine, can be printed. This will return a formatted string with some information around the entity. Executing this code you should see something like this:

EnrichedURL: http://www.example.com/1, Risk Score: 0, Last Seen: 2024-06-10 23:59:59
EnrichedURL: http://www.example.com/2, Risk Score: 0, Last Seen: 2024-06-10 23:59:59

Example 3: Bulk enrich a CSV file containing IP addresses and get the risk score. Save the results in a new file.

This example starts with writing the to_enrich.csv, this is just a convenient way of allowing you to replicate the example. In a real application these lines are not needed, as you will only need to provide the list of IPs to enrich. The core of the example starts at the SoarMgr initialization.

We start by reading the content of the file where our IPs are stored and save the values in ips_to_enrich. Then we call the soar method to enrich the IPs. The soar method returns the same payload even if Recorded Future has not information about that specific entity, that is why there is no need of checking if the entity has been enriched.

In the last step we create a new file with two columns, ip and score and we save the IP address with the related score.

import csv
from pathlib import Path

from psengine.enrich import SoarMgr

OUTPUT_DIR = Path(__file__).parent / 'enrich'
OUTPUT_DIR.mkdir(exist_ok=True)

to_enrich_file = OUTPUT_DIR / 'to_enrich.csv'
to_enrich_file.write_text('ip\n1.1.1.1\n2.2.2.2')

enriched_file = OUTPUT_DIR / 'enriched.csv'

mgr = SoarMgr()

with to_enrich_file.open(newline='') as f:
    reader = csv.DictReader(f)
    ips_to_enrich = [row['ip'] for row in reader]

enriched_ips = mgr.soar(ip=ips_to_enrich)

with enriched_file.open('w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['ip', 'score'])
    for ip, enriched in zip(ips_to_enrich, enriched_ips):
        writer.writerow([ip, enriched.content.risk.score])