Skip to content
Snippets Groups Projects
Commit 55cdf912 authored by Stefano Borini's avatar Stefano Borini Committed by GitHub
Browse files

Merge pull request #86 from force-h2020/improve_event_handling

Improve event handling
parents 53bcc702 d1829c4b
No related branches found
No related tags found
No related merge requests found
...@@ -21,7 +21,7 @@ from .mco.i_mco_factory import IMCOFactory # noqa ...@@ -21,7 +21,7 @@ from .mco.i_mco_factory import IMCOFactory # noqa
from .mco.parameters.base_mco_parameter_factory import BaseMCOParameterFactory # noqa from .mco.parameters.base_mco_parameter_factory import BaseMCOParameterFactory # noqa
from .mco.parameters.base_mco_parameter import BaseMCOParameter # noqa from .mco.parameters.base_mco_parameter import BaseMCOParameter # noqa
from .mco.events import * # noqa from .core_driver_events import * # noqa
from .notification_listeners.i_notification_listener_factory import INotificationListenerFactory # noqa from .notification_listeners.i_notification_listener_factory import INotificationListenerFactory # noqa
from .notification_listeners.base_notification_listener import BaseNotificationListener # noqa from .notification_listeners.base_notification_listener import BaseNotificationListener # noqa
......
from traits.api import HasStrictTraits, Tuple
class BaseDriverEvent(HasStrictTraits):
""" Base event for the MCO driver."""
class MCOStartEvent(BaseDriverEvent):
""" The MCO driver should emit this event when the evaluation starts."""
input_names = Tuple()
output_names = Tuple()
class MCOFinishEvent(BaseDriverEvent):
""" The MCO driver should emit this event when the evaluation ends."""
class MCOProgressEvent(BaseDriverEvent):
""" The MCO driver should emit this event for every new evaluation that has
been completed. It carries data about the evaluation, specifically the
input data (MCO parameter values) and the resulting output (KPIs)."""
input = Tuple()
output = Tuple()
...@@ -14,6 +14,7 @@ from .io.workflow_reader import ( ...@@ -14,6 +14,7 @@ from .io.workflow_reader import (
InvalidVersionException, InvalidVersionException,
InvalidFileException InvalidFileException
) )
from .core_driver_events import MCOStartEvent, MCOFinishEvent, MCOProgressEvent
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
CORE_MCO_DRIVER_ID = plugin_id("core", "CoreMCODriver") CORE_MCO_DRIVER_ID = plugin_id("core", "CoreMCODriver")
...@@ -50,8 +51,27 @@ class CoreMCODriver(BaseCoreDriver): ...@@ -50,8 +51,27 @@ class CoreMCODriver(BaseCoreDriver):
mco_factory = mco_model.factory mco_factory = mco_model.factory
return mco_factory.create_optimizer() return mco_factory.create_optimizer()
@on_trait_change("mco:event") @on_trait_change("mco:started")
def _handle_mco_event(self, event): def _deliver_start_event(self):
output_names = []
for kpi in self.workflow.kpi_calculators:
output_names.extend(kpi.output_slot_names)
self._deliver_event(MCOStartEvent(
input_names=tuple(p.name for p in self.workflow.mco.parameters),
output_names=tuple(output_names)
))
@on_trait_change("mco:finished")
def _deliver_finished_event(self):
self._deliver_event(MCOFinishEvent())
@on_trait_change("mco:new_data")
def _deliver_mco_progress_event(self, data):
self._deliver_event(MCOProgressEvent(**data))
def _deliver_event(self, event):
""" Delivers an event to the listeners """
for listener in self.listeners[:]: for listener in self.listeners[:]:
try: try:
listener.deliver(event) listener.deliver(event)
......
...@@ -4,8 +4,6 @@ import itertools ...@@ -4,8 +4,6 @@ import itertools
import collections import collections
from force_bdss.api import BaseMCO from force_bdss.api import BaseMCO
from force_bdss.mco.events import MCOStartEvent, MCOFinishEvent, \
MCOProgressEvent
def rotated_range(start, stop, starting_value): def rotated_range(start, stop, starting_value):
...@@ -18,8 +16,6 @@ def rotated_range(start, stop, starting_value): ...@@ -18,8 +16,6 @@ def rotated_range(start, stop, starting_value):
class DummyDakotaOptimizer(BaseMCO): class DummyDakotaOptimizer(BaseMCO):
def run(self, model): def run(self, model):
self.notify_event(MCOStartEvent())
parameters = model.parameters parameters = model.parameters
values = [] values = []
...@@ -34,6 +30,7 @@ class DummyDakotaOptimizer(BaseMCO): ...@@ -34,6 +30,7 @@ class DummyDakotaOptimizer(BaseMCO):
application = self.factory.plugin.application application = self.factory.plugin.application
self.started = True
for value in value_iterator: for value in value_iterator:
ps = subprocess.Popen( ps = subprocess.Popen(
[sys.argv[0], [sys.argv[0],
...@@ -45,9 +42,9 @@ class DummyDakotaOptimizer(BaseMCO): ...@@ -45,9 +42,9 @@ class DummyDakotaOptimizer(BaseMCO):
out = ps.communicate( out = ps.communicate(
" ".join([str(v) for v in value]).encode("utf-8")) " ".join([str(v) for v in value]).encode("utf-8"))
out_data = out[0].decode("utf-8").split() out_data = out[0].decode("utf-8").split()
self.notify_event(MCOProgressEvent( self.new_data = {
input=tuple(value), 'input': tuple(value),
output=tuple(out_data), 'output': tuple(out_data)
)) }
self.notify_event(MCOFinishEvent()) self.finished = True
import abc import abc
from traits.api import ABCHasStrictTraits, Instance, Event from traits.api import ABCHasStrictTraits, Instance, Event, Dict, Str, Tuple
from force_bdss.mco.events import BaseMCOEvent
from .i_mco_factory import IMCOFactory from .i_mco_factory import IMCOFactory
...@@ -14,8 +13,14 @@ class BaseMCO(ABCHasStrictTraits): ...@@ -14,8 +13,14 @@ class BaseMCO(ABCHasStrictTraits):
#: A reference to the factory #: A reference to the factory
factory = Instance(IMCOFactory) factory = Instance(IMCOFactory)
#: Triggered when an event occurs. #: Triggered when the evaluation started.
event = Event(BaseMCOEvent) started = Event()
#: Triggered when the evaluation finished
finished = Event()
# Event triggered when the mco wants to send new data to listeners
new_data = Event(Dict(Str(), Tuple()))
def __init__(self, factory, *args, **kwargs): def __init__(self, factory, *args, **kwargs):
"""Initializes the MCO. """Initializes the MCO.
...@@ -39,18 +44,3 @@ class BaseMCO(ABCHasStrictTraits): ...@@ -39,18 +44,3 @@ class BaseMCO(ABCHasStrictTraits):
An instance of the model information, as created from An instance of the model information, as created from
create_model() create_model()
""" """
def notify_event(self, event):
"""Method based interface to deliver an event, instead of
assignment to traits.
Sends the event, synchronously. When the routine returns,
listeners have been fully informed (they might, however, handle
the event asynchronously at their convenience)
Parameters
----------
event: BaseMCOEvent
The event to deliver.
"""
self.event = event
from traits.api import HasStrictTraits, Tuple
class BaseMCOEvent(HasStrictTraits):
"""Base event for the MCO"""
class MCOStartEvent(BaseMCOEvent):
"""MCO should emit this event when the evaluation starts."""
class MCOFinishEvent(BaseMCOEvent):
"""MCO should emit this event when the evaluation ends."""
class MCOProgressEvent(BaseMCOEvent):
"""MCO should emit this event for every new evaluation that has been
completed. It carries data about the evaluation, specifically the
input data (MCO parameter values) and the resulting output (KPIs)."""
input = Tuple()
output = Tuple()
import unittest import unittest
from testfixtures import LogCapture from testfixtures import LogCapture
from force_bdss.mco.events import MCOStartEvent from force_bdss.core_driver_events import (
MCOStartEvent, MCOFinishEvent, MCOProgressEvent)
from force_bdss.notification_listeners.base_notification_listener import \ from force_bdss.notification_listeners.base_notification_listener import \
BaseNotificationListener BaseNotificationListener
from force_bdss.tests import fixtures from force_bdss.tests import fixtures
...@@ -47,16 +48,37 @@ class TestCoreMCODriver(unittest.TestCase): ...@@ -47,16 +48,37 @@ class TestCoreMCODriver(unittest.TestCase):
) )
self.assertEqual(len(driver.listeners), 1) self.assertEqual(len(driver.listeners), 1)
def test_event_handling(self): def test_start_event_handling(self):
driver = CoreMCODriver( driver = CoreMCODriver(
application=self.mock_application, application=self.mock_application,
) )
listener = driver.listeners[0] listener = driver.listeners[0]
mock_deliver = mock.Mock() mock_deliver = mock.Mock()
listener.__dict__["deliver"] = mock_deliver listener.__dict__["deliver"] = mock_deliver
event = MCOStartEvent() driver.mco.started = True
driver.mco.event = event self.assertIsInstance(mock_deliver.call_args[0][0], MCOStartEvent)
self.assertTrue(mock_deliver.call_args[0][0], event)
def test_finished_event_handling(self):
driver = CoreMCODriver(
application=self.mock_application,
)
listener = driver.listeners[0]
mock_deliver = mock.Mock()
listener.__dict__["deliver"] = mock_deliver
driver.mco.finished = True
self.assertIsInstance(mock_deliver.call_args[0][0], MCOFinishEvent)
def test_progress_event_handling(self):
driver = CoreMCODriver(
application=self.mock_application,
)
listener = driver.listeners[0]
mock_deliver = mock.Mock()
listener.__dict__["deliver"] = mock_deliver
driver.mco.new_data = {'input': (1, 2), 'output': (3, 4)}
self.assertIsInstance(mock_deliver.call_args[0][0], MCOProgressEvent)
self.assertEqual(mock_deliver.call_args[0][0].input, (1, 2))
self.assertEqual(mock_deliver.call_args[0][0].output, (3, 4))
def test_listener_init_exception(self): def test_listener_init_exception(self):
driver = CoreMCODriver( driver = CoreMCODriver(
...@@ -90,7 +112,7 @@ class TestCoreMCODriver(unittest.TestCase): ...@@ -90,7 +112,7 @@ class TestCoreMCODriver(unittest.TestCase):
listener.__dict__["deliver"] = mock_deliver listener.__dict__["deliver"] = mock_deliver
mock_deliver.side_effect = Exception() mock_deliver.side_effect = Exception()
with LogCapture() as capture: with LogCapture() as capture:
driver.mco.event = MCOStartEvent() driver.mco.started = True
self.assertTrue(mock_deliver.called) self.assertTrue(mock_deliver.called)
capture.check( capture.check(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment