OpenArm Dataset API
API Reference for OpenArm Dataset.
Table of Contents
DatasetCamera(camera frames)FrameSampleSamplerMetadataEquipment/Embodiments/PerceptionsEmbodiment,OpenArm,OpenArmCellLifterFrequencies- LeRobot v2.1 conversion (
lerobot_v21) - Command-line tools
Dataset
Top-level entry point. A Dataset is a thin handle to a directory tree on
disk; nothing is read eagerly except metadata.yaml.
Dataset.__init__
Dataset(
path: str | os.PathLike,
meta: Metadata | None = None,
camera_names: list[str] | None = None,
)
| Parameter | Description |
|---|---|
path | Dataset root (the directory containing metadata.yaml and episodes/). |
meta | Optional pre-loaded Metadata. Defaults to Metadata(path/"metadata.yaml"). |
camera_names | Optional whitelist of cameras to expose. Defaults to all in metadata. |
>>> import openarm_dataset
>>> ds = openarm_dataset.Dataset("tests/fixture/dataset_0.3.0")
Dataset.num_episodes
Property. Number of episodes declared in metadata.
>>> ds.num_episodes
2
Dataset.camera_names
Property. List of camera stream names, in declaration order. Either the
override passed to __init__ or list(meta.equipment.perceptions.cameras).
>>> ds.camera_names
['wrist_left', 'wrist_right', 'ceiling', 'head']
Dataset.meta
Attribute. The Metadata instance backing this dataset.
>>> ds.meta.version
'0.3.0'
>>> ds.meta.tasks
[{'prompt': 'Run test.', 'description': 'Longer task description if need.'}]
Dataset.root_path
Attribute. pathlib.Path of the dataset root.
Dataset.load_obs
Dataset.load_obs(
episode_index: int,
use_unixtime: bool = False,
cutoff: float | None = None,
) -> dict[str, pandas.DataFrame]
Loads all observation signals for one episode. Returns a dict keyed by
{embodiment}/{component}/{attribute} (or {embodiment}/{attribute} for
component-less embodiments such as the cell lifter).
For 0.3.0 datasets the arm state.parquet file is exploded into three DataFrames
per arm side (qpos, qvel, qtorque); older layouts expose only qpos.
| Parameter | Description |
|---|---|
episode_index | Zero-based index into meta.episodes. |
use_unixtime | If True, DataFrame index is float64 Unix seconds; otherwise datetime64[ns]. |
cutoff | Low-pass cutoff (Hz) for a 4th-order Butterworth filter. If None, falls back to the value set by set_smoothing. |
Each DataFrame's columns are the embodiment's joint names. For OpenArm that
is ('joint1', …, 'joint7', 'gripper'); for OpenArmCellLifter it is
('elevation',).
>>> obs = ds.load_obs(0)
>>> sorted(obs.keys())
['arms/left/qpos', 'arms/left/qtorque', 'arms/left/qvel',
'arms/right/qpos', 'arms/right/qtorque', 'arms/right/qvel',
'lifter/elevation']
>>> df = obs["arms/right/qpos"]
>>> df.shape
(746, 8)
>>> list(df.columns)
['joint1', 'joint2', 'joint3', 'joint4', 'joint5', 'joint6', 'joint7', 'gripper']
>>> df.index.dtype
dtype('<M8[ns]') # datetime64[ns]
>>> ds.load_action(0, use_unixtime=True)["arms/right/qpos"].index.dtype
dtype('float64')
>>> obs["lifter/elevation"].columns.tolist()
['elevation']
Dataset.load_action
Same signature and return type as load_obs, but reads from the action/
subtree. In 0.3.0 the only arm attribute persisted as action is qpos.
>>> sorted(ds.load_action(0).keys())
['arms/left/qpos', 'arms/right/qpos', 'lifter/elevation']
Dataset.load_cameras
Dataset.load_cameras(episode_index: int) -> dict[str, Camera]
Returns one Camera per stream in
Dataset.camera_names.
>>> cams = ds.load_cameras(0)
>>> sorted(cams)
['ceiling', 'head', 'wrist_left', 'wrist_right']
Dataset.load_camera
Dataset.load_camera(name: str, episode_index: int) -> Camera
Raises KeyError if name is not in Dataset.camera_names.
>>> head = ds.load_camera("head", 0)
>>> head.num_frames
3
Dataset.sample
Dataset.sample(hz: float, episode_index: int) -> list[Sample]
Resamples obs/action/cameras onto a uniform grid at hz Hz, clipped to the
intersection of all streams' time spans, and returns a list of
Sample objects (one per timestep).
For each modality the previous-or-equal element by timestamp is selected
via np.searchsorted (which actually returns the right-insertion index, so a
sample taken at t may align with the first frame whose timestamp is just
above t).
>>> samples = ds.sample(hz=30, episode_index=0)
>>> len(samples)
2
>>> samples[0]
Sample(timestamp=1772010251.6202147)
>>> samples[0].obs["arms/right/qpos"].dtype, samples[0].obs["arms/right/qpos"].shape
(dtype('float32'), (8,))
>>> samples[0].cameras["head"] # cameras are Frame objects, not arrays
<openarm_dataset.camera.Frame object at 0x...>
>>> samples[0].cameras["head"].load().shape # decode on demand
(600, 960, 3)
Dataset.set_smoothing
Dataset.set_smoothing(cutoff: float) -> None
Sets a default low-pass cutoff used by subsequent load_obs / load_action
calls when no explicit cutoff argument is passed. Also picked up by
Dataset.write(format="lerobot_v2.1").
>>> ds.set_smoothing(1.0)
Dataset.validate
Dataset.validate(on_error: Callable[[str], None] | None = None) -> bool
Walks every per-episode parquet file (obs and action) and reports any null values via Parquet column statistics. The current implementation only checks for nulls; it does not check schemas, monotonicity, or file presence.
| Parameter | Description |
|---|---|
on_error | Callback invoked with one human-readable error string per violation. |
| returns | True if no errors were reported, False otherwise. |
>>> errors = []
>>> ds.validate(on_error=errors.append)
True
>>> errors
[]
Dataset.write
Dataset.write(
output: str | os.PathLike,
format: str | None = None,
**options,
) -> None
Persists the dataset to output. format is one of:
format | Behavior |
|---|---|
None or "openarm" | Write as the current OpenArm format (currently 0.3.0). Upgrades legacy layouts in place. No extra options. |
"lerobot_v2.1" | Convert and write LeRobot v2.1 layout (parquet + MP4 videos + meta). See LeRobot section for **options. |
Any other value raises ValueError("Unsupported format: …").
>>> import tempfile, os
>>> out = tempfile.mkdtemp()
>>> ds.write(out) # native openarm format
>>> sorted(os.listdir(out))
['episodes', 'metadata.yaml']
Camera (camera frames)
openarm_dataset.camera.Camera
A Camera is a lazy view of one camera stream's JPEG directory. It indexes
files at construction time by listing the directory and sorting filenames
lexicographically (filenames are nanosecond timestamps, so lexicographic
order matches chronological order).
Constructor
Camera(name: str, base_path: str | os.PathLike)
You almost never construct this directly; use Dataset.load_camera /
Dataset.load_cameras.
Attributes
| Attribute | Type | Description |
|---|---|---|
name | str | Stream name (e.g. "head"). |
base_path | pathlib.Path | Directory holding the JPEGs. |
all_files | list[pathlib.Path] | All JPEG paths, sorted. |
Camera.num_frames
Property; equivalent to len(self.all_files).
>>> head = ds.load_camera("head", 0)
>>> head.num_frames
3
>>> head.base_path
PosixPath('tests/fixture/dataset_0.3.0/episodes/0/cameras/head')
>>> head.all_files[0].name
'1772010251618790832.jpeg'
Camera.get_frame
Camera.get_frame(index: int) -> Frame
Returns a Frame wrapping self.all_files[index]. Raises IndexError for
out-of-range indices.
>>> head.get_frame(0)
<openarm_dataset.camera.Frame object at 0x...>
>>> head.get_frame(0).timestamp
1772010251.6187909
Camera.frames
Camera.frames() -> Iterator[Frame]
Generator yielding one Frame per file in order.
>>> [f.timestamp for f in head.frames()]
[1772010251.6187909, 1772010251.629775, 1772010251.6634612]
Camera.load_timestamps
Camera.load_timestamps() -> list[float]
Returns the Unix-second timestamp for every frame, in the same order as
all_files. Timestamps are decoded from the filenames ({ns}.jpeg).
>>> head.load_timestamps()
[1772010251.6187909, 1772010251.629775, 1772010251.6634612]
Frame
openarm_dataset.camera.Frame
A lightweight handle to a single image file. Decoding only happens on
.load() or .show().
| Attribute / method | Type / signature | Description |
|---|---|---|
path | os.PathLike | JPEG path. |
timestamp | float | Unix seconds derived from the filename stem (ns / 1e9). |
load() | () -> numpy.ndarray | Decode to an (H, W, 3) uint8 ndarray (RGB). |
show() | () -> None | Open the image with PIL's default viewer. |
__eq__ | (other) -> bool | Equal iff other.path == self.path. |
>>> f = head.get_frame(0)
>>> f.path.name
'1772010251618790832.jpeg'
>>> f.timestamp
1772010251.6187909
>>> img = f.load()
>>> img.shape, img.dtype
((600, 960, 3), dtype('uint8'))
Sample
openarm_dataset.sampler.Sample
A single resampled timestep. Sample implements collections.abc.Mapping
over the four fixed keys "timestamp" | "obs" | "action" | "cameras", so it
can be both attribute- and dict-accessed.
Constructor
Sample(
timestamp: float,
obs: dict[str, numpy.ndarray],
action: dict[str, numpy.ndarray],
cameras: dict[str, Frame],
)
You normally get these from Dataset.sample.
Properties
| Property | Type | Description |
|---|---|---|
timestamp | float | Unix seconds of this sample on the resampling grid. |
obs | dict[str, numpy.ndarray] | 1-D float32 arrays of joint values, keyed like load_obs. |
action | dict[str, numpy.ndarray] | Same shape/dtype as obs, keyed like load_action. |
cameras | dict[str, Frame] | One Frame per camera stream — not decoded. |
Per-modality vectors carry the embodiment's joint values in the same order
as embodiment.joints (e.g. joint1…joint7, gripper for OpenArm). For each
camera, the selected Frame is the one with the smallest timestamp
>= the sample's timestamp (via np.searchsorted).
>>> s = ds.sample(hz=30, episode_index=0)[0]
>>> s
Sample(timestamp=1772010251.6202147)
>>> list(s.keys()) # Mapping interface
['timestamp', 'obs', 'action', 'cameras']
>>> s["timestamp"] == s.timestamp
True
>>> s.obs["arms/right/qpos"].shape, s.obs["arms/right/qpos"].dtype
((8,), dtype('float32'))
>>> s.cameras["head"].path.name
'1772010251629774985.jpeg'
Sampler
openarm_dataset.sampler.Sampler
The class behind Dataset.sample. It is stateless; you usually do not need
to instantiate it directly.
Sampler().sample(
dataset: Dataset,
episode_index: int,
hz: float,
) -> Iterator[Sample]
Dataset.sample materializes the iterator into a list. Note the argument
order differs between Sampler.sample(dataset, episode_index, hz) and
Dataset.sample(hz, episode_index).
>>> from openarm_dataset.sampler import Sampler
>>> list(Sampler().sample(ds, episode_index=0, hz=30))[0].timestamp
1772010251.6202147
Metadata
openarm_dataset.metadata.Metadata
Wraps a metadata.yaml file and (for unversioned datasets) an
episodes.jsonl sibling. All properties read from self.data, which is the
parsed YAML dict.
Constructor
Metadata(path: str | os.PathLike)
For legacy ("unversioned") datasets, the constructor merges the contents of
<path-dir>/episodes.jsonl into self.data["episodes"].
Properties
| Property | Type | Notes |
|---|---|---|
version | str or None | None indicates a pre-versioned dataset. |
operator | str | |
operation_type | str | Defaults to "teleop". |
location | str | |
tasks | list[dict] | Each task has prompt and description. |
episodes | list[dict] | Each episode has id, success, task_index. |
num_episodes | int | len(self.episodes). |
equipment | Equipment | Rebuilt on each access. |
frequencies | Frequencies |
>>> ds.meta.version, ds.meta.operator, ds.meta.location
('0.3.0', 'Tester', 'Test')
>>> ds.meta.tasks
[{'prompt': 'Run test.', 'description': 'Longer task description if need.'}]
>>> ds.meta.episodes
[{'id': '0', 'success': False, 'task_index': 0},
{'id': '3', 'success': True, 'task_index': 0}]
>>> ds.meta.num_episodes
2
Method
Metadata.write
Metadata.write(output: str | os.PathLike) -> None
Writes a normalized 0.3.0 metadata.yaml to the directory output,
upgrading legacy field names (equipment_id/equipment_version →
id/version; left_wrist/right_wrist → wrist_left/wrist_right).
Creates the directory if needed.
>>> import tempfile
>>> outdir = tempfile.mkdtemp()
>>> ds.meta.write(outdir)
>>> sorted(__import__('os').listdir(outdir))
['metadata.yaml']
Equipment / Embodiments / Perceptions
Equipment
Equipment(data: dict)
| Attribute | Type | Source key in YAML |
|---|---|---|
id | str | equipment.id |
version | str | equipment.version |
embodiments | Embodiments | equipment.embodiments |
perceptions | Perceptions | equipment.perceptions |
>>> eq = ds.meta.equipment
>>> eq.id, eq.version
('Test', '1.0')
Embodiments
A Mapping[str, Embodiment] keyed by embodiment name as declared in YAML.
>>> list(ds.meta.equipment.embodiments)
['arms', 'lifter']
>>> ds.meta.equipment.embodiments["arms"].id
'OpenArm'
Perceptions
| Attribute | Type | Notes |
|---|---|---|
cameras | dict[str, metadata.Camera] | One entry per declared camera. |
metadata.Camera is a tiny holder with a .name and the raw YAML dict; it
is not the same class as camera.Camera used for frames.
>>> list(ds.meta.equipment.perceptions.cameras)
['wrist_left', 'wrist_right', 'ceiling', 'head']
Embodiment, OpenArm, OpenArmCellLifter
Embodiment is the base class; concrete subclasses are returned by
Embodiments based on the id field in YAML.
| Subclass | id | components | attributes | joints |
|---|---|---|---|---|
OpenArm | "OpenArm" | ("right", "left") | ("qpos",) | ("joint1", … "joint7", "gripper") |
OpenArmCellLifter | "OpenArmCellLifter" | () | ("elevation",) | ("elevation",) |
All subclasses also expose .name (the YAML key), .id, .version.
Note: attributes describes the action layout. For 0.3.0 obs, arm
state.parquet is exploded into qpos/qvel/qtorque regardless of the
declared attributes tuple — this is hard-coded in Dataset._load_embodiment_value.
Unknown ids raise ValueError:
Frequencies
Frequencies(data: dict)
A passive holder for the frequencies section of metadata.yaml. Each
attribute is just the corresponding nested dict (or {} if absent).
| Attribute | Type | YAML key |
|---|---|---|
action | dict | frequencies.action |
cameras | dict | frequencies.cameras |
obs | dict | frequencies.obs |
>>> ds.meta.frequencies.cameras
{'ceiling': 30.303030303030305, 'head': 30.303030303030305,
'wrist_left': 30.303030303030305, 'wrist_right': 30.303030303030305}
>>> ds.meta.frequencies.action
{'arms': {'left': 250.0, 'right': 250.0}, 'lifter': 250.0}
LeRobot v2.1 conversion (lerobot_v21)
openarm_dataset.lerobot_v21.to_lerobotv21
Conversion entry point used by Dataset.write(format="lerobot_v2.1", **opts)
and the openarm-dataset-convert CLI. Requires ffmpeg on PATH for
video encoding; raises RuntimeError if not found.
to_lerobotv21(
dataset: Dataset,
output_dir: str | pathlib.Path,
fps: int = 30,
train_split: float = 0.8,
smoothing_cutoff: float = 1.0,
success_only: bool = False,
) -> None
| Parameter | Default | Description |
|---|---|---|
fps | 30 | Resampling rate for both states and video. Must be > 0; otherwise ValueError. |
train_split | 0.8 | Fraction of episodes assigned to train; remainder to val. Must be in [0, 1] or ValueError. |
smoothing_cutoff | 1.0 | Low-pass cutoff (Hz) applied to obs/action before sampling; calls dataset.set_smoothing internally. |
success_only | False | If True, drop episodes where meta.episodes[i]["success"] is falsy. Re-indexes episodes/tasks. |
If, after filtering, no episodes remain, raises ValueError("No episodes to write.").
The output layout produced under output_dir/:
data/chunk-XXX/episode_XXXXXX.parquetvideos/chunk-XXX/observation.images.<cam>/episode_XXXXXX.mp4meta/info.json,meta/episodes.jsonl,meta/episodes_stats.jsonl,meta/tasks.jsonl,meta/stats.json
Command-line tools
Installed via pyproject.toml's [project.scripts]:
openarm-dataset-validate
openarm-dataset-validate <input>
Calls Dataset(input).validate(on_error=lambda e: print(e, file=sys.stderr))
and exits with status 1 if any errors are reported.
openarm-dataset-convert
openarm-dataset-convert <input> <output>
[--format {openarm,lerobot_v2.1}]
[--fps INT] # default 30 (lerobot only)
[--smoothing-cutoff FLOAT] # default 1.0 (lerobot only)
[--train-split FLOAT] # default 0.8 (lerobot only)
[--success-only] # default False (lerobot only)
Implementation: openarm_dataset.convert.main. --format is always passed to Dataset.write; --fps / --smoothing-cutoff / --train-split / --success-only are forwarded as **options only when --format lerobot_v2.1.