Newer
Older
import json
from traits.api import HasStrictTraits, Instance
from force_bdss.core.input_slot_map import InputSlotInfo
from ..factory_registry_plugin import IFactoryRegistryPlugin
log = logging.getLogger(__name__)
class InvalidFileException(Exception):
"""Raised if the file is invalid for some reason"""
class InvalidVersionException(InvalidFileException):
"""Raised if the version tag does not satisfy the currently
supported list."""
class WorkflowReader(HasStrictTraits):
"""
Reads the workflow from a file.
"""
#: The Factory registry. The reader needs it to create the
#: specific model objects.
factory_registry = Instance(IFactoryRegistryPlugin)
"""Initializes the reader.
Parameters
----------
factory_registry: FactoryRegistryPlugin
The factory registry that provides lookup services
for a factory identified by a given id.
self.factory_registry = factory_registry
super(WorkflowReader, self).__init__(*args, **kwargs)
def read(self, file):
"""Reads the file and returns a Workflow object.
If any problem is found, raises an InvalidFileException or a
derived, more specialized exception.
Parameters
----------
file: File
A file object containing the data of the workflow in the
appropriate json format.
Returns
-------
Workflow
An instance of the model tree, rooted at Workflow.
Raises
------
InvalidFileException
Raised if the file is corrupted or cannot be read by this reader.
"""
json_data = json.load(file)
try:
version = json_data["version"]
except KeyError:
log.error("File missing version information")
raise InvalidFileException("Corrupted input file, no version"
" specified")
if version not in SUPPORTED_FILE_VERSIONS:
"File contains version {} that is not in the "
"list of supported versions {}".format(
raise InvalidVersionException(
"File version {} not supported".format(json_data["version"]))
wf = Workflow()
wf.execution_layers[:] = self._extract_execution_layers(wf_data)
wf.notification_listeners[:] = \
self._extract_notification_listeners(wf_data)
except KeyError as e:
raise InvalidFileException("Could not read file {}. "
"Unable to find key {}".format(file, e))
def _extract_mco(self, wf_data):
"""Extracts the MCO from the workflow dictionary data.
Parameters
----------
wf_data: dict
the content of the workflow key in the top level dictionary data.
Returns
-------
a BaseMCOModel instance of the specific MCO driver, or None
if no MCO is specified in the file (as in the case of premature
saving).
"""
registry = self.factory_registry
if mco_data is None:
# The file was saved without setting an MCO.
# The file is valid, we simply can't run any optimization yet.
return None
mco_id = mco_data["id"]
mco_factory = registry.mco_factory_by_id(mco_id)
model_data = wf_data["mco"]["model_data"]
model_data["parameters"] = self._extract_mco_parameters(
model = mco_factory.create_model(
def _extract_execution_layers(self, wf_data):
"""Extracts the data sources from the workflow dictionary data.
Parameters
----------
wf_data: dict
the content of the workflow key in the top level dictionary data.
Returns
-------
list of BaseDataSourceModel instances. Each BaseDataSourceModel is an
instance of the specific model class. The list can be empty.
registry = self.factory_registry
layers = []
for el_entry in wf_data["execution_layers"]:
layer = []
for ds_entry in el_entry:
ds_id = ds_entry["id"]
ds_factory = registry.data_source_factory_by_id(ds_id)
model_data = ds_entry["model_data"]
model_data["input_slot_info"] = self._extract_input_slot_info(
model_data["input_slot_info"]
)
layer.append(ds_factory.create_model(model_data))
layers.append(layer)
return layers
def _extract_kpi_calculators(self, wf_data):
"""Extracts the KPI calculators from the workflow dictionary data.
Parameters
----------
wf_data: dict
the content of the workflow key in the top level dictionary data.
Returns
-------
list of BaseKPICalculatorModel instances. Each BaseKPICalculatorModel
is an instance of the specific model class. The list can be
registry = self.factory_registry
kpi_calculators = []
for kpic_entry in wf_data["kpi_calculators"]:
kpic_id = kpic_entry["id"]
kpic_factory = registry.kpi_calculator_factory_by_id(kpic_id)
model_data = kpic_entry["model_data"]
model_data["input_slot_info"] = self._extract_input_slot_info(
model_data["input_slot_info"]
kpi_calculators.append(
kpic_factory.create_model(model_data)
return kpi_calculators
def _extract_mco_parameters(self, mco_id, parameters_data):
"""Extracts the MCO parameters from the data as dictionary.
Parameters
----------
parameters_data: dict
The content of the parameter data key in the MCO model data.
Returns
-------
List of instances of a subclass of BaseMCOParameter
"""
registry = self.factory_registry
parameters = []
for p in parameters_data:
id = p["id"]
factory = registry.mco_parameter_factory_by_id(mco_id, id)
model = factory.create_model(p["model_data"])
parameters.append(model)
def _extract_input_slot_info(self, info):
return [InputSlotInfo(**d) for d in info]
def _extract_notification_listeners(self, wf_data):
registry = self.factory_registry
listeners = []
for nl_entry in wf_data["notification_listeners"]:
nl_id = nl_entry["id"]
nl_factory = registry.notification_listener_factory_by_id(nl_id)
model_data = nl_entry["model_data"]
listeners.append(nl_factory.create_model(model_data))
return listeners