Skip to content

Client

psengine.rf_client

RFClient

RFClient(
    api_token: Union[str, None] = None,
    http_proxy: str = None,
    https_proxy: str = None,
    verify: Union[str, bool] = None,
    auth: tuple[str, str] = None,
    cert: Union[str, tuple[str, str], None] = None,
    timeout: int = None,
    retries: int = None,
    backoff_factor: int = None,
    status_forcelist: list = None,
    pool_max_size: int = None,
)

Bases: BaseHTTPClient

Recorded Future HTTP API client.

PARAMETER DESCRIPTION
api_token

An RF API token. Defaults to RF_TOKEN environment variable.

TYPE: Union[str, None] DEFAULT: None

http_proxy

An HTTP proxy URL.

TYPE: str DEFAULT: None

https_proxy

An HTTPS proxy URL.

TYPE: str DEFAULT: None

verify

An SSL verification flag or path to CA bundle.

TYPE: Union[str, bool] DEFAULT: None

auth

Basic Auth credentials.

TYPE: tuple[str, str] DEFAULT: None

cert

Client certificates.

TYPE: Union[str, tuple[str, str], None] DEFAULT: None

timeout

A request timeout. Defaults to 120.

TYPE: int DEFAULT: None

retries

A number of retries. Defaults to 5.

TYPE: int DEFAULT: None

backoff_factor

A backoff factor. Defaults to 1.

TYPE: int DEFAULT: None

status_forcelist

A list of status codes to force a retry. Defaults to [502, 503, 504].

TYPE: list DEFAULT: None

pool_max_size

The maximum number of connections in the pool. Defaults to 120.

TYPE: int DEFAULT: None

Source code in psengine/rf_client.py
def __init__(
    self,
    api_token: Annotated[
        Union[str, None], Doc('An RF API token. Defaults to RF_TOKEN environment variable.')
    ] = None,
    http_proxy: Annotated[str, Doc('An HTTP proxy URL.')] = None,
    https_proxy: Annotated[str, Doc('An HTTPS proxy URL.')] = None,
    verify: Annotated[
        Union[str, bool],
        Doc('An SSL verification flag or path to CA bundle.'),
    ] = None,
    auth: Annotated[tuple[str, str], Doc('Basic Auth credentials.')] = None,
    cert: Annotated[Union[str, tuple[str, str], None], Doc('Client certificates.')] = None,
    timeout: Annotated[int, Doc('A request timeout. Defaults to 120.')] = None,
    retries: Annotated[int, Doc('A number of retries. Defaults to 5.')] = None,
    backoff_factor: Annotated[int, Doc('A backoff factor. Defaults to 1.')] = None,
    status_forcelist: Annotated[
        list, Doc('A list of status codes to force a retry. Defaults to [502, 503, 504].')
    ] = None,
    pool_max_size: Annotated[
        int, Doc('The maximum number of connections in the pool. Defaults to 120.')
    ] = None,
):
    """Recorded Future HTTP API client."""
    super().__init__(
        http_proxy=http_proxy,
        https_proxy=https_proxy,
        verify=verify,
        auth=auth,
        cert=cert,
        timeout=timeout,
        retries=retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist,
        pool_max_size=pool_max_size,
    )

    self._api_token = api_token or self.config.rf_token.get_secret_value()
    if not self._api_token:
        raise ValueError('Missing Recorded Future API token.')
    if not is_api_token_format_valid(self._api_token):
        raise ValueError(
            f'Invalid Recorded Future API token.must match regex {RF_TOKEN_VALIDATION_REGEX}'
        )

backoff_factor instance-attribute

backoff_factor = (
    backoff_factor
    if backoff_factor is not None
    else client_backoff_factor
)

config instance-attribute

config = get_config()

http_proxy instance-attribute

http_proxy = (
    http_proxy if http_proxy is not None else http_proxy
)

https_proxy instance-attribute

https_proxy = (
    https_proxy if https_proxy is not None else https_proxy
)

log instance-attribute

log = getLogger(__name__)

pool_max_size instance-attribute

pool_max_size = (
    pool_max_size
    if pool_max_size is not None
    else client_pool_max_size
)

proxies instance-attribute

proxies = _set_proxies()

retries instance-attribute

retries = retries if retries is not None else client_retries

session instance-attribute

session = _create_session()

status_forcelist instance-attribute

status_forcelist = (
    status_forcelist
    if status_forcelist is not None
    else client_status_forcelist
)

timeout instance-attribute

timeout = timeout if timeout is not None else client_timeout

verify instance-attribute

verify = verify if verify is not None else client_ssl_verify

call

call(
    method: str,
    url: str,
    data: Union[dict, list[dict], None] = None,
    *,
    params: Union[dict, None] = None,
    headers: Union[dict, None] = None,
    **kwargs,
) -> Response

Invoke an HTTP request using the requests library.

PARAMETER DESCRIPTION
method

An HTTP method.

TYPE: str

url

A URL to make the request to.

TYPE: str

data

A request body.

TYPE: Union[dict, list[dict], None] DEFAULT: None

params

HTTP query parameters.

TYPE: Union[dict, None] DEFAULT: None

headers

If specified, overrides default headers and does not set the token.

TYPE: Union[dict, None] DEFAULT: None

RAISES DESCRIPTION
ValueError

If method is not one of GET, PUT, POST, DELETE, HEAD, OPTIONS, PATCH.

HTTPError

If the response status is not 2xx.

JSONDecodeError

If the response contains malformed JSON.

ConnectTimeout

If the connection to the server times out.

ConnectionError

If the request fails before completing.

ReadTimeout

If the server did not send any data in time.

RETURNS DESCRIPTION
Response

A requests.Response object.

Source code in psengine/base_http_client.py
@debug_call
@validate_call
def call(
    self,
    method: Annotated[str, Doc('An HTTP method.')],
    url: Annotated[str, Doc('A URL to make the request to.')],
    data: Annotated[Union[dict, list[dict], None], Doc('A request body.')] = None,
    *,
    params: Annotated[Union[dict, None], Doc('HTTP query parameters.')] = None,
    headers: Annotated[
        Union[dict, None],
        Doc('If specified, overrides default headers and does not set the token.'),
    ] = None,
    **kwargs,
) -> Annotated[Response, Doc('A requests.Response object.')]:
    """Invoke an HTTP request using the `requests` library.

    Raises:
        ValueError: If method is not one of GET, PUT, POST, DELETE, HEAD, OPTIONS, PATCH.
        HTTPError: If the response status is not 2xx.
        JSONDecodeError: If the response contains malformed JSON.
        ConnectTimeout: If the connection to the server times out.
        ConnectionError: If the request fails before completing.
        ReadTimeout: If the server did not send any data in time.
    """
    method_func = self._choose_method_type(method)

    if not headers:
        headers = {}

    if 'User-Agent' not in headers:
        headers['User-Agent'] = self._get_user_agent_header()

    data = json.dumps(data) if data is not None else None

    try:
        response = method_func(
            url=url,
            headers=headers,
            data=data,
            params=params,
            verify=self.verify,
            timeout=self.timeout,
            **kwargs,
        )
        self.log.debug(f'HTTP Status Code: {response.status_code}')

    except (ConnectionError, ConnectTimeout, ReadTimeout) as err:
        self.log.debug(f'GET request failed. Cause: {err}')
        raise

    try:
        response.raise_for_status()

    except HTTPError as err:
        msg = str(err)
        try:
            data = response.json()
        except JSONDecodeError:
            data = {}

        message = data.get('message') or data.get('error', {})
        if isinstance(message, dict):
            message = message.get('message')

        if message:
            msg += f', Cause: {message}'

        self.log.debug(f'{method} request failed. {msg}')

        raise HTTPError(msg, response=response) from err

    return response

can_connect

can_connect(
    method: str = 'get', url: str = BASE_URL
) -> bool

Check if the client can reach the specified API URL.

PARAMETER DESCRIPTION
method

An HTTP method.

TYPE: str DEFAULT: 'get'

url

A URL to test.recordedfuture.com.

TYPE: str DEFAULT: BASE_URL

RETURNS DESCRIPTION
bool

True if connection returns status 200, else False.

Source code in psengine/base_http_client.py
@debug_call
@validate_call
def can_connect(
    self,
    method: Annotated[str, Doc('An HTTP method.')] = 'get',
    url: Annotated[str, Doc('A URL to test.recordedfuture.com.')] = BASE_URL,
) -> Annotated[bool, Doc('True if connection returns status 200, else False.')]:
    """Check if the client can reach the specified API URL."""
    try:
        request = self.call(method=method, url=url)
        request.raise_for_status()
        return True
    except Exception as err:  # noqa: BLE001
        self.log.error(f'Error during connectivity test: {err}')
        return False

is_authorized

is_authorized(method: str, url: str, **kwargs) -> bool

Check if the request is authorized to a given Recorded Future API endpoint.

PARAMETER DESCRIPTION
method

An HTTP method.

TYPE: str

url

A URL to perform the check against.

TYPE: str

RETURNS DESCRIPTION
bool

True if authorized, False otherwise.

Source code in psengine/rf_client.py
@debug_call
@validate_call
def is_authorized(
    self,
    method: Annotated[str, Doc('An HTTP method.')],
    url: Annotated[str, Doc('A URL to perform the check against.')],
    **kwargs,
) -> Annotated[bool, Doc('True if authorized, False otherwise.')]:
    """Check if the request is authorized to a given Recorded Future API endpoint."""
    try:
        response = self.request(method, url, **kwargs)
        return response.status_code == 200
    except Exception as err:  # noqa: BLE001
        self.log.error(f'Error during validation: {err}')
        return False

request

request(
    method: str,
    url: str,
    data: Union[dict, list[dict], None] = None,
    *,
    params: Optional[dict] = None,
    headers: Optional[dict] = None,
    **kwargs,
) -> Response

Perform an HTTP request.

PARAMETER DESCRIPTION
method

An HTTP method, one of GET, PUT, POST, DELETE, HEAD, OPTIONS, PATCH.

TYPE: str

url

A URL to make the request to.

TYPE: str

data

A request body.

TYPE: Union[dict, list[dict], None] DEFAULT: None

params

HTTP query parameters.

TYPE: Optional[dict] DEFAULT: None

headers

If specified, it overrides default headers and does not set the token.

TYPE: Optional[dict] DEFAULT: None

RAISES DESCRIPTION
ValidationError

If method is not one of GET, PUT, POST, DELETE, HEAD, OPTIONS, PATCH.

RETURNS DESCRIPTION
Response

A requests.Response object.

Source code in psengine/rf_client.py
@debug_call
@validate_call
def request(
    self,
    method: Annotated[
        str, Doc('An HTTP method, one of GET, PUT, POST, DELETE, HEAD, OPTIONS, PATCH.')
    ],
    url: Annotated[str, Doc('A URL to make the request to.')],
    data: Annotated[Union[dict, list[dict], None], Doc('A request body.')] = None,
    *,
    params: Annotated[Optional[dict], Doc('HTTP query parameters.')] = None,
    headers: Annotated[
        Optional[dict],
        Doc('If specified, it overrides default headers and does not set the token.'),
    ] = None,
    **kwargs,
) -> Annotated[Response, Doc('A requests.Response object.')]:
    """Perform an HTTP request.

    Raises:
        ValidationError: If method is not one of GET, PUT, POST, DELETE, HEAD, OPTIONS, PATCH.
    """
    headers = headers or self._prepare_headers()

    return self.call(
        method=method,
        url=url,
        headers=headers,
        data=data,
        params=params,
        **kwargs,
    )

request_paged

request_paged(
    method: str,
    url: str,
    max_results: int = 1000,
    data: Union[dict, list[dict], None] = None,
    *,
    params: Union[dict, None] = None,
    headers: Union[dict, None] = None,
    results_path: Union[str, list[str]] = 'data',
    offset_key: str = 'offset',
    **kwargs,
) -> list[dict]

Perform a paged HTTP request.

Please note that some RF APIs cannot paginate through more than 1000 results and will return an error (HTTP 400) if max_results exceeds that. APIs such as Identity support pagination beyond 1000 results.

PARAMETER DESCRIPTION
method

An HTTP method: GET or POST.

TYPE: str

url

A URL to make the request to.

TYPE: str

max_results

The maximum number of results to return.

TYPE: int DEFAULT: 1000

data

A request body.

TYPE: Union[dict, list[dict], None] DEFAULT: None

params

HTTP query parameters.

TYPE: Union[dict, None] DEFAULT: None

headers

If specified, it overrides default headers and does not set the token.

TYPE: Union[dict, None] DEFAULT: None

results_path

Path to extract paged results from.

TYPE: Union[str, list[str]] DEFAULT: 'data'

offset_key

Key to use for paging. Defaults to 'offset'.

TYPE: str DEFAULT: 'offset'

Example
>>> response = rfc.request_paged(
        method='post',
        url='https://api.recordedfuture.com/identity/credentials/search',
        max_results=1565,
        data={
            'domains': ['norsegods.online'],
            'filter': {'first_downloaded_gte': '2024-01-01T23:40:47.034Z'},
            'limit': 100,
        },
        results_path='identities',
        offset_key='offset',
    )

>>> response = rfc.request_paged(
        method='get',
        url='https://api.recordedfuture.com/v2/ip/search',
        params={'limit': 100, 'fields': 'entity', 'riskRule': 'dnsAbuse'},
        results_path='data.results',
        offset_key='from',
    )
RAISES DESCRIPTION
KeyError

If no results are found in the API response.

ValueError
  • If method is not GET or POST.
  • If results_path is invalid.
RETURNS DESCRIPTION
list[dict]

Resulting data.

Source code in psengine/rf_client.py
def request_paged(
    self,
    method: Annotated[str, Doc('An HTTP method: GET or POST.')],
    url: Annotated[str, Doc('A URL to make the request to.')],
    max_results: Annotated[int, Doc('The maximum number of results to return.')] = 1000,
    data: Annotated[Union[dict, list[dict], None], Doc('A request body.')] = None,
    *,
    params: Annotated[Union[dict, None], Doc('HTTP query parameters.')] = None,
    headers: Annotated[
        Union[dict, None],
        Doc('If specified, it overrides default headers and does not set the token.'),
    ] = None,
    results_path: Annotated[
        Union[str, list[str]], Doc('Path to extract paged results from.')
    ] = 'data',
    offset_key: Annotated[str, Doc("Key to use for paging. Defaults to 'offset'.")] = 'offset',
    **kwargs,
) -> Annotated[list[dict], Doc('Resulting data.')]:
    """Perform a paged HTTP request.

    Please note that some RF APIs cannot paginate through more than 1000 results and will
    return an error (HTTP 400) if `max_results` exceeds that. APIs such as Identity support
    pagination beyond 1000 results.

    Example:
        ```python
        >>> response = rfc.request_paged(
                method='post',
                url='https://api.recordedfuture.com/identity/credentials/search',
                max_results=1565,
                data={
                    'domains': ['norsegods.online'],
                    'filter': {'first_downloaded_gte': '2024-01-01T23:40:47.034Z'},
                    'limit': 100,
                },
                results_path='identities',
                offset_key='offset',
            )

        >>> response = rfc.request_paged(
                method='get',
                url='https://api.recordedfuture.com/v2/ip/search',
                params={'limit': 100, 'fields': 'entity', 'riskRule': 'dnsAbuse'},
                results_path='data.results',
                offset_key='from',
            )
        ```

    Raises:
        KeyError: If no results are found in the API response.
        ValueError:
            - If method is not GET or POST.
            - If results_path is invalid.
    """
    results_paths = [results_path] if isinstance(results_path, str) else results_path

    try:
        results_expr = [jsonpath_ng.parse(p) for p in results_paths]
    except JsonPathParserError as err:
        raise ValueError(f'Invalid results_path: {results_path}') from err
    root_key = [self._get_root_key(e) for e in results_expr]

    # Make the first request
    response = self.request(
        method=method,
        url=url,
        headers=headers,
        data=data,
        params=params,
        **kwargs,
    )

    try:
        json_response = response.json()
    except JSONDecodeError:
        self.log.debug(f'Paged request does not contain valid JSON:\n{response.text}')
        raise

    if all(r not in json_response for r in root_key):
        raise KeyError(results_path)

    all_results = []
    dict_results = defaultdict(list)

    if all(len(json_response[r]) == 0 for r in root_key):
        return all_results

    # Get the initial results from the first response and add them to the list
    if isinstance(results_path, str):
        all_results += self._get_matches(results_expr[0], json_response)
    else:
        for expr in results_expr:
            with suppress(KeyError):
                dict_results[str(expr)].extend(self._get_matches(expr, json_response))

    if method.lower() == 'get':
        return self._request_paged_get(
            url=url,
            headers=headers,
            data=data,
            method=method,
            params=params,
            max_results=max_results,
            results_expr=results_expr[0] if isinstance(results_path, str) else results_expr,
            offset_key=offset_key,
            json_response=json_response,
            all_results=all_results,
            **kwargs,
        )

    if method.lower() == 'post':
        return self._request_paged_post(
            url=url,
            method=method,
            headers=headers,
            data=data,
            params=params,
            max_results=max_results,
            results_expr=results_expr[0] if isinstance(results_path, str) else results_expr,
            offset_key=offset_key,
            json_response=json_response,
            all_results=all_results,
            dict_results=dict_results,
            **kwargs,
        )

    raise ValueError('Invalid method for paged request. Must be GET or POST')

set_urllib_log_level

set_urllib_log_level(level: str) -> None

Set log level for urllib3 library.

PARAMETER DESCRIPTION
level

A log level to be set: CRITICAL, ERROR, WARNING, INFO, DEBUG, NOTSET.

TYPE: str

Source code in psengine/base_http_client.py
@debug_call
@validate_call
def set_urllib_log_level(
    self,
    level: Annotated[
        str, Doc('A log level to be set: CRITICAL, ERROR, WARNING, INFO, DEBUG, NOTSET.')
    ],
) -> None:
    """Set log level for urllib3 library."""
    if not level or level.upper() not in (
        'CRITICAL',
        'ERROR',
        'WARNING',
        'INFO',
        'DEBUG',
        'NOTSET',
    ):
        self.log.warning('Log level is empty or not valid')
        return
    logging.getLogger('urllib3').setLevel(level.upper())

is_api_token_format_valid

is_api_token_format_valid(token: str) -> bool

Check if the token format is valid.

The function performs a simple regex check but does not validate the token against the API.

PARAMETER DESCRIPTION
token

A Recorded Future API token.

TYPE: str

RETURNS DESCRIPTION
bool

True if the token format is valid, False otherwise.

Source code in psengine/rf_client.py
@validate_call
def is_api_token_format_valid(
    token: Annotated[str, Doc('A Recorded Future API token.')],
) -> Annotated[bool, Doc('True if the token format is valid, False otherwise.')]:
    """Check if the token format is valid.

    The function performs a simple regex check but does not validate the token against the API.
    """
    return re.match(RF_TOKEN_VALIDATION_REGEX, token) is not None