Newer
Older
from stevedore import extension
from stevedore.exception import NoMatches
from envisage.api import Application
from envisage.core_plugin import CorePlugin
from force_bdss.core_evaluation_driver import CoreEvaluationDriver
from force_bdss.core_mco_driver import CoreMCODriver
from traits.api import Unicode, Bool, Instance
from force_bdss.workspecs.workflow import Workflow
class BDSSApplication(Application):
"""Main application for the BDSS.
"""
id = "force_bdss.bdss_application"
#: The path of the workflow file to open
workflow_filepath = Unicode()
#: Deserialized content of the workflow file.
workflow = Instance(Workflow)
#: This flags signals to the application not to execute and orchestrate
#: the MCO, but instead to perform a single evaluation under the
#: coordination of the MCO itself. See design notes for more details.
evaluate = Bool()
def __init__(self, evaluate, workflow_filepath):
self.evaluate = evaluate
self.workflow_filepath = workflow_filepath
plugins = [CorePlugin()]
if self.evaluate:
plugins.append(CoreEvaluationDriver())
else:
plugins.append(CoreMCODriver())
mgr = extension.ExtensionManager(
namespace='force.bdss.extensions',
invoke_on_load=True
)
def import_extensions(ext):
print("Found extension {}".format(ext.name))
plugins.append(ext.obj)
try:
mgr.map(import_extensions)
except NoMatches:
print("No extensions found")
super().__init__(plugins=plugins)
def _workflow_default(self):
with open(self.workflow_filepath) as f:
return Workflow.from_json(json.load(f))