Skip to content
Snippets Groups Projects

Communicative workflow reader

Merged Adham Hashibon requested to merge communicative-workflow-reader into master
1 file
+ 1
1
Compare changes
  • Side-by-side
  • Inline
@@ -9,20 +9,34 @@ from force_bdss.core.output_slot_info import OutputSlotInfo
from force_bdss.core.workflow import Workflow
from ..factory_registry_plugin import IFactoryRegistryPlugin
log = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
SUPPORTED_FILE_VERSIONS = ["1"]
class InvalidFileException(Exception):
"""Raised if the file is invalid for some reason"""
class BaseWorkflowReaderException(Exception):
"""Base exception for the reader errors."""
class InvalidVersionException(InvalidFileException):
class InvalidFileException(BaseWorkflowReaderException):
"""Raised for a generic file being invalid for some
reason, e.g. incorrect format or missing keys.
"""
class InvalidVersionException(BaseWorkflowReaderException):
"""Raised if the version tag does not satisfy the currently
supported list."""
class MissingPluginException(BaseWorkflowReaderException):
"""Raised if the file requires a plugin we cannot find."""
class ModelInstantiationFailedException(BaseWorkflowReaderException):
"""Raised if we can't instantiate the model from a plugin"""
class WorkflowReader(HasStrictTraits):
"""
Reads the workflow from a file.
@@ -67,18 +81,30 @@ class WorkflowReader(HasStrictTraits):
------
InvalidFileException
Raised if the file is corrupted or cannot be read by this reader.
InvalidVersionException
Raised if the version is not supported.
MissingPluginException
The file cannot be opened because it needs a plugin that is not
available.
ModelInstantiationFailedException
When instantiating the model for a given plugin, an exception
occurred. This is likely due to a coding error in the plugin.
"""
json_data = json.load(file)
try:
version = json_data["version"]
except KeyError:
log.error("File missing version information")
logger.error("File missing version information")
raise InvalidFileException("Corrupted input file, no version"
" specified")
if version not in SUPPORTED_FILE_VERSIONS:
log.error(
logger.error(
"File contains version {} that is not in the "
"list of supported versions {}".format(
version, SUPPORTED_FILE_VERSIONS)
@@ -95,13 +121,13 @@ class WorkflowReader(HasStrictTraits):
wf.notification_listeners[:] = \
self._extract_notification_listeners(wf_data)
except KeyError as e:
log.error("Could not read file {}".format(file), exc_info=True)
raise InvalidFileException(
msg = (
"Could not read file {}. "
"Unable to find key {}."
"The plugin responsible for the missing "
"key may be missing or broken.".format(file, e)
)
"Unable to find key {}. "
"The file might be corrupted or unsupported.".format(file, e))
logger.exception(msg)
raise InvalidFileException(msg)
return wf
def _extract_mco(self, wf_data):
@@ -127,13 +153,30 @@ class WorkflowReader(HasStrictTraits):
return None
mco_id = mco_data["id"]
mco_factory = registry.mco_factory_by_id(mco_id)
try:
mco_factory = registry.mco_factory_by_id(mco_id)
except KeyError:
raise MissingPluginException(
"Could not read file. "
"The plugin responsible for the missing "
"key '{}' may be missing or broken.".format(mco_id)
)
model_data = wf_data["mco"]["model_data"]
model_data["parameters"] = self._extract_mco_parameters(
mco_id,
model_data["parameters"])
model = mco_factory.create_model(
wf_data["mco"]["model_data"])
try:
model = mco_factory.create_model(model_data)
except Exception as e:
msg = (
"Unable to create model for MCO {}: {}. "
"This is likely due to a coding error in the plugin. "
"Check the logs for more information.".format(
mco_id, e))
logger.exception(msg)
raise ModelInstantiationFailedException(msg)
return model
def _extract_execution_layers(self, wf_data):
@@ -157,7 +200,16 @@ class WorkflowReader(HasStrictTraits):
for ds_entry in el_entry:
ds_id = ds_entry["id"]
ds_factory = registry.data_source_factory_by_id(ds_id)
try:
ds_factory = registry.data_source_factory_by_id(ds_id)
except KeyError:
raise MissingPluginException(
"Could not read file. "
"The plugin responsible for the missing data source "
"key '{}' may be missing or broken.".format(ds_id)
)
model_data = ds_entry["model_data"]
model_data["input_slot_info"] = self._extract_input_slot_info(
model_data["input_slot_info"]
@@ -166,8 +218,21 @@ class WorkflowReader(HasStrictTraits):
self._extract_output_slot_info(
model_data["output_slot_info"]
)
layer.data_sources.append(
ds_factory.create_model(model_data))
try:
ds_model = ds_factory.create_model(model_data)
except Exception as e:
msg = (
"Unable to create model for DataSource {} : {}. "
"This is likely due to a coding "
"error in the plugin. Check the logs for more "
"information.".format(ds_id, e)
)
logger.exception(msg)
raise ModelInstantiationFailedException(msg)
layer.data_sources.append(ds_model)
layers.append(layer)
return layers
@@ -189,9 +254,29 @@ class WorkflowReader(HasStrictTraits):
parameters = []
for p in parameters_data:
id = p["id"]
factory = registry.mco_parameter_factory_by_id(mco_id, id)
model = factory.create_model(p["model_data"])
parameter_id = p["id"]
try:
factory = registry.mco_parameter_factory_by_id(
mco_id, parameter_id)
except KeyError:
raise MissingPluginException(
"Could not read file. "
"The plugin responsible for the missing MCO '{}' "
"parameter key '{}' may be missing or broken.".format(
mco_id, parameter_id)
)
try:
model = factory.create_model(p["model_data"])
except Exception as e:
msg = (
"Unable to create model for MCO {} parameter {} : {}. "
"This is likely due to an error in the plugin. "
"Check the logs for more information.".format(
mco_id, parameter_id, e))
logger.exception(msg)
raise ModelInstantiationFailedException(msg)
parameters.append(model)
return parameters
@@ -207,8 +292,29 @@ class WorkflowReader(HasStrictTraits):
listeners = []
for nl_entry in wf_data["notification_listeners"]:
nl_id = nl_entry["id"]
nl_factory = registry.notification_listener_factory_by_id(nl_id)
try:
nl_factory = registry.notification_listener_factory_by_id(
nl_id)
except KeyError:
raise MissingPluginException(
"Could not read file. "
"The plugin responsible for the missing "
"notification listener key '{}' may be missing "
"or broken.".format(nl_id)
)
model_data = nl_entry["model_data"]
listeners.append(nl_factory.create_model(model_data))
try:
model = nl_factory.create_model(model_data)
except Exception as e:
msg = (
"Unable to create model for Notification Listener "
"{} : {}. This is likely due to an error in the plugin. "
"Check the logs for more information.".format(nl_id, e))
logger.exception(msg)
raise ModelInstantiationFailedException(msg)
listeners.append(model)
return listeners
Loading