diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index 4d39d474c47..b5286f8d56c 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -2612,22 +2612,26 @@ class PyQueryMetadata: output_schema: PySchema unoptimized_plan: str runner: str - ray_dashboard_url: str | None + dashboard_url: str | None entrypoint: str | None python_version: str | None daft_version: str | None - ray_version: str | None + runner_version: str | None + distributed: bool + task_events_enabled: bool | None def __init__( self, output_schema: PySchema, unoptimized_plan: str, runner: str, - ray_dashboard_url: str | None = None, + dashboard_url: str | None = None, entrypoint: str | None = None, python_version: str | None = None, daft_version: str | None = None, - ray_version: str | None = None, + runner_version: str | None = None, + distributed: bool = False, + task_events_enabled: bool | None = None, ) -> None: ... class PyQueryResult: diff --git a/daft/runners/native_runner.py b/daft/runners/native_runner.py index 1eb82d4581a..4f261498b29 100644 --- a/daft/runners/native_runner.py +++ b/daft/runners/native_runner.py @@ -109,7 +109,6 @@ def run_iter( output_schema._schema, builder.repr_json(), "Native (Swordfish)", - ray_dashboard_url=None, entrypoint=entrypoint, python_version=python_version, daft_version=daft_version, diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index a0caa3c3905..1f8130b9eba 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -602,6 +602,8 @@ def run_iter( entrypoint = "python " + " ".join(sys.argv) dashboard_url = os.environ.get("DAFT_DASHBOARD_URL") + task_events_raw = os.environ.get("DAFT_TASK_EVENTS_ENABLED") + task_events_enabled = task_events_raw.strip().lower() in ("1", "true") if task_events_raw is not None else False # Log Dashboard URL if configured if dashboard_url: @@ -637,11 +639,13 @@ def run_iter( output_schema._schema, unoptimized_plan_json, "Ray (Flotilla)", - ray_dashboard_url, - entrypoint, + dashboard_url=ray_dashboard_url, + entrypoint=entrypoint, python_version=platform.python_version(), daft_version=daft.get_version(), - ray_version=ray.__version__, + runner_version=ray.__version__, + distributed=True, + task_events_enabled=task_events_enabled, ), ) ctx._notify_optimization_start(query_id) diff --git a/daft/subscribers/event_log.py b/daft/subscribers/event_log.py index e182efa83d6..4d15c0800eb 100644 --- a/daft/subscribers/event_log.py +++ b/daft/subscribers/event_log.py @@ -195,8 +195,8 @@ def on_query_started(self, event: QueryStarted) -> None: "daft_version": event.metadata.daft_version, "python_version": event.metadata.python_version, } - if event.metadata.ray_version is not None: - payload["runner_version"] = event.metadata.ray_version + if event.metadata.runner_version is not None: + payload["runner_version"] = event.metadata.runner_version self._write_event( event.query_id, diff --git a/src/daft-context/src/python.rs b/src/daft-context/src/python.rs index 01b7fe2c443..e3a1d7704c2 100644 --- a/src/daft-context/src/python.rs +++ b/src/daft-context/src/python.rs @@ -7,7 +7,7 @@ use pyo3::prelude::*; use crate::{ DaftContext, subscribers, - subscribers::{QueryMetadata, QueryResult}, + subscribers::{QueryMetadata, QueryResult, RunnerInfo}, }; #[pyclass(frozen, from_py_object)] @@ -17,27 +17,33 @@ pub struct PyQueryMetadata(pub(crate) Arc); #[pymethods] impl PyQueryMetadata { #[new] - #[pyo3(signature = (output_schema, unoptimized_plan, runner, ray_dashboard_url=None, entrypoint=None, python_version=None, daft_version=None, ray_version=None))] + #[pyo3(signature = (output_schema, unoptimized_plan, runner, dashboard_url=None, entrypoint=None, python_version=None, daft_version=None, runner_version=None, distributed=false, task_events_enabled=None))] #[allow(clippy::too_many_arguments)] fn __new__( output_schema: PySchema, unoptimized_plan: &str, runner: &str, - ray_dashboard_url: Option, + dashboard_url: Option, entrypoint: Option, python_version: Option, daft_version: Option, - ray_version: Option, + runner_version: Option, + distributed: bool, + task_events_enabled: Option, ) -> Self { Self(Arc::new(QueryMetadata { output_schema: output_schema.into(), unoptimized_plan: unoptimized_plan.into(), - runner: runner.into(), - ray_dashboard_url, + runner: RunnerInfo { + name: runner.to_string(), + version: runner_version, + distributed, + dashboard_url, + task_events_enabled, + }, entrypoint, python_version, daft_version, - ray_version, })) } #[getter] @@ -50,11 +56,11 @@ impl PyQueryMetadata { } #[getter] pub fn runner(&self) -> String { - self.0.runner.clone() + self.0.runner.name.clone() } #[getter] - pub fn ray_dashboard_url(&self) -> Option { - self.0.ray_dashboard_url.clone() + pub fn dashboard_url(&self) -> Option { + self.0.runner.dashboard_url.clone() } #[getter] pub fn entrypoint(&self) -> Option { @@ -69,8 +75,16 @@ impl PyQueryMetadata { self.0.daft_version.clone() } #[getter] - pub fn ray_version(&self) -> Option { - self.0.ray_version.clone() + pub fn runner_version(&self) -> Option { + self.0.runner.version.clone() + } + #[getter] + pub fn distributed(&self) -> bool { + self.0.runner.distributed + } + #[getter] + pub fn task_events_enabled(&self) -> Option { + self.0.runner.task_events_enabled } } diff --git a/src/daft-context/src/subscribers/dashboard.rs b/src/daft-context/src/subscribers/dashboard.rs index 9b55f8cc6bc..ab6b0d79e00 100644 --- a/src/daft-context/src/subscribers/dashboard.rs +++ b/src/daft-context/src/subscribers/dashboard.rs @@ -253,19 +253,22 @@ impl DashboardSubscriber { if self.is_worker() { return Ok(()); } - self.enqueue_json( format!("engine/query/{}/start", query_id), "query_start", &daft_dashboard::engine::StartQueryArgs { start_sec: secs_from_epoch(), unoptimized_plan: metadata.unoptimized_plan.clone(), - runner: Some(metadata.runner.clone()), - ray_dashboard_url: metadata.ray_dashboard_url.clone(), + runner: daft_dashboard::engine::RunnerInfo { + name: metadata.runner.name.clone(), + version: metadata.runner.version.clone(), + distributed: metadata.runner.distributed, + dashboard_url: metadata.runner.dashboard_url.clone(), + task_events_enabled: metadata.runner.task_events_enabled, + }, entrypoint: metadata.entrypoint.clone(), python_version: metadata.python_version.clone(), daft_version: metadata.daft_version.clone(), - ray_version: metadata.ray_version.clone(), }, ); Ok(()) diff --git a/src/daft-context/src/subscribers/debug.rs b/src/daft-context/src/subscribers/debug.rs index 2c4484634b8..d4897946073 100644 --- a/src/daft-context/src/subscribers/debug.rs +++ b/src/daft-context/src/subscribers/debug.rs @@ -23,7 +23,7 @@ impl DebugSubscriber { eprintln!( "query_start query_id={} runner={} unoptimized_plan=\n{}", event.header.query_id, - event.metadata.runner, + event.metadata.runner.name, event.metadata.unoptimized_plan.as_ref() ); Ok(()) diff --git a/src/daft-context/src/subscribers/mod.rs b/src/daft-context/src/subscribers/mod.rs index 41ad3a7a37b..79ce5c9033e 100644 --- a/src/daft-context/src/subscribers/mod.rs +++ b/src/daft-context/src/subscribers/mod.rs @@ -17,16 +17,23 @@ pub use events::Event; use crate::subscribers::events::EventHeader; +#[derive(Debug, Clone)] +pub struct RunnerInfo { + pub name: String, + pub version: Option, + pub distributed: bool, + pub dashboard_url: Option, + pub task_events_enabled: Option, +} + #[derive(Debug)] pub struct QueryMetadata { pub output_schema: SchemaRef, pub unoptimized_plan: QueryPlan, - pub runner: String, - pub ray_dashboard_url: Option, + pub runner: RunnerInfo, pub entrypoint: Option, pub python_version: Option, pub daft_version: Option, - pub ray_version: Option, } #[derive(Debug, Clone)] diff --git a/src/daft-dashboard/frontend/src/app/queries/page.tsx b/src/daft-dashboard/frontend/src/app/queries/page.tsx index 26a4e4d710a..718eea7e3c8 100644 --- a/src/daft-dashboard/frontend/src/app/queries/page.tsx +++ b/src/daft-dashboard/frontend/src/app/queries/page.tsx @@ -116,33 +116,30 @@ const columns = [ cell: info => toHumanReadableDate(info.getValue()), sortingFn: "basic", }), - columnHelper.accessor("runner", { + columnHelper.accessor(row => row.runner.name, { + id: "runner", header: "Engine", cell: info => getEngineName(info.getValue()), sortingFn: "alphanumeric", }), - // @ts-ignore - columnHelper.accessor("ray_dashboard_url", { - header: "Ray UI", + columnHelper.display({ + id: "dashboard_url", + header: "Dashboard", cell: info => { - let url = info.getValue(); - if (url) { - if (!url.startsWith("http://") && !url.startsWith("https://")) { - url = "http://" + url; - } - return ( - e.stopPropagation()} - > - Open Ray UI - - ); - } - return null; + const url = info.row.original.runner.dashboard_url; + if (!url) return null; + const href = url.startsWith("http://") || url.startsWith("https://") ? url : `http://${url}`; + return ( + e.stopPropagation()} + > + Open Dashboard + + ); }, enableSorting: false, }) diff --git a/src/daft-dashboard/frontend/src/app/query/page.tsx b/src/daft-dashboard/frontend/src/app/query/page.tsx index 577fffc724d..b8f5212a47b 100644 --- a/src/daft-dashboard/frontend/src/app/query/page.tsx +++ b/src/daft-dashboard/frontend/src/app/query/page.tsx @@ -25,23 +25,41 @@ const TAB_TRIGGER_CLS = "rounded-none bg-transparent px-4 py-2.5 text-sm font-me const MetaField = ({ label, value, + rows, href, mono, title, align = "left", + wrap = false, }: { label: string; - value: string; + value?: string; + /** Stacked label→value lines; takes precedence over `value`. */ + rows?: { label: string; value: string }[]; href?: string; mono?: boolean; title?: string; align?: "left" | "right"; + wrap?: boolean; }) => (

{label}

- {href ? ( + {rows ? ( +
+ {rows.map((r) => ( +

+ {r.label} + {r.value} +

+ ))} +
+ ) : href ? ( ) : (

{value} @@ -69,8 +87,7 @@ function QueryPageInner() { const debug = useMemo(() => searchParams.has("debug"), [searchParams]); const [query, setQuery] = useState(null); - const engine = query ? getEngineName(query.runner) : null; - const isFlotilla = engine === "Flotilla"; + const canShowTasksPanel = query?.runner.distributed === true; // Tab + tasks-sidebar state is URL-driven so deep links survive reload. // - `tab`: active tab id @@ -244,28 +261,35 @@ function QueryPageInner() {

{/* Row 1 */} - + {/* Row 2 */} - + - {query.ray_dashboard_url ? ( + {query.runner.dashboard_url ? ( ) : (
)} - {(query.python_version || query.ray_version) ? ( + {(query.python_version || query.runner.version) ? ( "Ray"; the Engine cell already shows "Flotilla". + label: query.runner.name.split(" (")[0], + value: query.runner.version, + }, + ].filter(Boolean) as { label: string; value: string }[]} /> ) : (
@@ -320,16 +344,17 @@ function QueryPageInner() { exec_state={query.state as ExecutingState} highlightedNodeId={nodeFilter} hoveredNodeIds={hoveredNodeIds} - onViewTasks={isFlotilla ? handleViewTasksForNode : undefined} - tasksOpen={isFlotilla && tasksOpen} - onOpenTasks={isFlotilla ? handleOpenTasks : undefined} + onViewTasks={canShowTasksPanel ? handleViewTasksForNode : undefined} + tasksOpen={canShowTasksPanel && tasksOpen} + onOpenTasks={canShowTasksPanel ? handleOpenTasks : undefined} queryStatus={query.state.status} />
- {isFlotilla && tasksOpen && queryId && ( + {canShowTasksPanel && tasksOpen && queryId && (
{rows.length === 0 && activeTasks.length === 0 ? ( -
-

- {nodeFilter != null - ? "No tasks for this filter." - : allRows.length === 0 - ? "No tasks reported yet." - : "No tasks match."} -

+
+ {nodeFilter != null ? ( +

No tasks for this filter.

+ ) : allRows.length > 0 ? ( +

No tasks match.

+ ) : runner?.task_events_enabled === false ? ( + <> +

No task events reported for this query.

+

+ Task reporting may be disabled. Set{" "} + DAFT_TASK_EVENTS_ENABLED=1{" "} + and rerun the query to see task-level details. +

+ + ) : queryActive ? ( +

No tasks reported yet.

+ ) : ( +

No task events were received for this query.

+ )}
) : ( <> diff --git a/src/daft-dashboard/frontend/src/app/query/types.ts b/src/daft-dashboard/frontend/src/app/query/types.ts index 5fa36473028..65257dd809d 100644 --- a/src/daft-dashboard/frontend/src/app/query/types.ts +++ b/src/daft-dashboard/frontend/src/app/query/types.ts @@ -206,16 +206,22 @@ export type QueryState = marked_dead_sec: number; }; +export type RunnerInfo = { + name: string; + version?: string | null; + distributed: boolean; + dashboard_url?: string | null; + task_events_enabled?: boolean | null; +}; + export type QueryInfo = { id: string; start_sec: number; last_heartbeat_sec: number; unoptimized_plan: string; - runner: string; - ray_dashboard_url?: string; + runner: RunnerInfo; entrypoint?: string; python_version?: string; daft_version?: string; - ray_version?: string; state: QueryState; }; diff --git a/src/daft-dashboard/frontend/src/hooks/use-queries.ts b/src/daft-dashboard/frontend/src/hooks/use-queries.ts index b715ec686b4..84bc7dec5f9 100644 --- a/src/daft-dashboard/frontend/src/hooks/use-queries.ts +++ b/src/daft-dashboard/frontend/src/hooks/use-queries.ts @@ -1,5 +1,7 @@ import { createContext, useContext } from "react"; +import type { RunnerInfo } from "@/app/query/types"; + export type PendingStatus = { status: "Pending"; start_sec: number; @@ -69,12 +71,10 @@ export type QuerySummary = { id: string; start_sec: number; status: QueryStatus; - runner: string; - ray_dashboard_url?: string; + runner: RunnerInfo; entrypoint?: string; python_version?: string; daft_version?: string; - ray_version?: string; }; export type QuerySummaryMap = { [_: string]: QuerySummary }; diff --git a/src/daft-dashboard/src/engine.rs b/src/daft-dashboard/src/engine.rs index e86957e8ed4..6fcdfbe5e74 100644 --- a/src/daft-dashboard/src/engine.rs +++ b/src/daft-dashboard/src/engine.rs @@ -33,17 +33,25 @@ use crate::state::{ QueryInfo, QueryState, TaskStatus, TaskStore, }; +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct RunnerInfo { + pub name: String, + pub version: Option, + #[serde(default)] + pub distributed: bool, + pub dashboard_url: Option, + pub task_events_enabled: Option, +} + #[derive(Clone, Deserialize, Serialize)] #[cfg_attr(debug_assertions, derive(Debug))] pub struct StartQueryArgs { pub start_sec: f64, pub unoptimized_plan: QueryPlan, - pub runner: Option, - pub ray_dashboard_url: Option, + pub runner: RunnerInfo, pub entrypoint: Option, pub python_version: Option, pub daft_version: Option, - pub ray_version: Option, } async fn query_start( @@ -68,14 +76,10 @@ pub(crate) fn apply_query_start( start_sec: args.start_sec, last_heartbeat_sec: secs_from_epoch(), unoptimized_plan: args.unoptimized_plan, - runner: args - .runner - .unwrap_or_else(|| "Native (Swordfish)".to_string()), - ray_dashboard_url: args.ray_dashboard_url, + runner: args.runner, entrypoint: args.entrypoint, python_version: args.python_version, daft_version: args.daft_version, - ray_version: args.ray_version, state: QueryState::Pending, }; @@ -1139,7 +1143,9 @@ pub(crate) fn routes() -> Router> { mod eviction_tests { use std::sync::Arc; - use super::{DashboardState, MAX_QUERIES_RETAINED, QueryInfo, QueryState, evict_old_queries}; + use super::{ + DashboardState, MAX_QUERIES_RETAINED, QueryInfo, QueryState, RunnerInfo, evict_old_queries, + }; fn insert_query(state: &DashboardState, id: &str, start_sec: f64, active: bool) { let qid: common_metrics::QueryID = Arc::from(id); @@ -1161,12 +1167,16 @@ mod eviction_tests { start_sec, last_heartbeat_sec: start_sec, unoptimized_plan: Arc::from(""), - runner: "test".to_string(), - ray_dashboard_url: None, + runner: RunnerInfo { + name: "test".to_string(), + version: None, + distributed: false, + dashboard_url: None, + task_events_enabled: None, + }, entrypoint: None, python_version: None, daft_version: None, - ray_version: None, state: query_state, }, ); @@ -1235,12 +1245,16 @@ mod eviction_tests { start_sec: 0.0, last_heartbeat_sec: 0.0, unoptimized_plan: Arc::from(""), - runner: "test".to_string(), - ray_dashboard_url: None, + runner: RunnerInfo { + name: "test".to_string(), + version: None, + distributed: false, + dashboard_url: None, + task_events_enabled: None, + }, entrypoint: None, python_version: None, daft_version: None, - ray_version: None, state: QueryState::Finalizing { plan_info: PlanInfo { plan_start_sec: 0.0, diff --git a/src/daft-dashboard/src/import.rs b/src/daft-dashboard/src/import.rs index a4d8a17edfa..70f4058e79b 100644 --- a/src/daft-dashboard/src/import.rs +++ b/src/daft-dashboard/src/import.rs @@ -15,9 +15,9 @@ use common_metrics::Stat; use crate::{ engine::{ ExecEmitStatsArgsRecv, ExecEndArgs, ExecStartArgs, FinalizeArgs, PlanEndArgs, - PlanStartArgs, StartQueryArgs, apply_emit_stats, apply_exec_end, apply_exec_start, - apply_operator_end, apply_operator_start, apply_plan_end, apply_plan_start, - apply_query_end, apply_query_start, + PlanStartArgs, RunnerInfo, StartQueryArgs, apply_emit_stats, apply_exec_end, + apply_exec_start, apply_operator_end, apply_operator_start, apply_plan_end, + apply_plan_start, apply_query_end, apply_query_start, }, events::{Event, MetricValue}, state::DashboardState, @@ -133,11 +133,18 @@ fn import_event(event: &Event, state: &DashboardState) -> Result<(), EventLogErr StartQueryArgs { start_sec: e.timestamp, unoptimized_plan: e.plan.clone().into(), - runner: e.runner.clone(), - ray_dashboard_url: e.dashboard_url.clone(), + runner: RunnerInfo { + name: e + .runner + .clone() + .unwrap_or_else(|| "Native (Swordfish)".to_string()), + version: e.runner_version.clone(), + distributed: false, + dashboard_url: e.dashboard_url.clone(), + task_events_enabled: None, + }, entrypoint: e.entrypoint.clone(), daft_version: e.daft_version.clone(), - ray_version: e.runner_version.clone(), python_version: e.python_version.clone(), }, ), diff --git a/src/daft-dashboard/src/state.rs b/src/daft-dashboard/src/state.rs index 5fc7d021aad..edb7123e96e 100644 --- a/src/daft-dashboard/src/state.rs +++ b/src/daft-dashboard/src/state.rs @@ -13,7 +13,7 @@ use serde::Serialize; use tokio::sync::{broadcast, watch}; use uuid::Uuid; -use crate::engine::TaskStatsEntry; +use crate::engine::{RunnerInfo, TaskStatsEntry}; #[derive(Debug, Clone, Serialize, PartialEq, Eq)] pub(crate) enum OperatorStatus { @@ -696,17 +696,13 @@ pub(crate) struct QuerySummary { pub id: QueryID, pub start_sec: f64, pub status: QueryStatus, - pub runner: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub ray_dashboard_url: Option, + pub runner: RunnerInfo, #[serde(skip_serializing_if = "Option::is_none")] pub entrypoint: Option, #[serde(skip_serializing_if = "Option::is_none")] pub python_version: Option, #[serde(skip_serializing_if = "Option::is_none")] pub daft_version: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub ray_version: Option, } #[derive(Debug, Clone, Serialize)] @@ -778,12 +774,10 @@ pub(crate) struct QueryInfo { pub start_sec: f64, pub last_heartbeat_sec: f64, pub unoptimized_plan: QueryPlan, - pub runner: String, - pub ray_dashboard_url: Option, + pub runner: RunnerInfo, pub entrypoint: Option, pub python_version: Option, pub daft_version: Option, - pub ray_version: Option, pub state: QueryState, } @@ -836,11 +830,9 @@ impl QueryInfo { start_sec: self.start_sec, status: self.status(), runner: self.runner.clone(), - ray_dashboard_url: self.ray_dashboard_url.clone(), entrypoint: self.entrypoint.clone(), python_version: self.python_version.clone(), daft_version: self.daft_version.clone(), - ray_version: self.ray_version.clone(), } } } diff --git a/tests/test_events.py b/tests/test_events.py index 304ffc7111c..05fa7ba5e8c 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -43,7 +43,7 @@ def _make_query_metadata() -> PyQueryMetadata: df.schema()._schema, df._builder.repr_json(), "Native (Swordfish)", - ray_dashboard_url=None, + dashboard_url=None, entrypoint="pytest", )