Skip to content

Auto Sigma Manager

psengine.malware_intel.auto_sigma_mgr.AutoSigmaMgr

AutoSigmaMgr(rf_token: str = None)

Manages requests for Recorded Future Malware Intelligence API Auto Sigma feature.

PARAMETER DESCRIPTION
rf_token

Recorded Future API token.

TYPE: str DEFAULT: None

Source code in psengine/malware_intel/auto_sigma_mgr.py
def __init__(self, rf_token: str = None):
    """Initializes the `AutoSigmaMgr` object.

    Args:
        rf_token (str, optional): Recorded Future API token.
    """
    self.log = logging.getLogger(__name__)
    self.rf_client = RFClient(api_token=rf_token) if rf_token else RFClient()

create_rule_job

create_rule_job(
    name: str,
    query: str,
    start_date: str,
    end_date: str | None = None,
) -> AutoSigmaJobCreateOut

Create a new Auto Sigma rule generation job.

PARAMETER DESCRIPTION
name

The name of the Auto Sigma job.

TYPE: str

query

The query used to select files to build rules for.

TYPE: str

start_date

The earliest date to include in the query.

TYPE: str

end_date

The latest date to include in the query.

TYPE: str | None DEFAULT: None

Endpoint

/malware-intelligence/v1/auto-sigma/jobs

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

AutoSigmaJobCreationError

If API error occurs.

RETURNS DESCRIPTION
AutoSigmaJobCreateOut

Job creation confirmation containing the job ID.

Source code in psengine/malware_intel/auto_sigma_mgr.py
@debug_call
@validate_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=AutoSigmaJobCreationError)
def create_rule_job(
    self,
    name: Annotated[str, Doc('The name of the Auto Sigma job.')],
    query: Annotated[str, Doc('The query used to select files to build rules for.')],
    start_date: Annotated[str, Doc('The earliest date to include in the query.')],
    end_date: Annotated[str | None, Doc('The latest date to include in the query.')] = None,
) -> Annotated[AutoSigmaJobCreateOut, Doc('Job creation confirmation containing the job ID.')]:
    """Create a new Auto Sigma rule generation job.

    Endpoint:
        `/malware-intelligence/v1/auto-sigma/jobs`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        AutoSigmaJobCreationError: If API error occurs.
    """
    data = {'name': name, 'query': query, 'start_date': start_date}
    if end_date is not None:
        data['end_date'] = end_date

    data = self.rf_client.request('post', EP_AUTO_SIGMA_JOBS, data).json()
    return AutoSigmaJobCreateOut.model_validate(data)

fetch_rule_jobs

fetch_rule_jobs(
    limit: int | None = DEFAULT_LIMIT,
) -> AutoSigmaJobsOut

Fetch all Auto Sigma rule generation jobs created by the user.

PARAMETER DESCRIPTION
limit

Maximum number of jobs to return.

TYPE: int | None DEFAULT: DEFAULT_LIMIT

Endpoint

/malware-intelligence/v1/auto-sigma/get_jobs

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

AutoSigmaFetchJobsError

If API error occurs.

RETURNS DESCRIPTION
AutoSigmaJobsOut

The list of Auto Sigma rule generation jobs created by the user.

Source code in psengine/malware_intel/auto_sigma_mgr.py
@debug_call
@validate_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=AutoSigmaFetchJobsError)
def fetch_rule_jobs(
    self,
    limit: Annotated[int | None, Doc('Maximum number of jobs to return.')] = DEFAULT_LIMIT,
) -> Annotated[
    AutoSigmaJobsOut,
    Doc('The list of Auto Sigma rule generation jobs created by the user.'),
]:
    """Fetch all Auto Sigma rule generation jobs created by the user.

    Endpoint:
        `/malware-intelligence/v1/auto-sigma/get_jobs`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        AutoSigmaFetchJobsError: If API error occurs.
    """
    data = {'limit': limit}
    jobs = self.rf_client.request('post', EP_AUTO_SIGMA_GET_JOBS, data=data).json()
    return AutoSigmaJobsOut.model_validate(jobs)

fetch_rule_job_result

fetch_rule_job_result(
    job_id: str, wait_until_finished: bool = False
) -> AutoSigmaJobOut

Fetch the result of a specific Auto Sigma rule generation job.

A newly created job typically moves through CREATED and then RUNNING while Sigma rules and patterns are being generated.

PARAMETER DESCRIPTION
job_id

The job ID to fetch.

TYPE: str

wait_until_finished

When true, keep polling until the job status is FINISHED.

TYPE: bool DEFAULT: False

The terminal statuses are
  • FAILED: generation failed.
  • FINISHED: generation succeeded.
Endpoint

/malware-intelligence/v1/auto-sigma/jobs/{job_id}

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

AutoSigmaFetchJobError

If API error occurs or if polling times out / job fails.

RETURNS DESCRIPTION
AutoSigmaJobOut

The details of the requested Sigma rule job.

Source code in psengine/malware_intel/auto_sigma_mgr.py
@debug_call
@validate_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=AutoSigmaFetchJobError)
def fetch_rule_job_result(
    self,
    job_id: Annotated[str, Doc('The job ID to fetch.')],
    wait_until_finished: Annotated[
        bool,
        Doc('When true, keep polling until the job status is FINISHED.'),
    ] = False,
) -> Annotated[AutoSigmaJobOut, Doc('The details of the requested Sigma rule job.')]:
    """Fetch the result of a specific Auto Sigma rule generation job.

    A newly created job typically moves through `CREATED` and then `RUNNING` while
    Sigma rules and patterns are being generated.

    The terminal statuses are:
        - `FAILED`: generation failed.
        - `FINISHED`: generation succeeded.

    Endpoint:
        `/malware-intelligence/v1/auto-sigma/jobs/{job_id}`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        AutoSigmaFetchJobError: If API error occurs or if polling times out / job fails.
    """
    if not wait_until_finished:
        data = self.rf_client.request('get', EP_AUTO_SIGMA_JOB_ID.format(job_id)).json()
        return AutoSigmaJobOut.model_validate(data)

    status = ''
    for _ in range(JOB_POOL_RETRIES):
        data = self.rf_client.request('get', EP_AUTO_SIGMA_JOB_ID.format(job_id)).json()
        result = AutoSigmaJobOut.model_validate(data)
        status = result.status.upper()

        if status == 'FINISHED':
            return result

        if status == 'FAILED':
            raise AutoSigmaFetchJobError(
                message=(f'Auto Sigma job {job_id} failed while waiting for FINISHED status.')
            )

        time.sleep(JOB_POOL_INTERTVAL_SECONDS)

    raise AutoSigmaFetchJobError(
        message=(
            f'Timed out waiting for Auto Sigma job {job_id} to finish. Last status: {status}'
        )
    )

edit_rule_str

edit_rule_str(
    job_id: str,
    rule_id: str,
    yaml_str: str | None = None,
    status: str | None = None,
) -> AutoSigmaJobEditOut

Edit an existing Auto Sigma rule within a job by modifying its YAML rule string.

PARAMETER DESCRIPTION
job_id

The job ID to which the Auto Sigma rule belongs.

TYPE: str

rule_id

The Auto Sigma rule ID to change.

TYPE: str

yaml_str

New Sigma rule YAML to apply.

TYPE: str | None DEFAULT: None

status

New Sigma rule status to apply. Supported values: - True Positive, - False Positive, - Benign Behavior, - No Root Cause, - Needs Tuning, - New.

TYPE: str | None DEFAULT: None

Endpoint

/malware-intelligence/v1/auto-sigma/jobs/{job_id}/{rule_id}

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

AutoSigmaJobEditError

If API error occurs.

RETURNS DESCRIPTION
AutoSigmaJobEditOut

Edit confirmation

Source code in psengine/malware_intel/auto_sigma_mgr.py
@debug_call
@validate_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=AutoSigmaJobEditError)
def edit_rule_str(
    self,
    job_id: Annotated[str, Doc('The job ID to which the Auto Sigma rule belongs.')],
    rule_id: Annotated[str, Doc('The Auto Sigma rule ID to change.')],
    yaml_str: Annotated[str | None, Doc('New Sigma rule YAML to apply.')] = None,
    status: Annotated[
        str | None,
        Doc(
            """New Sigma rule status to apply. Supported values:
                - True Positive,
                - False Positive,
                - Benign Behavior,
                - No Root Cause,
                - Needs Tuning,
                - New."""
        ),
    ] = None,
) -> Annotated[AutoSigmaJobEditOut, Doc('Edit confirmation')]:
    """Edit an existing Auto Sigma rule within a job by modifying its YAML rule string.

    Endpoint:
        `/malware-intelligence/v1/auto-sigma/jobs/{job_id}/{rule_id}`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        AutoSigmaJobEditError: If API error occurs.
    """
    data = {}
    if yaml_str is not None:
        data['rule_yaml'] = yaml_str
    if status is not None:
        data['status'] = status

    updated = self.rf_client.request(
        'post', EP_AUTO_SIGMA_JOB_ID_RULE_ID.format(job_id, rule_id), data
    ).json()
    return AutoSigmaJobEditOut(job_id=job_id, rule_id=rule_id, updated=updated)

delete_rule_job

delete_rule_job(job_id: str) -> AutoSigmaJobDeleteOut

Delete a created Auto Sigma job and its generated Sigma rules.

PARAMETER DESCRIPTION
job_id

The job ID to delete.

TYPE: str

Endpoint

/malware-intelligence/v1/auto-sigma/jobs/{job_id}

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

AutoSigmaJobDeletionError

If API error occurs.

RETURNS DESCRIPTION
AutoSigmaJobDeleteOut

A confirmation of deletion.

Source code in psengine/malware_intel/auto_sigma_mgr.py
@debug_call
@validate_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=AutoSigmaJobDeletionError)
def delete_rule_job(
    self,
    job_id: Annotated[str, Doc('The job ID to delete.')],
) -> Annotated[AutoSigmaJobDeleteOut, Doc('A confirmation of deletion.')]:
    """Delete a created Auto Sigma job and its generated Sigma rules.

    Endpoint:
        `/malware-intelligence/v1/auto-sigma/jobs/{job_id}`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        AutoSigmaJobDeletionError: If API error occurs.
    """
    data = self.rf_client.request('delete', EP_AUTO_SIGMA_JOB_ID.format(job_id)).json()
    return AutoSigmaJobDeleteOut.model_validate(data)

retry_failed_rule_job

retry_failed_rule_job(job_id: str) -> AutoSigmaJobRetryOut

Retry a failed Auto Sigma rule generation job.

PARAMETER DESCRIPTION
job_id

The job ID to retry.

TYPE: str

Endpoint

/malware-intelligence/v1/auto-sigma/jobs/{job_id}/retry

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

AutoSigmaJobRetryError

If API error occurs.

RETURNS DESCRIPTION
AutoSigmaJobRetryOut

A confirmation of retry.

Source code in psengine/malware_intel/auto_sigma_mgr.py
@debug_call
@validate_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=AutoSigmaJobRetryError)
def retry_failed_rule_job(
    self,
    job_id: Annotated[str, Doc('The job ID to retry.')],
) -> Annotated[AutoSigmaJobRetryOut, Doc('A confirmation of retry.')]:
    """Retry a failed Auto Sigma rule generation job.

    Endpoint:
        `/malware-intelligence/v1/auto-sigma/jobs/{job_id}/retry`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        AutoSigmaJobRetryError: If API error occurs.
    """
    data = self.rf_client.request('post', EP_AUTO_SIGMA_JOB_ID_RETRY.format(job_id)).json()
    return AutoSigmaJobRetryOut.model_validate(data)