Skip to content

Manager

psengine.classic_alerts.classic_alert_mgr.ClassicAlertMgr

ClassicAlertMgr(rf_token: str = None)

Alert Manager for Classic Alert (v3) API.

PARAMETER DESCRIPTION
rf_token

Recorded Future API token.

TYPE: str DEFAULT: None

Source code in psengine/classic_alerts/classic_alert_mgr.py
def __init__(self, rf_token: str = None):
    """Initializes the ClassicAlertMgr object.

    Args:
        rf_token (str, optional): Recorded Future API token.
    """
    self.log = logging.getLogger(__name__)
    self.rf_client = RFClient(api_token=rf_token) if rf_token else RFClient()

fetch

fetch(
    id_: str = Field(min_length=4),
    fields: Optional[list[str]] = ALL_CA_FIELDS,
    tagged_text: Optional[bool] = None,
) -> ClassicAlert

Fetch a specific alert.

The alert can be saved to a file as shown below:

PARAMETER DESCRIPTION
id_

The alert ID to be fetched.

TYPE: str DEFAULT: Field(min_length=4)

fields

Fields to include in the search result.

Note: Defaults fields are id, log, title, rule which are always retrieved. Any provided fields are added to these."

TYPE: Optional[list[str]] DEFAULT: ALL_CA_FIELDS

tagged_text

Entities in the alert title and message body will be marked up with entity IDs.

TYPE: Optional[bool] DEFAULT: None

Example
1
2
3
4
5
6
7
8
9
from pathlib import Path
from json import dumps
from psengine.classic_alerts import ClassicAlertMgr

mgr = ClassicAlertMgr()
alert = mgr.fetch('zVEe6k')
OUTPUT_DIR = Path('your' / 'path')
OUTPUT_DIR.mkdir(exist_ok=True)
(OUTPUT_DIR / f'{alert.id_}.json').write_text(dumps(alert.json(), indent=2))
Endpoint

v3/alerts/{id_}

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

AlertFetchError

If a fetch of the alert via the API fails.

RETURNS DESCRIPTION
ClassicAlert

ClassicAlert model.

Source code in psengine/classic_alerts/classic_alert_mgr.py
@debug_call
@validate_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=AlertFetchError)
def fetch(
    self,
    id_: Annotated[str, Doc('The alert ID to be fetched.')] = Field(min_length=4),
    fields: Annotated[
        Optional[list[str]],
        Doc(
            """
            Fields to include in the search result.

            **Note:**
            Defaults fields are `id`, `log`, `title`, `rule` which are always retrieved.
            Any provided fields are added to these."
            """
        ),
    ] = ALL_CA_FIELDS,
    tagged_text: Annotated[
        Optional[bool],
        Doc('Entities in the alert title and message body will be marked up with entity IDs.'),
    ] = None,
) -> Annotated[ClassicAlert, Doc('ClassicAlert model.')]:
    """Fetch a specific alert.

    The alert can be saved to a file as shown below:

    Example:
        ```python
        from pathlib import Path
        from json import dumps
        from psengine.classic_alerts import ClassicAlertMgr

        mgr = ClassicAlertMgr()
        alert = mgr.fetch('zVEe6k')
        OUTPUT_DIR = Path('your' / 'path')
        OUTPUT_DIR.mkdir(exist_ok=True)
        (OUTPUT_DIR / f'{alert.id_}.json').write_text(dumps(alert.json(), indent=2))
        ```

    Endpoint:
        `v3/alerts/{id_}`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        AlertFetchError: If a fetch of the alert via the API fails.
    """
    params = {}
    params['fields'] = set((fields or []) + REQUIRED_CA_FIELDS)
    params['fields'] = ','.join(params['fields'])

    if tagged_text:
        params['taggedText'] = tagged_text

    self.log.info(f'Fetching alert: {id_}')
    response = self.rf_client.request(
        'get', url=EP_CLASSIC_ALERTS_ID.format(id_), params=params
    ).json()
    return ClassicAlert.model_validate(response.get('data'))

fetch_all_images

fetch_all_images(alert: ClassicAlert) -> None

Fetch all images from an alert and store them in the alert object under @images.

PARAMETER DESCRIPTION
alert

Alert to fetch images from.

TYPE: ClassicAlert

Endpoint

v3/alerts/image

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

Source code in psengine/classic_alerts/classic_alert_mgr.py
@debug_call
@validate_call
def fetch_all_images(
    self,
    alert: Annotated[ClassicAlert, Doc('Alert to fetch images from.')],
) -> None:
    """Fetch all images from an alert and store them in the alert object under `@images`.

    Endpoint:
        `v3/alerts/image`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
    """
    for hit in alert.hits:
        for entity in hit.entities:
            if entity.type_ == 'Image':
                alert.store_image(entity.id_, self.fetch_image(entity.id_))

fetch_bulk

fetch_bulk(
    ids: list[str],
    fields: Optional[list[str]] = ALL_CA_FIELDS,
    tagged_text: Optional[bool] = None,
    max_workers: Optional[int] = 0,
) -> list[ClassicAlert]

Fetch multiple alerts.

PARAMETER DESCRIPTION
ids

Alert IDs that should be fetched.

TYPE: list[str]

fields

Fields to include in the search result.

Note: Defaults fields are id, log, title, rule which are always retrieved. Any provided fields are added to these."

TYPE: Optional[list[str]] DEFAULT: ALL_CA_FIELDS

tagged_text

Entities in the alert title and message body will be marked up with entity IDs.

TYPE: Optional[bool] DEFAULT: None

max_workers

Number of workers to multithread requests.

TYPE: Optional[int] DEFAULT: 0

Example
from json import dumps
from pathlib import Path
from psengine.classic_alerts import ClassicAlertMgr

mgr = ClassicAlertMgr()
alerts = mgr.fetch_bulk(ids=['zVEe6k', 'zVHPXX'])
OUTPUT_DIR = Path('your/path')
OUTPUT_DIR.mkdir(exist_ok=True)
for i, alert in enumerate(alerts):
    (OUTPUT_DIR / f'filename_{i}.json').write_text(dumps(alert.json(), indent=2))

Alternatively, save all alerts to a single file:

1
2
3
4
5
6
7
8
9
from json import dump
from pathlib import Path
from psengine.classic_alerts import ClassicAlertMgr

mgr = ClassicAlertMgr()
OUTPUT_FILE = Path('your/path/file')
alerts = mgr.fetch_bulk(ids=['zVEe6k', 'zVHPXX'])
with OUTPUT_FILE.open('w') as f:
    dump([alert.json() for alert in alerts], f, indent=2)
Endpoint

v3/alerts/{id_}

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

AlertFetchError

If a fetch of the alert via the API fails.

RETURNS DESCRIPTION
list[ClassicAlert]

List of ClassicAlert models.

Source code in psengine/classic_alerts/classic_alert_mgr.py
@debug_call
@validate_call
def fetch_bulk(
    self,
    ids: Annotated[list[str], Doc('Alert IDs that should be fetched.')],
    fields: Annotated[
        Optional[list[str]],
        Doc(
            """
            Fields to include in the search result.

            **Note:**
            Defaults fields are `id`, `log`, `title`, `rule` which are always retrieved.
            Any provided fields are added to these."
            """
        ),
    ] = ALL_CA_FIELDS,
    tagged_text: Annotated[
        Optional[bool],
        Doc('Entities in the alert title and message body will be marked up with entity IDs.'),
    ] = None,
    max_workers: Annotated[
        Optional[int], Doc('Number of workers to multithread requests.')
    ] = 0,
) -> Annotated[list[ClassicAlert], Doc('List of ClassicAlert models.')]:
    """Fetch multiple alerts.

    Example:
        ```python
        from json import dumps
        from pathlib import Path
        from psengine.classic_alerts import ClassicAlertMgr

        mgr = ClassicAlertMgr()
        alerts = mgr.fetch_bulk(ids=['zVEe6k', 'zVHPXX'])
        OUTPUT_DIR = Path('your/path')
        OUTPUT_DIR.mkdir(exist_ok=True)
        for i, alert in enumerate(alerts):
            (OUTPUT_DIR / f'filename_{i}.json').write_text(dumps(alert.json(), indent=2))
        ```

        Alternatively, save all alerts to a single file:

        ```python
        from json import dump
        from pathlib import Path
        from psengine.classic_alerts import ClassicAlertMgr

        mgr = ClassicAlertMgr()
        OUTPUT_FILE = Path('your/path/file')
        alerts = mgr.fetch_bulk(ids=['zVEe6k', 'zVHPXX'])
        with OUTPUT_FILE.open('w') as f:
            dump([alert.json() for alert in alerts], f, indent=2)
        ```

    Endpoint:
        `v3/alerts/{id_}`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        AlertFetchError: If a fetch of the alert via the API fails.
    """
    self.log.info(f'Fetching alerts: {ids}')
    results = []
    if max_workers:
        results = MultiThreadingHelper.multithread_it(
            max_workers,
            self.fetch,
            iterator=ids,
            fields=fields,
            tagged_text=tagged_text,
        )
    else:
        results = [self.fetch(id_, fields, tagged_text) for id_ in ids]

    return results

fetch_hits

fetch_hits(
    ids: Union[str, list[str]],
    tagged_text: Optional[bool] = None,
) -> list[ClassicAlertHit]

Fetch a list of all the data that caused the alert to trigger (hits).

PARAMETER DESCRIPTION
ids

One or more alert IDs to fetch.

TYPE: Union[str, list[str]]

tagged_text

Entities in the alert title and message body will be marked up with entity IDs.

TYPE: Optional[bool] DEFAULT: None

Endpoint

v3/alerts/hits

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

AlertFetchError

If a fetch of the alert hit via the API fails.

RETURNS DESCRIPTION
list[ClassicAlertHit]

List of ClassicAlertHit models.

Source code in psengine/classic_alerts/classic_alert_mgr.py
@debug_call
@validate_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=AlertFetchError)
def fetch_hits(
    self,
    ids: Annotated[Union[str, list[str]], Doc('One or more alert IDs to fetch.')],
    tagged_text: Annotated[
        Optional[bool],
        Doc('Entities in the alert title and message body will be marked up with entity IDs.'),
    ] = None,
) -> Annotated[list[ClassicAlertHit], Doc('List of ClassicAlertHit models.')]:
    """Fetch a list of all the data that caused the alert to trigger (hits).

    Endpoint:
        `v3/alerts/hits`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        AlertFetchError: If a fetch of the alert hit via the API fails.
    """
    data = {}

    if isinstance(ids, list):
        ids = ','.join(ids)

    data['ids'] = ids

    if tagged_text:
        data['taggedText'] = tagged_text

    self.log.info(f'Fetching hits for alerts: {ids}')
    response = self.rf_client.request('get', url=EP_CLASSIC_ALERTS_HITS, params=data).json()
    return [ClassicAlertHit.model_validate(hit) for hit in response.get('data', [])]

fetch_image

fetch_image(id_: str) -> bytes

Fetch an image.

PARAMETER DESCRIPTION
id_

Image ID to fetch, for example: img:d4620c6a-c789-48aa-b652-b47e0d06d91a

TYPE: str

Endpoint

v3/alerts/image

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

AlertImageFetchError

If a fetch of the alert image via the API fails.

RETURNS DESCRIPTION
bytes

Image content.

Source code in psengine/classic_alerts/classic_alert_mgr.py
@debug_call
@validate_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=AlertImageFetchError)
def fetch_image(
    self,
    id_: Annotated[
        str, Doc('Image ID to fetch, for example: img:d4620c6a-c789-48aa-b652-b47e0d06d91a')
    ],
) -> Annotated[bytes, Doc('Image content.')]:
    """Fetch an image.

    Endpoint:
        `v3/alerts/image`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        AlertImageFetchError: If a fetch of the alert image via the API fails.
    """
    self.log.info(f'Fetching image: {id_}')
    response = self.rf_client.request('get', url=EP_CLASSIC_ALERTS_IMAGE, params={'id': id_})
    return response.content

fetch_rules

fetch_rules(
    freetext: Union[str, list[str], None] = None,
    max_results: int = Field(
        default=DEFAULT_LIMIT, ge=1, le=1000
    ),
) -> list[AlertRuleOut]

Search for alerting rules.

PARAMETER DESCRIPTION
freetext

Filter by a freetext search.

TYPE: Union[str, list[str], None] DEFAULT: None

max_results

Maximum number of rules to return. Maximum 1000.

TYPE: int DEFAULT: Field(default=DEFAULT_LIMIT, ge=1, le=1000)

Endpoint

v2/alert/rules

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type or value.

NoRulesFoundError

If a rule has not been found.

RETURNS DESCRIPTION
list[AlertRuleOut]

List of AlertRule models.

Source code in psengine/classic_alerts/classic_alert_mgr.py
@debug_call
@validate_call
def fetch_rules(
    self,
    freetext: Annotated[
        Union[str, list[str], None], Doc('Filter by a freetext search.')
    ] = None,
    max_results: Annotated[
        int, Doc('Maximum number of rules to return. Maximum 1000.')
    ] = Field(default=DEFAULT_LIMIT, ge=1, le=1000),
) -> Annotated[list[AlertRuleOut], Doc('List of AlertRule models.')]:
    """Search for alerting rules.

    Endpoint:
        `v2/alert/rules`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type or value.
        NoRulesFoundError: If a rule has not been found.
    """
    if not freetext:
        return self._fetch_rules(max_results=max_results)

    if isinstance(freetext, str):
        return self._fetch_rules(freetext, max_results)

    rules = []
    for text in freetext:
        rules += self._fetch_rules(text, max_results - len(rules))
    return rules

search

search(
    triggered: Optional[str] = None,
    status: Optional[str] = None,
    rule_id: Union[str, list[str], None] = None,
    freetext: Optional[str] = None,
    tagged_text: Optional[bool] = None,
    order_by: Optional[str] = None,
    direction: Optional[str] = None,
    fields: Optional[list[str]] = REQUIRED_CA_FIELDS,
    max_results: Optional[int] = Field(
        ge=1, le=1000, default=DEFAULT_LIMIT
    ),
    max_workers: Optional[int] = Field(
        ge=0, le=50, default=0
    ),
    alerts_per_page: Optional[int] = Field(
        ge=1, le=1000, default=ALERTS_PER_PAGE
    ),
) -> list[ClassicAlert]

Search for triggered alerts.

Does pagination requests on batches of alerts_per_page up to max_results.

PARAMETER DESCRIPTION
triggered

Filter on triggered time. Format: -1d or [2017-07-30,2017-07-31].

TYPE: Optional[str] DEFAULT: None

status

Filter on status, such as: New, Resolved, Pending, Dismissed.

TYPE: Optional[str] DEFAULT: None

rule_id

Filter by a specific Alert Rule ID.

TYPE: Union[str, list[str], None] DEFAULT: None

freetext

Filter by a freetext search.

TYPE: Optional[str] DEFAULT: None

tagged_text

Entities in the alert title and message body will be marked up.

TYPE: Optional[bool] DEFAULT: None

order_by

Sort by a specific field, such as: triggered.

TYPE: Optional[str] DEFAULT: None

direction

Sort direction, such as: asc or desc.

TYPE: Optional[str] DEFAULT: None

fields

Fields to include in the search result.

Note: Defaults fields are id, log, title, rule which are always retrieved. Any provided fields are added to these."

TYPE: Optional[list[str]] DEFAULT: REQUIRED_CA_FIELDS

max_results

Maximum number of records to return. Maximum 1000.

TYPE: Optional[int] DEFAULT: Field(ge=1, le=1000, default=DEFAULT_LIMIT)

max_workers

Number of workers to use for concurrent fetches. Applied only when multiple rule_id values are provided.

TYPE: Optional[int] DEFAULT: Field(ge=0, le=50, default=0)

alerts_per_page

Number of items to retrieve per page.

TYPE: Optional[int] DEFAULT: Field(ge=1, le=1000, default=ALERTS_PER_PAGE)

Warning

Paginating with a high number of items per page may lead to timeout errors from the API.

Endpoint

v3/alerts/

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

AlertSearchError

If connection error occurs.

RETURNS DESCRIPTION
list[ClassicAlert]

List of ClassicAlert models.

Source code in psengine/classic_alerts/classic_alert_mgr.py
@debug_call
@validate_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=AlertSearchError)
def search(
    self,
    triggered: Annotated[
        Optional[str], Doc('Filter on triggered time. Format: -1d or [2017-07-30,2017-07-31].')
    ] = None,
    status: Annotated[
        Optional[str],
        Doc('Filter on status, such as: `New`, `Resolved`, `Pending`, `Dismissed`.'),
    ] = None,
    rule_id: Annotated[
        Union[str, list[str], None], Doc('Filter by a specific Alert Rule ID.')
    ] = None,
    freetext: Annotated[Optional[str], Doc('Filter by a freetext search.')] = None,
    tagged_text: Annotated[
        Optional[bool], Doc('Entities in the alert title and message body will be marked up.')
    ] = None,
    order_by: Annotated[
        Optional[str], Doc('Sort by a specific field, such as: `triggered`.')
    ] = None,
    direction: Annotated[
        Optional[str], Doc('Sort direction, such as: `asc` or `desc`.')
    ] = None,
    fields: Annotated[
        Optional[list[str]],
        Doc(
            """
            Fields to include in the search result.

            **Note:**
            Defaults fields are `id`, `log`, `title`, `rule` which are always retrieved.
            Any provided fields are added to these."
            """
        ),
    ] = REQUIRED_CA_FIELDS,
    max_results: Annotated[
        Optional[int], Doc('Maximum number of records to return. Maximum 1000.')
    ] = Field(ge=1, le=1000, default=DEFAULT_LIMIT),
    max_workers: Annotated[
        Optional[int],
        Doc(
            """
            Number of workers to use for concurrent fetches.
            Applied only when multiple `rule_id` values are provided.
            """
        ),
    ] = Field(ge=0, le=50, default=0),
    alerts_per_page: Annotated[
        Optional[int], Doc('Number of items to retrieve per page.')
    ] = Field(ge=1, le=1000, default=ALERTS_PER_PAGE),
) -> Annotated[list[ClassicAlert], Doc('List of ClassicAlert models.')]:
    """Search for triggered alerts.

    Does pagination requests on batches of `alerts_per_page` up to `max_results`.

    Warning:
        Paginating with a high number of items per page may lead to timeout errors from the API.

    Endpoint:
        `v3/alerts/`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        AlertSearchError: If connection error occurs.
    """
    rule_id = None if rule_id == [] else rule_id
    params = {
        'triggered': triggered,
        'status': status,
        'freetext': freetext,
        'tagged_text': tagged_text,
        'order_by': order_by,
        'direction': direction,
        'fields': fields,
        'max_results': DEFAULT_LIMIT if max_results is None else max_results,
        'alerts_per_page': alerts_per_page,
    }
    if isinstance(rule_id, list) and max_workers:
        return list(
            chain.from_iterable(
                MultiThreadingHelper.multithread_it(
                    max_workers, self._search, iterator=rule_id, **params
                )
            )
        )

    if isinstance(rule_id, list):
        return list(chain.from_iterable(self._search(rule, **params) for rule in rule_id))

    if isinstance(rule_id, str):
        return self._search(rule_id, **params)
    return self._search(**params)

update

update(updates: list[dict])

Update one or more alerts.

It is possible to update the assignee, statusInPortal, and a note tied to the alert.

PARAMETER DESCRIPTION
updates

List of updates to perform.

TYPE: list[dict]

Example
1
2
3
4
5
6
7
8
9
[
    {
        "id": "string",
        "assignee": "string",
        "status": "unassigned",
        "note": "string",
        "statusInPortal": "New"
    }
]
Endpoint

v2/alert/update

Source code in psengine/classic_alerts/classic_alert_mgr.py
@debug_call
@validate_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=AlertUpdateError)
def update(
    self,
    updates: Annotated[list[dict], Doc('List of updates to perform.')],
):
    """Update one or more alerts.

    It is possible to update the assignee, `statusInPortal`, and a note tied to the alert.

    Example:
        ```python
        [
            {
                "id": "string",
                "assignee": "string",
                "status": "unassigned",
                "note": "string",
                "statusInPortal": "New"
            }
        ]
        ```

    Endpoint:
        `v2/alert/update`
    """
    self.log.info(f'Updating alerts: {updates}')
    return self.rf_client.request('post', url=EP_CLASSIC_ALERTS_UPDATE, data=updates).json()

update_status

update_status(ids: Union[str, list[str]], status: str)

Update the status of one or several alerts.

PARAMETER DESCRIPTION
ids

One or more alert IDs.

TYPE: Union[str, list[str]]

status

Status to update to.

TYPE: str

Endpoint

v2/alert/update

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

AlertUpdateError

If connection error occurs.

Source code in psengine/classic_alerts/classic_alert_mgr.py
@debug_call
@validate_call
def update_status(
    self,
    ids: Annotated[Union[str, list[str]], Doc('One or more alert IDs.')],
    status: Annotated[str, Doc('Status to update to.')],
):
    """Update the status of one or several alerts.

    Endpoint:
        `v2/alert/update`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        AlertUpdateError: If connection error occurs.
    """
    ids = ids if isinstance(ids, list) else ids.split(',')
    payload = [{'id': alert_id, 'statusInPortal': status} for alert_id in ids]
    return self.update(payload)