Skip to content
Snippets Groups Projects
Commit 0f80e054 authored by Stefano Borini's avatar Stefano Borini
Browse files

better synchronization with the UI

parent 48827729
No related branches found
No related tags found
1 merge request!79Deliver notification info
This commit is part of merge request !79. Comments created here will be created in the context of that merge request.
......@@ -15,6 +15,7 @@ from .io.workflow_reader import (
InvalidFileException
)
log = logging.getLogger(__name__)
CORE_MCO_DRIVER_ID = plugin_id("core", "CoreMCODriver")
......@@ -38,7 +39,7 @@ class CoreMCODriver(BaseCoreDriver):
try:
listener.finalize(None)
except Exception as e:
logging.error(
log.error(
"Failed to finalize "
"listener {}: {}".format(
listener.__class__.__name__, str(e)))
......@@ -57,7 +58,26 @@ class CoreMCODriver(BaseCoreDriver):
@on_trait_change("mco:event")
def _handle_mco_event(self, event):
for listener in self.listeners:
listener.deliver(None, event)
try:
listener.deliver(None, event)
except Exception as e:
log.error(
"Exception while delivering to listener {}: {}".format(
listener.__class__.__name__,
str(e)
))
try:
listener.finalize()
except Exception:
log.error(
"Exception while finalizing listener {}: {}".format(
listener.__class__.__name__,
str(e)
))
pass
self.listeners.remove(listener)
def _listeners_default(self):
listeners = []
......@@ -67,7 +87,7 @@ class CoreMCODriver(BaseCoreDriver):
listener = factory.create_listener()
listener.initialize(None)
except Exception as e:
logging.error(
log.error(
"Failed to create or initialize "
"listener with id {}: {}".format(
factory.id, str(e)))
......
......@@ -11,58 +11,90 @@ from force_bdss.api import (
import zmq
log = logging.getLogger(__name__)
class UINotification(BaseNotificationListener):
"""
Notification engine for the UI. Uses zeromq for the traffic handling.
"""
#: The ZMQ context.
#: The ZMQ context. If None, it means that the service is unavailable.
_context = Instance(zmq.Context)
#: The pubsub socket.
_pub_socket = Instance(zmq.Socket)
#: The synchronization socket to recover already sent information at a
#: later stage
_rep_socket = Instance(zmq.Socket)
#: The synchronization socket to communicate with the server (UI)
_sync_socket = Instance(zmq.Socket)
def initialize(self, model):
self._context = zmq.Context()
#: The cache of messages as they are sent out.
_msg_cache = List()
self._pub_socket = self._context.socket(zmq.PUB)
self._pub_socket.setsockopt(zmq.LINGER, 0)
self._pub_socket.connect("tcp://127.0.0.1:12345")
self._sync_socket = self._context.socket(zmq.REQ)
self._sync_socket.setsockopt(zmq.LINGER, 0)
self._sync_socket.connect("tcp://127.0.0.1:12346")
self._sync_socket.send_string("HELLO 1")
events = self._sync_socket.poll(1000, zmq.POLLIN)
if events == 0:
log.info("Could not connect to UI server after 1000 ms. "
"Continuing without UI notification.")
self._close_and_clear_sockets()
return
recv = self._sync_socket.recv_string()
if recv != "HELLO 1":
log.error(
("Unexpected reply in sync"
" negotiation with UI server. {}".format(recv)))
self._close_and_clear_sockets()
return
def deliver(self, model, event):
try:
data = self._rep_socket.recv(flags=zmq.NOBLOCK)
except zmq.ZMQError as e:
if e.errno == errno.EAGAIN:
data = None
else:
logging.error("Error while receiving data from "
"reply socket: {}".format(str(e)))
if data and data[0:4] == "SYNC".encode("utf-8"):
self._rep_socket.send_multipart(self._msg_cache)
if not self._context:
return
msg = _format_event(event)
if msg is not None:
self._msg_cache.append(msg)
self._pub_socket.send(msg)
self._pub_socket.send(msg)
def initialize(self, model):
print("XX")
self._context = zmq.Context()
self._pub_socket = self._context.socket(zmq.PUB)
self._pub_socket.bind("tcp://*:12345")
def finalize(self, model):
if not self._context:
return
self._rep_socket = self._context.socket(zmq.REP)
self._rep_socket.bind("tcp://*:12346")
self._sync_socket.send_string("GOODBYE 1")
events = self._sync_socket.poll(1000, zmq.POLLIN)
if events == 0:
log.info("Could not close connection to UI server after "
"1000 ms.")
self._close_and_clear_sockets()
return
def finalize(self, model):
self._pub_socket.close()
self._rep_socket.close()
self._context.term()
recv = self._sync_socket.recv_string()
if recv != "GOODBYE 1":
log.error(
("Unexpected reply in goodbye sync"
" negotiation with UI server. {}".format(recv)))
self._close_and_clear_sockets()
def _close_and_clear_sockets(self):
if self._pub_socket:
self._pub_socket.close()
if self._sync_socket:
self._sync_socket.close()
if self._context:
self._context.term()
self._pub_socket = None
self._rep_socket = None
self._sync_socket = None
self._context = None
......
import zmq
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:12345")
socket.setsockopt(zmq.SUBSCRIBE, "".encode("utf-8"))
send_socket = context.socket(zmq.REQ)
send_socket.connect("tcp://localhost:12346")
send_socket.send("SYNC".encode("utf-8"))
data = send_socket.recv_multipart()
for d in data:
split_data = d.decode("utf-8").split("\n")
print("SYNCED ", split_data)
while True:
string = socket.recv()
split_data = string.decode('utf-8').split("\n")
print(split_data)
import zmq
# Socket to talk to server
context = zmq.Context()
sub_socket = context.socket(zmq.SUB)
sub_socket.bind("tcp://*:12345")
sub_socket.setsockopt(zmq.SUBSCRIBE, "".encode("utf-8"))
sub_socket.setsockopt(zmq.LINGER, 0)
sync_socket = context.socket(zmq.REP)
sync_socket.setsockopt(zmq.LINGER, 0)
sync_socket.bind("tcp://*:12346")
poller = zmq.Poller()
poller.register(sub_socket)
poller.register(sync_socket)
WAITING = 0
RECEIVING = 1
state = WAITING
while True:
events = dict(poller.poll())
if sync_socket in events:
data = sync_socket.recv_string()
print("received ", data)
if data == "HELLO 1":
sync_socket.send_string("HELLO 1")
state = RECEIVING
elif data == "GOODBYE 1":
sync_socket.send_string("GOODBYE 1")
state = WAITING
else:
print("unknown request", data)
if sub_socket in events:
if state == RECEIVING:
string = sub_socket.recv_string()
split_data = string.split("\n")
print(split_data)
else:
print("data while waiting. discarding")
string = sub_socket.recv_string()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment