Skip to content
Snippets Groups Projects

Conv arch

Merged Lisa-Marie Fenner requested to merge 60-conv-arch into 64-improve-online-execution-v2
7 files
+ 177
16
Compare changes
  • Side-by-side
  • Inline
Files
7
@@ -16,7 +16,7 @@ from typing import Any, Iterable
from opt_tools import SysVarSet, Traj, Batch, tree_map_relaxed
from ros_datahandler import get_bags_iter
from ros_datahandler import JointPosData, ForceData, JointState, WrenchStamped
from dataclass_creator.franka_dataclass import FrankaData
from dataclass_creator.franka_dataclass import FrankaData, URData
def min_max_normalize_data(data: jnp.ndarray):
@@ -115,15 +115,20 @@ def load_dataset(root='experiments/example_data/train', name: str = 'SUCCESS') -
Returns:
FormattedDataclass: data containing the modalities corresponding to the attributes initialized for RosDatahandler
"""
bags = glob.glob(join(root, f'*{name}*.bag'))
if 'SUCCESS' in name or 'FAILURE' in name:
bags = glob.glob(join(root, f'*{name}*')) # should give the folders which contain metadata.yaml and the .db3 files
else:
bags = glob.glob(join(root, f'*{name}*.bag'))
assert len(bags) > 0, f"I expected bags! No bags on {root}"
bags = [Path(bag) for bag in bags]
print(f'Containing # {len(bags)} rosbags, named {name}')
# init_urdata = URData(jt = JointPosData('/joint_states', JointState),
# force = ForceData('/force_torque_sensor_broadcaster/wrench', WrenchStamped))
init_frankadata = FrankaData(jt = JointPosData('/joint_states', JointState),
force = ForceData('/franka_state_controller/F_ext', WrenchStamped))
force = ForceData('/franka_state_controller/F_ext', WrenchStamped))
# @kevin 2.7.24: data is now a list of Trajs; leaves are np arrays for each bags
data = get_bags_iter(bags, init_frankadata, sync_stream='force')
@@ -144,13 +149,18 @@ if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
#curr_dir = Path(os.path.dirname(os.path.abspath(__file__)))
#root = os.fspath(Path(curr_dir.parent.parent, 'data/test').resolve())
root = 'experiments/01_faston_converging/data/with_search_strategy/test'
data_anom = load_dataset(root=root, name='FAILURE')
data_nom = load_dataset(root=root, name='110_SUCCESS')
visualize_anomaly_bags(data_anom[0], data_nom[0], "040_FAILURE")
visualize_anomaly_bags(data_anom[1], data_nom[0], "050_FAILURE")
visualize_anomaly_bags(data_anom[2], data_nom[0],"214_FAILURE")
visualise_bags(data_anom[0], "040_FAILURE")
visualise_bags(data_anom[1], "050_FAILURE")
visualise_bags(data_anom[2], "214_FAILURE")
# root = 'experiments/01_faston_converging/data/with_search_strategy/test'
# root = 'experiments/03_kraftverlaeufe_klipsen'
# data = load_dataset(root=root, name='rosbag')
# data_anom = load_dataset(root=root, name='FAILURE')
# data_nom = load_dataset(root=root, name='110_SUCCESS')
# visualize_anomaly_bags(data_anom[0], data_nom[0], "040_FAILURE")
# visualize_anomaly_bags(data_anom[1], data_nom[0], "050_FAILURE")
# visualize_anomaly_bags(data_anom[2], data_nom[0],"214_FAILURE")
# visualise_bags(data_anom[0], "040_FAILURE")
# visualise_bags(data_anom[1], "050_FAILURE")
# visualise_bags(data_anom[2], "214_FAILURE")
plt.show()
Loading