From 0f80e05456dba99b3091927de873cf1160f807ff Mon Sep 17 00:00:00 2001
From: Stefano Borini <sborini@enthought.com>
Date: Thu, 10 Aug 2017 12:06:55 +0100
Subject: [PATCH] better synchronization with the UI

---
 force_bdss/core_mco_driver.py                 | 26 ++++-
 .../dummy/ui_notification/ui_notification.py  | 96 ++++++++++++-------
 utils/zmq_client.py                           | 20 ----
 utils/zmq_ui_server.py                        | 44 +++++++++
 4 files changed, 131 insertions(+), 55 deletions(-)
 delete mode 100644 utils/zmq_client.py
 create mode 100644 utils/zmq_ui_server.py

diff --git a/force_bdss/core_mco_driver.py b/force_bdss/core_mco_driver.py
index 6e17888..ee22353 100644
--- a/force_bdss/core_mco_driver.py
+++ b/force_bdss/core_mco_driver.py
@@ -15,6 +15,7 @@ from .io.workflow_reader import (
     InvalidFileException
 )
 
+log = logging.getLogger(__name__)
 CORE_MCO_DRIVER_ID = plugin_id("core", "CoreMCODriver")
 
 
@@ -38,7 +39,7 @@ class CoreMCODriver(BaseCoreDriver):
             try:
                 listener.finalize(None)
             except Exception as e:
-                logging.error(
+                log.error(
                     "Failed to finalize "
                     "listener {}: {}".format(
                         listener.__class__.__name__, str(e)))
@@ -57,7 +58,26 @@ class CoreMCODriver(BaseCoreDriver):
     @on_trait_change("mco:event")
     def _handle_mco_event(self, event):
         for listener in self.listeners:
-            listener.deliver(None, event)
+            try:
+                listener.deliver(None, event)
+            except Exception as e:
+                log.error(
+                    "Exception while delivering to listener {}: {}".format(
+                        listener.__class__.__name__,
+                        str(e)
+                    ))
+
+                try:
+                    listener.finalize()
+                except Exception:
+                    log.error(
+                        "Exception while finalizing listener {}: {}".format(
+                            listener.__class__.__name__,
+                            str(e)
+                        ))
+                    pass
+
+                self.listeners.remove(listener)
 
     def _listeners_default(self):
         listeners = []
@@ -67,7 +87,7 @@ class CoreMCODriver(BaseCoreDriver):
                 listener = factory.create_listener()
                 listener.initialize(None)
             except Exception as e:
-                logging.error(
+                log.error(
                     "Failed to create or initialize "
                     "listener with id {}: {}".format(
                         factory.id, str(e)))
diff --git a/force_bdss/core_plugins/dummy/ui_notification/ui_notification.py b/force_bdss/core_plugins/dummy/ui_notification/ui_notification.py
index 32f7764..49349d8 100644
--- a/force_bdss/core_plugins/dummy/ui_notification/ui_notification.py
+++ b/force_bdss/core_plugins/dummy/ui_notification/ui_notification.py
@@ -11,58 +11,90 @@ from force_bdss.api import (
 
 import zmq
 
+log = logging.getLogger(__name__)
+
 
 class UINotification(BaseNotificationListener):
     """
     Notification engine for the UI. Uses zeromq for the traffic handling.
     """
-    #: The ZMQ context.
+    #: The ZMQ context. If None, it means that the service is unavailable.
     _context = Instance(zmq.Context)
 
     #: The pubsub socket.
     _pub_socket = Instance(zmq.Socket)
 
-    #: The synchronization socket to recover already sent information at a
-    #: later stage
-    _rep_socket = Instance(zmq.Socket)
+    #: The synchronization socket to communicate with the server (UI)
+    _sync_socket = Instance(zmq.Socket)
+
+    def initialize(self, model):
+        self._context = zmq.Context()
 
-    #: The cache of messages as they are sent out.
-    _msg_cache = List()
+        self._pub_socket = self._context.socket(zmq.PUB)
+        self._pub_socket.setsockopt(zmq.LINGER, 0)
+        self._pub_socket.connect("tcp://127.0.0.1:12345")
+
+        self._sync_socket = self._context.socket(zmq.REQ)
+        self._sync_socket.setsockopt(zmq.LINGER, 0)
+        self._sync_socket.connect("tcp://127.0.0.1:12346")
+
+        self._sync_socket.send_string("HELLO 1")
+        events = self._sync_socket.poll(1000, zmq.POLLIN)
+        if events == 0:
+            log.info("Could not connect to UI server after 1000 ms. "
+                     "Continuing without UI notification.")
+            self._close_and_clear_sockets()
+            return
+
+        recv = self._sync_socket.recv_string()
+
+        if recv != "HELLO 1":
+            log.error(
+                ("Unexpected reply in sync"
+                 " negotiation with UI server. {}".format(recv)))
+            self._close_and_clear_sockets()
+            return
 
     def deliver(self, model, event):
-        try:
-            data = self._rep_socket.recv(flags=zmq.NOBLOCK)
-        except zmq.ZMQError as e:
-            if e.errno == errno.EAGAIN:
-                data = None
-            else:
-                logging.error("Error while receiving data from "
-                              "reply socket: {}".format(str(e)))
-
-        if data and data[0:4] == "SYNC".encode("utf-8"):
-            self._rep_socket.send_multipart(self._msg_cache)
+        if not self._context:
+            return
 
         msg = _format_event(event)
-        if msg is not None:
-            self._msg_cache.append(msg)
-            self._pub_socket.send(msg)
+        self._pub_socket.send(msg)
 
-    def initialize(self, model):
-        print("XX")
-        self._context = zmq.Context()
-        self._pub_socket = self._context.socket(zmq.PUB)
-        self._pub_socket.bind("tcp://*:12345")
+    def finalize(self, model):
+        if not self._context:
+            return
 
-        self._rep_socket = self._context.socket(zmq.REP)
-        self._rep_socket.bind("tcp://*:12346")
+        self._sync_socket.send_string("GOODBYE 1")
+        events = self._sync_socket.poll(1000, zmq.POLLIN)
+        if events == 0:
+            log.info("Could not close connection to UI server after "
+                     "1000 ms.")
+            self._close_and_clear_sockets()
+            return
 
-    def finalize(self, model):
-        self._pub_socket.close()
-        self._rep_socket.close()
-        self._context.term()
+        recv = self._sync_socket.recv_string()
+
+        if recv != "GOODBYE 1":
+            log.error(
+                ("Unexpected reply in goodbye sync"
+                 " negotiation with UI server. {}".format(recv)))
+
+        self._close_and_clear_sockets()
+
+    def _close_and_clear_sockets(self):
+        if self._pub_socket:
+            self._pub_socket.close()
+
+        if self._sync_socket:
+            self._sync_socket.close()
+
+        if self._context:
+            self._context.term()
 
         self._pub_socket = None
-        self._rep_socket = None
+        self._sync_socket = None
         self._context = None
 
 
diff --git a/utils/zmq_client.py b/utils/zmq_client.py
deleted file mode 100644
index 9ca3d5e..0000000
--- a/utils/zmq_client.py
+++ /dev/null
@@ -1,20 +0,0 @@
-import zmq
-
-# Socket to talk to server
-context = zmq.Context()
-socket = context.socket(zmq.SUB)
-socket.connect("tcp://localhost:12345")
-socket.setsockopt(zmq.SUBSCRIBE, "".encode("utf-8"))
-send_socket = context.socket(zmq.REQ)
-send_socket.connect("tcp://localhost:12346")
-
-send_socket.send("SYNC".encode("utf-8"))
-data = send_socket.recv_multipart()
-for d in data:
-    split_data = d.decode("utf-8").split("\n")
-    print("SYNCED ", split_data)
-
-while True:
-    string = socket.recv()
-    split_data = string.decode('utf-8').split("\n")
-    print(split_data)
diff --git a/utils/zmq_ui_server.py b/utils/zmq_ui_server.py
new file mode 100644
index 0000000..dfc0022
--- /dev/null
+++ b/utils/zmq_ui_server.py
@@ -0,0 +1,44 @@
+import zmq
+
+# Socket to talk to server
+context = zmq.Context()
+sub_socket = context.socket(zmq.SUB)
+sub_socket.bind("tcp://*:12345")
+sub_socket.setsockopt(zmq.SUBSCRIBE, "".encode("utf-8"))
+sub_socket.setsockopt(zmq.LINGER, 0)
+
+sync_socket = context.socket(zmq.REP)
+sync_socket.setsockopt(zmq.LINGER, 0)
+sync_socket.bind("tcp://*:12346")
+
+poller = zmq.Poller()
+poller.register(sub_socket)
+poller.register(sync_socket)
+
+WAITING = 0
+RECEIVING = 1
+
+state = WAITING
+
+while True:
+    events = dict(poller.poll())
+    if sync_socket in events:
+        data = sync_socket.recv_string()
+        print("received ", data)
+        if data == "HELLO 1":
+            sync_socket.send_string("HELLO 1")
+            state = RECEIVING
+        elif data == "GOODBYE 1":
+            sync_socket.send_string("GOODBYE 1")
+            state = WAITING
+        else:
+            print("unknown request", data)
+
+    if sub_socket in events:
+        if state == RECEIVING:
+            string = sub_socket.recv_string()
+            split_data = string.split("\n")
+            print(split_data)
+        else:
+            print("data while waiting. discarding")
+            string = sub_socket.recv_string()
-- 
GitLab