Skip to content
Snippets Groups Projects
workflow_reader.py 7.46 KiB
Newer Older

from traits.api import HasStrictTraits, Instance

from force_bdss.core.input_slot_map import InputSlotInfo
Stefano Borini's avatar
Stefano Borini committed
from force_bdss.core.workflow import Workflow
from ..factory_registry_plugin import IFactoryRegistryPlugin
log = logging.getLogger(__name__)

SUPPORTED_FILE_VERSIONS = ["1"]


class InvalidFileException(Exception):
    """Raised if the file is invalid for some reason"""


class InvalidVersionException(InvalidFileException):
    """Raised if the version tag does not satisfy the currently
    supported list."""


class WorkflowReader(HasStrictTraits):
    """
    Reads the workflow from a file.
    """
    #: The Factory registry. The reader needs it to create the
    #: specific model objects.
    factory_registry = Instance(IFactoryRegistryPlugin)
                 factory_registry,
        """Initializes the reader.

        Parameters
        ----------
        factory_registry: FactoryRegistryPlugin
            The factory registry that provides lookup services
            for a factory identified by a given id.
        self.factory_registry = factory_registry

        super(WorkflowReader, self).__init__(*args, **kwargs)

    def read(self, file):
        """Reads the file and returns a Workflow object.
        If any problem is found, raises an InvalidFileException or a
        derived, more specialized exception.

        Parameters
        ----------
        file: File
            A file object containing the data of the workflow in the
            appropriate json format.

        Returns
        -------
        Workflow
            An instance of the model tree, rooted at Workflow.

        Raises
        ------
        InvalidFileException
            Raised if the file is corrupted or cannot be read by this reader.
        """
        json_data = json.load(file)

        try:
            version = json_data["version"]
        except KeyError:
            log.error("File missing version information")
            raise InvalidFileException("Corrupted input file, no version"
                                       " specified")

        if version not in SUPPORTED_FILE_VERSIONS:
                "File contains version {} that is not in the "
                "list of supported versions {}".format(
Stefano Borini's avatar
Stefano Borini committed
                    version, SUPPORTED_FILE_VERSIONS)
            raise InvalidVersionException(
                "File version {} not supported".format(json_data["version"]))

        wf = Workflow()

Stefano Borini's avatar
Stefano Borini committed
            wf_data = json_data["workflow"]
            wf.mco = self._extract_mco(wf_data)
            wf.execution_layers[:] = self._extract_execution_layers(wf_data)
Stefano Borini's avatar
Stefano Borini committed
            wf.kpi_calculators[:] = self._extract_kpi_calculators(wf_data)
            wf.notification_listeners[:] = \
                self._extract_notification_listeners(wf_data)
Stefano Borini's avatar
Stefano Borini committed
            log.error("Could not read file {}".format(file), exc_info=True)
Stefano Borini's avatar
Stefano Borini committed
            raise InvalidFileException("Could not read file {}. "
                                       "Unable to find key {}".format(file, e))
Stefano Borini's avatar
Stefano Borini committed
        return wf
    def _extract_mco(self, wf_data):
        """Extracts the MCO from the workflow dictionary data.

        Parameters
        ----------
        wf_data: dict
            the content of the workflow key in the top level dictionary data.

        Returns
        -------
        a BaseMCOModel instance of the specific MCO driver, or None
        if no MCO is specified in the file (as in the case of premature
        saving).
        """
        registry = self.factory_registry
        mco_data = wf_data.get("mco")
        if mco_data is None:
            # The file was saved without setting an MCO.
            # The file is valid, we simply can't run any optimization yet.
            return None

        mco_id = mco_data["id"]
        mco_factory = registry.mco_factory_by_id(mco_id)
        model_data = wf_data["mco"]["model_data"]
        model_data["parameters"] = self._extract_mco_parameters(
            mco_id,
            model_data["parameters"])
        model = mco_factory.create_model(
            wf_data["mco"]["model_data"])
Stefano Borini's avatar
Stefano Borini committed
        return model
    def _extract_execution_layers(self, wf_data):
        """Extracts the data sources from the workflow dictionary data.
        Parameters
        ----------
        wf_data: dict
            the content of the workflow key in the top level dictionary data.

        Returns
        -------
        list of BaseDataSourceModel instances. Each BaseDataSourceModel is an
        instance of the specific model class. The list can be empty.
        registry = self.factory_registry
        layers = []
        for el_entry in wf_data["execution_layers"]:
            layer = []

            for ds_entry in el_entry:
                ds_id = ds_entry["id"]
                ds_factory = registry.data_source_factory_by_id(ds_id)
                model_data = ds_entry["model_data"]
                model_data["input_slot_maps"] = self._extract_input_slot_maps(
                    model_data["input_slot_maps"]
                )
                layer.append(ds_factory.create_model(model_data))
            layers.append(layer)

        return layers
    def _extract_kpi_calculators(self, wf_data):
        """Extracts the KPI calculators from the workflow dictionary data.

        Parameters
        ----------
        wf_data: dict
            the content of the workflow key in the top level dictionary data.

        Returns
        -------
        list of BaseKPICalculatorModel instances. Each BaseKPICalculatorModel
        is an instance of the specific model class. The list can be
        registry = self.factory_registry
        for kpic_entry in wf_data["kpi_calculators"]:
            kpic_factory = registry.kpi_calculator_factory_by_id(kpic_id)
            model_data = kpic_entry["model_data"]
            model_data["input_slot_maps"] = self._extract_input_slot_maps(
                model_data["input_slot_maps"]
            )
                kpic_factory.create_model(model_data)
    def _extract_mco_parameters(self, mco_id, parameters_data):
Stefano Borini's avatar
Stefano Borini committed
        """Extracts the MCO parameters from the data as dictionary.

        Parameters
        ----------
        parameters_data: dict
            The content of the parameter data key in the MCO model data.

        Returns
        -------
        List of instances of a subclass of BaseMCOParameter
        """
        registry = self.factory_registry

        parameters = []

        for p in parameters_data:
            id = p["id"]
            factory = registry.mco_parameter_factory_by_id(mco_id, id)
            model = factory.create_model(p["model_data"])
            parameters.append(model)

    def _extract_input_slot_maps(self, maps_data):
        return [InputSlotInfo(**d) for d in maps_data]

    def _extract_notification_listeners(self, wf_data):
        registry = self.factory_registry
        listeners = []
        for nl_entry in wf_data["notification_listeners"]:
            nl_id = nl_entry["id"]
            nl_factory = registry.notification_listener_factory_by_id(nl_id)
            model_data = nl_entry["model_data"]
            listeners.append(nl_factory.create_model(model_data))

        return listeners