Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -2612,22 +2612,26 @@ class PyQueryMetadata:
output_schema: PySchema
unoptimized_plan: str
runner: str
ray_dashboard_url: str | None
dashboard_url: str | None
entrypoint: str | None
python_version: str | None
daft_version: str | None
ray_version: str | None
runner_version: str | None
distributed: bool
task_events_enabled: bool | None

def __init__(
self,
output_schema: PySchema,
unoptimized_plan: str,
runner: str,
ray_dashboard_url: str | None = None,
dashboard_url: str | None = None,
entrypoint: str | None = None,
python_version: str | None = None,
daft_version: str | None = None,
ray_version: str | None = None,
runner_version: str | None = None,
distributed: bool = False,
task_events_enabled: bool | None = None,
) -> None: ...

class PyQueryResult:
Expand Down
1 change: 0 additions & 1 deletion daft/runners/native_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ def run_iter(
output_schema._schema,
builder.repr_json(),
"Native (Swordfish)",
ray_dashboard_url=None,
entrypoint=entrypoint,
python_version=python_version,
daft_version=daft_version,
Expand Down
10 changes: 7 additions & 3 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,8 @@ def run_iter(

entrypoint = "python " + " ".join(sys.argv)
dashboard_url = os.environ.get("DAFT_DASHBOARD_URL")
task_events_raw = os.environ.get("DAFT_TASK_EVENTS_ENABLED")
task_events_enabled = task_events_raw.strip().lower() in ("1", "true") if task_events_raw is not None else False

# Log Dashboard URL if configured
if dashboard_url:
Expand Down Expand Up @@ -637,11 +639,13 @@ def run_iter(
output_schema._schema,
unoptimized_plan_json,
"Ray (Flotilla)",
ray_dashboard_url,
entrypoint,
dashboard_url=ray_dashboard_url,
entrypoint=entrypoint,
python_version=platform.python_version(),
daft_version=daft.get_version(),
ray_version=ray.__version__,
runner_version=ray.__version__,
distributed=True,
task_events_enabled=task_events_enabled,
),
)
ctx._notify_optimization_start(query_id)
Expand Down
4 changes: 2 additions & 2 deletions daft/subscribers/event_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ def on_query_started(self, event: QueryStarted) -> None:
"daft_version": event.metadata.daft_version,
"python_version": event.metadata.python_version,
}
if event.metadata.ray_version is not None:
payload["runner_version"] = event.metadata.ray_version
if event.metadata.runner_version is not None:
payload["runner_version"] = event.metadata.runner_version

self._write_event(
event.query_id,
Expand Down
38 changes: 26 additions & 12 deletions src/daft-context/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use pyo3::prelude::*;

use crate::{
DaftContext, subscribers,
subscribers::{QueryMetadata, QueryResult},
subscribers::{QueryMetadata, QueryResult, RunnerInfo},
};

#[pyclass(frozen, from_py_object)]
Expand All @@ -17,27 +17,33 @@ pub struct PyQueryMetadata(pub(crate) Arc<QueryMetadata>);
#[pymethods]
impl PyQueryMetadata {
#[new]
#[pyo3(signature = (output_schema, unoptimized_plan, runner, ray_dashboard_url=None, entrypoint=None, python_version=None, daft_version=None, ray_version=None))]
#[pyo3(signature = (output_schema, unoptimized_plan, runner, dashboard_url=None, entrypoint=None, python_version=None, daft_version=None, runner_version=None, distributed=false, task_events_enabled=None))]
#[allow(clippy::too_many_arguments)]
fn __new__(
output_schema: PySchema,
unoptimized_plan: &str,
runner: &str,
ray_dashboard_url: Option<String>,
dashboard_url: Option<String>,
entrypoint: Option<String>,
python_version: Option<String>,
daft_version: Option<String>,
ray_version: Option<String>,
runner_version: Option<String>,
distributed: bool,
task_events_enabled: Option<bool>,
) -> Self {
Self(Arc::new(QueryMetadata {
output_schema: output_schema.into(),
unoptimized_plan: unoptimized_plan.into(),
runner: runner.into(),
ray_dashboard_url,
runner: RunnerInfo {
name: runner.to_string(),
version: runner_version,
distributed,
dashboard_url,
task_events_enabled,
},
entrypoint,
python_version,
daft_version,
ray_version,
}))
}
#[getter]
Expand All @@ -50,11 +56,11 @@ impl PyQueryMetadata {
}
#[getter]
pub fn runner(&self) -> String {
self.0.runner.clone()
self.0.runner.name.clone()
}
#[getter]
pub fn ray_dashboard_url(&self) -> Option<String> {
self.0.ray_dashboard_url.clone()
pub fn dashboard_url(&self) -> Option<String> {
self.0.runner.dashboard_url.clone()
}
#[getter]
pub fn entrypoint(&self) -> Option<String> {
Expand All @@ -69,8 +75,16 @@ impl PyQueryMetadata {
self.0.daft_version.clone()
}
#[getter]
pub fn ray_version(&self) -> Option<String> {
self.0.ray_version.clone()
pub fn runner_version(&self) -> Option<String> {
self.0.runner.version.clone()
}
#[getter]
pub fn distributed(&self) -> bool {
self.0.runner.distributed
}
#[getter]
pub fn task_events_enabled(&self) -> Option<bool> {
self.0.runner.task_events_enabled
}
}

Expand Down
11 changes: 7 additions & 4 deletions src/daft-context/src/subscribers/dashboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,19 +253,22 @@ impl DashboardSubscriber {
if self.is_worker() {
return Ok(());
}

self.enqueue_json(
format!("engine/query/{}/start", query_id),
"query_start",
&daft_dashboard::engine::StartQueryArgs {
start_sec: secs_from_epoch(),
unoptimized_plan: metadata.unoptimized_plan.clone(),
runner: Some(metadata.runner.clone()),
ray_dashboard_url: metadata.ray_dashboard_url.clone(),
runner: daft_dashboard::engine::RunnerInfo {
name: metadata.runner.name.clone(),
version: metadata.runner.version.clone(),
distributed: metadata.runner.distributed,
dashboard_url: metadata.runner.dashboard_url.clone(),
task_events_enabled: metadata.runner.task_events_enabled,
},
entrypoint: metadata.entrypoint.clone(),
python_version: metadata.python_version.clone(),
daft_version: metadata.daft_version.clone(),
ray_version: metadata.ray_version.clone(),
},
);
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/daft-context/src/subscribers/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl DebugSubscriber {
eprintln!(
"query_start query_id={} runner={} unoptimized_plan=\n{}",
event.header.query_id,
event.metadata.runner,
event.metadata.runner.name,
event.metadata.unoptimized_plan.as_ref()
);
Ok(())
Expand Down
13 changes: 10 additions & 3 deletions src/daft-context/src/subscribers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,23 @@ pub use events::Event;

use crate::subscribers::events::EventHeader;

#[derive(Debug, Clone)]
pub struct RunnerInfo {
pub name: String,
pub version: Option<String>,
pub distributed: bool,
pub dashboard_url: Option<String>,
pub task_events_enabled: Option<bool>,
}

#[derive(Debug)]
pub struct QueryMetadata {
pub output_schema: SchemaRef,
pub unoptimized_plan: QueryPlan,
pub runner: String,
pub ray_dashboard_url: Option<String>,
pub runner: RunnerInfo,
pub entrypoint: Option<String>,
pub python_version: Option<String>,
pub daft_version: Option<String>,
pub ray_version: Option<String>,
}

#[derive(Debug, Clone)]
Expand Down
41 changes: 19 additions & 22 deletions src/daft-dashboard/frontend/src/app/queries/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -116,33 +116,30 @@ const columns = [
cell: info => toHumanReadableDate(info.getValue()),
sortingFn: "basic",
}),
columnHelper.accessor("runner", {
columnHelper.accessor(row => row.runner.name, {
id: "runner",
header: "Engine",
cell: info => getEngineName(info.getValue()),
sortingFn: "alphanumeric",
}),
// @ts-ignore
columnHelper.accessor("ray_dashboard_url", {
header: "Ray UI",
columnHelper.display({
id: "dashboard_url",
header: "Dashboard",
cell: info => {
let url = info.getValue();
if (url) {
if (!url.startsWith("http://") && !url.startsWith("https://")) {
url = "http://" + url;
}
return (
<a
href={url}
target="_blank"
rel="noopener noreferrer"
className="flex items-center gap-1 text-blue-500 hover:underline"
onClick={(e) => e.stopPropagation()}
>
Open Ray UI <ExternalLinkIcon className="h-4 w-4" />
</a>
);
}
return null;
const url = info.row.original.runner.dashboard_url;
if (!url) return null;
const href = url.startsWith("http://") || url.startsWith("https://") ? url : `http://${url}`;
return (
<a
href={href}
target="_blank"
rel="noopener noreferrer"
className="flex items-center gap-1 text-blue-500 hover:underline"
onClick={(e) => e.stopPropagation()}
>
Open Dashboard <ExternalLinkIcon className="h-4 w-4" />
</a>
);
},
enableSorting: false,
})
Expand Down
Loading