Skip to content
Snippets Groups Projects
Commit bbbd4bec authored by Stefano Borini's avatar Stefano Borini
Browse files

Changed API and adapted to use the model information.

parent cfb39de8
No related branches found
No related tags found
1 merge request!79Deliver notification info
......@@ -53,7 +53,7 @@ class CoreMCODriver(BaseCoreDriver):
def _handle_mco_event(self, event):
for listener in self.listeners:
try:
listener.deliver(None, event)
listener.deliver(event)
except Exception as e:
log.error(
"Exception while delivering to listener {}: {}".format(
......@@ -66,10 +66,11 @@ class CoreMCODriver(BaseCoreDriver):
def _listeners_default(self):
listeners = []
for factory in self.factory_registry.notification_listener_factories:
for nl_model in self.workflow.notification_listeners:
factory = nl_model.factory
try:
listener = factory.create_listener()
listener.initialize(None)
listener.initialize(nl_model)
except Exception as e:
log.error(
"Failed to create or initialize "
......
......@@ -7,7 +7,7 @@ from force_bdss.api import (
class DummyNotificationListener(BaseNotificationListener):
def deliver(self, model, event):
def deliver(self, event):
if isinstance(event, (MCOStartEvent, MCOFinishEvent)):
print(event.__class__.__name__)
elif isinstance(event, MCOProgressEvent):
......@@ -18,5 +18,5 @@ class DummyNotificationListener(BaseNotificationListener):
def initialize(self, model):
print("Initializing")
def finalize(self, model):
def finalize(self):
print("Finalizing")
import errno
import logging
from traits.api import List, Instance
from traits.api import List, Instance, String
from force_bdss.api import (
BaseNotificationListener,
......@@ -27,18 +26,25 @@ class UINotification(BaseNotificationListener):
#: The synchronization socket to communicate with the server (UI)
_sync_socket = Instance(zmq.Socket)
#: Unique identifier from the UI. To be returned in the protocol.
_identifier = String()
#: The protocol version that this plugin delivers
_proto_version = "1"
def initialize(self, model):
self._context = zmq.Context()
self._pub_socket = self._context.socket(zmq.PUB)
self._pub_socket.setsockopt(zmq.LINGER, 0)
self._pub_socket.connect("tcp://127.0.0.1:12345")
self._pub_socket.connect(model.pub_url)
self._sync_socket = self._context.socket(zmq.REQ)
self._sync_socket.setsockopt(zmq.LINGER, 0)
self._sync_socket.connect("tcp://127.0.0.1:12346")
self._sync_socket.connect(model.sync_url)
self._sync_socket.send_string("HELLO 1")
msg = "HELLO\n{}\n{}".format(self._identifier, self._proto_version)
self._sync_socket.send_string(msg)
events = self._sync_socket.poll(1000, zmq.POLLIN)
if events == 0:
log.info("Could not connect to UI server after 1000 ms. "
......@@ -48,25 +54,26 @@ class UINotification(BaseNotificationListener):
recv = self._sync_socket.recv_string()
if recv != "HELLO 1":
if recv != msg:
log.error(
("Unexpected reply in sync"
" negotiation with UI server. {}".format(recv)))
self._close_and_clear_sockets()
return
def deliver(self, model, event):
def deliver(self, event):
if not self._context:
return
msg = _format_event(event)
self._pub_socket.send(msg)
msg = _format_event(event, self._identifier)
self._pub_socket.send_string(msg)
def finalize(self, model):
def finalize(self):
if not self._context:
return
self._sync_socket.send_string("GOODBYE 1")
msg = "GOODBYE\n{}\n{}".format(self._identifier, self._proto_version)
self._sync_socket.send_string(msg)
events = self._sync_socket.poll(1000, zmq.POLLIN)
if events == 0:
log.info("Could not close connection to UI server after "
......@@ -76,7 +83,7 @@ class UINotification(BaseNotificationListener):
recv = self._sync_socket.recv_string()
if recv != "GOODBYE 1":
if recv != msg:
log.error(
("Unexpected reply in goodbye sync"
" negotiation with UI server. {}".format(recv)))
......@@ -98,7 +105,7 @@ class UINotification(BaseNotificationListener):
self._context = None
def _format_event(event):
def _format_event(event, identifier):
"""Converts the event into a byte sequence to be transferred via zmq"""
if isinstance(event, MCOStartEvent):
data = "MCO_START"
......@@ -106,9 +113,10 @@ def _format_event(event):
data = "MCO_FINISH"
elif isinstance(event, MCOProgressEvent):
data = "MCO_PROGRESS\n{}\n{}".format(
identifier,
" ".join([str(x) for x in event.input]),
" ".join([str(x) for x in event.output]))
else:
return None
return ("EVENT\n{}".format(data)).encode("utf-8")
return "EVENT\n{}\n{}".format(identifier, data)
from force_bdss.api import BaseNotificationListenerModel
from traits.api import String
from force_bdss.api import (
BaseNotificationListenerModel, ZMQSocketURL)
class UINotificationModel(BaseNotificationListenerModel):
pass
#: The socket URL where the UI will be found. Synchronization port.
sync_url = ZMQSocketURL()
#: The socket URL where the UI will be found. PubSub port.
pub_url = ZMQSocketURL()
#: Unique identifier assigned by the UI to recognize the connection.
identifier = String()
......@@ -34,7 +34,7 @@ class BaseNotificationListener(ABCHasStrictTraits):
such as setting up a connection, or opening a file.
"""
def finalize(self, model):
def finalize(self):
"""
Method used to finalize state of the listener.
......@@ -44,13 +44,11 @@ class BaseNotificationListener(ABCHasStrictTraits):
"""
@abc.abstractmethod
def deliver(self, model, event):
def deliver(self, event):
"""Delivers the event to the recipient
Parameters
----------
model:
The model
event: MCOEvent
The event to notify.
"""
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment