"""YAML-based attribute container definitions file."""
import yaml
from acstore import errors
from acstore.containers import interface
from acstore.helpers import schema
# TODO: merge this into interface.AttributeContainer once Plaso has been
# changed to no longer support attributes containers without a schema.
[docs]
class AttributeContainerWithSchema(interface.AttributeContainer):
"""Attribute container with schema."""
SCHEMA = {}
[docs]
class YAMLAttributeContainerDefinitionsFile:
"""YAML-based attribute container definitions file.
A YAML-based attribute container definitions file contains one or more
attribute container definitions. An attribute container definition consists
of:
name: windows_eventlog_message_file
attributes:
- name: path
type: str
- name: windows_path
type: str
Where:
* name, unique identifier of the attribute container;
* attributes, defines the attributes of the container.
"""
_SUPPORTED_DATA_TYPES = frozenset(
["AttributeContainerIdentifier", "bool", "int", "str", "timestamp"]
)
_SUPPORTED_KEYS = frozenset(["attributes", "name"])
def _ReadDefinition(self, definition_values):
"""Reads a definition from a dictionary.
Args:
definition_values (dict[str, object]): attribute container definition
values.
Returns:
AttributeContainer: an attribute container.
Raises:
ParseError: if the definition is not set or incorrect.
"""
if not definition_values:
raise errors.ParseError("Missing attribute container definition values.")
different_keys = set(definition_values) - self._SUPPORTED_KEYS
if different_keys:
different_keys = ", ".join(different_keys)
raise errors.ParseError(f"Undefined keys: {different_keys:s}")
container_name = definition_values.get("name")
if not container_name:
raise errors.ParseError(
"Invalid attribute container definition missing name."
)
attributes = definition_values.get("attributes")
if not attributes:
raise errors.ParseError(
f"Invalid attribute container definition: {container_name:s} "
f"missing attributes."
)
class_name = "".join([element.title() for element in container_name.split("_")])
class_attributes = {"CONTAINER_TYPE": container_name}
container_schema = {}
for attribute_index, attribute_values in enumerate(attributes):
attribute_name = attribute_values.get("name")
if not attribute_name:
raise errors.ParseError(
f"Invalid attribute container definition: {container_name:s} name "
f"missing of attribute: {attribute_index:d}."
)
if attribute_name in class_attributes:
raise errors.ParseError(
f"Invalid attribute container definition: {container_name:s} "
f"attribute: {attribute_name:s} already set."
)
attribute_data_type = attribute_values.get("type")
if not attribute_data_type:
raise errors.ParseError(
f"Invalid attribute container definition: {container_name:s} type "
f"missing of attribute: {attribute_name:s}."
)
if not schema.SchemaHelper.HasDataType(attribute_data_type):
raise errors.ParseError(
f"Invalid attribute container definition: {container_name:s} type "
f"attribute: {attribute_name:s} unsupported data type: "
f"{attribute_data_type:s}."
)
class_attributes[attribute_name] = None
container_schema[attribute_name] = attribute_data_type
class_attributes["SCHEMA"] = container_schema
# TODO: add support for _SERIALIZABLE_PROTECTED_ATTRIBUTES.
return type(class_name, (AttributeContainerWithSchema,), class_attributes)
def _ReadFromFileObject(self, file_object):
"""Reads the definitions from a file-like object.
Args:
file_object (file): definitions file-like object.
Yields:
AttributeContainer: an attribute container.
"""
yaml_generator = yaml.safe_load_all(file_object)
for yaml_definition in yaml_generator:
yield self._ReadDefinition(yaml_definition)
[docs]
def ReadFromFile(self, path):
"""Reads the definitions from a YAML file.
Args:
path (str): path to a definitions file.
Yields:
AttributeContainer: an attribute container.
"""
with open(path, encoding="utf-8") as file_object:
yield from self._ReadFromFileObject(file_object)