Skip to content
Snippets Groups Projects
workflow_reader.py 6.25 KiB
Newer Older

from traits.api import HasStrictTraits, Instance

from ..bundle_registry_plugin import BundleRegistryPlugin
from ..workspecs.workflow import Workflow

SUPPORTED_FILE_VERSIONS = ["1"]


class InvalidFileException(Exception):
    """Raised if the file is invalid for some reason"""


class InvalidVersionException(InvalidFileException):
    """Raised if the version tag does not satisfy the currently
    supported list."""


class WorkflowReader(HasStrictTraits):
    """
    Reads the workflow from a file.
    """
    #: The bundle registry. The reader needs it to create the
    #: bundle-specific model objects.
    bundle_registry = Instance(BundleRegistryPlugin)
    def __init__(self,
                 bundle_registry,
                 *args,
                 **kwargs):
        """Initializes the reader.

        Parameters
        ----------
        bundle_registry: BundleRegistryPlugin
            The bundle registry that provides lookup services
            for a bundle identified by a given id.
        """
        self.bundle_registry = bundle_registry

        super(WorkflowReader, self).__init__(*args, **kwargs)

    def read(self, file):
        """Reads the file and returns a Workflow object.
        If any problem is found, raises an InvalidFileException or a
        derived, more specialized exception.

        Parameters
        ----------
        file: File
            A file object containing the data of the workflow in the
            appropriate json format.

        Returns
        -------
        Workflow
            An instance of the model tree, rooted at Workflow.

        Raises
        ------
        InvalidFileException
            Raised if the file is corrupted or cannot be read by this reader.
        """
        json_data = json.load(file)

        try:
            version = json_data["version"]
        except KeyError:
            logging.error("File missing version information")
            raise InvalidFileException("Corrupted input file, no version"
                                       " specified")

        if version not in SUPPORTED_FILE_VERSIONS:
            logging.error(
                "File contains version {} that is not in the "
                "list of supported versions {}".format(
Stefano Borini's avatar
Stefano Borini committed
                    version, SUPPORTED_FILE_VERSIONS)
            raise InvalidVersionException(
                "File version {} not supported".format(json_data["version"]))

        wf = Workflow()

Stefano Borini's avatar
Stefano Borini committed
            wf_data = json_data["workflow"]
            wf.mco = self._extract_mco(wf_data)
Stefano Borini's avatar
Stefano Borini committed
            wf.data_sources[:] = self._extract_data_sources(wf_data)
            wf.kpi_calculators[:] = self._extract_kpi_calculators(wf_data)
        except KeyError as e:
            logging.exception("Could not read file")
            raise InvalidFileException("Could not read file. "
                                       "Unable to find key {}".format(e))
Stefano Borini's avatar
Stefano Borini committed
        return wf
    def _extract_mco(self, wf_data):
        """Extracts the MCO from the workflow dictionary data.

        Parameters
        ----------
        wf_data: dict
            the content of the workflow key in the top level dictionary data.

        Returns
        -------
        a BaseMCOModel instance of the bundle-specific MCO driver, or None
        if no MCO is specified in the file (as in the case of premature
        saving).
        """
        registry = self.bundle_registry

        mco_data = wf_data.get("mco")
        if mco_data is None:
            # The file was saved without setting an MCO.
            # The file is valid, we simply can't run any optimization yet.
            return None

        mco_id = mco_data["id"]
        mco_bundle = registry.mco_bundle_by_id(mco_id)
        model_data = wf_data["mco"]["model_data"]
        model_data["parameters"] = self._extract_mco_parameters(
            mco_id,
            model_data["parameters"])
Stefano Borini's avatar
Stefano Borini committed
        model = mco_bundle.create_model(
            wf_data["mco"]["model_data"])
Stefano Borini's avatar
Stefano Borini committed
        return model

    def _extract_data_sources(self, wf_data):
        """Extracts the data sources from the workflow dictionary data.
        Parameters
        ----------
        wf_data: dict
            the content of the workflow key in the top level dictionary data.

        Returns
        -------
        list of BaseDataSourceModel instances. Each BaseDataSourceModel is an
        instance of the bundle specific model class. The list can be empty.
        """
        registry = self.bundle_registry

        data_sources = []
        for ds_entry in wf_data["data_sources"]:
            ds_id = ds_entry["id"]
            ds_bundle = registry.data_source_bundle_by_id(ds_id)
            data_sources.append(ds_bundle.create_model(ds_entry["model_data"]))

        return data_sources
    def _extract_kpi_calculators(self, wf_data):
        """Extracts the KPI calculators from the workflow dictionary data.

        Parameters
        ----------
        wf_data: dict
            the content of the workflow key in the top level dictionary data.

        Returns
        -------
        list of BaseKPICalculatorModel instances. Each BaseKPICalculatorModel
        is an instance of the bundle specific model class. The list can be
        empty.
        """
        registry = self.bundle_registry

        kpi_calculators = []
        for kpic_entry in wf_data["kpi_calculators"]:
            kpic_id = kpic_entry["id"]
            kpic_bundle = registry.kpi_calculator_bundle_by_id(kpic_id)

            kpi_calculators.append(
                kpic_bundle.create_model(kpic_entry["model_data"]))

        return kpi_calculators
    def _extract_mco_parameters(self, mco_id, parameters_data):
Stefano Borini's avatar
Stefano Borini committed
        """Extracts the MCO parameters from the data as dictionary.

        Parameters
        ----------
        parameters_data: dict
            The content of the parameter data key in the MCO model data.

        Returns
        -------
        List of instances of a subclass of BaseMCOParameter
        """
        registry = self.bundle_registry

        parameters = []

        for p in parameters_data:
            id = p["id"]
            factory = registry.mco_parameter_factory_by_id(mco_id, id)
            model = factory.create_model(p["model_data"])
            parameters.append(model)