Skip to content
Snippets Groups Projects
Unverified Commit 2b03fb7b authored by Stefano Borini's avatar Stefano Borini Committed by GitHub
Browse files

Merge pull request #161 from force-h2020/extract-execution-module

Refactoring: Moves execution to a separate module.
parents 796041af 70ae2a00
No related branches found
No related tags found
No related merge requests found
......@@ -11,6 +11,7 @@ from .core.kpi_specification import KPISpecification # noqa
from .core.execution_layer import ExecutionLayer # noqa
from .core.verifier import verify_workflow # noqa
from .core.verifier import VerifierError # noqa
from .core.execution import execute_layer, execute_workflow # noqa
from .data_sources.base_data_source_model import BaseDataSourceModel # noqa
from .data_sources.base_data_source import BaseDataSource # noqa
......
import logging
from force_bdss.core.data_value import DataValue
log = logging.getLogger(__name__)
def execute_workflow(workflow, data_values):
"""Executes the given workflow using the list of data values.
Returns a list of data values for the KPI results
Parameters
----------
workflow: Workflow
The instance of the workflow
data_values: List
The data values that the MCO generally provides.
Returns
-------
list: A list of DataValues containing the KPI results.
"""
available_data_values = data_values[:]
for index, layer in enumerate(workflow.execution_layers):
log.info("Computing data layer {}".format(index))
ds_results = execute_layer(layer, available_data_values)
available_data_values += ds_results
log.info("Aggregating KPI data")
kpi_results = []
kpi_names = [kpi.name for kpi in workflow.mco.kpis]
kpi_results = [
dv
for dv in available_data_values
if dv.name in kpi_names
]
return kpi_results
def execute_layer(layer, environment_data_values):
"""Helper routine.
Performs the evaluation of a single layer.
At the moment we have a single layer of DataSources followed
by a single layer of KPI calculators.
Parameters
----------
layer: ExecutionLayer
A list of the models for all the data sources
environment_data_values: list
A list of data values to submit to the evaluators.
NOTE: The above parameter is going to go away as soon as we move
to unlimited layers and remove the distinction between data sources
and KPI calculators.
"""
results = []
for model in layer.data_sources:
factory = model.factory
try:
data_source = factory.create_data_source()
except Exception:
log.exception(
"Unable to create data source from factory '{}' "
"in plugin '{}'. This may indicate a programming "
"error in the plugin".format(
factory.id,
factory.plugin.id))
raise
# Get the slots for this data source. These must be matched to
# the appropriate values in the environment data values.
# Matching is by position.
in_slots, out_slots = data_source.slots(model)
# Binding performs the extraction of the specified data values
# satisfying the above input slots from the environment data values
# considering what the user specified in terms of names (which is
# in the model input slot info
# The resulting data are the ones picked by name from the
# environment data values, and in the appropriate ordering as
# needed by the input slots.
passed_data_values = _bind_data_values(
environment_data_values,
model.input_slot_info,
in_slots)
# execute data source, passing only relevant data values.
log.info("Evaluating for Data Source {}".format(
factory.name))
log.info("Passed values:")
for idx, dv in enumerate(passed_data_values):
log.info("{}: {}".format(idx, dv))
try:
res = data_source.run(model, passed_data_values)
except Exception:
log.exception(
"Evaluation could not be performed. "
"Run method raised exception.")
raise
if not isinstance(res, list):
error_txt = (
"The run method of data source {} must return a list."
" It returned instead {}. Fix the run() method to return"
" the appropriate entity.".format(
factory.name,
type(res)
))
log.error(error_txt)
raise RuntimeError(error_txt)
if len(res) != len(out_slots):
error_txt = (
"The number of data values ({} values) returned"
" by '{}' does not match the number"
" of output slots it specifies ({} values)."
" This is likely a plugin error.").format(
len(res), factory.name, len(out_slots)
)
log.error(error_txt)
raise RuntimeError(error_txt)
if len(res) != len(model.output_slot_info):
error_txt = (
"The number of data values ({} values) returned"
" by '{}' does not match the number"
" of user-defined names specified ({} values)."
" This is either a plugin error or a file"
" error.").format(
len(res),
factory.name,
len(model.output_slot_info)
)
log.error(error_txt)
raise RuntimeError(error_txt)
for idx, dv in enumerate(res):
if not isinstance(dv, DataValue):
error_txt = (
"The result list returned by DataSource {} contains"
" an entry that is not a DataValue. An entry of type"
" {} was instead found in position {}."
" Fix the DataSource.run() method"
" to return the appropriate entity.".format(
factory.name,
type(dv),
idx
)
)
log.error(error_txt)
raise RuntimeError(error_txt)
# At this point, the returned data values are unnamed.
# Add the names as specified by the user.
for dv, output_slot_info in zip(res, model.output_slot_info):
dv.name = output_slot_info.name
# If the name was not specified, simply discard the value,
# because apparently the user is not interested in it.
res = [r for r in res if r.name != ""]
results.extend(res)
log.info("Returned values:")
for idx, dv in enumerate(res):
log.info("{}: {}".format(idx, dv))
# Finally, return all the computed data values from all evaluators,
# properly named.
return results
def _bind_data_values(available_data_values,
model_slot_map,
slots):
"""
Given the named data values in the environment, the slots a given
data source expects, and the user-specified names for each of these
slots, returns those data values with the requested names, ordered
in the correct order as specified by the slot map.
"""
passed_data_values = []
lookup_map = {dv.name: dv for dv in available_data_values}
if len(slots) != len(model_slot_map):
raise RuntimeError("The length of the slots is not equal to"
" the length of the slot map. This may"
" indicate a file error.")
try:
for slot, slot_map in zip(slots, model_slot_map):
passed_data_values.append(lookup_map[slot_map.name])
except KeyError:
raise RuntimeError(
"Unable to find requested name '{}' in available "
"data values. Current data value names: {}".format(
slot_map.name,
list(lookup_map.keys())))
return passed_data_values
import unittest
import testfixtures
import six
from force_bdss.core.execution_layer import ExecutionLayer
from force_bdss.core.kpi_specification import KPISpecification
from force_bdss.core.output_slot_info import OutputSlotInfo
from force_bdss.core.workflow import Workflow
from force_bdss.tests.probe_classes.data_source import ProbeDataSourceFactory
from force_bdss.core.input_slot_info import InputSlotInfo
from force_bdss.core.data_value import DataValue
from force_bdss.core.slot import Slot
from force_bdss.tests.probe_classes.factory_registry_plugin import \
ProbeFactoryRegistryPlugin
from force_bdss.tests.probe_classes.mco import ProbeMCOFactory
from force_bdss.core.execution import execute_workflow, execute_layer, \
_bind_data_values
class TestExecution(unittest.TestCase):
def setUp(self):
self.registry = ProbeFactoryRegistryPlugin()
self.plugin = self.registry.plugin
def test_bind_data_values(self):
data_values = [
DataValue(name="foo"),
DataValue(name="bar"),
DataValue(name="baz")
]
slot_map = (
InputSlotInfo(name="baz"),
InputSlotInfo(name="bar")
)
slots = (
Slot(),
Slot()
)
result = _bind_data_values(data_values, slot_map, slots)
self.assertEqual(result[0], data_values[2])
self.assertEqual(result[1], data_values[1])
# Check the errors. Only one slot map for two slots.
slot_map = (
InputSlotInfo(name="baz"),
)
with testfixtures.LogCapture():
with six.assertRaisesRegex(
self,
RuntimeError,
"The length of the slots is not equal to the length of"
" the slot map"):
_bind_data_values(data_values, slot_map, slots)
# missing value in the given data values.
slot_map = (
InputSlotInfo(name="blap"),
InputSlotInfo(name="bar")
)
with testfixtures.LogCapture():
with six.assertRaisesRegex(
self,
RuntimeError,
"Unable to find requested name 'blap' in available"
" data values."):
_bind_data_values(data_values, slot_map, slots)
def test_compute_layer_results(self):
data_values = [
DataValue(name="foo"),
DataValue(name="bar"),
DataValue(name="baz"),
DataValue(name="quux")
]
def run(self, *args, **kwargs):
return [DataValue(value=1), DataValue(value=2), DataValue(value=3)]
ds_factory = self.registry.data_source_factories[0]
ds_factory.input_slots_size = 2
ds_factory.output_slots_size = 3
ds_factory.run_function = run
evaluator_model = ds_factory.create_model()
evaluator_model.input_slot_info = [
InputSlotInfo(name="foo"),
InputSlotInfo(name="quux")
]
evaluator_model.output_slot_info = [
OutputSlotInfo(name="one"),
OutputSlotInfo(name=""),
OutputSlotInfo(name="three")
]
res = execute_layer(
ExecutionLayer(data_sources=[evaluator_model]),
data_values,
)
self.assertEqual(len(res), 2)
self.assertEqual(res[0].name, "one")
self.assertEqual(res[0].value, 1)
self.assertEqual(res[1].name, "three")
self.assertEqual(res[1].value, 3)
def test_multilayer_execution(self):
# The multilayer peforms the following execution
# layer 0: in1 + in2 | in3 + in4
# res1 res2
# layer 1: res1 + res2
# res3
# layer 2: res3 * res1
# res4
# layer 3: res4 * res2
# out1
# Final result should be
# out1 = ((in1 + in2 + in3 + in4) * (in1 + in2) * (in3 + in4)
data_values = [
DataValue(value=10, name="in1"),
DataValue(value=15, name="in2"),
DataValue(value=3, name="in3"),
DataValue(value=7, name="in4")
]
def adder(model, parameters):
first = parameters[0].value
second = parameters[1].value
return [DataValue(value=(first+second))]
adder_factory = ProbeDataSourceFactory(
self.plugin,
input_slots_size=2,
output_slots_size=1,
run_function=adder)
def multiplier(model, parameters):
first = parameters[0].value
second = parameters[1].value
return [DataValue(value=(first*second))]
multiplier_factory = ProbeDataSourceFactory(
self.plugin,
input_slots_size=2,
output_slots_size=1,
run_function=multiplier)
mco_factory = ProbeMCOFactory(self.plugin)
mco_model = mco_factory.create_model()
mco_model.kpis = [
KPISpecification(name="out1")
]
wf = Workflow(
mco=mco_model,
execution_layers=[
ExecutionLayer(),
ExecutionLayer(),
ExecutionLayer(),
ExecutionLayer()
]
)
# Layer 0
model = adder_factory.create_model()
model.input_slot_info = [
InputSlotInfo(name="in1"),
InputSlotInfo(name="in2")
]
model.output_slot_info = [
OutputSlotInfo(name="res1")
]
wf.execution_layers[0].data_sources.append(model)
model = adder_factory.create_model()
model.input_slot_info = [
InputSlotInfo(name="in3"),
InputSlotInfo(name="in4")
]
model.output_slot_info = [
OutputSlotInfo(name="res2")
]
wf.execution_layers[0].data_sources.append(model)
# layer 1
model = adder_factory.create_model()
model.input_slot_info = [
InputSlotInfo(name="res1"),
InputSlotInfo(name="res2")
]
model.output_slot_info = [
OutputSlotInfo(name="res3")
]
wf.execution_layers[1].data_sources.append(model)
# layer 2
model = multiplier_factory.create_model()
model.input_slot_info = [
InputSlotInfo(name="res3"),
InputSlotInfo(name="res1")
]
model.output_slot_info = [
OutputSlotInfo(name="res4")
]
wf.execution_layers[2].data_sources.append(model)
# layer 3
model = multiplier_factory.create_model()
model.input_slot_info = [
InputSlotInfo(name="res4"),
InputSlotInfo(name="res2")
]
model.output_slot_info = [
OutputSlotInfo(name="out1")
]
wf.execution_layers[3].data_sources.append(model)
kpi_results = execute_workflow(wf, data_values)
self.assertEqual(len(kpi_results), 1)
self.assertEqual(kpi_results[0].value, 8750)
......@@ -3,7 +3,7 @@ import logging
from traits.api import on_trait_change
from force_bdss.core.data_value import DataValue
from force_bdss.core.execution import execute_workflow
from force_bdss.ids import InternalPluginID
from .base_core_driver import BaseCoreDriver
......@@ -51,174 +51,6 @@ class CoreEvaluationDriver(BaseCoreDriver):
mco_communicator.send_to_mco(mco_model, kpi_results)
def execute_workflow(workflow, data_values):
"""Executes the given workflow using the list of data values.
Returns a list of data values for the KPI results
"""
available_data_values = data_values[:]
for index, layer in enumerate(workflow.execution_layers):
log.info("Computing data layer {}".format(index))
ds_results = _compute_layer_results(
available_data_values,
layer,
)
available_data_values += ds_results
log.info("Aggregating KPI data")
kpi_results = []
kpi_names = [kpi.name for kpi in workflow.mco.kpis]
kpi_results = [
dv
for dv in available_data_values
if dv.name in kpi_names
]
return kpi_results
def _compute_layer_results(environment_data_values,
layer,
):
"""Helper routine.
Performs the evaluation of a single layer.
At the moment we have a single layer of DataSources followed
by a single layer of KPI calculators.
Parameters
----------
environment_data_values: list
A list of data values to submit to the evaluators.
layer: ExecutionLayer
A list of the models for all the data sources
NOTE: The above parameter is going to go away as soon as we move
to unlimited layers and remove the distinction between data sources
and KPI calculators.
"""
results = []
for model in layer.data_sources:
factory = model.factory
try:
data_source = factory.create_data_source()
except Exception:
log.exception(
"Unable to create data source from factory '{}' "
"in plugin '{}'. This may indicate a programming "
"error in the plugin".format(
factory.id,
factory.plugin.id))
raise
# Get the slots for this data source. These must be matched to
# the appropriate values in the environment data values.
# Matching is by position.
in_slots, out_slots = data_source.slots(model)
# Binding performs the extraction of the specified data values
# satisfying the above input slots from the environment data values
# considering what the user specified in terms of names (which is
# in the model input slot info
# The resulting data are the ones picked by name from the
# environment data values, and in the appropriate ordering as
# needed by the input slots.
passed_data_values = _bind_data_values(
environment_data_values,
model.input_slot_info,
in_slots)
# execute data source, passing only relevant data values.
log.info("Evaluating for Data Source {}".format(
factory.name))
log.info("Passed values:")
for idx, dv in enumerate(passed_data_values):
log.info("{}: {}".format(idx, dv))
try:
res = data_source.run(model, passed_data_values)
except Exception:
log.exception(
"Evaluation could not be performed. "
"Run method raised exception.")
raise
if not isinstance(res, list):
error_txt = (
"The run method of data source {} must return a list."
" It returned instead {}. Fix the run() method to return"
" the appropriate entity.".format(
factory.name,
type(res)
))
log.error(error_txt)
raise RuntimeError(error_txt)
if len(res) != len(out_slots):
error_txt = (
"The number of data values ({} values) returned"
" by '{}' does not match the number"
" of output slots it specifies ({} values)."
" This is likely a plugin error.").format(
len(res), factory.name, len(out_slots)
)
log.error(error_txt)
raise RuntimeError(error_txt)
if len(res) != len(model.output_slot_info):
error_txt = (
"The number of data values ({} values) returned"
" by '{}' does not match the number"
" of user-defined names specified ({} values)."
" This is either a plugin error or a file"
" error.").format(
len(res),
factory.name,
len(model.output_slot_info)
)
log.error(error_txt)
raise RuntimeError(error_txt)
for idx, dv in enumerate(res):
if not isinstance(dv, DataValue):
error_txt = (
"The result list returned by DataSource {} contains"
" an entry that is not a DataValue. An entry of type"
" {} was instead found in position {}."
" Fix the DataSource.run() method"
" to return the appropriate entity.".format(
factory.name,
type(dv),
idx
)
)
log.error(error_txt)
raise RuntimeError(error_txt)
# At this point, the returned data values are unnamed.
# Add the names as specified by the user.
for dv, output_slot_info in zip(res, model.output_slot_info):
dv.name = output_slot_info.name
# If the name was not specified, simply discard the value,
# because apparently the user is not interested in it.
res = [r for r in res if r.name != ""]
results.extend(res)
log.info("Returned values:")
for idx, dv in enumerate(res):
log.info("{}: {}".format(idx, dv))
# Finally, return all the computed data values from all evaluators,
# properly named.
return results
def _get_data_values_from_mco(model, communicator):
"""Helper method.
Receives the data (in order) from the MCO, and bind them to the
......@@ -255,33 +87,3 @@ def _get_data_values_from_mco(model, communicator):
# Exclude those who have no name set.
return [dv for dv in mco_data_values if dv.name != ""]
def _bind_data_values(available_data_values,
model_slot_map,
slots):
"""
Given the named data values in the environment, the slots a given
data source expects, and the user-specified names for each of these
slots, returns those data values with the requested names, ordered
in the correct order as specified by the slot map.
"""
passed_data_values = []
lookup_map = {dv.name: dv for dv in available_data_values}
if len(slots) != len(model_slot_map):
raise RuntimeError("The length of the slots is not equal to"
" the length of the slot map. This may"
" indicate a file error.")
try:
for slot, slot_map in zip(slots, model_slot_map):
passed_data_values.append(lookup_map[slot_map.name])
except KeyError:
raise RuntimeError(
"Unable to find requested name '{}' in available "
"data values. Current data value names: {}".format(
slot_map.name,
list(lookup_map.keys())))
return passed_data_values
......@@ -3,19 +3,11 @@ import unittest
import testfixtures
import six
from force_bdss.core.execution_layer import ExecutionLayer
from force_bdss.core.kpi_specification import KPISpecification
from force_bdss.core.output_slot_info import OutputSlotInfo
from force_bdss.core.workflow import Workflow
from force_bdss.tests.probe_classes.factory_registry_plugin import \
ProbeFactoryRegistryPlugin
from force_bdss.tests.probe_classes.data_source import ProbeDataSourceFactory
from force_bdss.core.input_slot_info import InputSlotInfo
from force_bdss.core.data_value import DataValue
from force_bdss.core.slot import Slot
from force_bdss.tests import fixtures
from force_bdss.tests.probe_classes.mco import ProbeMCOFactory
try:
import mock
......@@ -25,10 +17,7 @@ except ImportError:
from envisage.api import Application
from force_bdss.core_evaluation_driver import (
CoreEvaluationDriver,
execute_workflow,
_bind_data_values,
_compute_layer_results
CoreEvaluationDriver
)
......@@ -131,207 +120,6 @@ class TestCoreEvaluationDriver(unittest.TestCase):
" the number of user-defined names"):
driver.application_started()
def test_bind_data_values(self):
data_values = [
DataValue(name="foo"),
DataValue(name="bar"),
DataValue(name="baz")
]
slot_map = (
InputSlotInfo(name="baz"),
InputSlotInfo(name="bar")
)
slots = (
Slot(),
Slot()
)
result = _bind_data_values(data_values, slot_map, slots)
self.assertEqual(result[0], data_values[2])
self.assertEqual(result[1], data_values[1])
# Check the errors. Only one slot map for two slots.
slot_map = (
InputSlotInfo(name="baz"),
)
with testfixtures.LogCapture():
with six.assertRaisesRegex(
self,
RuntimeError,
"The length of the slots is not equal to the length of"
" the slot map"):
_bind_data_values(data_values, slot_map, slots)
# missing value in the given data values.
slot_map = (
InputSlotInfo(name="blap"),
InputSlotInfo(name="bar")
)
with testfixtures.LogCapture():
with six.assertRaisesRegex(
self,
RuntimeError,
"Unable to find requested name 'blap' in available"
" data values."):
_bind_data_values(data_values, slot_map, slots)
def test_compute_layer_results(self):
data_values = [
DataValue(name="foo"),
DataValue(name="bar"),
DataValue(name="baz"),
DataValue(name="quux")
]
def run(self, *args, **kwargs):
return [DataValue(value=1), DataValue(value=2), DataValue(value=3)]
ds_factory = self.registry.data_source_factories[0]
ds_factory.input_slots_size = 2
ds_factory.output_slots_size = 3
ds_factory.run_function = run
evaluator_model = ds_factory.create_model()
evaluator_model.input_slot_info = [
InputSlotInfo(name="foo"),
InputSlotInfo(name="quux")
]
evaluator_model.output_slot_info = [
OutputSlotInfo(name="one"),
OutputSlotInfo(name=""),
OutputSlotInfo(name="three")
]
res = _compute_layer_results(
data_values,
ExecutionLayer(data_sources=[evaluator_model]),
)
self.assertEqual(len(res), 2)
self.assertEqual(res[0].name, "one")
self.assertEqual(res[0].value, 1)
self.assertEqual(res[1].name, "three")
self.assertEqual(res[1].value, 3)
def test_multilayer_execution(self):
# The multilayer peforms the following execution
# layer 0: in1 + in2 | in3 + in4
# res1 res2
# layer 1: res1 + res2
# res3
# layer 2: res3 * res1
# res4
# layer 3: res4 * res2
# out1
# Final result should be
# out1 = ((in1 + in2 + in3 + in4) * (in1 + in2) * (in3 + in4)
data_values = [
DataValue(value=10, name="in1"),
DataValue(value=15, name="in2"),
DataValue(value=3, name="in3"),
DataValue(value=7, name="in4")
]
def adder(model, parameters):
first = parameters[0].value
second = parameters[1].value
return [DataValue(value=(first+second))]
adder_factory = ProbeDataSourceFactory(
self.plugin,
input_slots_size=2,
output_slots_size=1,
run_function=adder)
def multiplier(model, parameters):
first = parameters[0].value
second = parameters[1].value
return [DataValue(value=(first*second))]
multiplier_factory = ProbeDataSourceFactory(
self.plugin,
input_slots_size=2,
output_slots_size=1,
run_function=multiplier)
mco_factory = ProbeMCOFactory(self.plugin)
mco_model = mco_factory.create_model()
mco_model.kpis = [
KPISpecification(name="out1")
]
wf = Workflow(
mco=mco_model,
execution_layers=[
ExecutionLayer(),
ExecutionLayer(),
ExecutionLayer(),
ExecutionLayer()
]
)
# Layer 0
model = adder_factory.create_model()
model.input_slot_info = [
InputSlotInfo(name="in1"),
InputSlotInfo(name="in2")
]
model.output_slot_info = [
OutputSlotInfo(name="res1")
]
wf.execution_layers[0].data_sources.append(model)
model = adder_factory.create_model()
model.input_slot_info = [
InputSlotInfo(name="in3"),
InputSlotInfo(name="in4")
]
model.output_slot_info = [
OutputSlotInfo(name="res2")
]
wf.execution_layers[0].data_sources.append(model)
# layer 1
model = adder_factory.create_model()
model.input_slot_info = [
InputSlotInfo(name="res1"),
InputSlotInfo(name="res2")
]
model.output_slot_info = [
OutputSlotInfo(name="res3")
]
wf.execution_layers[1].data_sources.append(model)
# layer 2
model = multiplier_factory.create_model()
model.input_slot_info = [
InputSlotInfo(name="res3"),
InputSlotInfo(name="res1")
]
model.output_slot_info = [
OutputSlotInfo(name="res4")
]
wf.execution_layers[2].data_sources.append(model)
# layer 3
model = multiplier_factory.create_model()
model.input_slot_info = [
InputSlotInfo(name="res4"),
InputSlotInfo(name="res2")
]
model.output_slot_info = [
OutputSlotInfo(name="out1")
]
wf.execution_layers[3].data_sources.append(model)
kpi_results = execute_workflow(wf, data_values)
self.assertEqual(len(kpi_results), 1)
self.assertEqual(kpi_results[0].value, 8750)
def test_mco_communicator_broken(self):
self.registry.mco_factories[0].raises_on_create_communicator = True
driver = CoreEvaluationDriver(
......@@ -367,9 +155,9 @@ class TestCoreEvaluationDriver(unittest.TestCase):
'Creating communicator'),
('force_bdss.core_evaluation_driver', 'INFO',
'Received data from MCO: \n whatever = 1.0 (AVERAGE)'),
('force_bdss.core_evaluation_driver', 'INFO',
('force_bdss.core.execution', 'INFO',
'Computing data layer 0'),
('force_bdss.core_evaluation_driver', 'ERROR',
('force_bdss.core.execution', 'ERROR',
'Unable to create data source from factory '
"'force.bdss.enthought.plugin.test.v0"
".factory.probe_data_source' in plugin "
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment