Skip to content

Client

psengine.asi.client

ASIClient

ASIClient(
    api_token: Union[str, None] = None,
    http_proxy: str = None,
    https_proxy: str = None,
    verify: Union[str, bool] = None,
    auth: tuple[str, str] = None,
    cert: Union[str, tuple[str, str], None] = None,
    timeout: int = None,
    retries: int = None,
    backoff_factor: int = None,
    status_forcelist: list = None,
    pool_max_size: int = None,
)

Bases: BaseHTTPClient

Recorded Future ASI Attack Surface Intelligence API client.

PARAMETER DESCRIPTION
api_token

A Recorded Future ASI API key.

TYPE: Union[str, None] DEFAULT: None

http_proxy

An HTTP proxy URL.

TYPE: str DEFAULT: None

https_proxy

An HTTPS proxy URL.

TYPE: str DEFAULT: None

verify

An SSL verification flag or path to CA bundle.

TYPE: Union[str, bool] DEFAULT: None

auth

Basic Auth credentials.

TYPE: tuple[str, str] DEFAULT: None

cert

Client certificates.

TYPE: Union[str, tuple[str, str], None] DEFAULT: None

timeout

A request timeout. Defaults to 120.

TYPE: int DEFAULT: None

retries

A number of retries. Defaults to 5.

TYPE: int DEFAULT: None

backoff_factor

A backoff factor. Defaults to 1.

TYPE: int DEFAULT: None

status_forcelist

A list of status codes to force a retry. Defaults to [502, 503, 504].

TYPE: list DEFAULT: None

pool_max_size

The maximum number of connections in the pool. Defaults to 120.

TYPE: int DEFAULT: None

Source code in psengine/asi/client.py
def __init__(
    self,
    api_token: Annotated[
        Union[str, None],
        Doc('A Recorded Future ASI API key.'),
    ] = None,
    http_proxy: Annotated[str, Doc('An HTTP proxy URL.')] = None,
    https_proxy: Annotated[str, Doc('An HTTPS proxy URL.')] = None,
    verify: Annotated[
        Union[str, bool],
        Doc('An SSL verification flag or path to CA bundle.'),
    ] = None,
    auth: Annotated[tuple[str, str], Doc('Basic Auth credentials.')] = None,
    cert: Annotated[Union[str, tuple[str, str], None], Doc('Client certificates.')] = None,
    timeout: Annotated[int, Doc('A request timeout. Defaults to 120.')] = None,
    retries: Annotated[int, Doc('A number of retries. Defaults to 5.')] = None,
    backoff_factor: Annotated[int, Doc('A backoff factor. Defaults to 1.')] = None,
    status_forcelist: Annotated[
        list, Doc('A list of status codes to force a retry. Defaults to [502, 503, 504].')
    ] = None,
    pool_max_size: Annotated[
        int, Doc('The maximum number of connections in the pool. Defaults to 120.')
    ] = None,
):
    super().__init__(
        http_proxy=http_proxy,
        https_proxy=https_proxy,
        verify=verify,
        auth=auth,
        cert=cert,
        timeout=timeout,
        retries=retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist,
        pool_max_size=pool_max_size,
    )
    self._api_token = api_token or self.config.asi_token.get_secret_value()
    if not self._api_token:
        raise ValueError('Missing Recorded Future ASI API token.')
    if not is_api_token_format_valid(self._api_token):
        raise ValueError(
            f'Invalid Recorded Future API token: must match regex {ASI_TOKEN_VALIDATION_REGEX}'
        )

backoff_factor instance-attribute

backoff_factor = (
    backoff_factor
    if backoff_factor is not None
    else client_backoff_factor
)

config instance-attribute

config = get_config()

http_proxy instance-attribute

http_proxy = (
    http_proxy if http_proxy is not None else http_proxy
)

https_proxy instance-attribute

https_proxy = (
    https_proxy if https_proxy is not None else https_proxy
)

log instance-attribute

log = getLogger(__name__)

pool_max_size instance-attribute

pool_max_size = (
    pool_max_size
    if pool_max_size is not None
    else client_pool_max_size
)

proxies instance-attribute

proxies = _set_proxies()

retries instance-attribute

retries = retries if retries is not None else client_retries

session instance-attribute

session = _create_session()

status_forcelist instance-attribute

status_forcelist = (
    status_forcelist
    if status_forcelist is not None
    else client_status_forcelist
)

timeout instance-attribute

timeout = timeout if timeout is not None else client_timeout

verify instance-attribute

verify = verify if verify is not None else client_ssl_verify

call

call(
    method: str,
    url: str,
    data: Union[dict, list[dict], bytes, None] = None,
    *,
    params: Union[dict, None] = None,
    headers: Union[dict, None] = None,
    **kwargs,
) -> Response

Invoke an HTTP request using the requests library.

PARAMETER DESCRIPTION
method

An HTTP method.

TYPE: str

url

A URL to make the request to.

TYPE: str

data

A request body.

TYPE: Union[dict, list[dict], bytes, None] DEFAULT: None

params

HTTP query parameters.

TYPE: Union[dict, None] DEFAULT: None

headers

If specified, overrides default headers and does not set the token.

TYPE: Union[dict, None] DEFAULT: None

RAISES DESCRIPTION
ValueError

If method is not one of GET, PUT, POST, DELETE, HEAD, OPTIONS, PATCH.

HTTPError

If the response status is not 2xx.

JSONDecodeError

If the response contains malformed JSON.

ConnectTimeout

If the connection to the server times out.

ConnectionError

If the request fails before completing.

ReadTimeout

If the server did not send any data in time.

RETURNS DESCRIPTION
Response

A requests.Response object.

Source code in psengine/base_http_client.py
@debug_call
@validate_call
def call(
    self,
    method: Annotated[str, Doc('An HTTP method.')],
    url: Annotated[str, Doc('A URL to make the request to.')],
    data: Annotated[Union[dict, list[dict], bytes, None], Doc('A request body.')] = None,
    *,
    params: Annotated[Union[dict, None], Doc('HTTP query parameters.')] = None,
    headers: Annotated[
        Union[dict, None],
        Doc('If specified, overrides default headers and does not set the token.'),
    ] = None,
    **kwargs,
) -> Annotated[Response, Doc('A requests.Response object.')]:
    """Invoke an HTTP request using the `requests` library.

    Raises:
        ValueError: If method is not one of GET, PUT, POST, DELETE, HEAD, OPTIONS, PATCH.
        HTTPError: If the response status is not 2xx.
        JSONDecodeError: If the response contains malformed JSON.
        ConnectTimeout: If the connection to the server times out.
        ConnectionError: If the request fails before completing.
        ReadTimeout: If the server did not send any data in time.
    """
    method_func = self._choose_method_type(method)

    if not headers:
        headers = {}

    if 'User-Agent' not in headers:
        headers['User-Agent'] = self._get_user_agent_header()

    if not isinstance(data, bytes):
        data = json.dumps(data) if data is not None else None

    try:
        response = method_func(
            url=url,
            headers=headers,
            data=data,
            params=params,
            verify=self.verify,
            timeout=self.timeout,
            **kwargs,
        )
        self.log.debug(f'HTTP Status Code: {response.status_code}')

    except (ConnectionError, ConnectTimeout, ReadTimeout) as err:
        self.log.debug(f'GET request failed. Cause: {err}')
        raise

    try:
        response.raise_for_status()

    except HTTPError as err:
        msg = str(err)
        try:
            data = response.json()
        except JSONDecodeError:
            data = {}

        message = data.get('message') or data.get('error', {})
        if isinstance(message, dict):
            message = message.get('message')

        if message:
            msg += f', Cause: {message}'

        self.log.debug(f'{method} request failed. {msg}')

        raise HTTPError(msg, response=response) from err

    return response

can_connect

can_connect(
    method: str = 'get', url: str = BASE_URL
) -> bool

Check if the client can reach the specified API URL.

PARAMETER DESCRIPTION
method

An HTTP method.

TYPE: str DEFAULT: 'get'

url

A URL to test.recordedfuture.com.

TYPE: str DEFAULT: BASE_URL

RETURNS DESCRIPTION
bool

True if connection returns status 200, else False.

Source code in psengine/base_http_client.py
@debug_call
@validate_call
def can_connect(
    self,
    method: Annotated[str, Doc('An HTTP method.')] = 'get',
    url: Annotated[str, Doc('A URL to test.recordedfuture.com.')] = BASE_URL,
) -> Annotated[bool, Doc('True if connection returns status 200, else False.')]:
    """Check if the client can reach the specified API URL."""
    try:
        request = self.call(method=method, url=url)
        request.raise_for_status()
        return True
    except Exception as err:  # noqa: BLE001
        self.log.error(f'Error during connectivity test: {err}')
        return False

request

request(
    method: str,
    url: str,
    data: Union[dict, list[dict], bytes, None] = None,
    *,
    params: Optional[dict] = None,
    headers: Optional[dict] = None,
    **kwargs,
) -> Response

Perform an HTTP request against Recorded Future ASI.

PARAMETER DESCRIPTION
method

An HTTP method, one of GET, PUT, POST, DELETE, HEAD, OPTIONS, PATCH.

TYPE: str

url

A URL or API path to make the request to.

TYPE: str

data

A request body.

TYPE: Union[dict, list[dict], bytes, None] DEFAULT: None

params

HTTP query parameters.

TYPE: Optional[dict] DEFAULT: None

headers

If specified, it overrides default headers and does not set the API key.

TYPE: Optional[dict] DEFAULT: None

RETURNS DESCRIPTION
Response

A requests.Response object.

Source code in psengine/asi/client.py
@debug_call
@validate_call
def request(
    self,
    method: Annotated[
        str, Doc('An HTTP method, one of GET, PUT, POST, DELETE, HEAD, OPTIONS, PATCH.')
    ],
    url: Annotated[str, Doc('A URL or API path to make the request to.')],
    data: Annotated[Union[dict, list[dict], bytes, None], Doc('A request body.')] = None,
    *,
    params: Annotated[Optional[dict], Doc('HTTP query parameters.')] = None,
    headers: Annotated[
        Optional[dict],
        Doc('If specified, it overrides default headers and does not set the API key.'),
    ] = None,
    **kwargs,
) -> Annotated[Response, Doc('A requests.Response object.')]:
    """Perform an HTTP request against Recorded Future ASI."""
    headers = headers or self._prepare_headers()

    return self.call(
        method=method,
        url=url,
        headers=headers,
        data=data,
        params=params,
        **kwargs,
    )

request_paged

request_paged(
    method: str,
    url: str,
    data: Optional[dict] = None,
    *,
    params: Optional[dict] = None,
    headers: Optional[dict] = None,
    max_results: int = DEFAULT_LIMIT,
    objects_per_page: Optional[int] = Field(
        ge=1,
        le=MAX_ASI_PAGE_SIZE,
        default=DEFAULT_ASI_PAGE_SIZE,
    ),
    **kwargs,
) -> dict[str, Any]

Perform a paged request using ASI cursor-based pagination.

Automatically handles cursor-based pagination internally, aggregating all pages up to max_results.

PARAMETER DESCRIPTION
method

An HTTP method. Supports GET and POST.

TYPE: str

url

A URL or API path to make the request to.

TYPE: str

data

A request body.

TYPE: Optional[dict] DEFAULT: None

params

HTTP query parameters.

TYPE: Optional[dict] DEFAULT: None

headers

If specified, it overrides default headers and does not set the API key.

TYPE: Optional[dict] DEFAULT: None

max_results

The maximum number of results to return.

TYPE: int DEFAULT: DEFAULT_LIMIT

objects_per_page

Requested page size.

TYPE: Optional[int] DEFAULT: Field(ge=1, le=MAX_ASI_PAGE_SIZE, default=DEFAULT_ASI_PAGE_SIZE)

Example response structure

The data field contains records aggregated from all fetched pages.

The meta field is copied from the last fetched page.

{
    "data": [
        {"id": "record-1"},
        {"id": "record-2"}
    ],
    "meta": {
        "pagination": {
            "limit": 100,
            "total": 248,
            "next_cursor": eyJ0eXBlIjogInNlY...
        }
    }
}

RETURNS DESCRIPTION
dict[str, Any]

Paged records merged into a single dictionary.

Source code in psengine/asi/client.py
@debug_call
@validate_call
def request_paged(  # noqa: C901
    self,
    method: Annotated[str, Doc('An HTTP method. Supports GET and POST.')],
    url: Annotated[str, Doc('A URL or API path to make the request to.')],
    data: Annotated[Optional[dict], Doc('A request body.')] = None,
    *,
    params: Annotated[Optional[dict], Doc('HTTP query parameters.')] = None,
    headers: Annotated[
        Optional[dict],
        Doc('If specified, it overrides default headers and does not set the API key.'),
    ] = None,
    max_results: Annotated[
        int, Doc('The maximum number of results to return.')
    ] = DEFAULT_LIMIT,
    objects_per_page: Annotated[Optional[int], Doc('Requested page size.')] = Field(
        ge=1, le=MAX_ASI_PAGE_SIZE, default=DEFAULT_ASI_PAGE_SIZE
    ),
    **kwargs,
) -> Annotated[
    dict[str, Any],
    Doc(
        """
        Paged records merged into a single dictionary.

        """
    ),
]:
    """Perform a paged request using ASI cursor-based pagination.

    Automatically handles cursor-based pagination internally, aggregating all pages
    up to `max_results`.

    Example response structure:
        The `data` field contains records aggregated from all fetched pages.

        The `meta` field is copied from the last fetched page.
        ```json
        {
            "data": [
                {"id": "record-1"},
                {"id": "record-2"}
            ],
            "meta": {
                "pagination": {
                    "limit": 100,
                    "total": 248,
                    "next_cursor": eyJ0eXBlIjogInNlY...
                }
            }
        }
        ```

    """
    method = method.upper()
    if method not in ('GET', 'POST'):
        raise ValueError('Invalid method for paged request. Must be GET or POST')

    request_params, request_data = self._initialize_paged_request(
        method=method, params=params, data=data, limit=objects_per_page
    )

    all_results = []
    meta = None
    current_results = 0
    remaining_results = max_results

    while len(all_results) < max_results:
        if method == 'GET':
            request_params['limit'] = min(request_params['limit'], remaining_results)
            response = self.request(
                method=method,
                url=url,
                headers=headers,
                params=request_params,
                **kwargs,
            )

        else:
            request_data['pagination']['limit'] = min(
                request_data['pagination']['limit'], remaining_results
            )
            response = self.request(
                method=method,
                url=url,
                headers=headers,
                params=request_params,
                data=request_data,
                **kwargs,
            )

        try:
            json_response = response.json()
            page_results = json_response['data']
        except JSONDecodeError:
            self.log.error(f'Paged request does not contain valid JSON:\n{response.text}')
            raise
        except KeyError:
            self.log.error(f'Paged request does not contain `data` field:\n{response.text}')
            raise

        try:
            meta = json_response['meta']
            total = meta['pagination']['total']
            limit = meta['pagination']['limit']
        except KeyError:
            msg = 'Paged request `meta`, does not contain the `total` or `limit` field:\n{}'
            self.log.error(msg.format(response.text))
            raise

        if isinstance(page_results, list):
            all_results.extend(page_results)
        else:
            all_results.append(page_results)

        current_results += limit
        remaining_results = max_results - len(all_results)
        if current_results >= max_results or current_results >= total:
            break

        try:
            next_cursor = json_response['meta']['pagination']['next_cursor']
        except KeyError:
            break

        if next_cursor is None:
            break

        if method == 'GET':
            request_params['cursor'] = next_cursor
        else:
            request_data['pagination']['next_cursor'] = next_cursor

    return {'data': all_results[:max_results], 'meta': meta}

set_urllib_log_level

set_urllib_log_level(level: str) -> None

Set log level for urllib3 library.

PARAMETER DESCRIPTION
level

A log level to be set: CRITICAL, ERROR, WARNING, INFO, DEBUG, NOTSET.

TYPE: str

Source code in psengine/base_http_client.py
@debug_call
@validate_call
def set_urllib_log_level(
    self,
    level: Annotated[
        str, Doc('A log level to be set: CRITICAL, ERROR, WARNING, INFO, DEBUG, NOTSET.')
    ],
) -> None:
    """Set log level for urllib3 library."""
    if not level or level.upper() not in (
        'CRITICAL',
        'ERROR',
        'WARNING',
        'INFO',
        'DEBUG',
        'NOTSET',
    ):
        self.log.warning('Log level is empty or not valid')
        return
    logging.getLogger('urllib3').setLevel(level.upper())

is_api_token_format_valid

is_api_token_format_valid(token: str) -> bool

Check if the token format is valid.

The function performs a simple regex check but does not validate the token against the API.

PARAMETER DESCRIPTION
token

A Recorded Future ASI API token.

TYPE: str

RETURNS DESCRIPTION
bool

True if the token format is valid, False otherwise.

Source code in psengine/asi/client.py
@validate_call
def is_api_token_format_valid(
    token: Annotated[str, Doc('A Recorded Future ASI API token.')],
) -> Annotated[bool, Doc('True if the token format is valid, False otherwise.')]:
    """Check if the token format is valid.

    The function performs a simple regex check but does not validate the token against the API.
    """
    return re.fullmatch(ASI_TOKEN_VALIDATION_REGEX, token) is not None