Skip to content
Snippets Groups Projects
Unverified Commit 93a80a59 authored by Mikael Arguedas's avatar Mikael Arguedas
Browse files

explicit argument for API functions instead of argparse structure


Signed-off-by: default avatarMikael Arguedas <mikael.arguedas@gmail.com>
parent e1d01f07
No related branches found
No related tags found
No related merge requests found
......@@ -221,32 +221,29 @@ def create_signed_governance_file(signed_gov_path, gov_path, ca_cert_path, ca_ke
(openssl_executable, gov_path, signed_gov_path, ca_cert_path, ca_key_path))
def create_keystore(args):
root = args.ROOT
print(args)
if not os.path.exists(root):
print('creating directory: %s' % root)
os.makedirs(root, exist_ok=True)
def create_keystore(keystore_path):
if not os.path.exists(keystore_path):
print('creating directory: %s' % keystore_path)
os.makedirs(keystore_path, exist_ok=True)
else:
print('directory already exists: %s' % root)
print('directory already exists: %s' % keystore_path)
ca_conf_path = os.path.join(root, 'ca_conf.cnf')
ca_conf_path = os.path.join(keystore_path, 'ca_conf.cnf')
if not os.path.isfile(ca_conf_path):
print('creating CA file: %s' % ca_conf_path)
create_ca_conf_file(ca_conf_path)
else:
print('found CA conf file, not writing a new one!')
ecdsa_param_path = os.path.join(root, 'ecdsaparam')
ecdsa_param_path = os.path.join(keystore_path, 'ecdsaparam')
if not os.path.isfile(ecdsa_param_path):
print('creating ECDSA param file: %s' % ecdsa_param_path)
create_ecdsa_param_file(ecdsa_param_path)
else:
print('found ECDSA param file, not writing a new one!')
ca_key_path = os.path.join(root, 'ca.key.pem')
ca_cert_path = os.path.join(root, 'ca.cert.pem')
ca_key_path = os.path.join(keystore_path, 'ca.key.pem')
ca_cert_path = os.path.join(keystore_path, 'ca.cert.pem')
if not (os.path.isfile(ca_key_path) and os.path.isfile(ca_cert_path)):
print('creating new CA key/cert pair')
create_ca_key_cert(ecdsa_param_path, ca_conf_path, ca_key_path, ca_cert_path)
......@@ -254,7 +251,7 @@ def create_keystore(args):
print('found CA key and cert, not creating new ones!')
# create governance file
gov_path = os.path.join(root, 'governance.xml')
gov_path = os.path.join(keystore_path, 'governance.xml')
if not os.path.isfile(gov_path):
print('creating governance file: %s' % gov_path)
domain_id = os.getenv('ROS_DOMAIN_ID', '0')
......@@ -263,7 +260,7 @@ def create_keystore(args):
print('found governance file, not creating a new one!')
# sign governance file
signed_gov_path = os.path.join(root, 'governance.p7s')
signed_gov_path = os.path.join(keystore_path, 'governance.p7s')
if not os.path.isfile(signed_gov_path):
print('creating signed governance file: %s' % signed_gov_path)
create_signed_governance_file(signed_gov_path, gov_path, ca_cert_path, ca_key_path)
......@@ -271,18 +268,18 @@ def create_keystore(args):
print('found signed governance file, not creating a new one!')
# create index file
index_path = os.path.join(root, 'index.txt')
index_path = os.path.join(keystore_path, 'index.txt')
if not os.path.isfile(index_path):
with open(index_path, 'a'):
pass
# create serial file
serial_path = os.path.join(root, 'serial')
serial_path = os.path.join(keystore_path, 'serial')
if not os.path.isfile(serial_path):
with open(serial_path, 'w') as f:
f.write('1000')
print('all done! enjoy your keystore in %s' % root)
print('all done! enjoy your keystore in %s' % keystore_path)
print('cheers!')
return True
......@@ -391,54 +388,47 @@ def create_signed_permissions_file(
(openssl_executable, permissions_path, signed_permissions_path, ca_cert_path, ca_key_path))
def create_permission(args):
print(args)
root = args.ROOT
name = args.NAME
policy_file_path = args.POLICY_FILE_PATH
def create_permission(keystore_path, identity, policy_file_path):
domain_id = os.getenv('ROS_DOMAIN_ID', '0')
relative_path = os.path.normpath(name.lstrip('/'))
key_dir = os.path.join(root, relative_path)
relative_path = os.path.normpath(identity.lstrip('/'))
key_dir = os.path.join(keystore_path, relative_path)
print('key_dir %s' % key_dir)
policy_element = get_policy(name, policy_file_path)
policy_element = get_policy(identity, policy_file_path)
permissions_path = os.path.join(key_dir, 'permissions.xml')
create_permission_file(permissions_path, domain_id, policy_element)
signed_permissions_path = os.path.join(key_dir, 'permissions.p7s')
keystore_ca_cert_path = os.path.join(root, 'ca.cert.pem')
keystore_ca_key_path = os.path.join(root, 'ca.key.pem')
keystore_ca_cert_path = os.path.join(keystore_path, 'ca.cert.pem')
keystore_ca_key_path = os.path.join(keystore_path, 'ca.key.pem')
create_signed_permissions_file(
permissions_path, signed_permissions_path,
keystore_ca_cert_path, keystore_ca_key_path)
return True
def create_key(args):
print(args)
root = args.ROOT
name = args.NAME
if not is_valid_keystore(root):
print('root path is not a valid keystore: %s' % root)
def create_key(keystore_path, identity):
if not is_valid_keystore(keystore_path):
print("'%s' is not a valid keystore " % keystore_path)
return False
if not is_key_name_valid(name):
print('bad character in requested key name: %s' % name)
if not is_key_name_valid(identity):
print("bad character in requested identity: '%s'" % identity)
return False
print('creating key for node name: %s' % name)
print("creating key for identity: '%s'" % identity)
relative_path = os.path.normpath(name.lstrip('/'))
key_dir = os.path.join(root, relative_path)
relative_path = os.path.normpath(identity.lstrip('/'))
key_dir = os.path.join(keystore_path, relative_path)
os.makedirs(key_dir, exist_ok=True)
# copy the CA cert in there
keystore_ca_cert_path = os.path.join(root, 'ca.cert.pem')
keystore_ca_cert_path = os.path.join(keystore_path, 'ca.cert.pem')
dest_identity_ca_cert_path = os.path.join(key_dir, 'identity_ca.cert.pem')
dest_permissions_ca_cert_path = os.path.join(key_dir, 'permissions_ca.cert.pem')
shutil.copyfile(keystore_ca_cert_path, dest_identity_ca_cert_path)
shutil.copyfile(keystore_ca_cert_path, dest_permissions_ca_cert_path)
# copy the governance file in there
keystore_governance_path = os.path.join(root, 'governance.p7s')
keystore_governance_path = os.path.join(keystore_path, 'governance.p7s')
dest_governance_path = os.path.join(key_dir, 'governance.p7s')
shutil.copyfile(keystore_governance_path, dest_governance_path)
......@@ -451,7 +441,7 @@ def create_key(args):
cnf_path = os.path.join(key_dir, 'request.cnf')
if not os.path.isfile(cnf_path):
create_request_file(cnf_path, name)
create_request_file(cnf_path, identity)
else:
print('config file exists, not creating a new one: %s' % cnf_path)
......@@ -460,7 +450,7 @@ def create_key(args):
if not os.path.isfile(key_path) or not os.path.isfile(req_path):
print('creating key and cert request')
create_key_and_cert_req(
root,
keystore_path,
relative_path,
cnf_path,
ecdsa_param_path,
......@@ -471,7 +461,7 @@ def create_key(args):
cert_path = os.path.join(key_dir, 'cert.pem')
if not os.path.isfile(cert_path):
print('creating cert')
create_cert(root, relative_path)
create_cert(keystore_path, relative_path)
else:
print('found cert; not creating a new one!')
......@@ -480,7 +470,7 @@ def create_key(args):
policy_file_path = get_policy_default('policy.xml')
policy_element = get_policy('/default', policy_file_path)
profile_element = policy_element.find('profiles/profile')
ns, node = name.rsplit('/', 1)
ns, node = identity.rsplit('/', 1)
ns = '/' if not ns else ns
profile_element.attrib['ns'] = ns
profile_element.attrib['node'] = node
......@@ -490,7 +480,7 @@ def create_key(args):
create_permission_file(permissions_path, domain_id, policy_element)
signed_permissions_path = os.path.join(key_dir, 'permissions.p7s')
keystore_ca_key_path = os.path.join(root, 'ca.key.pem')
keystore_ca_key_path = os.path.join(keystore_path, 'ca.key.pem')
create_signed_permissions_file(
permissions_path, signed_permissions_path,
keystore_ca_cert_path, keystore_ca_key_path)
......@@ -498,12 +488,12 @@ def create_key(args):
return True
def list_keys(args):
for name in os.listdir(args.ROOT):
if os.path.isdir(os.path.join(args.ROOT, name)):
def list_keys(keystore_path):
for name in os.listdir(keystore_path):
if os.path.isdir(os.path.join(keystore_path, name)):
print(name)
return True
def distribute_key(args):
def distribute_key(source_keystore_path, taget_keystore_path):
raise NotImplementedError()
......@@ -31,5 +31,5 @@ class CreateKeyVerb(VerbExtension):
parser.add_argument('NAME', help='key name, aka ROS node name')
def main(self, *, args):
success = create_key(args)
success = create_key(args.ROOT, args.NAME)
return 0 if success else 1
......@@ -30,5 +30,5 @@ class CreateKeystoreVerb(VerbExtension):
arg.completer = DirectoriesCompleter()
def main(self, *, args):
success = create_keystore(args)
success = create_keystore(args.ROOT)
return 0 if success else 1
......@@ -41,7 +41,7 @@ class CreatePermissionVerb(VerbExtension):
def main(self, *, args):
try:
success = create_permission(args)
success = create_permission(args.ROOT, args.NAME, args.POLICY_FILE_PATH)
except FileNotFoundError as e:
raise RuntimeError(str(e))
return 0 if success else 1
......@@ -32,5 +32,5 @@ class DistributeKeyVerb(VerbExtension):
arg.completer = DirectoriesCompleter()
def main(self, *, args):
success = distribute_key(args)
success = distribute_key(args.ROOT, args.TARGET)
return 0 if success else 1
......@@ -30,5 +30,5 @@ class ListKeysVerb(VerbExtension):
arg.completer = DirectoriesCompleter()
def main(self, *, args):
success = list_keys(args)
success = list_keys(args.ROOT)
return 0 if success else 1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment