Source code for acstore.helpers.yaml_definitions_file

"""YAML-based attribute container definitions file."""

import yaml

from acstore import errors
from acstore.containers import interface
from acstore.helpers import schema


# TODO: merge this into interface.AttributeContainer once Plaso has been
# changed to no longer support attributes containers without a schema.
[docs] class AttributeContainerWithSchema(interface.AttributeContainer): """Attribute container with schema.""" SCHEMA = {}
[docs] class YAMLAttributeContainerDefinitionsFile: """YAML-based attribute container definitions file. A YAML-based attribute container definitions file contains one or more attribute container definitions. An attribute container definition consists of: name: windows_eventlog_message_file attributes: - name: path type: str - name: windows_path type: str Where: * name, unique identifier of the attribute container; * attributes, defines the attributes of the container. """ _SUPPORTED_DATA_TYPES = frozenset( ["AttributeContainerIdentifier", "bool", "int", "str", "timestamp"] ) _SUPPORTED_KEYS = frozenset(["attributes", "name"]) def _ReadDefinition(self, definition_values): """Reads a definition from a dictionary. Args: definition_values (dict[str, object]): attribute container definition values. Returns: AttributeContainer: an attribute container. Raises: ParseError: if the definition is not set or incorrect. """ if not definition_values: raise errors.ParseError("Missing attribute container definition values.") different_keys = set(definition_values) - self._SUPPORTED_KEYS if different_keys: different_keys = ", ".join(different_keys) raise errors.ParseError(f"Undefined keys: {different_keys:s}") container_name = definition_values.get("name") if not container_name: raise errors.ParseError( "Invalid attribute container definition missing name." ) attributes = definition_values.get("attributes") if not attributes: raise errors.ParseError( f"Invalid attribute container definition: {container_name:s} " f"missing attributes." ) class_name = "".join([element.title() for element in container_name.split("_")]) class_attributes = {"CONTAINER_TYPE": container_name} container_schema = {} for attribute_index, attribute_values in enumerate(attributes): attribute_name = attribute_values.get("name") if not attribute_name: raise errors.ParseError( f"Invalid attribute container definition: {container_name:s} name " f"missing of attribute: {attribute_index:d}." ) if attribute_name in class_attributes: raise errors.ParseError( f"Invalid attribute container definition: {container_name:s} " f"attribute: {attribute_name:s} already set." ) attribute_data_type = attribute_values.get("type") if not attribute_data_type: raise errors.ParseError( f"Invalid attribute container definition: {container_name:s} type " f"missing of attribute: {attribute_name:s}." ) if not schema.SchemaHelper.HasDataType(attribute_data_type): raise errors.ParseError( f"Invalid attribute container definition: {container_name:s} type " f"attribute: {attribute_name:s} unsupported data type: " f"{attribute_data_type:s}." ) class_attributes[attribute_name] = None container_schema[attribute_name] = attribute_data_type class_attributes["SCHEMA"] = container_schema # TODO: add support for _SERIALIZABLE_PROTECTED_ATTRIBUTES. return type(class_name, (AttributeContainerWithSchema,), class_attributes) def _ReadFromFileObject(self, file_object): """Reads the definitions from a file-like object. Args: file_object (file): definitions file-like object. Yields: AttributeContainer: an attribute container. """ yaml_generator = yaml.safe_load_all(file_object) for yaml_definition in yaml_generator: yield self._ReadDefinition(yaml_definition)
[docs] def ReadFromFile(self, path): """Reads the definitions from a YAML file. Args: path (str): path to a definitions file. Yields: AttributeContainer: an attribute container. """ with open(path, encoding="utf-8") as file_object: yield from self._ReadFromFileObject(file_object)