Inference
Overview
After training, a policy is deployed as a policy server process that:
- Receives an observation bundle (cameras + joint positions) packed as an Arrow IPC file
- Runs the model
- Returns a JSON action chunk (a sequence of 16-DOF joint positions)
The robot runtime is a Dora dataflow.
Dataflow Architecture
Hardware
arm_right (250 Hz) ──┐
arm_left (250 Hz) ──┤
camera_wrist_right ──┤ observer ──► quittable-observer
camera_wrist_left ──┤ (30 Hz) │
camera_head ──┤ observation (Arrow IPC)
camera_ceiling ──┘ │
▼
┌─────────────────────────┐
│ policy-server │ ← YOU WRITE THIS
│ (local or docker) │
└─────────────────────────┘
│
actions JSON
│
▼
actions-executor
The Policy Server Contract
This is the what one needs to implement when adapting to a new model.
Transport
The node connects to a UNIX socket (path set via $SOCKET). The Dora node
dora-openarm-local-policy-server handles the socket I/O for you; your
model code lives in the process it launches.
For local development, the server listens on the socket:
SOCKET=/dev/shm/policy-server.socket dora run dataflow-inference.yaml --uv
Observation Input
Each request arrives as a JSON line over the socket:
{
"name": "inference",
"data_path": "/dev/shm/obs_12345.arrow",
"metadata": {
"timestamp": 1716000000123456789,
"camera_head.height": 600,
"camera_head.width": 960,
"camera_ceiling.height": 600,
"camera_ceiling.width": 960,
"camera_wrist_right.height": 600,
"camera_wrist_right.width": 960,
"camera_wrist_left.height": 600,
"camera_wrist_left.width": 960
}
}
Open the Arrow IPC file and parse it:
import pyarrow as pa
import numpy as np
with pa.OSFile(request["data_path"], "rb") as f:
with pa.ipc.open_file(f) as reader:
observations = reader.get_batch(0).to_struct_array()
last = observations[-1]
metadata = request["metadata"]
# Camera frames — shape (H, W, 3), uint8
def read_camera(name):
return (
last[name].values.to_numpy(zero_copy_only=False)
.reshape(metadata[f"{name}.height"], metadata[f"{name}.width"], 3)
)
frames = {
"head": read_camera("camera_head"),
"ceiling": read_camera("camera_ceiling"),
"right_wrist": read_camera("camera_wrist_right"),
"left_wrist": read_camera("camera_wrist_left"),
}
# Joint positions — float32, shape (16,)
# Layout: right_arm[7] | right_gripper[1] | left_arm[7] | left_gripper[1]
pos_dim = len(last["position"])
qpos = (
last["position"].values.to_numpy(zero_copy_only=False)
.reshape(pos_dim)
.astype(np.float32)
)
Action Output
Write a single JSON line back to the socket:
{
"interval": 33333333,
"cutoff_hz": 5,
"positions": [
[q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, q10, q11, q12, q13, q14, q15],
...
]
}
| Field | Type | Meaning |
|---|---|---|
interval | int (ns) | Time between consecutive position steps. int(1e9 / 30) ≈ 33 ms for 30 Hz |
cutoff_hz | number, optional | Execute only the first N steps of the chunk before the next inference for receding horizon control |
positions | List[List[float]] length T | Each inner list is 16 floats: right_arm[7] + right_gripper[1] + left_arm[7] + left_gripper[1] |
If you want to skip inference this tick (e.g. rate-limiting), return an empty positions list:
{ "positions": [] }
We will provide more full-fledged policy server and dataflow examples soon.
Dataflow YAML — Key Nodes
The three nodes you care about most in the YAML. Everything else (ticks, cameras, arms) is boilerplate you copy verbatim.
nodes:
# --- observer: collects arms + cameras into one Arrow IPC bundle ---
- id: observer
path: dora-openarm-observer
inputs:
tick: quittable-tick-observer/tick # 30 Hz gate
arm_right: arm-right/position
arm_left: arm-left/position
camera_wrist_right: camera-wrist-right/image
camera_wrist_left: camera-wrist-left/image
camera_head: camera-head/image
camera_ceiling: camera-ceiling/image
phase_classifier_result: phase-classifier/result
outputs:
- observation # Arrow IPC file path + metadata, written to /dev/shm
# --- quittable-observer: gate controlled by controller ---
- id: quittable-observer
path: dora-openarm-quitter
inputs:
command: controller/command
observation: observer/observation
outputs:
- observation
# --- policy-server: YOUR node ---
- id: policy-server
path: dora-openarm-local-policy-server
env:
SOCKET: /dev/shm/policy-server.socket # must match your running server
inputs:
observation: quittable-observer/observation
outputs:
- actions # JSON: {interval, cutoff_hz, positions: [[16D], ...]}
# --- actions-executor: unpacks the chunk and drives the arms ---
- id: actions-executor
path: dora-openarm-actions-executor
args: "--upsample-on --filter-on"
inputs:
actions: policy-server/actions
outputs:
- move_position_right # → arm-right
- move_position_left # → arm-left
The observation output of quittable-observer is what your server receives as
request["data_path"]. The actions output is what you write back via the socket.
Running
Prerequisites
uv venv .venv-inference --python=3.12
. .venv-inference/bin/activate
uv pip install "dora-rs-cli==0.5.0" "dora-rs==0.5.0"
dora build inference/dataflow-inference.yaml --uv
Local policy server (debug)
Start the policy server first, then run the dataflow:
SOCKET=/dev/shm/policy-server.socket \
dora run inference/dataflow-inference.yaml --uv