diff --git a/force_bdss/core_mco_driver.py b/force_bdss/core_mco_driver.py index c8e8aea2ea83788028f08ed59e3cc444fc579351..a23543704884d3fcab755cd95d31a4c82338c880 100644 --- a/force_bdss/core_mco_driver.py +++ b/force_bdss/core_mco_driver.py @@ -53,7 +53,7 @@ class CoreMCODriver(BaseCoreDriver): def _handle_mco_event(self, event): for listener in self.listeners: try: - listener.deliver(None, event) + listener.deliver(event) except Exception as e: log.error( "Exception while delivering to listener {}: {}".format( @@ -66,10 +66,11 @@ class CoreMCODriver(BaseCoreDriver): def _listeners_default(self): listeners = [] - for factory in self.factory_registry.notification_listener_factories: + for nl_model in self.workflow.notification_listeners: + factory = nl_model.factory try: listener = factory.create_listener() - listener.initialize(None) + listener.initialize(nl_model) except Exception as e: log.error( "Failed to create or initialize " diff --git a/force_bdss/core_plugins/dummy/dummy_notification_listener/dummy_notification_listener.py b/force_bdss/core_plugins/dummy/dummy_notification_listener/dummy_notification_listener.py index c40f567ba3a4ac154997cc0f1822ac5ee2e00693..da64656ded8bafa0fdcf522c84a033be2e1c19d0 100644 --- a/force_bdss/core_plugins/dummy/dummy_notification_listener/dummy_notification_listener.py +++ b/force_bdss/core_plugins/dummy/dummy_notification_listener/dummy_notification_listener.py @@ -7,7 +7,7 @@ from force_bdss.api import ( class DummyNotificationListener(BaseNotificationListener): - def deliver(self, model, event): + def deliver(self, event): if isinstance(event, (MCOStartEvent, MCOFinishEvent)): print(event.__class__.__name__) elif isinstance(event, MCOProgressEvent): @@ -18,5 +18,5 @@ class DummyNotificationListener(BaseNotificationListener): def initialize(self, model): print("Initializing") - def finalize(self, model): + def finalize(self): print("Finalizing") diff --git a/force_bdss/core_plugins/dummy/ui_notification/ui_notification.py b/force_bdss/core_plugins/dummy/ui_notification/ui_notification.py index 49349d86857289f5949cd7bcecfc6eb1d6fb2a98..9690d777eba962dfb437ff80dd10f35367b5ba0a 100644 --- a/force_bdss/core_plugins/dummy/ui_notification/ui_notification.py +++ b/force_bdss/core_plugins/dummy/ui_notification/ui_notification.py @@ -1,6 +1,5 @@ -import errno import logging -from traits.api import List, Instance +from traits.api import List, Instance, String from force_bdss.api import ( BaseNotificationListener, @@ -27,18 +26,25 @@ class UINotification(BaseNotificationListener): #: The synchronization socket to communicate with the server (UI) _sync_socket = Instance(zmq.Socket) + #: Unique identifier from the UI. To be returned in the protocol. + _identifier = String() + + #: The protocol version that this plugin delivers + _proto_version = "1" + def initialize(self, model): self._context = zmq.Context() self._pub_socket = self._context.socket(zmq.PUB) self._pub_socket.setsockopt(zmq.LINGER, 0) - self._pub_socket.connect("tcp://127.0.0.1:12345") + self._pub_socket.connect(model.pub_url) self._sync_socket = self._context.socket(zmq.REQ) self._sync_socket.setsockopt(zmq.LINGER, 0) - self._sync_socket.connect("tcp://127.0.0.1:12346") + self._sync_socket.connect(model.sync_url) - self._sync_socket.send_string("HELLO 1") + msg = "HELLO\n{}\n{}".format(self._identifier, self._proto_version) + self._sync_socket.send_string(msg) events = self._sync_socket.poll(1000, zmq.POLLIN) if events == 0: log.info("Could not connect to UI server after 1000 ms. " @@ -48,25 +54,26 @@ class UINotification(BaseNotificationListener): recv = self._sync_socket.recv_string() - if recv != "HELLO 1": + if recv != msg: log.error( ("Unexpected reply in sync" " negotiation with UI server. {}".format(recv))) self._close_and_clear_sockets() return - def deliver(self, model, event): + def deliver(self, event): if not self._context: return - msg = _format_event(event) - self._pub_socket.send(msg) + msg = _format_event(event, self._identifier) + self._pub_socket.send_string(msg) - def finalize(self, model): + def finalize(self): if not self._context: return - self._sync_socket.send_string("GOODBYE 1") + msg = "GOODBYE\n{}\n{}".format(self._identifier, self._proto_version) + self._sync_socket.send_string(msg) events = self._sync_socket.poll(1000, zmq.POLLIN) if events == 0: log.info("Could not close connection to UI server after " @@ -76,7 +83,7 @@ class UINotification(BaseNotificationListener): recv = self._sync_socket.recv_string() - if recv != "GOODBYE 1": + if recv != msg: log.error( ("Unexpected reply in goodbye sync" " negotiation with UI server. {}".format(recv))) @@ -98,7 +105,7 @@ class UINotification(BaseNotificationListener): self._context = None -def _format_event(event): +def _format_event(event, identifier): """Converts the event into a byte sequence to be transferred via zmq""" if isinstance(event, MCOStartEvent): data = "MCO_START" @@ -106,9 +113,10 @@ def _format_event(event): data = "MCO_FINISH" elif isinstance(event, MCOProgressEvent): data = "MCO_PROGRESS\n{}\n{}".format( + identifier, " ".join([str(x) for x in event.input]), " ".join([str(x) for x in event.output])) else: return None - return ("EVENT\n{}".format(data)).encode("utf-8") + return "EVENT\n{}\n{}".format(identifier, data) diff --git a/force_bdss/core_plugins/dummy/ui_notification/ui_notification_model.py b/force_bdss/core_plugins/dummy/ui_notification/ui_notification_model.py index 3f5cc1660bbced9afb96f81fa5877cda11770523..7737cfc95fae26c01f85e04dcf2f15967f1e1efb 100644 --- a/force_bdss/core_plugins/dummy/ui_notification/ui_notification_model.py +++ b/force_bdss/core_plugins/dummy/ui_notification/ui_notification_model.py @@ -1,5 +1,14 @@ -from force_bdss.api import BaseNotificationListenerModel +from traits.api import String +from force_bdss.api import ( + BaseNotificationListenerModel, ZMQSocketURL) class UINotificationModel(BaseNotificationListenerModel): - pass + #: The socket URL where the UI will be found. Synchronization port. + sync_url = ZMQSocketURL() + + #: The socket URL where the UI will be found. PubSub port. + pub_url = ZMQSocketURL() + + #: Unique identifier assigned by the UI to recognize the connection. + identifier = String() diff --git a/force_bdss/notification_listeners/base_notification_listener.py b/force_bdss/notification_listeners/base_notification_listener.py index 64b8243ae001ac658365da2e0aae298027fb98e8..d2b6910d51e894d33db0a3591be48085761e5f0e 100644 --- a/force_bdss/notification_listeners/base_notification_listener.py +++ b/force_bdss/notification_listeners/base_notification_listener.py @@ -34,7 +34,7 @@ class BaseNotificationListener(ABCHasStrictTraits): such as setting up a connection, or opening a file. """ - def finalize(self, model): + def finalize(self): """ Method used to finalize state of the listener. @@ -44,13 +44,11 @@ class BaseNotificationListener(ABCHasStrictTraits): """ @abc.abstractmethod - def deliver(self, model, event): + def deliver(self, event): """Delivers the event to the recipient Parameters ---------- - model: - The model event: MCOEvent The event to notify. """