-
Notifications
You must be signed in to change notification settings - Fork 498
feat(droid): Add daft.datasets.droid
#7089
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
18d28ea
1a90010
c4eccd0
c06e38a
74421d7
6a5587a
a6ac034
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| from daft.datasets.common_crawl import common_crawl | ||
| from daft.datasets import droid | ||
|
|
||
| __all__ = ["common_crawl"] | ||
| __all__ = ["common_crawl", "droid"] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,153 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from typing import TYPE_CHECKING | ||
|
|
||
| import daft | ||
| from daft.api_annotations import PublicAPI | ||
| from daft.datatype import DataType | ||
| from daft.expressions import col, lit | ||
| from daft.functions import ( | ||
| file, | ||
| format, | ||
| regexp_replace, | ||
| unnest, | ||
| video_file, | ||
| ) | ||
| from daft.io import GCSConfig, IOConfig | ||
|
|
||
| if TYPE_CHECKING: | ||
| from daft.dataframe import DataFrame | ||
|
|
||
|
|
||
| _METADATA_DTYPE = DataType.struct( | ||
| { | ||
| "uuid": DataType.string, | ||
| "lab": DataType.string, | ||
| "user": DataType.string, | ||
| "user_id": DataType.string, | ||
| "date": DataType.date, | ||
| "timestamp": DataType.string, | ||
| "hdf5_path": DataType.string, | ||
| "building": DataType.string, | ||
| "scene_id": DataType.int64, | ||
| "success": DataType.bool, | ||
| "robot_serial": DataType.string, | ||
| "r2d2_version": DataType.string, | ||
| "current_task": DataType.string, | ||
| "trajectory_length": DataType.int64, | ||
| "wrist_cam_serial": DataType.string, | ||
| "ext1_cam_serial": DataType.string, | ||
| "ext2_cam_serial": DataType.string, | ||
| "wrist_cam_extrinsics": DataType.list(DataType.float64), | ||
| "ext1_cam_extrinsics": DataType.list(DataType.float64), | ||
| "ext2_cam_extrinsics": DataType.list(DataType.float64), | ||
| "wrist_svo_path": DataType.string, | ||
| "wrist_mp4_path": DataType.string, | ||
| "ext1_svo_path": DataType.string, | ||
| "ext1_mp4_path": DataType.string, | ||
| "ext2_svo_path": DataType.string, | ||
| "ext2_mp4_path": DataType.string, | ||
| "left_mp4_path": DataType.string, | ||
| "right_mp4_path": DataType.string, | ||
| } | ||
| ) | ||
|
|
||
|
|
||
| @PublicAPI | ||
| def raw( | ||
| # By default, use the official public GCS bucket | ||
| path: str = "gs://gresearch/robotics/droid_raw", | ||
| io_config: IOConfig | None = None, | ||
| # TODO: Add support for stereo videos | ||
| # include_stereo: bool = False, | ||
| # TODO: Add support for SVO camera recordings | ||
| ) -> DataFrame: | ||
| r"""Load the raw DROID robotics dataset as a lazy episode-level DataFrame. | ||
|
|
||
| This function discovers episodes by globbing ``metadata_*.json`` files under the | ||
| provided dataset root, reads the episode metadata, and attaches lazy file references | ||
| to the per-episode trajectory HDF5 file and MP4 camera recordings. | ||
|
|
||
| Each row corresponds to one DROID episode with the following layout on disk: | ||
|
|
||
| episode/ | ||
| |---- metadata_<episode_id>.json # Episode metadata like building ID, data collector ID etc. | ||
| |---- trajectory.h5 # All low-dimensional information like action and proprioception trajectories. | ||
| |---- recordings/ | ||
| |---- MP4/ | ||
| |---- <camera_serial>.mp4 | ||
| |---- <camera_serial>-stereo.mp4 # Optional stereo views. | ||
| |---- SVO/ | ||
| |---- <camera_serial>.svo # Raw ZED SVO file with encoded camera recording information (contains some additional metadata) | ||
|
|
||
| Args: | ||
| path: Root path to the raw DROID dataset. Defaults to the official public | ||
| GCS release at `gs://gresearch/robotics/droid_raw`. Also supports | ||
| local paths and other remote object stores. | ||
| io_config: IO configuration for accessing remote storage. | ||
|
|
||
| Returns: | ||
| A DataFrame with one row per episode. Metadata fields from each episode's JSON | ||
| file are stored in the `metadata` struct column, along with: | ||
|
|
||
| - `episode_dir`: path to the episode directory | ||
| - `metadata.*`: metadata fields parsed from the metadata JSON file | ||
| - `trajectory`: lazy `daft.File` reference to the trajectory HDF5 file | ||
| - `wrist_video`: lazy `daft.VideoFile` reference to the wrist camera MP4 file | ||
| - `ext1_video`: lazy `daft.VideoFile` reference to the external camera 1 MP4 file | ||
| Often the left camera feed. | ||
| - `ext2_video`: lazy `daft.VideoFile` reference to the external camera 2 MP4 file | ||
| Often the right camera feed. | ||
|
|
||
| Examples: | ||
| >>> import daft | ||
| >>> df = daft.datasets.droid.raw() # doctest: +SKIP | ||
| >>> df.select("episode_dir", "ext1_video").show() # doctest: +SKIP | ||
| """ | ||
| # Configure IO config with anonymous access to the public GCS bucket | ||
| if io_config is None: | ||
| io_config = IOConfig(gcs=GCSConfig(anonymous=True)) | ||
|
|
||
| episodes = ( | ||
| daft.from_glob_path(f"{path.rstrip('/')}/**/metadata_*.json", io_config=io_config) | ||
| .select( | ||
| col("path") | ||
| .download(io_config=io_config) | ||
| .cast(DataType.string()) | ||
| .try_deserialize("json", _METADATA_DTYPE) | ||
| .alias("metadata"), | ||
| regexp_replace(col("path"), r"/metadata_[^/]+\.json$", "").alias("episode_dir"), | ||
| ) | ||
| .select(unnest(col("metadata")), "episode_dir") | ||
| ) | ||
|
|
||
| # Create a file column for the trajectory HDF5 file | ||
| episodes = episodes.with_column( | ||
| "trajectory", | ||
| file(format("{}/{}", col("episode_dir"), lit("trajectory.h5")), io_config=io_config), | ||
| ) | ||
|
|
||
| # Create VideoFile columns for MP4 camera recordings | ||
| episodes = ( | ||
| episodes.with_column( | ||
| "wrist_video", | ||
| video_file(format("{}/{}.mp4", col("episode_dir"), col("wrist_cam_serial")), io_config=io_config), | ||
| ) | ||
| .with_column( | ||
| "ext1_video", | ||
| video_file(format("{}/{}.mp4", col("episode_dir"), col("ext1_cam_serial")), io_config=io_config), | ||
| ) | ||
| .with_column( | ||
| "ext2_video", | ||
| video_file(format("{}/{}.mp4", col("episode_dir"), col("ext2_cam_serial")), io_config=io_config), | ||
| ) | ||
| ) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The paths are constructed as |
||
|
|
||
| return episodes | ||
|
|
||
|
|
||
| # TODO: Add a custom expression to read & parse the trajectory HDF5 file | ||
|
|
||
| __all__ = [ | ||
| "raw", | ||
| ] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import pytest | ||
|
|
||
| import daft | ||
| from daft import DataType, MediaType | ||
| from daft.expressions import col | ||
|
|
||
| pytestmark = pytest.mark.integration() | ||
|
|
||
| DROID_RAW_GCS_PREFIX = "gs://gresearch/robotics/droid_raw" | ||
|
|
||
|
|
||
| @pytest.fixture(scope="module") | ||
| def droid_raw_df(): | ||
| return daft.datasets.droid.raw() | ||
|
|
||
|
|
||
| def test_droid_discovers_episodes_and_metadata(droid_raw_df) -> None: | ||
| result = droid_raw_df.select("uuid", "building", "success").limit(1).to_pydict() | ||
|
|
||
| assert len(result["uuid"]) == 1 | ||
| assert isinstance(result["uuid"][0], str) and result["uuid"][0] | ||
| assert isinstance(result["building"][0], str) and result["building"][0] | ||
| assert isinstance(result["success"][0], bool) | ||
|
|
||
|
|
||
| def test_droid_unnests_metadata_columns(droid_raw_df) -> None: | ||
| schema = {field.name: field.dtype for field in droid_raw_df.schema()} | ||
| assert "building" in schema | ||
| assert "success" in schema | ||
| assert "metadata" not in schema | ||
| assert schema["building"] == DataType.string | ||
| assert schema["success"] == DataType.bool | ||
|
|
||
|
|
||
| def test_droid_adds_trajectory_and_video_file_columns(droid_raw_df) -> None: | ||
| schema = {field.name: field.dtype for field in droid_raw_df.schema()} | ||
| assert schema["trajectory"] == DataType.file() | ||
| assert schema["wrist_video"] == DataType.file(MediaType.video()) | ||
| assert schema["ext1_video"] == DataType.file(MediaType.video()) | ||
| assert schema["ext2_video"] == DataType.file(MediaType.video()) | ||
|
|
||
| result = ( | ||
| droid_raw_df.select( | ||
| "episode_dir", | ||
| "wrist_cam_serial", | ||
| "ext1_cam_serial", | ||
| "ext2_cam_serial", | ||
| col("trajectory").file_path().alias("trajectory_path"), | ||
| col("wrist_video").file_path().alias("wrist_video_path"), | ||
| col("ext1_video").file_path().alias("ext1_video_path"), | ||
| col("ext2_video").file_path().alias("ext2_video_path"), | ||
| ) | ||
| .limit(1) | ||
| .to_pydict() | ||
| ) | ||
|
|
||
| episode_dir = result["episode_dir"][0] | ||
| assert episode_dir.startswith(f"{DROID_RAW_GCS_PREFIX}/") | ||
|
|
||
| trajectory_path = result["trajectory_path"][0] | ||
| assert trajectory_path == f"{episode_dir}/trajectory.h5" | ||
|
|
||
| assert result["wrist_video_path"][0] == f"{episode_dir}/{result['wrist_cam_serial'][0]}.mp4" | ||
| assert result["ext1_video_path"][0] == f"{episode_dir}/{result['ext1_cam_serial'][0]}.mp4" | ||
| assert result["ext2_video_path"][0] == f"{episode_dir}/{result['ext2_cam_serial'][0]}.mp4" | ||
|
Comment on lines
+59
to
+67
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. considering this is an evergreen dataset and the path can be provided, I'm not as worried. We aren't testing that the data is there, we're testing that we can read the dataset format. |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When
io_configisNonethe function createsIOConfig(gcs=GCSConfig(anonymous=True))and passes it to every IO operation. This is correct for the default GCS path, but if a caller supplies a non-GCSpath(e.g.,s3://or a local path) without an explicitio_config, the GCS anonymous config is silently attached to S3 or other operations. S3 calls with a GCS config object will likely error or bypass credential resolution unexpectedly. Consider only defaulting to anonymous GCS when the path starts withgs://, leavingio_config=Nonefor other schemes.