Skip to content
Snippets Groups Projects
execution.py 7.19 KiB
Newer Older
import logging
from force_bdss.core.data_value import DataValue

log = logging.getLogger(__name__)


def execute_workflow(workflow, data_values):
    """Executes the given workflow using the list of data values.
    Returns a list of data values for the KPI results

    Parameters
    ----------
    workflow: Workflow
        The instance of the workflow

    data_values: List
        The data values that the MCO generally provides.

    Returns
    -------
    list: A list of DataValues containing the KPI results.
    """

    available_data_values = data_values[:]
    for index, layer in enumerate(workflow.execution_layers):
        log.info("Computing data layer {}".format(index))
        ds_results = execute_layer(layer, available_data_values)
        available_data_values += ds_results

    log.info("Aggregating KPI data")

    kpi_results = []
    kpi_names = [kpi.name for kpi in workflow.mco.kpis]

    kpi_results = [
        dv
        for dv in available_data_values
        if dv.name in kpi_names
    ]

    return kpi_results


def execute_layer(layer, environment_data_values):
    """Helper routine.
    Performs the evaluation of a single layer.
    At the moment we have a single layer of DataSources followed
    by a single layer of KPI calculators.

    Parameters
    ----------
    layer: ExecutionLayer
        A list of the models for all the data sources

Stefano Borini's avatar
Stefano Borini committed
    environment_data_values: list
        A list of data values to submit to the evaluators.

    NOTE: The above parameter is going to go away as soon as we move
    to unlimited layers and remove the distinction between data sources
    and KPI calculators.
    """
    results = []

    for model in layer.data_sources:
        factory = model.factory
        try:
            data_source = factory.create_data_source()
        except Exception:
            log.exception(
                "Unable to create data source from factory '{}' "
                "in plugin '{}'. This may indicate a programming "
                "error in the plugin".format(
                    factory.id,
                    factory.plugin.id))
            raise

        # Get the slots for this data source. These must be matched to
        # the appropriate values in the environment data values.
        # Matching is by position.
        in_slots, out_slots = data_source.slots(model)

        # Binding performs the extraction of the specified data values
        # satisfying the above input slots from the environment data values
        # considering what the user specified in terms of names (which is
        # in the model input slot info
        # The resulting data are the ones picked by name from the
        # environment data values, and in the appropriate ordering as
        # needed by the input slots.
        passed_data_values = _bind_data_values(
            environment_data_values,
            model.input_slot_info,
            in_slots)

        # execute data source, passing only relevant data values.
        log.info("Evaluating for Data Source {}".format(
            factory.name))
        log.info("Passed values:")
        for idx, dv in enumerate(passed_data_values):
            log.info("{}: {}".format(idx, dv))

        try:
            res = data_source.run(model, passed_data_values)
        except Exception:
            log.exception(
                "Evaluation could not be performed. "
                "Run method raised exception.")
            raise

        if not isinstance(res, list):
            error_txt = (
                "The run method of data source {} must return a list."
                " It returned instead {}. Fix the run() method to return"
                " the appropriate entity.".format(
                    factory.name,
                    type(res)
                ))
            log.error(error_txt)
            raise RuntimeError(error_txt)

        if len(res) != len(out_slots):
            error_txt = (
                "The number of data values ({} values) returned"
                " by '{}' does not match the number"
                " of output slots it specifies ({} values)."
                " This is likely a plugin error.").format(
                len(res), factory.name, len(out_slots)
            )

            log.error(error_txt)
            raise RuntimeError(error_txt)

        if len(res) != len(model.output_slot_info):
            error_txt = (
                "The number of data values ({} values) returned"
                " by '{}' does not match the number"
                " of user-defined names specified ({} values)."
                " This is either a plugin error or a file"
                " error.").format(
                len(res),
                factory.name,
                len(model.output_slot_info)
            )

            log.error(error_txt)
            raise RuntimeError(error_txt)

        for idx, dv in enumerate(res):
            if not isinstance(dv, DataValue):
                error_txt = (
                    "The result list returned by DataSource {} contains"
                    " an entry that is not a DataValue. An entry of type"
                    " {} was instead found in position {}."
                    " Fix the DataSource.run() method"
                    " to return the appropriate entity.".format(
                        factory.name,
                        type(dv),
                        idx
                    )
                )
                log.error(error_txt)
                raise RuntimeError(error_txt)

        # At this point, the returned data values are unnamed.
        # Add the names as specified by the user.
        for dv, output_slot_info in zip(res, model.output_slot_info):
            dv.name = output_slot_info.name

        # If the name was not specified, simply discard the value,
        # because apparently the user is not interested in it.
        res = [r for r in res if r.name != ""]
        results.extend(res)

        log.info("Returned values:")
        for idx, dv in enumerate(res):
            log.info("{}: {}".format(idx, dv))

    # Finally, return all the computed data values from all evaluators,
    # properly named.
    return results


def _bind_data_values(available_data_values,
                      model_slot_map,
                      slots):
    """
    Given the named data values in the environment, the slots a given
    data source expects, and the user-specified names for each of these
    slots, returns those data values with the requested names, ordered
    in the correct order as specified by the slot map.
    """
    passed_data_values = []
    lookup_map = {dv.name: dv for dv in available_data_values}

    if len(slots) != len(model_slot_map):
        raise RuntimeError("The length of the slots is not equal to"
                           " the length of the slot map. This may"
                           " indicate a file error.")

    try:
        for slot, slot_map in zip(slots, model_slot_map):
            passed_data_values.append(lookup_map[slot_map.name])
    except KeyError:
        raise RuntimeError(
            "Unable to find requested name '{}' in available "
            "data values. Current data value names: {}".format(
                slot_map.name,
                list(lookup_map.keys())))

    return passed_data_values