Skip to content
Snippets Groups Projects
Unverified Commit 6da08dad authored by Stefano Borini's avatar Stefano Borini Committed by GitHub
Browse files

Merge pull request #139 from force-h2020/verifier

Introduce Verifier to check if a model can run and if not, report the errors.
parents e98a5611 a29fc327
No related branches found
No related tags found
No related merge requests found
......@@ -26,7 +26,7 @@ class TestExecution(unittest.TestCase):
def test_plain_invocation_mco(self):
with cd(fixtures.dirpath()):
try:
subprocess.check_output(["force_bdss", "test_empty.json"],
subprocess.check_output(["force_bdss", '--help'],
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
self.fail("force_bdss returned error at plain invocation.")
......
import unittest
from force_bdss.core.execution_layer import ExecutionLayer
from force_bdss.core.input_slot_info import InputSlotInfo
from force_bdss.core.output_slot_info import OutputSlotInfo
from force_bdss.core.verifier import verify_workflow
from force_bdss.core.workflow import Workflow
from force_bdss.tests.dummy_classes.extension_plugin import \
DummyExtensionPlugin
class TestVerifier(unittest.TestCase):
def setUp(self):
self.plugin = DummyExtensionPlugin()
self.workflow = Workflow()
def test_empty_workflow(self):
wf = self.workflow
errors = verify_workflow(wf)
self.assertEqual(len(errors), 2)
self.assertEqual(errors[0].subject, wf)
self.assertIn("no MCO", errors[0].error)
self.assertEqual(errors[1].subject, wf)
self.assertIn("no execution layers", errors[1].error)
def test_no_mco_parameters(self):
wf = self.workflow
wf.mco = self.plugin.mco_factories[0].create_model()
errors = verify_workflow(wf)
self.assertEqual(len(errors), 2)
self.assertEqual(errors[0].subject, wf.mco)
self.assertIn("no defined parameters", errors[0].error)
def test_parameters_empty_names(self):
wf = self.workflow
mco_factory = self.plugin.mco_factories[0]
wf.mco = mco_factory.create_model()
parameter_factory = mco_factory.parameter_factories()[0]
wf.mco.parameters.append(parameter_factory.create_model())
errors = verify_workflow(wf)
self.assertEqual(len(errors), 3)
self.assertEqual(errors[0].subject, wf.mco.parameters[0])
self.assertIn("empty name", errors[0].error)
self.assertEqual(errors[1].subject, wf.mco.parameters[0])
self.assertIn("empty type", errors[1].error)
def test_data_sources(self):
wf = self.workflow
mco_factory = self.plugin.mco_factories[0]
wf.mco = mco_factory.create_model()
parameter_factory = mco_factory.parameter_factories()[0]
wf.mco.parameters.append(parameter_factory.create_model())
wf.mco.parameters[0].name = "name"
wf.mco.parameters[0].type = "type"
layer = ExecutionLayer()
wf.execution_layers.append(layer)
ds_factory = self.plugin.data_source_factories[0]
ds_model = ds_factory.create_model()
layer.data_sources.append(ds_model)
errors = verify_workflow(wf)
self.assertEqual(errors[0].subject, ds_model)
self.assertIn("Missing input slot name assignment", errors[0].error)
ds_model.input_slot_info.append(
InputSlotInfo(name="name")
)
errors = verify_workflow(wf)
self.assertEqual(errors[0].subject, ds_model)
self.assertIn("Missing output slot name assignment", errors[0].error)
ds_model.output_slot_info.append(
OutputSlotInfo(name="name")
)
errors = verify_workflow(wf)
self.assertEqual(len(errors), 0)
ds_model.input_slot_info[0].name = ''
errors = verify_workflow(wf)
self.assertEqual(len(errors), 1)
self.assertIn("Undefined name for input parameter", errors[0].error)
ds_model.output_slot_info[0].name = ''
errors = verify_workflow(wf)
self.assertEqual(len(errors), 2)
self.assertIn("Undefined name for output parameter", errors[1].error)
import logging
from traits.api import HasStrictTraits, Str, Any
logger = logging.getLogger(__name__)
class VerifierError(HasStrictTraits):
subject = Any()
error = Str()
def verify_workflow(workflow):
"""Verifies if the workflow can be executed, and specifies where the
error occurs and why.
"""
result = []
result.extend(_check_mco(workflow))
result.extend(_check_execution_layers(workflow))
return result
def _check_mco(workflow):
errors = []
if workflow.mco is None:
errors.append(
VerifierError(
subject=workflow,
error="Workflow has no MCO"
)
)
return errors
mco = workflow.mco
if len(mco.parameters) == 0:
errors.append(VerifierError(subject=mco,
error="MCO has no defined parameters"))
for param in mco.parameters:
if len(param.name.strip()) == 0:
errors.append(VerifierError(subject=param,
error="Parameter has empty name"))
if len(param.type.strip()) == 0:
errors.append(VerifierError(subject=param,
error="Parameter has empty type"))
return errors
def _check_execution_layers(workflow):
errors = []
layers = workflow.execution_layers
if len(layers) == 0:
errors.append(
VerifierError(
subject=workflow,
error="Workflow has no execution layers"
)
)
return errors
for layer in layers:
if len(layer.data_sources) == 0:
errors.append(VerifierError(subject=layer,
error="Layer has no data sources"))
for ds in layer.data_sources:
errors.extend(_check_data_source(ds))
return errors
def _check_data_source(data_source_model):
errors = []
factory = data_source_model.factory
try:
data_source = factory.create_data_source()
except Exception:
logger.exception("Unable to create data source from factory"
" '{}', plugin '{}'. This might indicate a "
"programming error".format(factory.id,
factory.plugin.id))
raise
try:
input_slots, output_slots = data_source.slots(data_source_model)
except Exception:
logger.exception(
"Unable to retrieve slot information from data source"
" created by factory '{}', plugin '{}'. This might "
"indicate a programming error.".format(
data_source.factory.id,
data_source.factory.plugin.id))
raise
if len(input_slots) != len(data_source_model.input_slot_info):
errors.append(VerifierError(
subject=data_source_model,
error="Missing input slot name assignment"))
for idx, info in enumerate(data_source_model.input_slot_info):
if len(info.name.strip()) == 0:
errors.append(VerifierError(
subject=data_source_model,
error="Undefined name for input parameter {}".format(idx)))
if len(output_slots) != len(data_source_model.output_slot_info):
errors.append(VerifierError(
subject=data_source_model,
error="Missing output slot name assignment"))
for idx, info in enumerate(data_source_model.output_slot_info):
if len(info.name.strip()) == 0:
errors.append(VerifierError(
subject=data_source_model,
error="Undefined name for output parameter {}".format(idx)))
return errors
......@@ -3,6 +3,7 @@ import logging
from traits.api import on_trait_change, Instance, List
from force_bdss.core.verifier import verify_workflow
from force_bdss.ids import InternalPluginID
from force_bdss.mco.base_mco import BaseMCO
from force_bdss.notification_listeners.base_notification_listener import \
......@@ -25,6 +26,21 @@ class CoreMCODriver(BaseCoreDriver):
@on_trait_change("application:started")
def application_started(self):
try:
workflow = self.workflow
except Exception:
log.exception("Unable to open workflow file.")
sys.exit(1)
errors = verify_workflow(workflow)
if len(errors) != 0:
log.error("Unable to execute workflow due to verification "
"errors :")
for err in errors:
log.error(err.error)
sys.exit(1)
try:
mco = self.mco
except Exception:
......@@ -51,13 +67,8 @@ class CoreMCODriver(BaseCoreDriver):
self.listeners[:] = []
def _mco_default(self):
try:
workflow = self.workflow
except Exception:
log.exception("Unable to open workflow file.")
raise
mco_model = workflow.mco
mco_model = self.workflow.mco
if mco_model is None:
log.info("No MCO defined. Nothing to do. Exiting.")
sys.exit(0)
......
from force_bdss.core.slot import Slot
from force_bdss.data_sources.base_data_source import BaseDataSource
from force_bdss.data_sources.base_data_source_factory import \
BaseDataSourceFactory
......@@ -9,7 +10,11 @@ class DummyDataSource(BaseDataSource):
pass
def slots(self, model):
return (), ()
return (
Slot(type="TYPE1"),
), (
Slot(type="TYPE2"),
)
class DummyDataSourceModel(BaseDataSourceModel):
......
{
"version": "1",
"workflow": {
"mco": {
"id": "force.bdss.enthought.plugin.test.v0.factory.dummy_mco",
"model_data": {
"parameters": [
{
"id": "force.bdss.enthought.plugin.test.v0.factory.dummy_mco.parameter.dummy_mco_parameter",
"model_data": {
"name": "foo",
"type": "PRESSURE"
}
}
]
}
},
"execution_layers": [
[
{
"id": "force.bdss.enthought.plugin.test.v0.factory.dummy_data_source",
"model_data": {
"power": 1.0,
"cuba_type_in": "PRESSURE",
"cuba_type_out": "PRESSURE",
"input_slot_info": [
{
"source": "Environment",
"name": "foo"
}
],
"output_slot_info": [
{
"name": "bar",
"is_kpi": true
}
]
}
}
]
],
"notification_listeners": [
]
}
}
{
"version": "1",
"workflow": {
"mco": {
"id": "force.bdss.enthought.plugin.test.v0.factory.probe_mco",
"model_data": {
"parameters": [
{
"id": "force.bdss.enthought.plugin.test.v0.factory.probe_mco.parameter.probe_mco_parameter",
"model_data": {
"name": "foo",
"type": "PRESSURE"
}
}
]
}
},
"execution_layers": [
[
{
"id": "force.bdss.enthought.plugin.test.v0.factory.probe_data_source",
"model_data": {
"input_slot_info": [
{
"source": "Environment",
"name": "foo"
}
],
"output_slot_info": [
{
"name": "bar",
"is_kpi": true
}
]
}
}
]
],
"notification_listeners": [
{
"id": "force.bdss.enthought.plugin.test.v0.factory.probe_notification_listener",
"model_data": {
}
}
]
}
}
......@@ -4,10 +4,11 @@ from force_bdss.api import (
BaseDataSourceFactory, BaseDataSourceModel, BaseDataSource,
Slot
)
from force_bdss.core.data_value import DataValue
def run_func(*args, **kwargs):
return []
def run_func(model, parameters):
return [DataValue() for _ in range(model.output_slots_size)]
class ProbeDataSource(BaseDataSource):
......@@ -35,8 +36,8 @@ class ProbeDataSourceModel(BaseDataSourceModel):
input_slots_type = Str('PRESSURE')
output_slots_type = Str('PRESSURE')
input_slots_size = Int(0)
output_slots_size = Int(0)
input_slots_size = Int(1)
output_slots_size = Int(1)
@on_trait_change('input_slots_type,output_slots_type,'
'input_slots_size,output_slots_size')
......@@ -51,8 +52,8 @@ class ProbeDataSourceFactory(BaseDataSourceFactory):
input_slots_type = Str('PRESSURE')
output_slots_type = Str('PRESSURE')
input_slots_size = Int(0)
output_slots_size = Int(0)
input_slots_size = Int(1)
output_slots_size = Int(1)
raises_on_create_model = Bool(False)
raises_on_create_data_source = Bool(False)
......
......@@ -52,7 +52,7 @@ class ProbeMCOCommunicator(BaseMCOCommunicator):
send_called = Bool(False)
receive_called = Bool(False)
nb_output_data_values = Int(0)
nb_output_data_values = Int(1)
def send_to_mco(self, model, kpi_results):
self.send_called = True
......@@ -60,12 +60,13 @@ class ProbeMCOCommunicator(BaseMCOCommunicator):
def receive_from_mco(self, model):
self.receive_called = True
return [
DataValue() for _ in range(self.nb_output_data_values)
DataValue(name="whatever", value=1.0)
for _ in range(self.nb_output_data_values)
]
class ProbeMCOFactory(BaseMCOFactory):
nb_output_data_values = Int(0)
nb_output_data_values = Int(1)
raises_on_create_model = Bool(False)
raises_on_create_optimizer = Bool(False)
......
......@@ -38,7 +38,7 @@ class TestCoreEvaluationDriver(unittest.TestCase):
application.get_plugin = mock.Mock(
return_value=self.registry
)
application.workflow_filepath = fixtures.get("test_null.json")
application.workflow_filepath = fixtures.get("test_probe.json")
self.mock_application = application
def test_initialization(self):
......@@ -49,7 +49,7 @@ class TestCoreEvaluationDriver(unittest.TestCase):
def test_error_for_non_matching_mco_parameters(self):
mco_factory = self.registry.mco_factories[0]
mco_factory.nb_output_data_values = 1
mco_factory.nb_output_data_values = 2
driver = CoreEvaluationDriver(
application=self.mock_application)
with testfixtures.LogCapture():
......@@ -62,7 +62,7 @@ class TestCoreEvaluationDriver(unittest.TestCase):
def test_error_for_incorrect_output_slots(self):
def run(self, *args, **kwargs):
return [DataValue()]
return [DataValue(), DataValue()]
ds_factory = self.registry.data_source_factories[0]
ds_factory.run_function = run
driver = CoreEvaluationDriver(application=self.mock_application)
......@@ -70,7 +70,7 @@ class TestCoreEvaluationDriver(unittest.TestCase):
with six.assertRaisesRegex(
self,
RuntimeError,
"The number of data values \(1 values\)"
"The number of data values \(2 values\)"
" returned by 'test_data_source' does not match"
" the number of output slots"):
driver.application_started()
......@@ -78,11 +78,11 @@ class TestCoreEvaluationDriver(unittest.TestCase):
def test_error_for_missing_ds_output_names(self):
def run(self, *args, **kwargs):
return [DataValue()]
return [DataValue(), DataValue()]
ds_factory = self.registry.data_source_factories[0]
ds_factory.run_function = run
ds_factory.output_slots_size = 1
ds_factory.output_slots_size = 2
driver = CoreEvaluationDriver(
application=self.mock_application,
)
......@@ -90,7 +90,7 @@ class TestCoreEvaluationDriver(unittest.TestCase):
with six.assertRaisesRegex(
self,
RuntimeError,
"The number of data values \(1 values\)"
"The number of data values \(2 values\)"
" returned by 'test_data_source' does not match"
" the number of user-defined names"):
driver.application_started()
......@@ -323,7 +323,7 @@ class TestCoreEvaluationDriver(unittest.TestCase):
('force_bdss.core_evaluation_driver', 'INFO',
'Creating communicator'),
('force_bdss.core_evaluation_driver', 'INFO',
'Received data from MCO: \n'),
'Received data from MCO: \n whatever = 1.0 (AVERAGE)'),
('force_bdss.core_evaluation_driver', 'INFO',
'Computing data layer 0'),
('force_bdss.core_evaluation_driver', 'ERROR',
......
......@@ -28,7 +28,7 @@ class TestCoreMCODriver(unittest.TestCase):
application.get_plugin = mock.Mock(
return_value=self.factory_registry_plugin
)
application.workflow_filepath = fixtures.get("test_null.json")
application.workflow_filepath = fixtures.get("test_probe.json")
self.mock_application = application
def test_initialization(self):
......@@ -202,3 +202,37 @@ class TestCoreMCODriver(unittest.TestCase):
"'force.bdss.enthought.plugin.test.v0'"
" raised exception. This might indicate "
'a programming error in the plugin.'),)
def test_nonexistent_file(self):
self.mock_application.workflow_filepath = fixtures.get(
"test_nonexistent.json")
driver = CoreMCODriver(
application=self.mock_application,
)
with LogCapture() as capture:
with self.assertRaises(SystemExit):
driver.application_started()
capture.check(
('force_bdss.core_mco_driver', 'ERROR',
'Unable to open workflow file.'),
)
def test_non_valid_file(self):
self.mock_application.workflow_filepath = fixtures.get(
"test_null.json")
driver = CoreMCODriver(
application=self.mock_application,
)
with LogCapture() as capture:
with self.assertRaises(SystemExit):
driver.application_started()
capture.check(
('force_bdss.core_mco_driver',
'ERROR',
'Unable to execute workflow due to verification errors :'),
('force_bdss.core_mco_driver', 'ERROR',
'MCO has no defined parameters'),
('force_bdss.core_mco_driver', 'ERROR',
'Missing input slot name assignment'),
('force_bdss.core_mco_driver', 'ERROR',
'Missing output slot name assignment'))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment