Skip to main content
Version: 2.0

Inference


Overview

After training, a policy is deployed as a policy server process that:

  1. Receives an observation bundle (cameras + joint positions) packed as an Arrow IPC file
  2. Runs the model
  3. Returns a JSON action chunk (a sequence of 16-DOF joint positions)

The robot runtime is a Dora dataflow.


Dataflow Architecture

Hardware
arm_right (250 Hz) ──┐
arm_left (250 Hz) ──┤
camera_wrist_right ──┤ observer ──► quittable-observer
camera_wrist_left ──┤ (30 Hz) │
camera_head ──┤ observation (Arrow IPC)
camera_ceiling ──┘ │

┌─────────────────────────┐
│ policy-server │ ← YOU WRITE THIS
│ (local or docker) │
└─────────────────────────┘

actions JSON


actions-executor

The Policy Server Contract

This is the what one needs to implement when adapting to a new model.

Transport

The node connects to a UNIX socket (path set via $SOCKET). The Dora node dora-openarm-local-policy-server handles the socket I/O for you; your model code lives in the process it launches.

For local development, the server listens on the socket:

SOCKET=/dev/shm/policy-server.socket dora run dataflow-inference.yaml --uv

Observation Input

Each request arrives as a JSON line over the socket:

{
"name": "inference",
"data_path": "/dev/shm/obs_12345.arrow",
"metadata": {
"timestamp": 1716000000123456789,
"camera_head.height": 600,
"camera_head.width": 960,
"camera_ceiling.height": 600,
"camera_ceiling.width": 960,
"camera_wrist_right.height": 600,
"camera_wrist_right.width": 960,
"camera_wrist_left.height": 600,
"camera_wrist_left.width": 960
}
}

Open the Arrow IPC file and parse it:

import pyarrow as pa
import numpy as np

with pa.OSFile(request["data_path"], "rb") as f:
with pa.ipc.open_file(f) as reader:
observations = reader.get_batch(0).to_struct_array()

last = observations[-1]
metadata = request["metadata"]

# Camera frames — shape (H, W, 3), uint8
def read_camera(name):
return (
last[name].values.to_numpy(zero_copy_only=False)
.reshape(metadata[f"{name}.height"], metadata[f"{name}.width"], 3)
)

frames = {
"head": read_camera("camera_head"),
"ceiling": read_camera("camera_ceiling"),
"right_wrist": read_camera("camera_wrist_right"),
"left_wrist": read_camera("camera_wrist_left"),
}

# Joint positions — float32, shape (16,)
# Layout: right_arm[7] | right_gripper[1] | left_arm[7] | left_gripper[1]
pos_dim = len(last["position"])
qpos = (
last["position"].values.to_numpy(zero_copy_only=False)
.reshape(pos_dim)
.astype(np.float32)
)

Action Output

Write a single JSON line back to the socket:

{
"interval": 33333333,
"cutoff_hz": 5,
"positions": [
[q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, q10, q11, q12, q13, q14, q15],
...
]
}
FieldTypeMeaning
intervalint (ns)Time between consecutive position steps. int(1e9 / 30) ≈ 33 ms for 30 Hz
cutoff_hznumber, optionalExecute only the first N steps of the chunk before the next inference for receding horizon control
positionsList[List[float]] length TEach inner list is 16 floats: right_arm[7] + right_gripper[1] + left_arm[7] + left_gripper[1]

If you want to skip inference this tick (e.g. rate-limiting), return an empty positions list:

{ "positions": [] }

We will provide more full-fledged policy server and dataflow examples soon.


Dataflow YAML — Key Nodes

The three nodes you care about most in the YAML. Everything else (ticks, cameras, arms) is boilerplate you copy verbatim.

nodes:
# --- observer: collects arms + cameras into one Arrow IPC bundle ---
- id: observer
path: dora-openarm-observer
inputs:
tick: quittable-tick-observer/tick # 30 Hz gate
arm_right: arm-right/position
arm_left: arm-left/position
camera_wrist_right: camera-wrist-right/image
camera_wrist_left: camera-wrist-left/image
camera_head: camera-head/image
camera_ceiling: camera-ceiling/image
phase_classifier_result: phase-classifier/result
outputs:
- observation # Arrow IPC file path + metadata, written to /dev/shm

# --- quittable-observer: gate controlled by controller ---
- id: quittable-observer
path: dora-openarm-quitter
inputs:
command: controller/command
observation: observer/observation
outputs:
- observation

# --- policy-server: YOUR node ---
- id: policy-server
path: dora-openarm-local-policy-server
env:
SOCKET: /dev/shm/policy-server.socket # must match your running server
inputs:
observation: quittable-observer/observation
outputs:
- actions # JSON: {interval, cutoff_hz, positions: [[16D], ...]}

# --- actions-executor: unpacks the chunk and drives the arms ---
- id: actions-executor
path: dora-openarm-actions-executor
args: "--upsample-on --filter-on"
inputs:
actions: policy-server/actions
outputs:
- move_position_right # → arm-right
- move_position_left # → arm-left

The observation output of quittable-observer is what your server receives as request["data_path"]. The actions output is what you write back via the socket.


Running

Prerequisites

uv venv .venv-inference --python=3.12
. .venv-inference/bin/activate
uv pip install "dora-rs-cli==0.5.0" "dora-rs==0.5.0"
dora build inference/dataflow-inference.yaml --uv

Local policy server (debug)

Start the policy server first, then run the dataflow:

SOCKET=/dev/shm/policy-server.socket \
dora run inference/dataflow-inference.yaml --uv