Newer
Older
class WorkflowWriter(HasStrictTraits):
"""A Writer for writing the Workflow onto disk.
"""
"""Writes the workflow model object to a file f in JSON format.
Parameters
----------
workflow: Workflow
The Workflow instance to write to file
f: File
A file object on which to write the workflow, properly serialized
into JSON.
"""
data["workflow"] = self._workflow_data(workflow)
json.dump(data, f, indent=4)
def _workflow_data(self, workflow):
workflow_data = {
"mco": self._mco_data(workflow.mco),
"execution_layers": [
self._execution_layer_data(el)
for el in workflow.execution_layers],
"notification_listeners": [
self._model_data(nl)
for nl in workflow.notification_listeners
]
return workflow_data
def _mco_data(self, mco):
"""Extracts the data from the MCO object and returns its dictionary.
If the MCO is None, returns None"""
if mco is None:
return None
for param in data["model_data"]["parameters"]:
{
"id": param.factory.id,
data["model_data"]["parameters"] = parameters_data
kpis_data = []
for kpi in data["model_data"]["kpis"]:
kpis_data.append(
traits_to_dict(kpi)
)
data["model_data"]["kpis"] = kpis_data
def _execution_layer_data(self, layer):
"""Extracts the execution layer list of DataSource models"""
data = []
for ds in layer.data_sources:
data.append(self._model_data(ds))
return data
Extracts the data from an external model and returns its dictionary
"model_data": state