diff --git a/force_bdss/core/execution_layer.py b/force_bdss/core/execution_layer.py new file mode 100644 index 0000000000000000000000000000000000000000..2957e6ad88563662d44fd512ba90394e9bcb8e88 --- /dev/null +++ b/force_bdss/core/execution_layer.py @@ -0,0 +1,10 @@ +from traits.api import HasStrictTraits, List + +from force_bdss.data_sources.base_data_source_model import BaseDataSourceModel + + +class ExecutionLayer(HasStrictTraits): + """Represents a single layer in the execution stack. + It contains a list of the data source models that must be executed. + """ + data_source_models = List(BaseDataSourceModel) diff --git a/force_bdss/core/workflow.py b/force_bdss/core/workflow.py index d7d29231b9883a4493a91f8bc35290159b8c313f..f28748477079471d42568a2a7dddcdd0005a4b4f 100644 --- a/force_bdss/core/workflow.py +++ b/force_bdss/core/workflow.py @@ -1,6 +1,6 @@ from traits.api import HasStrictTraits, Instance, List -from force_bdss.data_sources.base_data_source_model import BaseDataSourceModel +from force_bdss.core.execution_layer import ExecutionLayer from force_bdss.mco.base_mco_model import BaseMCOModel from force_bdss.notification_listeners.base_notification_listener_model \ import BaseNotificationListenerModel @@ -15,9 +15,7 @@ class Workflow(HasStrictTraits): #: The execution layers. Execution starts from the first layer, #: where all data sources are executed in sequence. It then passes all #: the computed data to the second layer, then the third etc. - #: For now, the final execution is performed by the KPI layer, but this - #: will go away when we remove the KPI calculators. - execution_layers = List(List(BaseDataSourceModel)) + execution_layers = List(ExecutionLayer) #: Contains information about the listeners to be setup notification_listeners = List(BaseNotificationListenerModel) diff --git a/force_bdss/core_evaluation_driver.py b/force_bdss/core_evaluation_driver.py index 274e04c037c06fefbfbac4da48ae87ee2894f44c..4d31d305e514d4420cc2fd1f01850c1d5e4a3016 100644 --- a/force_bdss/core_evaluation_driver.py +++ b/force_bdss/core_evaluation_driver.py @@ -60,14 +60,14 @@ def execute_workflow(workflow, data_values): ) available_data_values += ds_results - log.info("Computing KPI layer") + log.info("Aggregating KPI data") kpi_results = [dv for dv in available_data_values if dv.is_kpi] return kpi_results def _compute_layer_results(environment_data_values, - evaluator_models, + layer, ): """Helper routine. Performs the evaluation of a single layer. @@ -79,7 +79,7 @@ def _compute_layer_results(environment_data_values, environment_data_values: list A list of data values to submit to the evaluators. - evaluator_models: list + layer: ExecutionLayer A list of the models for all the data sources NOTE: The above parameter is going to go away as soon as we move @@ -88,7 +88,7 @@ def _compute_layer_results(environment_data_values, """ results = [] - for model in evaluator_models: + for model in layer.data_source_models: factory = model.factory data_source = factory.create_data_source() diff --git a/force_bdss/core_mco_driver.py b/force_bdss/core_mco_driver.py index b7dfda63a0022798bf607871d456dec6e45afc6b..41d5771183e527eb229da2a625ba2ca7ef7c7ef0 100644 --- a/force_bdss/core_mco_driver.py +++ b/force_bdss/core_mco_driver.py @@ -57,7 +57,7 @@ class CoreMCODriver(BaseCoreDriver): def _deliver_start_event(self): output_names = [] for layer in self.workflow.execution_layers: - for data_source in layer: + for data_source in layer.data_source_models: output_names.extend(info.name for info in data_source.output_slot_info if info.is_kpi diff --git a/force_bdss/io/tests/test_workflow_writer.py b/force_bdss/io/tests/test_workflow_writer.py index db58071091bc91e32c9b7b7168a7517bcf98fcc5..bc3164efa07ea81209d38afba824841c7498d5dd 100644 --- a/force_bdss/io/tests/test_workflow_writer.py +++ b/force_bdss/io/tests/test_workflow_writer.py @@ -3,6 +3,7 @@ import unittest from six import StringIO +from force_bdss.core.execution_layer import ExecutionLayer from force_bdss.data_sources.base_data_source_factory import \ BaseDataSourceFactory from force_bdss.data_sources.base_data_source_model import BaseDataSourceModel @@ -69,8 +70,10 @@ class TestWorkflowWriter(unittest.TestCase): self.assertEqual(wf_result.mco.factory.id, wf.mco.factory.id) self.assertEqual(len(wf_result.execution_layers), 2) - self.assertEqual(len(wf_result.execution_layers[0]), 2) - self.assertEqual(len(wf_result.execution_layers[1]), 1) + self.assertEqual( + len(wf_result.execution_layers[0].data_source_models), 2) + self.assertEqual( + len(wf_result.execution_layers[1].data_source_models), 1) def _create_mock_workflow(self): wf = Workflow() @@ -87,16 +90,19 @@ class TestWorkflowWriter(unittest.TestCase): ) ] wf.execution_layers = [ - [ - BaseDataSourceModel(mock.Mock(spec=IDataSourceFactory, - id=factory_id("enthought", "mock2"))), - BaseDataSourceModel(mock.Mock(spec=IDataSourceFactory, - id=factory_id("enthought", "mock2"))), - ], - [ - BaseDataSourceModel(mock.Mock(spec=IDataSourceFactory, - id=factory_id("enthought", "mock2"))) - ] + ExecutionLayer(data_source_models=[ + BaseDataSourceModel( + mock.Mock(spec=IDataSourceFactory, + id=factory_id("enthought", "mock2"))), + BaseDataSourceModel( + mock.Mock(spec=IDataSourceFactory, + id=factory_id("enthought", "mock2"))), + ]), + ExecutionLayer(data_source_models=[ + BaseDataSourceModel( + mock.Mock(spec=IDataSourceFactory, + id=factory_id("enthought", "mock2"))) + ]) ] return wf diff --git a/force_bdss/io/workflow_reader.py b/force_bdss/io/workflow_reader.py index a63925c666a83385436e6f790bf21163f19bc63b..14f8adafd0e154bd13b64b0be8aa4ca3b2735aea 100644 --- a/force_bdss/io/workflow_reader.py +++ b/force_bdss/io/workflow_reader.py @@ -3,6 +3,7 @@ import logging from traits.api import HasStrictTraits, Instance +from force_bdss.core.execution_layer import ExecutionLayer from force_bdss.core.input_slot_info import InputSlotInfo from force_bdss.core.output_slot_info import OutputSlotInfo from force_bdss.core.workflow import Workflow @@ -148,7 +149,7 @@ class WorkflowReader(HasStrictTraits): layers = [] for el_entry in wf_data["execution_layers"]: - layer = [] + layer = ExecutionLayer() for ds_entry in el_entry: ds_id = ds_entry["id"] @@ -161,7 +162,8 @@ class WorkflowReader(HasStrictTraits): self._extract_output_slot_info( model_data["output_slot_info"] ) - layer.append(ds_factory.create_model(model_data)) + layer.data_source_models.append( + ds_factory.create_model(model_data)) layers.append(layer) return layers diff --git a/force_bdss/io/workflow_writer.py b/force_bdss/io/workflow_writer.py index f47981f7a77609adc6a472c9eb223e9265fad3aa..5248a9ec0a7a90b2ae68cc4a4bfc5dbbbfce1819 100644 --- a/force_bdss/io/workflow_writer.py +++ b/force_bdss/io/workflow_writer.py @@ -66,7 +66,7 @@ class WorkflowWriter(HasStrictTraits): """Extracts the execution layer list of DataSource models""" data = [] - for ds in layer: + for ds in layer.data_source_models: data.append(self._model_data(ds)) return data diff --git a/force_bdss/tests/test_core_evaluation_driver.py b/force_bdss/tests/test_core_evaluation_driver.py index ffbbf86b32c1e23e9b49ad89758eaf9a11c11460..05ee3b30e2830baba81556ac7bc733f5b2dfe5cd 100644 --- a/force_bdss/tests/test_core_evaluation_driver.py +++ b/force_bdss/tests/test_core_evaluation_driver.py @@ -3,6 +3,7 @@ import unittest import testfixtures import six +from force_bdss.core.execution_layer import ExecutionLayer from force_bdss.core.output_slot_info import OutputSlotInfo from force_bdss.core.workflow import Workflow from force_bdss.tests.probe_classes.factory_registry_plugin import \ @@ -179,7 +180,7 @@ class TestCoreEvaluationDriver(unittest.TestCase): res = _compute_layer_results( data_values, - [evaluator_model], + ExecutionLayer(data_source_models=[evaluator_model]), ) self.assertEqual(len(res), 2) self.assertEqual(res[0].name, "one") @@ -232,10 +233,10 @@ class TestCoreEvaluationDriver(unittest.TestCase): wf = Workflow( execution_layers=[ - [], - [], - [], - [] + ExecutionLayer(), + ExecutionLayer(), + ExecutionLayer(), + ExecutionLayer() ] ) # Layer 0 @@ -247,7 +248,7 @@ class TestCoreEvaluationDriver(unittest.TestCase): model.output_slot_info = [ OutputSlotInfo(name="res1") ] - wf.execution_layers[0].append(model) + wf.execution_layers[0].data_source_models.append(model) model = adder_factory.create_model() model.input_slot_info = [ @@ -257,7 +258,7 @@ class TestCoreEvaluationDriver(unittest.TestCase): model.output_slot_info = [ OutputSlotInfo(name="res2") ] - wf.execution_layers[0].append(model) + wf.execution_layers[0].data_source_models.append(model) # layer 1 model = adder_factory.create_model() @@ -268,7 +269,7 @@ class TestCoreEvaluationDriver(unittest.TestCase): model.output_slot_info = [ OutputSlotInfo(name="res3") ] - wf.execution_layers[1].append(model) + wf.execution_layers[1].data_source_models.append(model) # layer 2 model = multiplier_factory.create_model() @@ -279,7 +280,7 @@ class TestCoreEvaluationDriver(unittest.TestCase): model.output_slot_info = [ OutputSlotInfo(name="res4") ] - wf.execution_layers[2].append(model) + wf.execution_layers[2].data_source_models.append(model) # layer 3 model = multiplier_factory.create_model() @@ -290,7 +291,7 @@ class TestCoreEvaluationDriver(unittest.TestCase): model.output_slot_info = [ OutputSlotInfo(name="out1", is_kpi=True) ] - wf.execution_layers[3].append(model) + wf.execution_layers[3].data_source_models.append(model) kpi_results = execute_workflow(wf, data_values) self.assertEqual(len(kpi_results), 1)