Skip to content

Auto Yara Manager

psengine.malware_intel.auto_yara_mgr.AutoYaraMgr

AutoYaraMgr(rf_token: str = None)

Manages requests for Recorded Future Malware Intelligence API Auto YARA feature.

PARAMETER DESCRIPTION
rf_token

Recorded Future API token.

TYPE: str DEFAULT: None

Source code in psengine/malware_intel/auto_yara_mgr.py
def __init__(self, rf_token: str = None):
    """Initializes the `AutoYaraMgr` object.

    Args:
        rf_token (str, optional): Recorded Future API token.
    """
    self.log = logging.getLogger(__name__)
    self.rf_client = RFClient(api_token=rf_token) if rf_token else RFClient()

create_rule_job

create_rule_job(
    hashes: list[str], name: str, query: str | None = None
) -> AutoYaraJobCreateOut

Create a new Auto YARA rule generation job based on the hashes and/or query provided.

PARAMETER DESCRIPTION
hashes

The list of hashes to use.

TYPE: list[str]

name

The job name.

TYPE: str

query

The filtering query to perform.

TYPE: str | None DEFAULT: None

Endpoint

/malware-intelligence/v1/auto-yara/jobs

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

AutoYaraJobCreationError

If API error occurs.

RETURNS DESCRIPTION
AutoYaraJobCreateOut

Job creation confirmation containing the job ID.

Source code in psengine/malware_intel/auto_yara_mgr.py
@debug_call
@validate_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=AutoYaraJobCreationError)
def create_rule_job(
    self,
    hashes: Annotated[list[str], Doc('The list of hashes to use.')],
    name: Annotated[str, Doc('The job name.')],
    query: Annotated[str | None, Doc('The filtering query to perform.')] = None,
) -> Annotated[AutoYaraJobCreateOut, Doc('Job creation confirmation containing the job ID.')]:
    """Create a new Auto YARA rule generation job based on the hashes and/or query provided.

    Endpoint:
        `/malware-intelligence/v1/auto-yara/jobs`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        AutoYaraJobCreationError: If API error occurs.
    """
    data = {'hashes': hashes, 'name': name}
    if query is not None:
        data['query'] = query

    data = self.rf_client.request('post', EP_AUTO_YARA_JOBS, data).json()
    return AutoYaraJobCreateOut.model_validate(data)

fetch_rule_jobs

fetch_rule_jobs() -> AutoYaraJobsOut

Fetch all the Auto Yara rule generation jobs created by the user.

Endpoint

/malware-intelligence/v1/auto-yara/jobs

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

AutoYaraFetchJobsError

If API error occurs.

RETURNS DESCRIPTION
AutoYaraJobsOut

The list of Auto Yara rule generation jobs created by the user.

Source code in psengine/malware_intel/auto_yara_mgr.py
@debug_call
@validate_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=AutoYaraFetchJobsError)
def fetch_rule_jobs(
    self,
) -> Annotated[
    AutoYaraJobsOut, Doc('The list of Auto Yara rule generation jobs created by the user.')
]:
    """Fetch all the Auto Yara rule generation jobs created by the user.

    Endpoint:
        `/malware-intelligence/v1/auto-yara/jobs`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        AutoYaraFetchJobsError: If API error occurs.
    """
    data = self.rf_client.request('get', EP_AUTO_YARA_JOBS).json()
    return AutoYaraJobsOut.model_validate(data)

fetch_rule_job_result

fetch_rule_job_result(
    job_id: str,
    sanitize: bool | None = None,
    wait_until_finished: bool = False,
) -> AutoYaraJobOut

Fetch the result of a specific Auto YARA rule generation job.

A newly created job will typically progress through CREATED and then RUNNING while the YARA rule is being generated. During those states, job.yara_rule_str is None.

PARAMETER DESCRIPTION
job_id

The job ID to fetch.

TYPE: str

sanitize

Return a sanitized version of the rule when true.

TYPE: bool | None DEFAULT: None

wait_until_finished

When true, keep polling until the job status is FINISHED.

TYPE: bool DEFAULT: False

The terminal statuses are
  • FAILED: rule generation failed, so job.yara_rule_str remains None.
  • FINISHED: rule generation succeeded, and job.yara_rule_str is available.
Endpoint

/malware-intelligence/v1/auto-yara/jobs/{job_id}

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

AutoYaraFetchJobError

If API error occurs or if polling times out / job fails.

RETURNS DESCRIPTION
AutoYaraJobOut

The details of the requested YARA rule job.

Source code in psengine/malware_intel/auto_yara_mgr.py
@debug_call
@validate_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=AutoYaraFetchJobError)
def fetch_rule_job_result(
    self,
    job_id: Annotated[str, Doc('The job ID to fetch.')],
    sanitize: Annotated[
        bool | None, Doc('Return a sanitized version of the rule when true.')
    ] = None,
    wait_until_finished: Annotated[
        bool,
        Doc('When true, keep polling until the job status is FINISHED.'),
    ] = False,
) -> Annotated[AutoYaraJobOut, Doc('The details of the requested YARA rule job.')]:
    """Fetch the result of a specific Auto YARA rule generation job.

    A newly created job will typically progress through `CREATED` and then `RUNNING` while
    the YARA rule is being generated. During those states, `job.yara_rule_str` is `None`.

    The terminal statuses are:
        - `FAILED`: rule generation failed, so `job.yara_rule_str` remains `None`.
        - `FINISHED`: rule generation succeeded, and `job.yara_rule_str` is available.


    Endpoint:
        `/malware-intelligence/v1/auto-yara/jobs/{job_id}`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        AutoYaraFetchJobError: If API error occurs or if polling times out / job fails.
    """
    kwargs = {'params': {'sanitize': sanitize}} if sanitize is not None else {}
    if not wait_until_finished:
        data = self.rf_client.request(
            'get', EP_AUTO_YARA_JOB_ID.format(job_id), **kwargs
        ).json()
        return AutoYaraJobOut.model_validate(data)

    status = ''
    for _ in range(JOB_POOL_RETRIES):
        data = self.rf_client.request(
            'get', EP_AUTO_YARA_JOB_ID.format(job_id), **kwargs
        ).json()
        result = AutoYaraJobOut.model_validate(data)
        status = result.job.status.upper()

        if status == 'FINISHED':
            return result

        if status == 'FAILED':
            raise AutoYaraFetchJobError(
                message=f'Auto YARA job {job_id} failed while waiting for FINISHED status.'
            )

        time.sleep(JOB_POOL_INTERTVAL_SECONDS)

    raise AutoYaraFetchJobError(
        message=(
            f'Timed out waiting for Auto YARA job {job_id} to finish. Last status: {status}'
        )
    )

edit_rule_str

edit_rule_str(
    job_id: str, yara_rule_str: str
) -> AutoYaraJobEditOut

Edit an existing Yara rule job by modifying its YARA rule string.

PARAMETER DESCRIPTION
job_id

The job ID to which the Auto Yara rule belongs.

TYPE: str

yara_rule_str

The new YARA rule string value to apply.

TYPE: str

Endpoint

/malware-intelligence/v1/auto-yara/jobs/edit

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

AutoYaraJobEditError

If API error occurs.

RETURNS DESCRIPTION
AutoYaraJobEditOut

Edit confirmation containing the job ID.

Source code in psengine/malware_intel/auto_yara_mgr.py
@debug_call
@validate_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=AutoYaraJobEditError)
def edit_rule_str(
    self,
    job_id: Annotated[str, Doc('The job ID to which the Auto Yara rule belongs.')],
    yara_rule_str: Annotated[str, Doc('The new YARA rule string value to apply.')],
) -> Annotated[AutoYaraJobEditOut, Doc('Edit confirmation containing the job ID.')]:
    """Edit an existing Yara rule job by modifying its YARA rule string.

    Endpoint:
        `/malware-intelligence/v1/auto-yara/jobs/edit`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        AutoYaraJobEditError: If API error occurs.
    """
    data = self.rf_client.request(
        'post', EP_AUTO_YARA_JOBS_EDIT, {'job_id': job_id, 'yara_rule_str': yara_rule_str}
    ).json()
    return AutoYaraJobEditOut.model_validate(data)

delete_rule_job

delete_rule_job(job_id: str) -> AutoYaraJobDeleteOut

Delete a created Auto Yara job and with it the generated YARA rule.

PARAMETER DESCRIPTION
job_id

The job ID to delete.

TYPE: str

Endpoint

/malware-intelligence/v1/auto-yara/jobs/{job_id}

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

AutoYaraJobDeletionError

If API error occurs.

RETURNS DESCRIPTION
AutoYaraJobDeleteOut

A confirmation of deletion.

Source code in psengine/malware_intel/auto_yara_mgr.py
@debug_call
@validate_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=AutoYaraJobDeletionError)
def delete_rule_job(
    self,
    job_id: Annotated[str, Doc('The job ID to delete.')],
) -> Annotated[AutoYaraJobDeleteOut, Doc('A confirmation of deletion.')]:
    """Delete a created Auto Yara job and with it the generated YARA rule.

    Endpoint:
        `/malware-intelligence/v1/auto-yara/jobs/{job_id}`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        AutoYaraJobDeletionError: If API error occurs.
    """
    data = self.rf_client.request('delete', EP_AUTO_YARA_JOB_ID.format(job_id)).json()
    return AutoYaraJobDeleteOut.model_validate(data)

retry_failed_rule_job

retry_failed_rule_job(job_id: str) -> AutoYaraJobRetryOut

Retry a failed Auto YARA rule generation job.

PARAMETER DESCRIPTION
job_id

The job ID to retry.

TYPE: str

Endpoint

/malware-intelligence/v1/auto-yara/jobs/{job_id}/retry

RAISES DESCRIPTION
ValidationError

If any supplied parameter is of incorrect type.

AutoYaraJobRetryError

If API error occurs.

RETURNS DESCRIPTION
AutoYaraJobRetryOut

A confirmation of retry.

Source code in psengine/malware_intel/auto_yara_mgr.py
@debug_call
@validate_call
@connection_exceptions(ignore_status_code=[], exception_to_raise=AutoYaraJobRetryError)
def retry_failed_rule_job(
    self,
    job_id: Annotated[str, Doc('The job ID to retry.')],
) -> Annotated[AutoYaraJobRetryOut, Doc('A confirmation of retry.')]:
    """Retry a failed Auto YARA rule generation job.

    Endpoint:
        `/malware-intelligence/v1/auto-yara/jobs/{job_id}/retry`

    Raises:
        ValidationError: If any supplied parameter is of incorrect type.
        AutoYaraJobRetryError: If API error occurs.
    """
    data = self.rf_client.request('post', EP_AUTO_YARA_JOB_ID_RETRY.format(job_id)).json()
    return AutoYaraJobRetryOut.model_validate(data)