diff --git a/force_bdss/core/workflow.py b/force_bdss/core/workflow.py index 8cdb20c21436f9bb8c386992a31b18ed842b3a99..c0750a25c72354392a8150c4b5d2db8ee81d56e0 100644 --- a/force_bdss/core/workflow.py +++ b/force_bdss/core/workflow.py @@ -13,9 +13,12 @@ class Workflow(HasStrictTraits): #: Can be None if no MCO has been specified yet. mco = Instance(BaseMCOModel, allow_none=True) - #: Contains the factory-specific DataSource Model objects. - #: The list can be empty - data_sources = List(BaseDataSourceModel) + #: The execution layers. Execution starts from the first layer, + #: where all data sources are executed in sequence. It then passes all + #: the computed data to the second layer, then the third etc. + #: For now, the final execution is performed by the KPI layer, but this + #: will go away when we remove the KPI calculators. + execution_layers = List(List(BaseDataSourceModel)) #: Contains the factory-specific KPI Calculator Model objects. #: The list can be empty diff --git a/force_bdss/core_evaluation_driver.py b/force_bdss/core_evaluation_driver.py index 96a717383b1d019a225e99f315e37c06cb99d2be..289c8c1de0941b88070774cfc594a9c781f79e31 100644 --- a/force_bdss/core_evaluation_driver.py +++ b/force_bdss/core_evaluation_driver.py @@ -41,21 +41,34 @@ class CoreEvaluationDriver(BaseCoreDriver): mco_data_values = _get_data_values_from_mco( mco_model, mco_communicator) - log.info("Computing data layer 1") + kpi_results = execute_workflow(workflow, mco_data_values) + + mco_communicator.send_to_mco(mco_model, kpi_results) + + +def execute_workflow(workflow, data_values): + """Executes the given workflow using the list of data values. + Returns a list of data values for the KPI results + """ + + available_data_values = data_values[:] + for index, layer in enumerate(workflow.execution_layers): + log.info("Computing data layer {}".format(index)) ds_results = _compute_layer_results( - mco_data_values, - workflow.data_sources, + available_data_values, + layer, "create_data_source" ) + available_data_values += ds_results - log.info("Computing data layer 2") - kpi_results = _compute_layer_results( - ds_results + mco_data_values, - workflow.kpi_calculators, - "create_kpi_calculator" - ) + log.info("Computing KPI layer") + kpi_results = _compute_layer_results( + available_data_values, + workflow.kpi_calculators, + "create_kpi_calculator" + ) - mco_communicator.send_to_mco(mco_model, kpi_results) + return kpi_results def _compute_layer_results(environment_data_values, diff --git a/force_bdss/io/tests/test_workflow_writer.py b/force_bdss/io/tests/test_workflow_writer.py index 74a8dbf90ff1b776f1bccbc9f40a73462a1d16cf..fc197c306921d8ff7fe6de1ecd6340249398b7c6 100644 --- a/force_bdss/io/tests/test_workflow_writer.py +++ b/force_bdss/io/tests/test_workflow_writer.py @@ -3,6 +3,10 @@ import unittest from six import StringIO +from force_bdss.data_sources.base_data_source_factory import \ + BaseDataSourceFactory +from force_bdss.data_sources.base_data_source_model import BaseDataSourceModel +from force_bdss.data_sources.i_data_source_factory import IDataSourceFactory from force_bdss.factory_registry_plugin import FactoryRegistryPlugin from force_bdss.io.workflow_reader import WorkflowReader from force_bdss.mco.parameters.base_mco_parameter import BaseMCOParameter @@ -36,6 +40,13 @@ class TestWorkflowWriter(unittest.TestCase): self.mock_registry.mco_factory_by_id = mock.Mock( return_value=mock_mco_factory) + datasource_factory = BaseDataSourceFactory( + id=factory_id("enthought", "mock2"), plugin=None) + + self.mock_registry.data_source_factory_by_id = mock.Mock( + return_value=datasource_factory + ) + def test_write(self): wfwriter = WorkflowWriter() fp = StringIO() @@ -45,7 +56,7 @@ class TestWorkflowWriter(unittest.TestCase): self.assertIn("version", result) self.assertIn("workflow", result) self.assertIn("mco", result["workflow"]) - self.assertIn("data_sources", result["workflow"]) + self.assertIn("execution_layers", result["workflow"]) self.assertIn("kpi_calculators", result["workflow"]) def test_write_and_read(self): @@ -58,6 +69,9 @@ class TestWorkflowWriter(unittest.TestCase): wf_result = wfreader.read(fp) self.assertEqual(wf_result.mco.factory.id, wf.mco.factory.id) + self.assertEqual(len(wf_result.execution_layers), 2) + self.assertEqual(len(wf_result.execution_layers[0]), 2) + self.assertEqual(len(wf_result.execution_layers[1]), 1) def _create_mock_workflow(self): wf = Workflow() @@ -73,6 +87,18 @@ class TestWorkflowWriter(unittest.TestCase): ) ) ] + wf.execution_layers = [ + [ + BaseDataSourceModel(mock.Mock(spec=IDataSourceFactory, + id=factory_id("enthought", "mock2"))), + BaseDataSourceModel(mock.Mock(spec=IDataSourceFactory, + id=factory_id("enthought", "mock2"))), + ], + [ + BaseDataSourceModel(mock.Mock(spec=IDataSourceFactory, + id=factory_id("enthought", "mock2"))) + ] + ] return wf def test_write_and_read_empty_workflow(self): diff --git a/force_bdss/io/workflow_reader.py b/force_bdss/io/workflow_reader.py index 3222733c555f34f27ddd63d155906cc878df44a9..4f0ce4a06f3b67ef57a795c7e1214e27199a3f75 100644 --- a/force_bdss/io/workflow_reader.py +++ b/force_bdss/io/workflow_reader.py @@ -89,7 +89,7 @@ class WorkflowReader(HasStrictTraits): try: wf_data = json_data["workflow"] wf.mco = self._extract_mco(wf_data) - wf.data_sources[:] = self._extract_data_sources(wf_data) + wf.execution_layers[:] = self._extract_execution_layers(wf_data) wf.kpi_calculators[:] = self._extract_kpi_calculators(wf_data) wf.notification_listeners[:] = \ self._extract_notification_listeners(wf_data) @@ -131,7 +131,7 @@ class WorkflowReader(HasStrictTraits): wf_data["mco"]["model_data"]) return model - def _extract_data_sources(self, wf_data): + def _extract_execution_layers(self, wf_data): """Extracts the data sources from the workflow dictionary data. Parameters @@ -146,17 +146,21 @@ class WorkflowReader(HasStrictTraits): """ registry = self.factory_registry - data_sources = [] - for ds_entry in wf_data["data_sources"]: - ds_id = ds_entry["id"] - ds_factory = registry.data_source_factory_by_id(ds_id) - model_data = ds_entry["model_data"] - model_data["input_slot_maps"] = self._extract_input_slot_maps( - model_data["input_slot_maps"] - ) - data_sources.append(ds_factory.create_model(model_data)) - - return data_sources + layers = [] + for el_entry in wf_data["execution_layers"]: + layer = [] + + for ds_entry in el_entry: + ds_id = ds_entry["id"] + ds_factory = registry.data_source_factory_by_id(ds_id) + model_data = ds_entry["model_data"] + model_data["input_slot_maps"] = self._extract_input_slot_maps( + model_data["input_slot_maps"] + ) + layer.append(ds_factory.create_model(model_data)) + layers.append(layer) + + return layers def _extract_kpi_calculators(self, wf_data): """Extracts the KPI calculators from the workflow dictionary data. diff --git a/force_bdss/io/workflow_writer.py b/force_bdss/io/workflow_writer.py index a1e6f8212135de20ff8710f769d79e36983dc26e..22a7f185040f75afffe1b066c1c3111608a72459 100644 --- a/force_bdss/io/workflow_writer.py +++ b/force_bdss/io/workflow_writer.py @@ -28,9 +28,9 @@ class WorkflowWriter(HasStrictTraits): "kpi_calculators": [ self._model_data(kpic) for kpic in workflow.kpi_calculators], - "data_sources": [ - self._model_data(ds) - for ds in workflow.data_sources], + "execution_layers": [ + self._execution_layer_data(el) + for el in workflow.execution_layers], "notification_listeners": [ self._model_data(nl) for nl in workflow.notification_listeners @@ -65,6 +65,15 @@ class WorkflowWriter(HasStrictTraits): data["model_data"]["parameters"] = parameters_data return data + def _execution_layer_data(self, layer): + """Extracts the execution layer list of DataSource models""" + data = [] + + for ds in layer: + data.append(self._model_data(ds)) + + return data + def _model_data(self, model): """ Extracts the data from an external model and returns its dictionary diff --git a/force_bdss/tests/fixtures/test_empty.json b/force_bdss/tests/fixtures/test_empty.json index 4984d568725e4fca080c28f36d2fc9bf72bf1cca..00c6b2f2c28e9e39c2fad25d947c2b0cf13c5d38 100644 --- a/force_bdss/tests/fixtures/test_empty.json +++ b/force_bdss/tests/fixtures/test_empty.json @@ -2,7 +2,7 @@ "version": "1", "workflow": { "mco": null, - "data_sources": [ + "execution_layers": [ ], "kpi_calculators": [ ], diff --git a/force_bdss/tests/fixtures/test_null.json b/force_bdss/tests/fixtures/test_null.json index 8a0a016aed40e63b7ae68981e7a89bf39ec1867a..409716123f2fc8280ff5fd17b7b83b0316aabd1d 100644 --- a/force_bdss/tests/fixtures/test_null.json +++ b/force_bdss/tests/fixtures/test_null.json @@ -8,16 +8,18 @@ ] } }, - "data_sources": [ - { - "id": "force.bdss.enthought.factory.test_ds", - "model_data": { - "input_slot_maps": [ - ], - "output_slot_names": [ - ] + "execution_layers": [ + [ + { + "id": "force.bdss.enthought.factory.test_ds", + "model_data": { + "input_slot_maps": [ + ], + "output_slot_names": [ + ] + } } - } + ] ], "kpi_calculators": [ { diff --git a/force_bdss/tests/test_core_evaluation_driver.py b/force_bdss/tests/test_core_evaluation_driver.py index a6084e73294e123a6570e3a2086b08cdb948729d..748d8b0a111ed2108861b8cdd52d945886f0c48d 100644 --- a/force_bdss/tests/test_core_evaluation_driver.py +++ b/force_bdss/tests/test_core_evaluation_driver.py @@ -3,6 +3,7 @@ import unittest import testfixtures import six +from force_bdss.core.workflow import Workflow from force_bdss.tests.probe_classes.factory_registry_plugin import \ ProbeFactoryRegistryPlugin from force_bdss.tests.probe_classes.mco import ProbeMCOFactory @@ -22,8 +23,12 @@ except ImportError: from envisage.api import Application -from force_bdss.core_evaluation_driver import CoreEvaluationDriver, \ - _bind_data_values, _compute_layer_results +from force_bdss.core_evaluation_driver import ( + CoreEvaluationDriver, + execute_workflow, + _bind_data_values, + _compute_layer_results +) class TestCoreEvaluationDriver(unittest.TestCase): @@ -225,6 +230,110 @@ class TestCoreEvaluationDriver(unittest.TestCase): self.assertEqual(res[1].name, "three") self.assertEqual(res[1].value, 3) + def test_multilayer_execution(self): + # The multilayer peforms the following execution + # layer 0: in1 + in2 | in3 + in4 + # res1 res2 + # layer 1: res1 + res2 + # res3 + # layer 2: res3 * res1 + # res4 + # kpi layer: res4 * res2 + # + # Final result should be + # ((in1 + in2 + in3 + in4) * (in1 + in2) * (in3 + in4) + + data_values = [ + DataValue(value=10, name="in1"), + DataValue(value=15, name="in2"), + DataValue(value=3, name="in3"), + DataValue(value=7, name="in4") + ] + + def adder(model, parameters): + + first = parameters[0].value + second = parameters[1].value + return [DataValue(value=(first+second))] + + adder_factory = ProbeDataSourceFactory( + None, + input_slots_size=2, + output_slots_size=1, + run_function=adder) + + def multiplier(model, parameters): + first = parameters[0].value + second = parameters[1].value + return [DataValue(value=(first*second))] + + multiplier_factory = ProbeDataSourceFactory( + None, + input_slots_size=2, + output_slots_size=1, + run_function=multiplier) + + multiplier_kpi_factory = ProbeKPICalculatorFactory( + None, + input_slots_size=2, + output_slots_size=1, + run_function=multiplier) + + wf = Workflow( + execution_layers=[ + [], + [], + [] + ] + ) + # Layer 0 + model = adder_factory.create_model() + model.input_slot_maps = [ + InputSlotMap(name="in1"), + InputSlotMap(name="in2") + ] + model.output_slot_names = ["res1"] + wf.execution_layers[0].append(model) + + model = adder_factory.create_model() + model.input_slot_maps = [ + InputSlotMap(name="in3"), + InputSlotMap(name="in4") + ] + model.output_slot_names = ["res2"] + wf.execution_layers[0].append(model) + + # layer 1 + model = adder_factory.create_model() + model.input_slot_maps = [ + InputSlotMap(name="res1"), + InputSlotMap(name="res2") + ] + model.output_slot_names = ["res3"] + wf.execution_layers[1].append(model) + + # layer 2 + model = multiplier_factory.create_model() + model.input_slot_maps = [ + InputSlotMap(name="res3"), + InputSlotMap(name="res1") + ] + model.output_slot_names = ["res4"] + wf.execution_layers[2].append(model) + + # KPI layer + model = multiplier_kpi_factory.create_model() + model.input_slot_maps = [ + InputSlotMap(name="res4"), + InputSlotMap(name="res2") + ] + model.output_slot_names = ["out1"] + wf.kpi_calculators.append(model) + + kpi_results = execute_workflow(wf, data_values) + self.assertEqual(len(kpi_results), 1) + self.assertEqual(kpi_results[0].value, 8750) + def test_empty_slot_name_skips_data_value(self): """Checks if leaving a slot name empty will skip the data value in the final output