Skip to content
Snippets Groups Projects
Commit f816ba15 authored by Stefano Borini's avatar Stefano Borini
Browse files

Removed ZMQSocketURL as it's now only living in the UI

parent 3fde61c3
No related branches found
No related tags found
1 merge request!92Removed ZMQSocketURL as it's now only living in the UI
......@@ -28,4 +28,4 @@ from .notification_listeners.base_notification_listener import BaseNotificationL
from .notification_listeners.base_notification_listener_factory import BaseNotificationListenerFactory # noqa
from .notification_listeners.base_notification_listener_model import BaseNotificationListenerModel # noqa
from .local_traits import (ZMQSocketURL, Identifier) # noqa
from .local_traits import Identifier # noqa
import re
from traits.api import Regex, BaseStr, String
from traits.api import Regex, String
#: Used for variable names, but allow also empty string as it's the default
#: case and it will be present if the workflow is saved before actually
#: specifying the value.
Identifier = Regex(regex="(^[^\d\W]\w*\Z|^\Z)")
class ZMQSocketURL(BaseStr):
def validate(self, object, name, value):
super(ZMQSocketURL, self).validate(object, name, value)
m = re.match(
"tcp://(\\d{1,3})\.(\\d{1,3})\.(\\d{1,3})\.(\\d{1,3}):(\\d+)",
value)
if m is None:
self.error(object, name, value)
a, b, c, d, port = m.groups()
if not all(map(lambda x: 0 <= int(x) <= 255, (a, b, c, d))):
self.error(object, name, value)
if not (1 <= int(port) <= 65535):
self.error(object, name, value)
return value
#: Identifies a CUBA type with its key. At the moment a String with
#: no validation, but will come later.
CUBAType = String()
import unittest
from traits.api import HasStrictTraits, TraitError
from force_bdss.local_traits import Identifier, CUBAType, ZMQSocketURL
from force_bdss.local_traits import Identifier, CUBAType
class Traited(HasStrictTraits):
val = Identifier()
cuba = CUBAType()
socket_url = ZMQSocketURL()
class TestLocalTraits(unittest.TestCase):
......@@ -26,22 +25,3 @@ class TestLocalTraits(unittest.TestCase):
c = Traited()
c.cuba = "PRESSURE"
self.assertEqual(c.cuba, "PRESSURE")
def test_zmq_socket_url(self):
c = Traited()
for working in ["tcp://127.0.0.1:12345",
"tcp://255.255.255.255:65535",
"tcp://1.1.1.1:65535"]:
c.socket_url = working
self.assertEqual(c.socket_url, working)
for broken in ["tcp://270.0.0.1:12345",
"tcp://0.270.0.1:12345",
"tcp://0.0.270.1:12345",
"tcp://0.0.0.270:12345",
"url://255.255.255.255:65535",
"whatever",
"tcp://1.1.1.1:100000"]:
with self.assertRaises(TraitError):
c.socket_url = broken
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment