diff --git a/force_bdss/api.py b/force_bdss/api.py index 129f5ccd7c1e1e3048282abc9b8f0173bd735187..828570458e60ba2d9b117fc32f9279b2ec28696d 100644 --- a/force_bdss/api.py +++ b/force_bdss/api.py @@ -20,3 +20,12 @@ from .mco.i_mco_factory import IMCOFactory # noqa from .mco.parameters.base_mco_parameter_factory import BaseMCOParameterFactory # noqa from .mco.parameters.base_mco_parameter import BaseMCOParameter # noqa + +from .mco.events import * # noqa + +from .notification_listeners.i_notification_listener_factory import INotificationListenerFactory # noqa +from .notification_listeners.base_notification_listener import BaseNotificationListener # noqa +from .notification_listeners.base_notification_listener_factory import BaseNotificationListenerFactory # noqa +from .notification_listeners.base_notification_listener_model import BaseNotificationListenerModel # noqa + +from .local_traits import (ZMQSocketURL, Identifier) # noqa diff --git a/force_bdss/base_extension_plugin.py b/force_bdss/base_extension_plugin.py index a3f2bdf474c4cc4e8cd20d15945597a705f42e52..692b9fa72898aafc938854d14042890cc50bed48 100644 --- a/force_bdss/base_extension_plugin.py +++ b/force_bdss/base_extension_plugin.py @@ -1,6 +1,8 @@ from envisage.plugin import Plugin from traits.trait_types import List +from .notification_listeners.i_notification_listener_factory import \ + INotificationListenerFactory from .ids import ExtensionPointID from .data_sources.i_data_source_factory import IDataSourceFactory from .kpi.i_kpi_calculator_factory import IKPICalculatorFactory @@ -39,3 +41,8 @@ class BaseExtensionPlugin(Plugin): IKPICalculatorFactory, contributes_to=ExtensionPointID.KPI_CALCULATOR_FACTORIES ) + + notification_listener_factories = List( + INotificationListenerFactory, + contributes_to=ExtensionPointID.NOTIFICATION_LISTENER_FACTORIES + ) diff --git a/force_bdss/core/workflow.py b/force_bdss/core/workflow.py index d7cfb50193a9ad46035da21faab84ebce6800d9a..8cdb20c21436f9bb8c386992a31b18ed842b3a99 100644 --- a/force_bdss/core/workflow.py +++ b/force_bdss/core/workflow.py @@ -3,6 +3,8 @@ from traits.api import HasStrictTraits, Instance, List from force_bdss.data_sources.base_data_source_model import BaseDataSourceModel from force_bdss.kpi.base_kpi_calculator_model import BaseKPICalculatorModel from force_bdss.mco.base_mco_model import BaseMCOModel +from force_bdss.notification_listeners.base_notification_listener_model \ + import BaseNotificationListenerModel class Workflow(HasStrictTraits): @@ -18,3 +20,6 @@ class Workflow(HasStrictTraits): #: Contains the factory-specific KPI Calculator Model objects. #: The list can be empty kpi_calculators = List(BaseKPICalculatorModel) + + #: Contains information about the listeners to be setup + notification_listeners = List(BaseNotificationListenerModel) diff --git a/force_bdss/core_mco_driver.py b/force_bdss/core_mco_driver.py index 457f853163c238bc493b79cb3a2675a12fcfcade..e2678daee1837df4cf8595ee4618008a988c47d3 100644 --- a/force_bdss/core_mco_driver.py +++ b/force_bdss/core_mco_driver.py @@ -1,9 +1,13 @@ from __future__ import print_function import sys +import logging -from traits.api import on_trait_change +from traits.api import on_trait_change, Instance, List +from force_bdss.mco.base_mco import BaseMCO +from force_bdss.notification_listeners.base_notification_listener import \ + BaseNotificationListener from .ids import plugin_id from .base_core_driver import BaseCoreDriver from .io.workflow_reader import ( @@ -11,6 +15,7 @@ from .io.workflow_reader import ( InvalidFileException ) +log = logging.getLogger(__name__) CORE_MCO_DRIVER_ID = plugin_id("core", "CoreMCODriver") @@ -20,8 +25,21 @@ class CoreMCODriver(BaseCoreDriver): """ id = CORE_MCO_DRIVER_ID + mco = Instance(BaseMCO, allow_none=True) + + listeners = List(Instance(BaseNotificationListener)) + @on_trait_change("application:started") def application_started(self): + self.mco.run(self.workflow.mco) + + @on_trait_change("application:stopping") + def application_stopping(self): + for listener in self.listeners: + self._finalize_listener(listener) + self.listeners[:] = [] + + def _mco_default(self): try: workflow = self.workflow except (InvalidVersionException, InvalidFileException) as e: @@ -30,5 +48,50 @@ class CoreMCODriver(BaseCoreDriver): mco_model = workflow.mco mco_factory = mco_model.factory - mco = mco_factory.create_optimizer() - mco.run(mco_model) + return mco_factory.create_optimizer() + + @on_trait_change("mco:event") + def _handle_mco_event(self, event): + for listener in self.listeners[:]: + try: + listener.deliver(event) + except Exception as e: + log.error( + "Exception while delivering to listener {}: {}".format( + listener.__class__.__name__, + str(e) + )) + self._finalize_listener(listener) + self.listeners.remove(listener) + + def _listeners_default(self): + listeners = [] + + for nl_model in self.workflow.notification_listeners: + factory = nl_model.factory + try: + listener = factory.create_listener() + listener.initialize(nl_model) + except Exception as e: + log.error( + "Failed to create or initialize " + "listener with id {}: {}".format( + factory.id, str(e))) + else: + listeners.append(listener) + + return listeners + + def _finalize_listener(self, listener): + """Helper method. Finalizes a listener and handles possible + exceptions. it does _not_ remove the listener from the listener + list. + """ + try: + listener.finalize() + except Exception as e: + log.error( + "Exception while finalizing listener {}: {}".format( + listener.__class__.__name__, + str(e) + )) diff --git a/force_bdss/core_plugins/dummy/dummy_dakota/dakota_optimizer.py b/force_bdss/core_plugins/dummy/dummy_dakota/dakota_optimizer.py index 5b1291a93e01a111b289f410fd03578f02bb850d..9f9de5d87192fa818b1bd4e9ae49a510b3489827 100644 --- a/force_bdss/core_plugins/dummy/dummy_dakota/dakota_optimizer.py +++ b/force_bdss/core_plugins/dummy/dummy_dakota/dakota_optimizer.py @@ -4,6 +4,8 @@ import itertools import collections from force_bdss.api import BaseMCO +from force_bdss.mco.events import MCOStartEvent, MCOFinishEvent, \ + MCOProgressEvent def rotated_range(start, stop, starting_value): @@ -16,6 +18,8 @@ def rotated_range(start, stop, starting_value): class DummyDakotaOptimizer(BaseMCO): def run(self, model): + self.notify_event(MCOStartEvent()) + parameters = model.parameters values = [] @@ -40,5 +44,10 @@ class DummyDakotaOptimizer(BaseMCO): out = ps.communicate( " ".join([str(v) for v in value]).encode("utf-8")) - print("{}: {}".format(" ".join([str(v) for v in value]), - out[0].decode("utf-8"))) + out_data = out[0].decode("utf-8").split() + self.notify_event(MCOProgressEvent( + input=tuple(value), + output=tuple(out_data), + )) + + self.notify_event(MCOFinishEvent()) diff --git a/force_bdss/core_plugins/dummy/dummy_notification_listener/__init__.py b/force_bdss/core_plugins/dummy/dummy_notification_listener/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/force_bdss/core_plugins/dummy/dummy_notification_listener/dummy_notification_listener.py b/force_bdss/core_plugins/dummy/dummy_notification_listener/dummy_notification_listener.py new file mode 100644 index 0000000000000000000000000000000000000000..d8d72417135abc21b32c6483c00f53958e11cf91 --- /dev/null +++ b/force_bdss/core_plugins/dummy/dummy_notification_listener/dummy_notification_listener.py @@ -0,0 +1,24 @@ +from __future__ import print_function + +from force_bdss.api import ( + BaseNotificationListener, + MCOStartEvent, + MCOFinishEvent, + MCOProgressEvent +) + + +class DummyNotificationListener(BaseNotificationListener): + def deliver(self, event): + if isinstance(event, (MCOStartEvent, MCOFinishEvent)): + print(event.__class__.__name__) + elif isinstance(event, MCOProgressEvent): + print(event.__class__.__name__, event.input, event.output) + else: + print(event.__class__.__name__) + + def initialize(self, model): + print("Initializing") + + def finalize(self): + print("Finalizing") diff --git a/force_bdss/core_plugins/dummy/dummy_notification_listener/dummy_notification_listener_factory.py b/force_bdss/core_plugins/dummy/dummy_notification_listener/dummy_notification_listener_factory.py new file mode 100644 index 0000000000000000000000000000000000000000..e8e7c37e202def30541b77353921d087fc97b6d1 --- /dev/null +++ b/force_bdss/core_plugins/dummy/dummy_notification_listener/dummy_notification_listener_factory.py @@ -0,0 +1,23 @@ +from traits.api import String + +from force_bdss.api import ( + factory_id, + BaseNotificationListenerFactory) + +from .dummy_notification_listener import DummyNotificationListener +from .dummy_notification_listener_model import DummyNotificationListenerModel + + +class DummyNotificationListenerFactory(BaseNotificationListenerFactory): + id = String(factory_id("enthought", "dummy_notification_listener")) + + name = String("Dummy Notification Listener") + + def create_model(self, model_data=None): + if model_data is None: + model_data = {} + + return DummyNotificationListenerModel(self, **model_data) + + def create_listener(self): + return DummyNotificationListener(self) diff --git a/force_bdss/core_plugins/dummy/dummy_notification_listener/dummy_notification_listener_model.py b/force_bdss/core_plugins/dummy/dummy_notification_listener/dummy_notification_listener_model.py new file mode 100644 index 0000000000000000000000000000000000000000..0e2c5babca7c8b79501fdf064e728a77a5eaebc5 --- /dev/null +++ b/force_bdss/core_plugins/dummy/dummy_notification_listener/dummy_notification_listener_model.py @@ -0,0 +1,5 @@ +from force_bdss.api import BaseNotificationListenerModel + + +class DummyNotificationListenerModel(BaseNotificationListenerModel): + pass diff --git a/force_bdss/core_plugins/dummy/dummy_notification_listener/tests/__init__.py b/force_bdss/core_plugins/dummy/dummy_notification_listener/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/force_bdss/core_plugins/dummy/dummy_notification_listener/tests/test_dummy_notification_listener.py b/force_bdss/core_plugins/dummy/dummy_notification_listener/tests/test_dummy_notification_listener.py new file mode 100644 index 0000000000000000000000000000000000000000..82741a48cbf9da91e31916eddc5aa17505640bb2 --- /dev/null +++ b/force_bdss/core_plugins/dummy/dummy_notification_listener/tests/test_dummy_notification_listener.py @@ -0,0 +1,37 @@ +import unittest + +from force_bdss.api import MCOStartEvent, MCOProgressEvent, MCOFinishEvent +from force_bdss.notification_listeners.base_notification_listener_factory \ + import \ + BaseNotificationListenerFactory +from force_bdss.notification_listeners.base_notification_listener_model \ + import \ + BaseNotificationListenerModel +from force_bdss.tests.utils import captured_output + +try: + import mock +except ImportError: + from unittest import mock + +from force_bdss.core_plugins.dummy.dummy_notification_listener \ + .dummy_notification_listener import \ + DummyNotificationListener + + +class TestDummyNotificationListener(unittest.TestCase): + def test_initialization(self): + listener = DummyNotificationListener( + mock.Mock(spec=BaseNotificationListenerFactory)) + model = mock.Mock(spec=BaseNotificationListenerModel) + with captured_output() as (out, err): + listener.initialize(model) + listener.deliver(MCOStartEvent()) + listener.deliver(MCOProgressEvent()) + listener.deliver(MCOFinishEvent()) + listener.finalize() + + self.assertEqual( + out.getvalue(), + "Initializing\nMCOStartEvent\nMCOProgressEvent () ()\n" + "MCOFinishEvent\nFinalizing\n") diff --git a/force_bdss/core_plugins/dummy/dummy_notification_listener/tests/test_dummy_notification_listener_factory.py b/force_bdss/core_plugins/dummy/dummy_notification_listener/tests/test_dummy_notification_listener_factory.py new file mode 100644 index 0000000000000000000000000000000000000000..10af546d7900afe7559f1cea610b8e744140eee2 --- /dev/null +++ b/force_bdss/core_plugins/dummy/dummy_notification_listener/tests/test_dummy_notification_listener_factory.py @@ -0,0 +1,26 @@ + +import unittest + +from envisage.plugin import Plugin + +from force_bdss.core_plugins.dummy.dummy_notification_listener\ + .dummy_notification_listener_factory import \ + DummyNotificationListenerFactory + +try: + import mock +except ImportError: + from unittest import mock + + +class TestDummyNotificationListenerFactory(unittest.TestCase): + def test_create_methods(self): + factory = DummyNotificationListenerFactory(mock.Mock(spec=Plugin)) + model = factory.create_model() + self.assertEqual(model.factory, factory) + + model = factory.create_model({}) + self.assertEqual(model.factory, factory) + + listener = factory.create_listener() + self.assertEqual(listener.factory, factory) diff --git a/force_bdss/core_plugins/dummy/dummy_notification_listener/tests/test_dummy_notification_listener_model.py b/force_bdss/core_plugins/dummy/dummy_notification_listener/tests/test_dummy_notification_listener_model.py new file mode 100644 index 0000000000000000000000000000000000000000..94f3294334377541e7cd08eb30f31cdd5aa18604 --- /dev/null +++ b/force_bdss/core_plugins/dummy/dummy_notification_listener/tests/test_dummy_notification_listener_model.py @@ -0,0 +1,20 @@ +import unittest + +from force_bdss.core_plugins.dummy.dummy_notification_listener\ + .dummy_notification_listener_factory import \ + DummyNotificationListenerFactory +from force_bdss.core_plugins.dummy.dummy_notification_listener\ + .dummy_notification_listener_model import \ + DummyNotificationListenerModel + +try: + import mock +except ImportError: + from unittest import mock + + +class TestDummyNotificationListenerModel(unittest.TestCase): + def test_initialization(self): + factory = mock.Mock(spec=DummyNotificationListenerFactory) + model = DummyNotificationListenerModel(factory) + self.assertEqual(model.factory, factory) diff --git a/force_bdss/core_plugins/dummy/dummy_plugin.py b/force_bdss/core_plugins/dummy/dummy_plugin.py index f67df4b0eee0cfdc924b49bb5343b0d5cedbe832..63a229b3ac9199b6c71d89b6eabaa799b857a0d0 100644 --- a/force_bdss/core_plugins/dummy/dummy_plugin.py +++ b/force_bdss/core_plugins/dummy/dummy_plugin.py @@ -1,4 +1,8 @@ from force_bdss.api import BaseExtensionPlugin, plugin_id +from .dummy_notification_listener.dummy_notification_listener_factory import ( + DummyNotificationListenerFactory +) +from .ui_notification.ui_notification_factory import UINotificationFactory from .csv_extractor.csv_extractor_factory import CSVExtractorFactory from .kpi_adder.kpi_adder_factory import KPIAdderFactory from .dummy_dakota.dakota_factory import DummyDakotaFactory @@ -21,3 +25,8 @@ class DummyPlugin(BaseExtensionPlugin): def _kpi_calculator_factories_default(self): return [DummyKPICalculatorFactory(self), KPIAdderFactory(self)] + + def _notification_listener_factories_default(self): + return [DummyNotificationListenerFactory(self), + UINotificationFactory(self) + ] diff --git a/force_bdss/core_plugins/dummy/tests/test_direct_execution.py b/force_bdss/core_plugins/dummy/tests/test_direct_execution.py index 871e579295bf54e5963d23279461b09bf9601575..b0f34b63318eb18f32cc6633230842f61a7177b3 100644 --- a/force_bdss/core_plugins/dummy/tests/test_direct_execution.py +++ b/force_bdss/core_plugins/dummy/tests/test_direct_execution.py @@ -20,6 +20,7 @@ class DummyFactoryRegistryPlugin(FactoryRegistryPlugin): mco_factories = List() kpi_calculator_factories = List() data_source_factories = List() + notification_listener_factories = List() def mock_factory_registry_plugin(): @@ -30,6 +31,8 @@ def mock_factory_registry_plugin(): plugin.kpi_calculator_factories factory_registry_plugin.data_source_factories = \ plugin.data_source_factories + factory_registry_plugin.notification_listener_factories = \ + plugin.notification_listener_factories return factory_registry_plugin diff --git a/force_bdss/core_plugins/dummy/ui_notification/__init__.py b/force_bdss/core_plugins/dummy/ui_notification/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/force_bdss/core_plugins/dummy/ui_notification/tests/__init__.py b/force_bdss/core_plugins/dummy/ui_notification/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/force_bdss/core_plugins/dummy/ui_notification/tests/test_ui_notification.py b/force_bdss/core_plugins/dummy/ui_notification/tests/test_ui_notification.py new file mode 100644 index 0000000000000000000000000000000000000000..356c5a039b69f6ed8f4a3f7fac2a3a5abfb9983d --- /dev/null +++ b/force_bdss/core_plugins/dummy/ui_notification/tests/test_ui_notification.py @@ -0,0 +1,163 @@ +import unittest +from testfixtures import LogCapture + +from force_bdss.core_plugins.dummy.ui_notification.ui_notification import \ + UINotification +from force_bdss.core_plugins.dummy.ui_notification.ui_notification_factory \ + import \ + UINotificationFactory +from force_bdss.core_plugins.dummy.ui_notification.ui_notification_model \ + import \ + UINotificationModel +from force_bdss.mco.events import MCOStartEvent, MCOProgressEvent, \ + MCOFinishEvent + +try: + import mock +except ImportError: + from unittest import mock + +import zmq + + +class TestUINotification(unittest.TestCase): + def setUp(self): + factory = mock.Mock(spec=UINotificationFactory) + self.model = UINotificationModel(factory) + self.model.identifier = "an_id" + + listener = UINotification(factory) + self.sync_socket = mock.Mock(spec=zmq.Socket) + self.sync_socket.recv_string = mock.Mock() + self.sync_socket.recv_string.side_effect = [ + "HELLO\nan_id\n1", + "GOODBYE\nan_id\n1" + ] + + self.pub_socket = mock.Mock(spec=zmq.Socket) + self.context = mock.Mock(spec=zmq.Context) + self.context.socket.side_effect = [self.pub_socket, + self.sync_socket] + listener.__class__._create_context = mock.Mock( + return_value=self.context) + + self.listener = listener + + def test_deliver(self): + listener = self.listener + listener.initialize(self.model) + self.assertEqual( + self.sync_socket.send_string.call_args[0][0], + 'HELLO\nan_id\n1') + + listener.deliver(MCOStartEvent()) + self.assertEqual( + self.pub_socket.send_string.call_args[0][0], + 'EVENT\nan_id\nMCO_START') + + listener.deliver(MCOProgressEvent(input=(1, 2, 3), output=(4, 5))) + self.assertEqual( + self.pub_socket.send_string.call_args[0][0], + 'EVENT\nan_id\nMCO_PROGRESS\n1 2 3\n4 5') + + listener.deliver(MCOFinishEvent()) + self.assertEqual( + self.pub_socket.send_string.call_args[0][0], + 'EVENT\nan_id\nMCO_FINISH') + + def test_finalize(self): + listener = self.listener + listener.initialize(self.model) + listener.finalize() + self.assertTrue(self.context.term.called) + self.assertTrue(self.sync_socket.close.called) + self.assertTrue(self.pub_socket.close.called) + self.assertIsNone(listener._context) + self.assertIsNone(listener._sync_socket) + self.assertIsNone(listener._pub_socket) + + def test_initialize(self): + listener = self.listener + listener.initialize(self.model) + self.assertEqual( + self.sync_socket.send_string.call_args[0][0], + 'HELLO\nan_id\n1') + + def test_polling(self): + self.sync_socket.poll.return_value = 0 + listener = self.listener + with LogCapture() as capture: + listener.initialize(self.model) + capture.check( + ("force_bdss.core_plugins.dummy.ui_notification.ui_notification", # noqa + "INFO", + "Could not connect to UI server after 1000 ms. Continuing without UI notification." # noqa + ), + ) + + self.assertIsNone(listener._context) + + def test_wrong_init_recv_string(self): + listener = self.listener + + self.sync_socket.recv_string.side_effect = [ + "HELLO\nnot_the_right_id\n1", + "GOODBYE\nan_id\n1" + ] + + with LogCapture() as capture: + listener.initialize(self.model) + capture.check( + ("force_bdss.core_plugins.dummy.ui_notification.ui_notification", # noqa + "ERROR", + "Unexpected reply in sync negotiation with UI server. " + "'HELLO\nnot_the_right_id\n1'" # noqa + ), + ) + + self.assertIsNone(listener._context) + + def test_deliver_without_context(self): + self.listener.deliver(MCOStartEvent()) + self.assertFalse(self.pub_socket.send_string.called) + + def test_finalize_without_context(self): + self.listener.finalize() + self.assertFalse(self.sync_socket.send_string.called) + + def test_finalize_no_response(self): + self.sync_socket.poll.side_effect = [1, 0] + listener = self.listener + listener.initialize(self.model) + with LogCapture() as capture: + listener.finalize() + capture.check( + ("force_bdss.core_plugins.dummy.ui_notification.ui_notification", # noqa + "INFO", + "Could not close connection to UI server after 1000 ms." # noqa + ), + ) + + self.assertIsNone(listener._context) + + def test_wrong_finalize_recv_string(self): + listener = self.listener + self.sync_socket.poll.side_effect = [1, 1] + self.sync_socket.recv_string.side_effect = [ + "HELLO\nan_id\n1", + "GOODBYE\nnot_the_right_id\n1" + ] + + listener.initialize(self.model) + + with LogCapture() as capture: + listener.finalize() + capture.check( + ("force_bdss.core_plugins.dummy.ui_notification.ui_notification", # noqa + "ERROR", + "Unexpected reply in goodbye sync negotiation with UI server. " # noqa + "'GOODBYE\nnot_the_right_id\n1'" # noqa + ), + ) + + self.assertIsNone(listener._context) diff --git a/force_bdss/core_plugins/dummy/ui_notification/tests/test_ui_notification_factory.py b/force_bdss/core_plugins/dummy/ui_notification/tests/test_ui_notification_factory.py new file mode 100644 index 0000000000000000000000000000000000000000..b56c5c95f14fcadd4fffe273ce746533be37b09c --- /dev/null +++ b/force_bdss/core_plugins/dummy/ui_notification/tests/test_ui_notification_factory.py @@ -0,0 +1,40 @@ +import unittest + +from envisage.plugin import Plugin + +from force_bdss.core_plugins.dummy.ui_notification.ui_notification import \ + UINotification +from force_bdss.core_plugins.dummy.ui_notification.ui_notification_factory \ + import \ + UINotificationFactory +from force_bdss.core_plugins.dummy.ui_notification.ui_notification_model \ + import \ + UINotificationModel + +try: + import mock +except ImportError: + from unittest import mock + + +class TestUINotificationFactory(unittest.TestCase): + def test_initialization(self): + factory = UINotificationFactory(mock.Mock(spec=Plugin)) + self.assertEqual( + factory.id, + "force.bdss.enthought.factory.ui_notification") + + def test_create_model(self): + factory = UINotificationFactory(mock.Mock(spec=Plugin)) + model = factory.create_model() + self.assertIsInstance(model, UINotificationModel) + self.assertEqual(model.factory, factory) + + model = factory.create_model({}) + self.assertIsInstance(model, UINotificationModel) + self.assertEqual(model.factory, factory) + + def test_create_listener(self): + factory = UINotificationFactory(mock.Mock(spec=Plugin)) + listener = factory.create_listener() + self.assertIsInstance(listener, UINotification) diff --git a/force_bdss/core_plugins/dummy/ui_notification/ui_notification.py b/force_bdss/core_plugins/dummy/ui_notification/ui_notification.py new file mode 100644 index 0000000000000000000000000000000000000000..f3e644c0b7ca925666632e1b23eabaf927d23475 --- /dev/null +++ b/force_bdss/core_plugins/dummy/ui_notification/ui_notification.py @@ -0,0 +1,127 @@ +import logging +from traits.api import Instance, String + +from force_bdss.api import ( + BaseNotificationListener, + MCOStartEvent, + MCOFinishEvent, + MCOProgressEvent +) + +import zmq + +log = logging.getLogger(__name__) + + +class UINotification(BaseNotificationListener): + """ + Notification engine for the UI. Uses zeromq for the traffic handling. + """ + #: The ZMQ context. If None, it means that the service is unavailable. + _context = Instance(zmq.Context) + + #: The pubsub socket. + _pub_socket = Instance(zmq.Socket) + + #: The synchronization socket to communicate with the server (UI) + _sync_socket = Instance(zmq.Socket) + + #: Unique identifier from the UI. To be returned in the protocol. + _identifier = String() + + #: The protocol version that this plugin delivers + _proto_version = "1" + + def initialize(self, model): + self._identifier = model.identifier + self._context = self._create_context() + + self._pub_socket = self._context.socket(zmq.PUB) + self._pub_socket.setsockopt(zmq.LINGER, 0) + self._pub_socket.connect(model.pub_url) + + self._sync_socket = self._context.socket(zmq.REQ) + self._sync_socket.setsockopt(zmq.LINGER, 0) + self._sync_socket.connect(model.sync_url) + + msg = "HELLO\n{}\n{}".format(self._identifier, self._proto_version) + self._sync_socket.send_string(msg) + events = self._sync_socket.poll(1000, zmq.POLLIN) + + if events == 0: + log.info("Could not connect to UI server after 1000 ms. " + "Continuing without UI notification.") + self._close_and_clear_sockets() + return + + recv = self._sync_socket.recv_string() + + if recv != msg: + log.error( + ("Unexpected reply in sync" + " negotiation with UI server. '{}'".format(recv))) + self._close_and_clear_sockets() + return + + def deliver(self, event): + if not self._context: + return + + msg = _format_event(event, self._identifier) + if msg is not None: + self._pub_socket.send_string(msg) + + def finalize(self): + if not self._context: + return + + msg = "GOODBYE\n{}\n{}".format(self._identifier, self._proto_version) + self._sync_socket.send_string(msg) + events = self._sync_socket.poll(1000, zmq.POLLIN) + if events == 0: + log.info("Could not close connection to UI server after " + "1000 ms.") + self._close_and_clear_sockets() + return + + recv = self._sync_socket.recv_string() + + if recv != msg: + log.error( + ("Unexpected reply in goodbye sync" + " negotiation with UI server. '{}'".format(recv))) + + self._close_and_clear_sockets() + + def _close_and_clear_sockets(self): + if self._pub_socket: + self._pub_socket.close() + + if self._sync_socket: + self._sync_socket.close() + + if self._context: + self._context.term() + + self._pub_socket = None + self._sync_socket = None + self._context = None + + def _create_context(self): + return zmq.Context() + + +def _format_event(event, identifier): + """Converts the event into a byte sequence to be transferred via zmq""" + if isinstance(event, MCOStartEvent): + data = "MCO_START" + elif isinstance(event, MCOFinishEvent): + data = "MCO_FINISH" + elif isinstance(event, MCOProgressEvent): + data = "MCO_PROGRESS\n{}\n{}".format( + " ".join([str(x) for x in event.input]), + " ".join([str(x) for x in event.output])) + else: + return None + + return "EVENT\n{}\n{}".format(identifier, data) diff --git a/force_bdss/core_plugins/dummy/ui_notification/ui_notification_factory.py b/force_bdss/core_plugins/dummy/ui_notification/ui_notification_factory.py new file mode 100644 index 0000000000000000000000000000000000000000..39cee9b53bd08a828ef1b92d978954c6d6b56fc0 --- /dev/null +++ b/force_bdss/core_plugins/dummy/ui_notification/ui_notification_factory.py @@ -0,0 +1,21 @@ +from traits.api import String + +from force_bdss.api import factory_id, BaseNotificationListenerFactory + +from .ui_notification import UINotification +from .ui_notification_model import UINotificationModel + + +class UINotificationFactory(BaseNotificationListenerFactory): + id = String(factory_id("enthought", "ui_notification")) + + name = String("UI Notification") + + def create_model(self, model_data=None): + if model_data is None: + model_data = {} + + return UINotificationModel(self, **model_data) + + def create_listener(self): + return UINotification(self) diff --git a/force_bdss/core_plugins/dummy/ui_notification/ui_notification_model.py b/force_bdss/core_plugins/dummy/ui_notification/ui_notification_model.py new file mode 100644 index 0000000000000000000000000000000000000000..7737cfc95fae26c01f85e04dcf2f15967f1e1efb --- /dev/null +++ b/force_bdss/core_plugins/dummy/ui_notification/ui_notification_model.py @@ -0,0 +1,14 @@ +from traits.api import String +from force_bdss.api import ( + BaseNotificationListenerModel, ZMQSocketURL) + + +class UINotificationModel(BaseNotificationListenerModel): + #: The socket URL where the UI will be found. Synchronization port. + sync_url = ZMQSocketURL() + + #: The socket URL where the UI will be found. PubSub port. + pub_url = ZMQSocketURL() + + #: Unique identifier assigned by the UI to recognize the connection. + identifier = String() diff --git a/force_bdss/factory_registry_plugin.py b/force_bdss/factory_registry_plugin.py index 80d5944f2f63033ca0d50ea50d58ec13f2ebd176..67e785173c9986a4dccbd98a8d6aaa124dcaf250 100644 --- a/force_bdss/factory_registry_plugin.py +++ b/force_bdss/factory_registry_plugin.py @@ -3,12 +3,12 @@ from envisage.plugin import Plugin from traits.api import List from force_bdss.ids import ExtensionPointID +from force_bdss.notification_listeners.i_notification_listener_factory import \ + INotificationListenerFactory from .data_sources.i_data_source_factory import ( IDataSourceFactory) from .kpi.i_kpi_calculator_factory import IKPICalculatorFactory -from .mco.i_mco_factory import ( - IMCOFactory -) +from .mco.i_mco_factory import IMCOFactory FACTORY_REGISTRY_PLUGIN_ID = "force.bdss.plugins.factory_registry" @@ -43,6 +43,13 @@ class FactoryRegistryPlugin(Plugin): List(IKPICalculatorFactory), id=ExtensionPointID.KPI_CALCULATOR_FACTORIES) + #: Notification listeners are pluggable entities that will listen + #: to MCO events and act accordingly. + notification_listener_factories = ExtensionPoint( + List(INotificationListenerFactory), + id=ExtensionPointID.NOTIFICATION_LISTENER_FACTORIES + ) + def data_source_factory_by_id(self, id): """Finds a given data source factory by means of its id. The ID is as obtained by the function factory_id() in the @@ -61,9 +68,7 @@ class FactoryRegistryPlugin(Plugin): if ds.id == id: return ds - raise KeyError( - "Requested data source {} but don't know how " - "to find it.".format(id)) + raise KeyError(id) def kpi_calculator_factory_by_id(self, id): """Finds a given kpi factory by means of its id. @@ -83,9 +88,7 @@ class FactoryRegistryPlugin(Plugin): if kpic.id == id: return kpic - raise KeyError( - "Requested kpi calculator {} but don't know how " - "to find it.".format(id)) + raise KeyError(id) def mco_factory_by_id(self, id): """Finds a given Multi Criteria Optimizer (MCO) factory by means of @@ -105,8 +108,7 @@ class FactoryRegistryPlugin(Plugin): if mco.id == id: return mco - raise KeyError("Requested MCO {} but don't know how " - "to find it.".format(id)) + raise KeyError(id) def mco_parameter_factory_by_id(self, mco_id, parameter_id): """Retrieves the MCO parameter factory for a given MCO id and @@ -134,5 +136,24 @@ class FactoryRegistryPlugin(Plugin): if factory.id == parameter_id: return factory - raise KeyError("Requested MCO parameter {}:{} but don't know" - " how to find it.".format(mco_id, parameter_id)) + raise KeyError(parameter_id) + + def notification_listener_factory_by_id(self, id): + """Finds a given notification listener by means of its id. + The ID is as obtained by the function bundle_id() in the + plugin api. + + Parameters + ---------- + id: str + The identifier returned by the bundle_id() function. + + Raises + ------ + KeyError: if the entry is not found. + """ + for nl in self.notification_listener_factories: + if nl.id == id: + return nl + + raise KeyError(id) diff --git a/force_bdss/ids.py b/force_bdss/ids.py index 2bee28c2960f62e8a42f37bf071c385a2dacf912..336b64a90e0ee2a5b7be42e5d8b471a5d3c85915 100644 --- a/force_bdss/ids.py +++ b/force_bdss/ids.py @@ -2,7 +2,7 @@ import six class ExtensionPointID: - """The envisage extension points ids for the factorys ExtensionPoints. + """The envisage extension points ids for the factories ExtensionPoints. These are populated by the envisage plugins. The plugin developer generally does not have to handle these identifiers, @@ -12,6 +12,8 @@ class ExtensionPointID: MCO_FACTORIES = 'force.bdss.mco.factories' DATA_SOURCE_FACTORIES = 'force.bdss.data_source.factories' KPI_CALCULATOR_FACTORIES = 'force.bdss.kpi_calculator.factories' + NOTIFICATION_LISTENER_FACTORIES = \ + 'force.bdss.notification_listener.factories' def factory_id(producer, identifier): diff --git a/force_bdss/io/tests/test_workflow_reader.py b/force_bdss/io/tests/test_workflow_reader.py index b0704c45766410731a52a6e3a6e535ab1a5eb0dc..8ff216d88a04897742bcafbdd28094c40d1c54f8 100644 --- a/force_bdss/io/tests/test_workflow_reader.py +++ b/force_bdss/io/tests/test_workflow_reader.py @@ -39,6 +39,15 @@ class TestWorkflowReader(unittest.TestCase): with self.assertRaises(InvalidFileException): self.wfreader.read(self._as_json_stringio(data)) + def test_missing_key(self): + data = { + "version": "1", + "workflow": {} + } + + with self.assertRaises(InvalidFileException): + self.wfreader.read(self._as_json_stringio(data)) + def _as_json_stringio(self, data): fp = StringIO() json.dump(data, fp) diff --git a/force_bdss/io/workflow_reader.py b/force_bdss/io/workflow_reader.py index df20c7606e2e7624eaa92e9c8ffcdd6e950bd51a..2227de1558a7085d207229b1fae20558dc72085d 100644 --- a/force_bdss/io/workflow_reader.py +++ b/force_bdss/io/workflow_reader.py @@ -89,10 +89,12 @@ class WorkflowReader(HasStrictTraits): wf.mco = self._extract_mco(wf_data) wf.data_sources[:] = self._extract_data_sources(wf_data) wf.kpi_calculators[:] = self._extract_kpi_calculators(wf_data) + wf.notification_listeners[:] = \ + self._extract_notification_listeners(wf_data) except KeyError as e: - logging.exception("Could not read file") - raise InvalidFileException("Could not read file. " - "Unable to find key {}".format(e)) + logging.exception("Could not read file {}".format(file)) + raise InvalidFileException("Could not read file {}. " + "Unable to find key {}".format(file, e)) return wf def _extract_mco(self, wf_data): @@ -211,3 +213,14 @@ class WorkflowReader(HasStrictTraits): def _extract_input_slot_maps(self, maps_data): return [InputSlotMap(**d) for d in maps_data] + + def _extract_notification_listeners(self, wf_data): + registry = self.factory_registry + listeners = [] + for nl_entry in wf_data["notification_listeners"]: + nl_id = nl_entry["id"] + nl_factory = registry.notification_listener_factory_by_id(nl_id) + model_data = nl_entry["model_data"] + listeners.append(nl_factory.create_model(model_data)) + + return listeners diff --git a/force_bdss/io/workflow_writer.py b/force_bdss/io/workflow_writer.py index 8335d5b3a95ff669c863b6c7a3a8175aaf312ef7..3a2eee725c519502759f240a7e29c554f1e0f458 100644 --- a/force_bdss/io/workflow_writer.py +++ b/force_bdss/io/workflow_writer.py @@ -17,9 +17,7 @@ class WorkflowWriter(HasStrictTraits): A file object on which to write the workflow, properly serialized into JSON. """ - data = { - "version": "1", - } + data = dict(version="1") data["workflow"] = self._workflow_data(workflow) json.dump(data, f) @@ -32,7 +30,11 @@ class WorkflowWriter(HasStrictTraits): for kpic in workflow.kpi_calculators], "data_sources": [ self._model_data(ds) - for ds in workflow.data_sources] + for ds in workflow.data_sources], + "notification_listeners": [ + self._model_data(nl) + for nl in workflow.notification_listeners + ] } return workflow_data diff --git a/force_bdss/local_traits.py b/force_bdss/local_traits.py index 1c5e3f157e4409f2b7cbb61d60877cc6974ef9f4..f2aa3859b2c8488d50dce64996d0ce098ac0066a 100644 --- a/force_bdss/local_traits.py +++ b/force_bdss/local_traits.py @@ -1,4 +1,5 @@ -from traits.api import Regex, String +import re +from traits.api import Regex, BaseStr, String #: Used for variable names, but allow also empty string as it's the default #: case and it will be present if the workflow is saved before actually @@ -6,6 +7,26 @@ from traits.api import Regex, String Identifier = Regex(regex="(^[^\d\W]\w*\Z|^\Z)") +class ZMQSocketURL(BaseStr): + def validate(self, object, name, value): + super(ZMQSocketURL, self).validate(object, name, value) + m = re.match( + "tcp://(\\d{1,3})\.(\\d{1,3})\.(\\d{1,3})\.(\\d{1,3}):(\\d+)", + value) + if m is None: + self.error(object, name, value) + + a, b, c, d, port = m.groups() + + if not all(map(lambda x: 0 <= int(x) <= 255, (a, b, c, d))): + self.error(object, name, value) + + if not (1 <= int(port) <= 65535): + self.error(object, name, value) + + return value + + #: Identifies a CUBA type with its key. At the moment a String with #: no validation, but will come later. CUBAType = String() diff --git a/force_bdss/mco/base_mco.py b/force_bdss/mco/base_mco.py index 1fb2664c21e7b6dc515374047985cff82c3791d0..ae9d89b72092c962ebc7f6415d3fa8166613711e 100644 --- a/force_bdss/mco/base_mco.py +++ b/force_bdss/mco/base_mco.py @@ -1,7 +1,8 @@ import abc -from traits.api import ABCHasStrictTraits, Instance +from traits.api import ABCHasStrictTraits, Instance, Event +from force_bdss.mco.events import BaseMCOEvent from .i_mco_factory import IMCOFactory @@ -13,6 +14,9 @@ class BaseMCO(ABCHasStrictTraits): #: A reference to the factory factory = Instance(IMCOFactory) + #: Triggered when an event occurs. + event = Event(BaseMCOEvent) + def __init__(self, factory, *args, **kwargs): """Initializes the MCO. @@ -35,3 +39,18 @@ class BaseMCO(ABCHasStrictTraits): An instance of the model information, as created from create_model() """ + + def notify_event(self, event): + """Method based interface to deliver an event, instead of + assignment to traits. + + Sends the event, synchronously. When the routine returns, + listeners have been fully informed (they might, however, handle + the event asynchronously at their convenience) + + Parameters + ---------- + event: BaseMCOEvent + The event to deliver. + """ + self.event = event diff --git a/force_bdss/mco/events.py b/force_bdss/mco/events.py new file mode 100644 index 0000000000000000000000000000000000000000..216e1effa30d7b1e81e0a8f82312310bb3fae583 --- /dev/null +++ b/force_bdss/mco/events.py @@ -0,0 +1,21 @@ +from traits.api import HasStrictTraits, Tuple + + +class BaseMCOEvent(HasStrictTraits): + """Base event for the MCO""" + + +class MCOStartEvent(BaseMCOEvent): + """MCO should emit this event when the evaluation starts.""" + + +class MCOFinishEvent(BaseMCOEvent): + """MCO should emit this event when the evaluation ends.""" + + +class MCOProgressEvent(BaseMCOEvent): + """MCO should emit this event for every new evaluation that has been + completed. It carries data about the evaluation, specifically the + input data (MCO parameter values) and the resulting output (KPIs).""" + input = Tuple() + output = Tuple() diff --git a/force_bdss/notification_listeners/__init__.py b/force_bdss/notification_listeners/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/force_bdss/notification_listeners/base_notification_listener.py b/force_bdss/notification_listeners/base_notification_listener.py new file mode 100644 index 0000000000000000000000000000000000000000..e4f94647250c7b82b08e638fdf2295cf191135d0 --- /dev/null +++ b/force_bdss/notification_listeners/base_notification_listener.py @@ -0,0 +1,54 @@ +import abc + +from traits.api import ABCHasStrictTraits, Instance + +from .i_notification_listener_factory import INotificationListenerFactory + + +class BaseNotificationListener(ABCHasStrictTraits): + """Base class for the Notification Listener. + + Inherit this class for your listener implementation + """ + #: A reference to the factory + factory = Instance(INotificationListenerFactory) + + def __init__(self, factory, *args, **kwargs): + """Initializes the notification listener. + + Parameters + ---------- + factory: BaseNotificationListener + The factory this Notification Listener belongs to + """ + self.factory = factory + super(BaseNotificationListener, self).__init__(*args, **kwargs) + + def initialize(self, model): + """ + Method used to initialize persistent state of the listener using + information from the model. + + Reimplement it in your Notification Listener to perform special + initialization of state that survives across deliver() invocations, + such as setting up a connection, or opening a file. + """ + + def finalize(self): + """ + Method used to finalize state of the listener. + + Reimplement it in your Notification Listener to perform special + finalization of state that survives across deliver() invocations, + such as closing a connection, or closing a file. + """ + + @abc.abstractmethod + def deliver(self, event): + """Delivers the event to the recipient + + Parameters + ---------- + event: MCOEvent + The event to notify. + """ diff --git a/force_bdss/notification_listeners/base_notification_listener_factory.py b/force_bdss/notification_listeners/base_notification_listener_factory.py new file mode 100644 index 0000000000000000000000000000000000000000..08e2b59f90f2518d99445ffd09568e23e1b84045 --- /dev/null +++ b/force_bdss/notification_listeners/base_notification_listener_factory.py @@ -0,0 +1,50 @@ +import abc + +from traits.api import ABCHasStrictTraits, Instance, String, provides +from envisage.plugin import Plugin + +from .i_notification_listener_factory import INotificationListenerFactory + + +@provides(INotificationListenerFactory) +class BaseNotificationListenerFactory(ABCHasStrictTraits): + """Base class for notification listeners. + Notification listeners are extensions that receive event notifications + from the MCO and perform an associated action. + """ + #: identifier of the factory + id = String() + + #: Name of the factory. User friendly for UI + name = String() + + #: A reference to the containing plugin + plugin = Instance(Plugin) + + def __init__(self, plugin, *args, **kwargs): + """Initializes the instance. + + Parameters + ---------- + plugin: Plugin + The plugin that holds this factory. + """ + self.plugin = plugin + super(BaseNotificationListenerFactory, self).__init__(*args, **kwargs) + + @abc.abstractmethod + def create_listener(self): + """ + Creates an instance of the listener. + """ + + @abc.abstractmethod + def create_model(self, model_data=None): + """ + Creates an instance of the model. + + Parameters + ---------- + model_data: dict + Data to use to fill the model. + """ diff --git a/force_bdss/notification_listeners/base_notification_listener_model.py b/force_bdss/notification_listeners/base_notification_listener_model.py new file mode 100644 index 0000000000000000000000000000000000000000..ef7e58b044cc31e1be9bacd3772c8b2658cf901a --- /dev/null +++ b/force_bdss/notification_listeners/base_notification_listener_model.py @@ -0,0 +1,22 @@ +from traits.api import ABCHasStrictTraits, Instance + +from force_bdss.notification_listeners.i_notification_listener_factory import \ + INotificationListenerFactory + + +class BaseNotificationListenerModel(ABCHasStrictTraits): + """Base class for the specific Notification Listener models. + This model will also provide, through traits/traitsui magic the View + that will appear in the workflow manager UI. + + In your definition, your specific model must reimplement this class. + """ + #: A reference to the creating factory, so that we can + #: retrieve it as the originating factory. + factory = Instance(INotificationListenerFactory, + visible=False, + transient=True) + + def __init__(self, factory, *args, **kwargs): + self.factory = factory + super(BaseNotificationListenerModel, self).__init__(*args, **kwargs) diff --git a/force_bdss/notification_listeners/i_notification_listener_factory.py b/force_bdss/notification_listeners/i_notification_listener_factory.py new file mode 100644 index 0000000000000000000000000000000000000000..c4240d80500dfe343c5ae01216bca4588d2f1ea0 --- /dev/null +++ b/force_bdss/notification_listeners/i_notification_listener_factory.py @@ -0,0 +1,21 @@ +from traits.api import Interface, String, Instance +from envisage.plugin import Plugin + + +class INotificationListenerFactory(Interface): + """Envisage required interface for the BaseNotificationListenerFactory. + You should not need to use this directly. + + Refer to the BaseNotificationListenerFactory for documentation. + """ + id = String() + + name = String() + + plugin = Instance(Plugin) + + def create_listener(self): + """""" + + def create_model(self, model_data=None): + """""" diff --git a/force_bdss/tests/fixtures/test_csv.json b/force_bdss/tests/fixtures/test_csv.json index c70795734c8ba06f3d7f746a3121c1399e504506..d9d6e35bbb644d659031530caca14db4f7af3a24 100644 --- a/force_bdss/tests/fixtures/test_csv.json +++ b/force_bdss/tests/fixtures/test_csv.json @@ -70,6 +70,16 @@ ] } } + ], + "notification_listeners": [ + { + "id": "force.bdss.enthought.factory.ui_notification", + "model_data": { + "sync_url": "tcp://127.0.0.1:12346", + "pub_url": "tcp://127.0.0.1:12345", + "identifier": "vfdjlkadfkljfsd" + } + } ] } } diff --git a/force_bdss/tests/fixtures/test_null.json b/force_bdss/tests/fixtures/test_null.json index 5b017fbcffee49d68514de79567e8cc46cc9b345..04d2d8b7f438864a7dfe8b277a7f42d78576cdd7 100644 --- a/force_bdss/tests/fixtures/test_null.json +++ b/force_bdss/tests/fixtures/test_null.json @@ -29,6 +29,13 @@ ] } } + ], + "notification_listeners": [ + { + "id": "force.bdss.enthought.factory.null_nl", + "model_data": { + } + } ] } } diff --git a/force_bdss/tests/test_core_evaluation_driver.py b/force_bdss/tests/test_core_evaluation_driver.py index 07ac470fb855681b3afe64339cd6c72cb4049e4a..43101a3361b416e1330b7fd226aaab69cda83877 100644 --- a/force_bdss/tests/test_core_evaluation_driver.py +++ b/force_bdss/tests/test_core_evaluation_driver.py @@ -21,6 +21,14 @@ from force_bdss.mco.base_mco_model import BaseMCOModel from force_bdss.mco.parameters.base_mco_parameter import BaseMCOParameter from force_bdss.mco.parameters.base_mco_parameter_factory import \ BaseMCOParameterFactory +from force_bdss.notification_listeners.base_notification_listener import \ + BaseNotificationListener +from force_bdss.notification_listeners.base_notification_listener_factory \ + import \ + BaseNotificationListenerFactory +from force_bdss.notification_listeners.base_notification_listener_model \ + import \ + BaseNotificationListenerModel from force_bdss.tests import fixtures try: @@ -187,10 +195,37 @@ class NullDataSourceFactory(BaseDataSourceFactory): return NullDataSource(self) +class NullNotificationListener(BaseNotificationListener): + def initialize(self, model): + pass + + def deliver(self, event): + pass + + def finalize(self): + pass + + +class NullNotificationListenerModel(BaseNotificationListenerModel): + pass + + +class NullNotificationListenerFactory(BaseNotificationListenerFactory): + id = factory_id("enthought", "null_nl") + name = "null_nl" + + def create_listener(self): + return NullNotificationListener(self) + + def create_model(self, model_data=None): + return NullNotificationListenerModel(self) + + class DummyFactoryRegistryPlugin(FactoryRegistryPlugin): mco_factories = List() kpi_calculator_factories = List() data_source_factories = List() + notification_listener_factories = List() def mock_factory_registry_plugin(): @@ -201,6 +236,9 @@ def mock_factory_registry_plugin(): NullKPICalculatorFactory(factory_registry_plugin)] factory_registry_plugin.data_source_factories = [ NullDataSourceFactory(factory_registry_plugin)] + factory_registry_plugin.notification_listener_factories = [ + NullNotificationListenerFactory(factory_registry_plugin) + ] return factory_registry_plugin diff --git a/force_bdss/tests/test_core_mco_driver.py b/force_bdss/tests/test_core_mco_driver.py new file mode 100644 index 0000000000000000000000000000000000000000..688f7944604416e69ec586053d44d63005e7f807 --- /dev/null +++ b/force_bdss/tests/test_core_mco_driver.py @@ -0,0 +1,119 @@ +import unittest +from testfixtures import LogCapture + +from force_bdss.mco.events import MCOStartEvent +from force_bdss.notification_listeners.base_notification_listener import \ + BaseNotificationListener +from force_bdss.tests import fixtures +from force_bdss.tests.test_core_evaluation_driver import \ + mock_factory_registry_plugin + +try: + import mock +except ImportError: + from unittest import mock + +from envisage.api import Application + +from force_bdss.core_mco_driver import CoreMCODriver + + +class TestCoreMCODriver(unittest.TestCase): + def setUp(self): + self.mock_factory_registry_plugin = mock_factory_registry_plugin() + application = mock.Mock(spec=Application) + application.get_plugin = mock.Mock( + return_value=self.mock_factory_registry_plugin + ) + application.workflow_filepath = fixtures.get("test_null.json") + self.mock_application = application + + def test_initialization(self): + driver = CoreMCODriver( + application=self.mock_application, + ) + driver.application_started() + + def test_stopping(self): + driver = CoreMCODriver( + application=self.mock_application, + ) + driver.application_started() + driver.application_stopping() + + def test_listeners(self): + driver = CoreMCODriver( + application=self.mock_application, + ) + self.assertEqual(len(driver.listeners), 1) + + def test_event_handling(self): + driver = CoreMCODriver( + application=self.mock_application, + ) + listener = driver.listeners[0] + mock_deliver = mock.Mock() + listener.__dict__["deliver"] = mock_deliver + event = MCOStartEvent() + driver.mco.event = event + self.assertTrue(mock_deliver.call_args[0][0], event) + + def test_listener_init_exception(self): + driver = CoreMCODriver( + application=self.mock_application, + ) + registry = self.mock_factory_registry_plugin + factory = registry.notification_listener_factories[0] + mock_create_listener = mock.Mock() + mock_listener = mock.Mock(spec=BaseNotificationListener) + mock_create_listener.return_value = mock_listener + mock_listener.initialize = mock.Mock() + mock_listener.initialize.side_effect = Exception() + factory.__dict__["create_listener"] = mock_create_listener + with LogCapture() as capture: + listeners = driver.listeners + + capture.check( + ("force_bdss.core_mco_driver", + "ERROR", + "Failed to create or initialize listener with id " + "force.bdss.enthought.factory.null_nl: ")) + + self.assertEqual(len(listeners), 0) + + def test_listener_delivery_exception(self): + driver = CoreMCODriver( + application=self.mock_application, + ) + listener = driver.listeners[0] + mock_deliver = mock.Mock() + listener.__dict__["deliver"] = mock_deliver + mock_deliver.side_effect = Exception() + with LogCapture() as capture: + driver.mco.event = MCOStartEvent() + self.assertTrue(mock_deliver.called) + + capture.check( + ("force_bdss.core_mco_driver", + "ERROR", + "Exception while delivering to listener " + "NullNotificationListener: ")) + + def test_finalize_error(self): + driver = CoreMCODriver( + application=self.mock_application, + ) + driver.application_started() + + listener = driver.listeners[0] + mock_finalize = mock.Mock() + listener.__dict__["finalize"] = mock_finalize + mock_finalize.side_effect = Exception() + + with LogCapture() as capture: + driver.application_stopping() + capture.check( + ("force_bdss.core_mco_driver", + "ERROR", + "Exception while finalizing listener " + "NullNotificationListener: ")) diff --git a/force_bdss/tests/test_factory_registry_plugin.py b/force_bdss/tests/test_factory_registry_plugin.py index 1d001d7fe7a507c258a26180972dd500866d3cd2..3a90f7b144d7d05a64daa0ea9525719a6c5a0de4 100644 --- a/force_bdss/tests/test_factory_registry_plugin.py +++ b/force_bdss/tests/test_factory_registry_plugin.py @@ -5,6 +5,8 @@ from force_bdss.base_extension_plugin import ( from force_bdss.ids import factory_id, mco_parameter_id from force_bdss.mco.parameters.base_mco_parameter_factory import \ BaseMCOParameterFactory +from force_bdss.notification_listeners.i_notification_listener_factory import \ + INotificationListenerFactory try: import mock @@ -61,6 +63,10 @@ class MySuperPlugin(BaseExtensionPlugin): mock.Mock(spec=IKPICalculatorFactory, id=factory_id("enthought", "kpi3"))] + def _notification_listener_factories_default(self): + return [mock.Mock(spec=INotificationListenerFactory, + id=factory_id("enthought", "nl1"))] + class TestFactoryRegistryWithContent(unittest.TestCase): def setUp(self): @@ -89,6 +95,12 @@ class TestFactoryRegistryWithContent(unittest.TestCase): self.assertEqual(self.plugin.kpi_calculator_factory_by_id(id).id, id) + for entry in ["nl1"]: + id = factory_id("enthought", entry) + self.assertEqual( + self.plugin.notification_listener_factory_by_id(id).id, + id) + with self.assertRaises(KeyError): self.plugin.mco_factory_by_id( factory_id("enthought", "foo")) @@ -114,6 +126,11 @@ class TestFactoryRegistryWithContent(unittest.TestCase): factory_id("enthought", "foo") ) + with self.assertRaises(KeyError): + self.plugin.notification_listener_factory_by_id( + factory_id("enthought", "foo") + ) + if __name__ == '__main__': unittest.main() diff --git a/force_bdss/tests/test_local_traits.py b/force_bdss/tests/test_local_traits.py index d61e6d2c89520653d65431a0030ad89b362f18be..d66a863206a887c90073feec2ce0b749cb4c83a9 100644 --- a/force_bdss/tests/test_local_traits.py +++ b/force_bdss/tests/test_local_traits.py @@ -1,12 +1,13 @@ import unittest from traits.api import HasStrictTraits, TraitError -from force_bdss.local_traits import Identifier, CUBAType +from force_bdss.local_traits import Identifier, CUBAType, ZMQSocketURL class Traited(HasStrictTraits): val = Identifier() cuba = CUBAType() + socket_url = ZMQSocketURL() class TestLocalTraits(unittest.TestCase): @@ -25,3 +26,22 @@ class TestLocalTraits(unittest.TestCase): c = Traited() c.cuba = "PRESSURE" self.assertEqual(c.cuba, "PRESSURE") + + def test_zmq_socket_url(self): + c = Traited() + + for working in ["tcp://127.0.0.1:12345", + "tcp://255.255.255.255:65535", + "tcp://1.1.1.1:65535"]: + c.socket_url = working + self.assertEqual(c.socket_url, working) + + for broken in ["tcp://270.0.0.1:12345", + "tcp://0.270.0.1:12345", + "tcp://0.0.270.1:12345", + "tcp://0.0.0.270:12345", + "url://255.255.255.255:65535", + "whatever", + "tcp://1.1.1.1:100000"]: + with self.assertRaises(TraitError): + c.socket_url = broken diff --git a/force_bdss/tests/utils.py b/force_bdss/tests/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..a91dafd7f9869721562b94bd726cd37e4408e267 --- /dev/null +++ b/force_bdss/tests/utils.py @@ -0,0 +1,14 @@ +import sys +from contextlib import contextmanager +from six import StringIO + + +@contextmanager +def captured_output(): + new_out, new_err = StringIO(), StringIO() + old_out, old_err = sys.stdout, sys.stderr + try: + sys.stdout, sys.stderr = new_out, new_err + yield sys.stdout, sys.stderr + finally: + sys.stdout, sys.stderr = old_out, old_err diff --git a/requirements/requirements.txt b/requirements/requirements.txt index cfc27054ea4f3f6acf0be5267e8bf9a3809d9881..7f6b07c7b8c7244591f43a88cfedfd6c80dbda3c 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -2,3 +2,4 @@ envisage==4.6.0 click==6.7 six==1.10.0 stevedore==1.24.0 +pyzmq==16.0.2 diff --git a/utils/zmq_ui_server.py b/utils/zmq_ui_server.py new file mode 100644 index 0000000000000000000000000000000000000000..d249f506cedce8e7112ba3bb52d95246842170bf --- /dev/null +++ b/utils/zmq_ui_server.py @@ -0,0 +1,44 @@ +import zmq + +# Socket to talk to server +context = zmq.Context() +sub_socket = context.socket(zmq.SUB) +sub_socket.bind("tcp://*:12345") +sub_socket.setsockopt(zmq.SUBSCRIBE, "".encode("utf-8")) +sub_socket.setsockopt(zmq.LINGER, 0) + +sync_socket = context.socket(zmq.REP) +sync_socket.setsockopt(zmq.LINGER, 0) +sync_socket.bind("tcp://*:12346") + +poller = zmq.Poller() +poller.register(sub_socket) +poller.register(sync_socket) + +WAITING = 0 +RECEIVING = 1 + +state = WAITING + +while True: + events = dict(poller.poll()) + if sync_socket in events: + data = sync_socket.recv_string() + print("received ", data) + if data.startswith("HELLO\n"): + sync_socket.send_string(data) + state = RECEIVING + elif data.startswith("GOODBYE\n"): + sync_socket.send_string(data) + state = WAITING + else: + print("unknown request", data) + + if sub_socket in events: + if state == RECEIVING: + string = sub_socket.recv_string() + split_data = string.split("\n") + print(split_data) + else: + print("data while waiting. discarding") + string = sub_socket.recv_string()