Skip to content
Snippets Groups Projects
Commit 8750e85f authored by Stefano Borini's avatar Stefano Borini
Browse files

Added accurate workflow writer and tests.

parent b2eeec05
No related branches found
No related tags found
1 merge request!29Extract io layer to writer/reader class
import unittest
import json
from six import StringIO
from force_bdss.bundle_registry_plugin import BundleRegistryPlugin
from force_bdss.io.workflow_reader import WorkflowReader
try:
import mock
except ImportError:
from unittest import mock
from force_bdss.id_generators import bundle_id
from force_bdss.io.workflow_writer import WorkflowWriter
from force_bdss.mco.base_mco_model import BaseMCOModel
from force_bdss.mco.i_multi_criteria_optimizer_bundle import \
IMultiCriteriaOptimizerBundle
from force_bdss.workspecs.workflow import Workflow
class TestWorkflowWriter(unittest.TestCase):
def setUp(self):
self.mock_registry = mock.Mock(spec=BundleRegistryPlugin)
mock_mco_bundle = mock.Mock(spec=IMultiCriteriaOptimizerBundle,
id=bundle_id("enthought", "mock"))
mock_mco_model = mock.Mock(
spec=BaseMCOModel,
bundle=mock_mco_bundle
)
mock_mco_bundle.create_model = mock.Mock(
return_value = mock_mco_model
)
self.mock_registry.mco_bundle_by_id = mock.Mock(
return_value=mock_mco_bundle)
def test_write(self):
wfwriter = WorkflowWriter()
fp = StringIO()
wf = self._create_mock_workflow()
wfwriter.write(wf, fp)
result = json.loads(fp.getvalue())
self.assertIn("version", result)
self.assertIn("workflow", result)
self.assertIn("multi_criteria_optimizer", result["workflow"])
self.assertIn("data_sources", result["workflow"])
self.assertIn("kpi_calculators", result["workflow"])
def test_write_and_read(self):
wfwriter = WorkflowWriter()
fp = StringIO()
wf = self._create_mock_workflow()
wfwriter.write(wf, fp)
fp.seek(0)
wfreader = WorkflowReader(self.mock_registry)
wf_result = wfreader.read(fp)
self.assertEqual(wf_result.multi_criteria_optimizer.bundle.id,
wf.multi_criteria_optimizer.bundle.id)
def _create_mock_workflow(self):
wf = Workflow()
wf.multi_criteria_optimizer = BaseMCOModel(
mock.Mock(
spec=IMultiCriteriaOptimizerBundle,
id=bundle_id("enthought", "mock")))
return wf
...@@ -60,7 +60,13 @@ class WorkflowReader(HasStrictTraits): ...@@ -60,7 +60,13 @@ class WorkflowReader(HasStrictTraits):
def _extract_mco(self, json_data): def _extract_mco(self, json_data):
registry = self.bundle_registry registry = self.bundle_registry
mco_id = json_data["multi_criteria_optimizer"]["id"] mco_data = json_data.get("multi_criteria_optimizer")
if mco_data is None:
# The file was saved without setting an MCO.
# The file is valid, we simply can't run any optimization yet.
return None
mco_id = mco_data["id"]
mco_bundle = registry.mco_bundle_by_id(mco_id) mco_bundle = registry.mco_bundle_by_id(mco_id)
return mco_bundle.create_model( return mco_bundle.create_model(
json_data["multi_criteria_optimizer"]["model_data"]) json_data["multi_criteria_optimizer"]["model_data"])
......
...@@ -5,31 +5,33 @@ from ..bundle_registry_plugin import BundleRegistryPlugin ...@@ -5,31 +5,33 @@ from ..bundle_registry_plugin import BundleRegistryPlugin
class WorkflowWriter(HasStrictTraits): class WorkflowWriter(HasStrictTraits):
bundle_registry = Instance(BundleRegistryPlugin)
def __init__(self, bundle_registry, *args, **kwargs):
self.bundle_registry = bundle_registry
super(WorkflowWriter, self).__init__(*args, **kwargs)
def write(self, workflow, f): def write(self, workflow, f):
data = { data = {
"version": "1", "version": "1",
"workflow": {}
} }
data["multi_criteria_optimizer"] = \ wf_data = data["workflow"]
workflow.multi_criteria_optimizer.__getstate__() wf_data["multi_criteria_optimizer"] = {
"id": workflow.multi_criteria_optimizer.bundle.id,
"model_data": workflow.multi_criteria_optimizer.__getstate__()
}
kpic_data = [] kpic_data = []
for kpic in workflow.kpi_calculators: for kpic in workflow.kpi_calculators:
kpic_data.append(kpic.__getstate__()) kpic_data.append({
"id": kpic.bundle.id,
"model_data": kpic.__getstate__()}
)
data["kpi_calculators"] = kpic_data wf_data["kpi_calculators"] = kpic_data
ds_data = [] ds_data = []
for ds in workflow.data_sources: for ds in workflow.data_sources:
ds_data.append(ds.__getstate__()) ds_data.append({
"id": ds.bundle.id,
"model_data": ds.__getstate__()
})
data["data_sources"] = ds_data wf_data["data_sources"] = ds_data
json.dump(data, f) json.dump(data, f)
...@@ -6,6 +6,6 @@ from ..mco.base_mco_model import BaseMCOModel ...@@ -6,6 +6,6 @@ from ..mco.base_mco_model import BaseMCOModel
class Workflow(HasStrictTraits): class Workflow(HasStrictTraits):
multi_criteria_optimizer = Instance(BaseMCOModel) multi_criteria_optimizer = Instance(BaseMCOModel, allow_none=True)
data_sources = List(BaseDataSourceModel) data_sources = List(BaseDataSourceModel)
kpi_calculators = List(BaseKPICalculatorModel) kpi_calculators = List(BaseKPICalculatorModel)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment