Skip to content
Snippets Groups Projects
Commit 9a843095 authored by Stefano Borini's avatar Stefano Borini
Browse files

Changed name and introduced the driver

parent 2137db7f
No related branches found
No related tags found
1 merge request!162Provide ability to run a single data source and check its results [WIP]
......@@ -32,18 +32,18 @@ class BDSSApplication(Application):
#: This entry, if not None, drives the evaluator to run a single
#: data source in the workflow. It accepts input parameters on standard
#: input and returns the output to standard output.
run_datasource = Either(Unicode(), None)
run_data_source = Either(Unicode(), None)
def __init__(self, evaluate, run_datasource, workflow_filepath):
def __init__(self, evaluate, run_data_source, workflow_filepath):
self.evaluate = evaluate
self.run_datasource = run_datasource
self.run_data_source = run_data_source
self.workflow_filepath = workflow_filepath
plugins = [CorePlugin(), FactoryRegistryPlugin()]
if self.run_datasource:
plugins.append(CoreRunDataSourceDriver(
run_datasource=run_datasource
run_data_source=run_data_source
))
elif self.evaluate:
plugins.append(CoreEvaluationDriver())
......
......@@ -13,7 +13,7 @@ push_exception_handler(reraise_exceptions=True)
@click.option("--evaluate",
is_flag=True,
help="Runs the evaluation for a single point.")
@click.option("--run-datasource",
@click.option("--run-data-source",
type=click.STRING,
help="If specified, runs only the specified datasource "
"interactively. This is useful for debugging.")
......@@ -22,7 +22,7 @@ push_exception_handler(reraise_exceptions=True)
help="If specified, the log filename. "
" If unspecified, the log will be written to stdout.")
@click.argument('workflow_filepath', type=click.Path(exists=True))
def run(evaluate, run_datasource, logfile, workflow_filepath):
def run(evaluate, run_data_source, logfile, workflow_filepath):
logging_config = {}
logging_config["level"] = logging.INFO
......@@ -36,7 +36,7 @@ def run(evaluate, run_datasource, logfile, workflow_filepath):
try:
application = BDSSApplication(
evaluate=evaluate,
run_datasource=run_datasource,
run_data_source=run_data_source,
workflow_filepath=workflow_filepath
)
......
import sys
import logging
from traits.api import on_trait_change
from traits.api import on_trait_change, Unicode
from force_bdss.core.data_value import DataValue
from force_bdss.ids import InternalPluginID
from .base_core_driver import BaseCoreDriver
......@@ -15,6 +14,8 @@ class CoreRunDataSourceDriver(BaseCoreDriver):
"""Main plugin that handles the execution of a single data source"""
id = InternalPluginID.CORE_RUN_DATASOURCE_DRIVER_ID
run_data_source = Unicode()
@on_trait_change("application:started")
def application_started(self):
try:
......@@ -23,84 +24,8 @@ class CoreRunDataSourceDriver(BaseCoreDriver):
log.exception("Unable to open workflow file.")
sys.exit(1)
mco_model = workflow.mco
if mco_model is None:
log.info("No MCO defined. Nothing to do. Exiting.")
sys.exit(0)
mco_factory = mco_model.factory
log.info("Creating communicator")
try:
mco_communicator = mco_factory.create_communicator()
except Exception:
log.exception(
"Unable to create communicator from MCO factory '{}' "
"in plugin '{}'. This may indicate a programming "
"error in the plugin".format(
mco_factory.id,
mco_factory.plugin.id))
raise
mco_data_values = _get_data_values_from_mco(
mco_model, mco_communicator)
kpi_results = execute_workflow(workflow, mco_data_values)
mco_communicator.send_to_mco(mco_model, kpi_results)
def execute_workflow(workflow, data_values):
"""Executes the given workflow using the list of data values.
Returns a list of data values for the KPI results
"""
available_data_values = data_values[:]
for index, layer in enumerate(workflow.execution_layers):
log.info("Computing data layer {}".format(index))
ds_results = _compute_layer_results(
available_data_values,
layer,
)
available_data_values += ds_results
log.info("Aggregating KPI data")
kpi_results = []
kpi_names = [kpi.name for kpi in workflow.mco.kpis]
kpi_results = [
dv
for dv in available_data_values
if dv.name in kpi_names
]
return kpi_results
def _compute_layer_results(environment_data_values,
layer,
):
"""Helper routine.
Performs the evaluation of a single layer.
At the moment we have a single layer of DataSources followed
by a single layer of KPI calculators.
Parameters
----------
environment_data_values: list
A list of data values to submit to the evaluators.
layer: ExecutionLayer
A list of the models for all the data sources
NOTE: The above parameter is going to go away as soon as we move
to unlimited layers and remove the distinction between data sources
and KPI calculators.
"""
results = []
for model in layer.data_sources:
factory = model.factory
'''factory = model.factory
try:
data_source = factory.create_data_source()
except Exception:
......@@ -211,75 +136,4 @@ def _compute_layer_results(environment_data_values,
log.info("Returned values:")
for idx, dv in enumerate(res):
log.info("{}: {}".format(idx, dv))
# Finally, return all the computed data values from all evaluators,
# properly named.
return results
def _get_data_values_from_mco(model, communicator):
"""Helper method.
Receives the data (in order) from the MCO, and bind them to the
specified names as from the model.
Parameters
----------
model: BaseMCOModel
the MCO model (where the user-defined variable names are specified)
communicator: BaseMCOCommunicator
The communicator that produces the (temporarily unnamed) datavalues
from the MCO.
"""
mco_data_values = communicator.receive_from_mco(model)
log.info("Received data from MCO: \n{}".format(
"\n".join([str(x) for x in mco_data_values])))
if len(mco_data_values) != len(model.parameters):
error_txt = ("The number of data values returned by"
" the MCO ({} values) does not match the"
" number of parameters specified ({} values)."
" This is either a MCO plugin error or the workflow"
" file is corrupted.").format(
len(mco_data_values), len(model.parameters)
)
log.error(error_txt)
raise RuntimeError(error_txt)
# The data values obtained by the communicator are unnamed.
# Assign the name to each datavalue as specified by the user.
for dv, param in zip(mco_data_values, model.parameters):
dv.name = param.name
# Exclude those who have no name set.
return [dv for dv in mco_data_values if dv.name != ""]
def _bind_data_values(available_data_values,
model_slot_map,
slots):
"""
Given the named data values in the environment, the slots a given
data source expects, and the user-specified names for each of these
slots, returns those data values with the requested names, ordered
in the correct order as specified by the slot map.
"""
passed_data_values = []
lookup_map = {dv.name: dv for dv in available_data_values}
if len(slots) != len(model_slot_map):
raise RuntimeError("The length of the slots is not equal to"
" the length of the slot map. This may"
" indicate a file error.")
try:
for slot, slot_map in zip(slots, model_slot_map):
passed_data_values.append(lookup_map[slot_map.name])
except KeyError:
raise RuntimeError(
"Unable to find requested name '{}' in available "
"data values. Current data value names: {}".format(
slot_map.name,
list(lookup_map.keys())))
return passed_data_values
'''
......@@ -19,6 +19,7 @@ class ExtensionPointID:
class InternalPluginID:
CORE_MCO_DRIVER_ID = "force.bdss.core.CoreMCODriver"
CORE_EVALUATION_DRIVER_ID = "force.bdss.core.CoreEvaluationDriver"
CORE_RUN_DATASOURCE_DRIVER_ID = "force.bdss.core.CoreRunDataSourceDriver"
def factory_id(plugin_id, identifier):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment