Skip to content
Snippets Groups Projects

Deliver notification info

Merged Adham Hashibon requested to merge deliver-to-ui into master
4 files
+ 131
55
Compare changes
  • Side-by-side
  • Inline
Files
4
@@ -11,58 +11,90 @@ from force_bdss.api import (
import zmq
log = logging.getLogger(__name__)
class UINotification(BaseNotificationListener):
"""
Notification engine for the UI. Uses zeromq for the traffic handling.
"""
#: The ZMQ context.
#: The ZMQ context. If None, it means that the service is unavailable.
_context = Instance(zmq.Context)
#: The pubsub socket.
_pub_socket = Instance(zmq.Socket)
#: The synchronization socket to recover already sent information at a
#: later stage
_rep_socket = Instance(zmq.Socket)
#: The synchronization socket to communicate with the server (UI)
_sync_socket = Instance(zmq.Socket)
def initialize(self, model):
self._context = zmq.Context()
#: The cache of messages as they are sent out.
_msg_cache = List()
self._pub_socket = self._context.socket(zmq.PUB)
self._pub_socket.setsockopt(zmq.LINGER, 0)
self._pub_socket.connect("tcp://127.0.0.1:12345")
self._sync_socket = self._context.socket(zmq.REQ)
self._sync_socket.setsockopt(zmq.LINGER, 0)
self._sync_socket.connect("tcp://127.0.0.1:12346")
self._sync_socket.send_string("HELLO 1")
events = self._sync_socket.poll(1000, zmq.POLLIN)
if events == 0:
log.info("Could not connect to UI server after 1000 ms. "
"Continuing without UI notification.")
self._close_and_clear_sockets()
return
recv = self._sync_socket.recv_string()
if recv != "HELLO 1":
log.error(
("Unexpected reply in sync"
" negotiation with UI server. {}".format(recv)))
self._close_and_clear_sockets()
return
def deliver(self, model, event):
try:
data = self._rep_socket.recv(flags=zmq.NOBLOCK)
except zmq.ZMQError as e:
if e.errno == errno.EAGAIN:
data = None
else:
logging.error("Error while receiving data from "
"reply socket: {}".format(str(e)))
if data and data[0:4] == "SYNC".encode("utf-8"):
self._rep_socket.send_multipart(self._msg_cache)
if not self._context:
return
msg = _format_event(event)
if msg is not None:
self._msg_cache.append(msg)
self._pub_socket.send(msg)
self._pub_socket.send(msg)
def initialize(self, model):
print("XX")
self._context = zmq.Context()
self._pub_socket = self._context.socket(zmq.PUB)
self._pub_socket.bind("tcp://*:12345")
def finalize(self, model):
if not self._context:
return
self._rep_socket = self._context.socket(zmq.REP)
self._rep_socket.bind("tcp://*:12346")
self._sync_socket.send_string("GOODBYE 1")
events = self._sync_socket.poll(1000, zmq.POLLIN)
if events == 0:
log.info("Could not close connection to UI server after "
"1000 ms.")
self._close_and_clear_sockets()
return
def finalize(self, model):
self._pub_socket.close()
self._rep_socket.close()
self._context.term()
recv = self._sync_socket.recv_string()
if recv != "GOODBYE 1":
log.error(
("Unexpected reply in goodbye sync"
" negotiation with UI server. {}".format(recv)))
self._close_and_clear_sockets()
def _close_and_clear_sockets(self):
if self._pub_socket:
self._pub_socket.close()
if self._sync_socket:
self._sync_socket.close()
if self._context:
self._context.term()
self._pub_socket = None
self._rep_socket = None
self._sync_socket = None
self._context = None
Loading