Skip to content
Snippets Groups Projects
Commit 67b4fc31 authored by Stefano Borini's avatar Stefano Borini
Browse files

Separated MCO and evaluation in two different plugins

parent 6ac89a1b
No related branches found
No related tags found
1 merge request!14Separated MCO and evaluation in two different plugins
from envisage.extension_point import ExtensionPoint
from envisage.plugin import Plugin
from traits.api import List
from force_bdss.data_sources.i_data_source_bundle import (
IDataSourceBundle)
from force_bdss.kpi.i_kpi_calculator_bundle import IKPICalculatorBundle
from force_bdss.mco.i_multi_criteria_optimizer_bundle import (
IMultiCriteriaOptimizerBundle)
class BaseCoreDriver(Plugin):
"""Main plugin that handles the execution of the MCO
or the evaluation.
"""
# Note: we are forced to declare these extensions points here instead
# of the application object, and this is why we have to use this plugin.
# It is a workaround to an envisage bug that does not find the extension
# points if declared on the application.
#: A List of the available Multi Criteria Optimizers.
#: This will be populated by MCO plugins.
mco_bundles = ExtensionPoint(
List(IMultiCriteriaOptimizerBundle),
id='force.bdss.mco.bundles')
#: A list of the available Data Sources.
#: It will be populated by plugins.
data_source_bundles = ExtensionPoint(
List(IDataSourceBundle),
id='force.bdss.data_sources.bundles')
#: A list of the available Key Performance Indicator calculators.
#: It will be populated by plugins.
kpi_calculator_bundles = ExtensionPoint(
List(IKPICalculatorBundle),
id='force.bdss.kpi_calculators.bundles')
def _data_source_bundle_by_name(self, name):
for ds in self.data_source_bundles:
if ds.name == name:
return ds
raise Exception("Requested data source {} but don't know "
"to find it.".format(name))
def _kpi_calculator_bundle_by_name(self, name):
for kpic in self.kpi_calculator_bundles:
if kpic.name == name:
return kpic
raise Exception(
"Requested kpi calculator {} but don't know "
"to find it.".format(name))
def _mco_bundle_by_name(self, name):
for mco in self.mco_bundles:
if mco.name == name:
return mco
raise Exception("Requested MCO {} but it's not available"
"to compute it.".format(name))
import json
from stevedore import extension
from stevedore.exception import NoMatches
from envisage.api import Application
from envisage.core_plugin import CorePlugin
from force_bdss.core_evaluation_driver import CoreEvaluationDriver
from force_bdss.core_mco_driver import CoreMCODriver
from traits.api import Unicode, Bool, Instance
from force_bdss.workspecs.workflow import Workflow
......@@ -22,6 +29,34 @@ class BDSSApplication(Application):
#: coordination of the MCO itself. See design notes for more details.
evaluate = Bool()
def __init__(self, evaluate, workflow_filepath):
self.evaluate = evaluate
self.workflow_filepath = workflow_filepath
plugins = [CorePlugin()]
if self.evaluate:
plugins.append(CoreEvaluationDriver())
else:
plugins.append(CoreMCODriver())
mgr = extension.ExtensionManager(
namespace='force.bdss.extensions',
invoke_on_load=True
)
def import_extensions(ext):
print("Found extension {}".format(ext.name))
plugins.append(ext.obj)
try:
mgr.map(import_extensions)
except NoMatches:
print("No extensions found")
super().__init__(plugins=plugins)
def _workflow_default(self):
with open(self.workflow_filepath) as f:
return Workflow.from_json(json.load(f))
import click
from stevedore import extension
from stevedore.exception import NoMatches
from envisage.core_plugin import CorePlugin
from force_bdss.bdss_application import BDSSApplication
from force_bdss.core_mco_driver import CoreMCODriver
@click.command()
......@@ -12,27 +8,7 @@ from force_bdss.core_mco_driver import CoreMCODriver
@click.argument('workflow_filepath', type=click.Path(exists=True))
def run(evaluate, workflow_filepath):
plugins = [
CorePlugin(),
CoreMCODriver(),
]
mgr = extension.ExtensionManager(
namespace='force.bdss.extensions',
invoke_on_load=True
)
def import_extensions(ext):
print("Found extension {}".format(ext.name))
plugins.append(ext.obj)
try:
mgr.map(import_extensions)
except NoMatches:
print("No extensions found")
application = BDSSApplication(
plugins=plugins,
evaluate=evaluate,
workflow_filepath=workflow_filepath
)
......
from traits.has_traits import on_trait_change
from force_bdss.base_core_driver import BaseCoreDriver
class CoreEvaluationDriver(BaseCoreDriver):
"""Main plugin that handles the execution of the MCO
or the evaluation.
"""
@on_trait_change("application:started")
def application_started(self):
workflow = self.application.workflow
mco_data = workflow.multi_criteria_optimizer
mco_bundle = self._mco_bundle_by_name(mco_data.name)
mco_model = mco_bundle.create_model(mco_data.model_data)
mco_communicator = mco_bundle.create_communicator(
self.application,
mco_model)
parameters = mco_communicator.receive_from_mco()
ds_results = []
for requested_ds in workflow.data_sources:
ds_bundle = self._data_source_bundle_by_name(
requested_ds.name)
ds_model = ds_bundle.create_model(requested_ds.model_data)
data_source = ds_bundle.create_data_source(
self.application, ds_model)
ds_results.append(data_source.run(parameters))
kpi_results = []
for requested_kpic in workflow.kpi_calculators:
kpic_bundle = self._kpi_calculator_bundle_by_name(
requested_kpic.name)
ds_model = kpic_bundle.create_model(
requested_kpic.model_data)
kpi_calculator = kpic_bundle.create_data_source(
self.application, ds_model)
kpi_results.append(kpi_calculator.run(ds_results))
mco_communicator.send_to_mco(kpi_results)
from envisage.extension_point import ExtensionPoint
from envisage.plugin import Plugin
from traits.has_traits import on_trait_change
from traits.trait_types import List
from traits.api import on_trait_change
from force_bdss.data_sources.i_data_source_bundle import (
IDataSourceBundle)
from force_bdss.kpi.i_kpi_calculator_bundle import IKPICalculatorBundle
from force_bdss.mco.i_multi_criteria_optimizer_bundle import (
IMultiCriteriaOptimizerBundle)
from force_bdss.base_core_driver import BaseCoreDriver
class CoreMCODriver(Plugin):
class CoreMCODriver(BaseCoreDriver):
"""Main plugin that handles the execution of the MCO
or the evaluation.
"""
# Note: we are forced to declare these extensions points here instead
# of the application object, and this is why we have to use this plugin.
# It is a workaround to an envisage bug that does not find the extension
# points if declared on the application.
#: A List of the available Multi Criteria Optimizers.
#: This will be populated by MCO plugins.
mco_bundles = ExtensionPoint(
List(IMultiCriteriaOptimizerBundle),
id='force.bdss.mco.bundles')
#: A list of the available Data Sources.
#: It will be populated by plugins.
data_source_bundles = ExtensionPoint(
List(IDataSourceBundle),
id='force.bdss.data_sources.bundles')
#: A list of the available Key Performance Indicator calculators.
#: It will be populated by plugins.
kpi_calculator_bundles = ExtensionPoint(
List(IKPICalculatorBundle),
id='force.bdss.kpi_calculators.bundles')
@on_trait_change("application:started")
def application_started(self):
workflow = self.application.workflow
......@@ -46,58 +16,5 @@ class CoreMCODriver(Plugin):
mco_bundle = self._mco_bundle_by_name(mco_data.name)
mco_model = mco_bundle.create_model(mco_data.model_data)
mco = mco_bundle.create_optimizer(self.application, mco_model)
mco_communicator = mco_bundle.create_communicator(
self.application,
mco_model)
if not self.application.evaluate:
mco.run()
return
parameters = mco_communicator.receive_from_mco()
ds_results = []
for requested_ds in workflow.data_sources:
ds_bundle = self._data_source_bundle_by_name(
requested_ds.name)
ds_model = ds_bundle.create_model(requested_ds.model_data)
data_source = ds_bundle.create_data_source(
self.application, ds_model)
ds_results.append(data_source.run(parameters))
kpi_results = []
for requested_kpic in workflow.kpi_calculators:
kpic_bundle = self._kpi_calculator_bundle_by_name(
requested_kpic.name)
ds_model = kpic_bundle.create_model(
requested_kpic.model_data)
kpi_calculator = kpic_bundle.create_data_source(
self.application, ds_model)
kpi_results.append(kpi_calculator.run(ds_results))
mco_communicator.send_to_mco(kpi_results)
def _data_source_bundle_by_name(self, name):
for ds in self.data_source_bundles:
if ds.name == name:
return ds
raise Exception("Requested data source {} but don't know "
"to find it.".format(name))
def _kpi_calculator_bundle_by_name(self, name):
for kpic in self.kpi_calculator_bundles:
if kpic.name == name:
return kpic
raise Exception(
"Requested kpi calculator {} but don't know "
"to find it.".format(name))
def _mco_bundle_by_name(self, name):
for mco in self.mco_bundles:
if mco.name == name:
return mco
raise Exception("Requested MCO {} but it's not available"
"to compute it.".format(name))
mco.run()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment