Newer
Older
import logging
from force_bdss.core.data_value import DataValue
log = logging.getLogger(__name__)
def execute_workflow(workflow, data_values):
"""Executes the given workflow using the list of data values.
Returns a list of data values for the KPI results
Parameters
----------
workflow: Workflow
The instance of the workflow
data_values: List
The data values that the MCO generally provides.
Returns
-------
list: A list of DataValues containing the KPI results.
"""
available_data_values = data_values[:]
for index, layer in enumerate(workflow.execution_layers):
log.info("Computing data layer {}".format(index))
ds_results = execute_layer(layer, available_data_values)
available_data_values += ds_results
log.info("Aggregating KPI data")
kpi_results = []
kpi_names = [kpi.name for kpi in workflow.mco.kpis]
kpi_results = [
dv
for dv in available_data_values
if dv.name in kpi_names
]
return kpi_results
def execute_layer(layer, environment_data_values):
"""Helper routine.
Performs the evaluation of a single layer.
At the moment we have a single layer of DataSources followed
by a single layer of KPI calculators.
Parameters
----------
layer: ExecutionLayer
A list of the models for all the data sources
environment_data_values: list
A list of data values to submit to the evaluators.
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
NOTE: The above parameter is going to go away as soon as we move
to unlimited layers and remove the distinction between data sources
and KPI calculators.
"""
results = []
for model in layer.data_sources:
factory = model.factory
try:
data_source = factory.create_data_source()
except Exception:
log.exception(
"Unable to create data source from factory '{}' "
"in plugin '{}'. This may indicate a programming "
"error in the plugin".format(
factory.id,
factory.plugin.id))
raise
# Get the slots for this data source. These must be matched to
# the appropriate values in the environment data values.
# Matching is by position.
in_slots, out_slots = data_source.slots(model)
# Binding performs the extraction of the specified data values
# satisfying the above input slots from the environment data values
# considering what the user specified in terms of names (which is
# in the model input slot info
# The resulting data are the ones picked by name from the
# environment data values, and in the appropriate ordering as
# needed by the input slots.
passed_data_values = _bind_data_values(
environment_data_values,
model.input_slot_info,
in_slots)
# execute data source, passing only relevant data values.
log.info("Evaluating for Data Source {}".format(
factory.name))
log.info("Passed values:")
for idx, dv in enumerate(passed_data_values):
log.info("{}: {}".format(idx, dv))
try:
res = data_source.run(model, passed_data_values)
except Exception:
log.exception(
"Evaluation could not be performed. "
"Run method raised exception.")
raise
if not isinstance(res, list):
error_txt = (
"The run method of data source {} must return a list."
" It returned instead {}. Fix the run() method to return"
" the appropriate entity.".format(
factory.name,
type(res)
))
log.error(error_txt)
raise RuntimeError(error_txt)
if len(res) != len(out_slots):
error_txt = (
"The number of data values ({} values) returned"
" by '{}' does not match the number"
" of output slots it specifies ({} values)."
" This is likely a plugin error.").format(
len(res), factory.name, len(out_slots)
)
log.error(error_txt)
raise RuntimeError(error_txt)
if len(res) != len(model.output_slot_info):
error_txt = (
"The number of data values ({} values) returned"
" by '{}' does not match the number"
" of user-defined names specified ({} values)."
" This is either a plugin error or a file"
" error.").format(
len(res),
factory.name,
len(model.output_slot_info)
)
log.error(error_txt)
raise RuntimeError(error_txt)
for idx, dv in enumerate(res):
if not isinstance(dv, DataValue):
error_txt = (
"The result list returned by DataSource {} contains"
" an entry that is not a DataValue. An entry of type"
" {} was instead found in position {}."
" Fix the DataSource.run() method"
" to return the appropriate entity.".format(
factory.name,
type(dv),
idx
)
)
log.error(error_txt)
raise RuntimeError(error_txt)
# At this point, the returned data values are unnamed.
# Add the names as specified by the user.
for dv, output_slot_info in zip(res, model.output_slot_info):
dv.name = output_slot_info.name
# If the name was not specified, simply discard the value,
# because apparently the user is not interested in it.
res = [r for r in res if r.name != ""]
results.extend(res)
log.info("Returned values:")
for idx, dv in enumerate(res):
log.info("{}: {}".format(idx, dv))
# Finally, return all the computed data values from all evaluators,
# properly named.
return results
def _bind_data_values(available_data_values,
model_slot_map,
slots):
"""
Given the named data values in the environment, the slots a given
data source expects, and the user-specified names for each of these
slots, returns those data values with the requested names, ordered
in the correct order as specified by the slot map.
"""
passed_data_values = []
lookup_map = {dv.name: dv for dv in available_data_values}
if len(slots) != len(model_slot_map):
raise RuntimeError("The length of the slots is not equal to"
" the length of the slot map. This may"
" indicate a file error.")
try:
for slot, slot_map in zip(slots, model_slot_map):
passed_data_values.append(lookup_map[slot_map.name])
except KeyError:
raise RuntimeError(
"Unable to find requested name '{}' in available "
"data values. Current data value names: {}".format(
slot_map.name,
list(lookup_map.keys())))
return passed_data_values