diff --git a/pom.xml b/pom.xml index 8eb5254405c..e0fd36ae718 100644 --- a/pom.xml +++ b/pom.xml @@ -83,6 +83,7 @@ zeppelin-common zeppelin-client zeppelin-client-examples + zeppelin-mcp zeppelin-web-angular zeppelin-server zeppelin-jupyter diff --git a/zeppelin-mcp/.gitignore b/zeppelin-mcp/.gitignore new file mode 100644 index 00000000000..1cd1708a83e --- /dev/null +++ b/zeppelin-mcp/.gitignore @@ -0,0 +1,7 @@ +.venv/ +venv/ +target/ +*.egg-info/ +__pycache__/ +.pytest_cache/ +.coverage diff --git a/zeppelin-mcp/GETTING_STARTED.md b/zeppelin-mcp/GETTING_STARTED.md new file mode 100644 index 00000000000..b012b1eb2d4 --- /dev/null +++ b/zeppelin-mcp/GETTING_STARTED.md @@ -0,0 +1,280 @@ + + +# Getting Started — Zeppelin MCP Server (experimental) + +This guide takes you from a fresh checkout to an AI coding agent driving your +Apache Zeppelin instance through the [Model Context Protocol](https://modelcontextprotocol.io). + +> **Status: experimental.** There is no published package yet — you build and +> install it from source (below). Distribution through standard channels +> (PyPI, the Zeppelin binary distribution) is planned. The server itself is a +> thin client over Zeppelin's stable [REST API](https://zeppelin.apache.org/docs/latest/usage/rest_api/notebook.html), +> so it works against any reachable Zeppelin server with no server-side changes. + +### Open items (pre-GA) + +These are unresolved and tracked for follow-up — see the CR discussion: + +- **Bundling in binary distributions.** Whether to ship the `zeppelin-mcp` + console script (or a packaged artifact) inside the Zeppelin binary + distribution, vs. publishing to PyPI and letting users `pip install`. Until + decided, this module is build-from-source only. +- **Standard distribution channel** (PyPI release cadence, versioning). +- **Semantic search over notebooks as agent knowledge.** Optionally index + notebook/paragraph text plus interpreter metadata so an agent can discover + relevant notebooks by meaning (e.g. a `search_notebooks` tool) and build + richer context, rather than only operating on an explicitly selected note. + Not implemented today; the server currently exposes only direct REST-backed + operations. + +--- + +## 1. Prerequisites + +| Requirement | Notes | +| --- | --- | +| **Python 3.10+** | `python3 --version`. The server and its console script run on this. | +| **A running Zeppelin** | Local (`bin/zeppelin-daemon.sh start` → `http://localhost:8080`) or any reachable instance. | +| **An MCP-capable agent** | Claude Code, Kiro, Codex, Cursor, or any MCP client. | +| **git** | To clone the repo (if you haven't already). | + +Confirm Zeppelin is reachable before wiring up any agent: + +```bash +curl -s http://localhost:8080/api/notebook | head -c 200 +``` + +A JSON response (even an empty list) means you're good. A connection error +means start Zeppelin first. + +--- + +## 2. Build & install from source + +The server lives in the `zeppelin-mcp/` module. Install it into an isolated +virtualenv so the `zeppelin-mcp` console script lands on a predictable path. + +```bash +cd zeppelin-mcp + +# Create and activate an isolated environment +python3 -m venv .venv +source .venv/bin/activate # Windows: .venv\Scripts\activate + +# Install the package (editable, so source edits take effect immediately) +pip install -e . + +# Verify the console script is installed and resolves +which zeppelin-mcp +``` + +`which zeppelin-mcp` prints an **absolute path** like +`/path/to/zeppelin/zeppelin-mcp/.venv/bin/zeppelin-mcp`. **Copy this path** — +some agents (notably Codex) need the absolute path because they don't inherit +your shell's activated venv. + +> **Why a venv?** The console script is only on `PATH` while the venv is active. +> Agents launch the server as a subprocess and usually do **not** have your venv +> activated, so pointing them at the absolute path is the reliable option. + +### Optional: run from the Maven reactor + +The module is wired into the Zeppelin build. This creates a venv under +`target/venv` and runs the test suite (it does not install onto your `PATH`): + +```bash +# from the repo root +./mvnw test -pl zeppelin-mcp +# skip the Python tests like every other module: +./mvnw clean package -pl zeppelin-mcp -DskipTests +``` + +--- + +## 3. Smoke-test the server + +Before involving an agent, confirm the server starts and can reach Zeppelin. +It speaks MCP over **stdio**, so a bare launch will sit waiting for a client on +stdin — that's expected. Send it nothing and Ctrl-C; a clean start with no +import/connection error is the signal you want: + +```bash +ZEPPELIN_URL=http://localhost:8080 zeppelin-mcp +# (no output, waiting on stdin) -> press Ctrl-C +``` + +If you see a `ModuleNotFoundError`, the venv isn't active or the install +failed — revisit step 2. If you want a fuller end-to-end check, wire up an +agent (step 5) and ask it to list interpreters. + +--- + +## 4. Configuration (environment variables) + +The server is configured entirely through environment variables — there are no +config files of its own. + +| Variable | Default | Description | +| --- | --- | --- | +| `ZEPPELIN_URL` | *(required)* | Base server URL, e.g. `http://localhost:8080`. | +| `ZEPPELIN_AUTH` | `none` | `none`, `basic`, or `header`. | +| `ZEPPELIN_USER` | — | Username for `basic` auth. | +| `ZEPPELIN_PASSWORD` | — | Password for `basic` auth. | +| `ZEPPELIN_AUTH_HEADER` | `Authorization` | Header name for `header` auth. | +| `ZEPPELIN_AUTH_HEADER_VALUE` | — | Header value for `header` auth (e.g. `Bearer ...`). | +| `ZEPPELIN_TIMEOUT_SECONDS` | `600` | Per-request timeout (long enough for Spark/SQL jobs). | +| `ZEPPELIN_VERIFY_TLS` | `true` | Set `false` for self-signed dev servers. | +| `ZEPPELIN_NOTEBOOK` | — | Optional note id or URL to pre-select on startup. | + +For local anonymous Zeppelin, `ZEPPELIN_URL` alone is enough. For a +Shiro-secured server use `ZEPPELIN_AUTH=basic` with user/password; for an +SSO/reverse-proxied server use `ZEPPELIN_AUTH=header`. + +--- + +## 5. Connect an MCP client + +The server is launched as a subprocess by the agent. In every example, replace +`zeppelin-mcp` with the **absolute path** from step 2 if the bare command isn't +on the agent's `PATH`. + +### Claude Code + +```bash +claude mcp add zeppelin \ + --env ZEPPELIN_URL=http://localhost:8080 \ + -- /path/to/zeppelin/zeppelin-mcp/.venv/bin/zeppelin-mcp +``` + +### Kiro + +Kiro reads MCP config from `.kiro/settings/mcp.json` (workspace) or +`~/.kiro/settings/mcp.json` (user-level): + +```json +{ + "mcpServers": { + "zeppelin": { + "command": "/path/to/zeppelin/zeppelin-mcp/.venv/bin/zeppelin-mcp", + "args": [], + "env": { + "ZEPPELIN_URL": "http://localhost:8080" + }, + "disabled": false, + "autoApprove": [] + } + } +} +``` + +### Codex (OpenAI Codex CLI) + +Codex reads MCP servers from `~/.codex/config.toml`: + +```toml +[mcp_servers.zeppelin] +command = "/path/to/zeppelin/zeppelin-mcp/.venv/bin/zeppelin-mcp" +args = [] +env = { ZEPPELIN_URL = "http://localhost:8080" } +``` + +### Generic MCP client (`mcp.json` / `claude_desktop_config.json` / Cursor) + +```json +{ + "mcpServers": { + "zeppelin": { + "command": "/path/to/zeppelin/zeppelin-mcp/.venv/bin/zeppelin-mcp", + "env": { + "ZEPPELIN_URL": "http://localhost:8080", + "ZEPPELIN_AUTH": "basic", + "ZEPPELIN_USER": "admin", + "ZEPPELIN_PASSWORD": "password1" + } + } + } +} +``` + +> **Compatibility note.** Any spec-compliant MCP client works — the server has +> no client-specific code; it just runs a stdio MCP server. Only the location +> and format of the config differ between agents. After editing a config file, +> restart the agent (or reload its MCP servers) so it picks up the change. + +--- + +## 6. First run — verify end to end + +Ask your agent something like: + +> "List the Zeppelin interpreters, then create a notebook called `mcp-smoke`, +> run `%md # hello from MCP` in it, and read it back." + +Under the hood the agent calls `list_interpreters` → `create_notebook` → +`run_code` → `read_notebook`. If the markdown paragraph renders, your wiring is +correct. + +### Available tools + +| Tool | What it does | +| --- | --- | +| `set_notebook(notebook)` | Select a notebook (bare id or pasted URL); returns its rendered contents. | +| `create_notebook(name, default_interpreter_group="")` | Create a notebook and select it. | +| `run_code(code, interpreter="")` | Add a paragraph, run it synchronously, return its output. | +| `read_notebook()` | Render the selected notebook (paragraph ids, code, truncated output). | +| `delete_paragraph(paragraph_id)` | Delete a paragraph by id. | +| `clear_output()` | Clear all paragraph output, keeping the code. | +| `list_interpreters()` | List interpreter settings (id, name, group). | +| `restart_interpreter(setting_id, scope_to_notebook=False)` | Restart a hung/broken interpreter. | +| `write_summary(summary_markdown, title="")` | Insert a Markdown summary as the first paragraph; optionally rename the note. | + +A typical agent loop: `set_notebook` (or `create_notebook`) → one or more +`run_code` calls → `read_notebook` to inspect results → `write_summary` with the +agent's own data-driven findings. + +--- + +## 7. Troubleshooting + +| Symptom | Likely cause & fix | +| --- | --- | +| Agent reports the server failed to start | The `command` path is wrong or the venv isn't installed. Use the absolute path from `which zeppelin-mcp` (step 2). | +| `ModuleNotFoundError: mcp` (or `httpx`) | `pip install -e .` wasn't run in the active venv. Reinstall (step 2). | +| Tools return connection/timeout errors | `ZEPPELIN_URL` is unreachable from where the agent runs, or Zeppelin is down. Re-run the `curl` check in step 1. | +| `401`/`403` from tools | Zeppelin requires auth. Set `ZEPPELIN_AUTH=basic` (+ user/password) or `header`. | +| TLS errors against a dev server | Set `ZEPPELIN_VERIFY_TLS=false`. | +| Long Spark/SQL paragraph times out | Raise `ZEPPELIN_TIMEOUT_SECONDS`. | +| `python3: command not found` during Maven build | Set `-Dpython.executable=python` or skip Python tests with `-Dpython.test.skip=true`. | + +--- + +## 8. Developing on the server + +```bash +cd zeppelin-mcp +source .venv/bin/activate +pip install -e '.[test]' +pytest # unit suite + coverage +``` + +Tests run entirely against an in-memory fake Zeppelin (an `httpx.MockTransport` +handler in `tests/conftest.py`) — no live server needed. + +See [`README.md`](README.md) for the module reference and architecture notes +(`client.py` owns REST calls, `context.py` renders results, `server.py` wires +tools to session state). diff --git a/zeppelin-mcp/README.md b/zeppelin-mcp/README.md new file mode 100644 index 00000000000..2277a2574be --- /dev/null +++ b/zeppelin-mcp/README.md @@ -0,0 +1,127 @@ + + +# Zeppelin MCP Server + +A [Model Context Protocol](https://modelcontextprotocol.io) (MCP) server that lets +AI coding agents — Claude Code, Kiro, Cursor, and any other MCP client — drive an +Apache Zeppelin instance: select or create a notebook, run paragraphs in any +interpreter, read results, clean up paragraphs, restart a stuck interpreter, and +write a summary back into the notebook. + +It talks to Zeppelin over the standard [REST API](https://zeppelin.apache.org/docs/latest/usage/rest_api/notebook.html), +so it works against any reachable Zeppelin server without modifying it. + +## Tools + +| Tool | What it does | +| --- | --- | +| `set_notebook(notebook)` | Select a notebook (bare id or pasted URL); returns its rendered contents. | +| `create_notebook(name, default_interpreter_group="")` | Create a notebook and select it. | +| `run_code(code, interpreter="")` | Add a paragraph, run it synchronously, return its output. | +| `read_notebook()` | Render the selected notebook (paragraph ids, code, truncated output). | +| `delete_paragraph(paragraph_id)` | Delete a paragraph by id. | +| `clear_output()` | Clear all paragraph output, keeping the code. | +| `list_interpreters()` | List interpreter settings (id, name, group). | +| `restart_interpreter(setting_id, scope_to_notebook=False)` | Restart a hung/broken interpreter. | +| `write_summary(summary_markdown, title="")` | Insert a Markdown summary as the first paragraph; optionally rename the note. | + +A typical agent loop: `set_notebook` (or `create_notebook`) → one or more +`run_code` calls → `read_notebook` to inspect results → `write_summary` with the +agent's own data-driven findings. + +## Installation + +Requires Python 3.10+. + +```bash +# From a checkout of this module: +pip install /path/to/zeppelin/zeppelin-mcp + +# Or, once published to PyPI: +pip install zeppelin-mcp +``` + +This installs the `zeppelin-mcp` console script, which runs the server over stdio. + +## Configuration + +The server is configured entirely through environment variables: + +| Variable | Default | Description | +| --- | --- | --- | +| `ZEPPELIN_URL` | *(required)* | Base server URL, e.g. `http://localhost:8080`. | +| `ZEPPELIN_AUTH` | `none` | `none`, `basic`, or `header`. | +| `ZEPPELIN_USER` | — | Username for `basic` auth. | +| `ZEPPELIN_PASSWORD` | — | Password for `basic` auth. | +| `ZEPPELIN_AUTH_HEADER` | `Authorization` | Header name for `header` auth. | +| `ZEPPELIN_AUTH_HEADER_VALUE` | — | Header value for `header` auth (e.g. `Bearer ...`). | +| `ZEPPELIN_TIMEOUT_SECONDS` | `600` | Per-request timeout (long enough for Spark/SQL jobs). | +| `ZEPPELIN_VERIFY_TLS` | `true` | Set `false` for self-signed dev servers. | + +### Authentication modes + +- **`none`** — anonymous Zeppelin (no `shiro.ini` realm). The default for local dev. +- **`basic`** — Shiro form login via `POST /api/login`; the session cookie is reused. +- **`header`** — attaches a fixed header to every request. Use this for + reverse-proxied / SSO deployments where auth is a bearer token or session cookie. + +## Connecting from an MCP client + +### Claude Code + +```bash +claude mcp add zeppelin \ + --env ZEPPELIN_URL=http://localhost:8080 \ + -- zeppelin-mcp +``` + +### Generic MCP client (`mcp.json` / `claude_desktop_config.json`) + +```json +{ + "mcpServers": { + "zeppelin": { + "command": "zeppelin-mcp", + "env": { + "ZEPPELIN_URL": "http://localhost:8080", + "ZEPPELIN_AUTH": "basic", + "ZEPPELIN_USER": "admin", + "ZEPPELIN_PASSWORD": "password1" + } + } + } +} +``` + +## Development + +```bash +cd zeppelin-mcp +python3 -m venv .venv && source .venv/bin/activate +pip install -e '.[test]' +pytest # runs the unit suite with coverage +``` + +The tests run entirely against an in-memory fake Zeppelin (an `httpx.MockTransport` +handler in `tests/conftest.py`) — no live server is needed. + +### Building with Maven + +The module is wired into the Zeppelin reactor build. `mvn test -pl zeppelin-mcp` +creates a virtualenv under `target/venv` and runs pytest. A standard +`./mvnw clean package -DskipTests` skips the Python tests like every other module. diff --git a/zeppelin-mcp/pom.xml b/zeppelin-mcp/pom.xml new file mode 100644 index 00000000000..f11e2718951 --- /dev/null +++ b/zeppelin-mcp/pom.xml @@ -0,0 +1,118 @@ + + + + 4.0.0 + + + zeppelin + org.apache.zeppelin + 0.13.0-SNAPSHOT + ../pom.xml + + + zeppelin-mcp + pom + Zeppelin: MCP Server + Model Context Protocol server for operating Zeppelin notebooks from AI agents + + + + python3 + + false + ${project.build.directory}/venv + + + + + + org.codehaus.mojo + exec-maven-plugin + + + + mcp-venv-setup + test-compile + + exec + + + ${python.test.skip} + ${python.executable} + ${project.basedir} + + -m + venv + ${mcp.venv.dir} + + + + + mcp-pip-install + test-compile + + exec + + + ${python.test.skip} + ${mcp.venv.dir}/bin/pip + ${project.basedir} + + install + --quiet + .[test] + + + + + + mcp-pytest + test + + exec + + + ${python.test.skip} + ${mcp.venv.dir}/bin/pytest + ${project.basedir} + + -q + + + + + + + + + + + + skip-python-tests + + + skipTests + + + + true + + + + diff --git a/zeppelin-mcp/pyproject.toml b/zeppelin-mcp/pyproject.toml new file mode 100644 index 00000000000..012a2b2f342 --- /dev/null +++ b/zeppelin-mcp/pyproject.toml @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "zeppelin-mcp" +version = "0.13.0.dev0" +description = "Model Context Protocol (MCP) server for Apache Zeppelin notebooks" +readme = "README.md" +requires-python = ">=3.10" +license = { text = "Apache-2.0" } +authors = [{ name = "Apache Zeppelin" }] +keywords = ["zeppelin", "mcp", "notebook", "llm", "agent"] +dependencies = [ + "mcp>=1.2.0", + "httpx>=0.27.0", +] + +[project.optional-dependencies] +test = [ + "pytest>=8.0.0", + "pytest-cov>=5.0.0", +] + +[project.scripts] +zeppelin-mcp = "zeppelin_mcp.server:main" + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +addopts = "--cov=zeppelin_mcp --cov-report=term-missing" diff --git a/zeppelin-mcp/src/zeppelin_mcp/__init__.py b/zeppelin-mcp/src/zeppelin_mcp/__init__.py new file mode 100644 index 00000000000..ef410659188 --- /dev/null +++ b/zeppelin-mcp/src/zeppelin_mcp/__init__.py @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""MCP server for Apache Zeppelin notebook operations.""" + +from .client import AuthMode, ZeppelinClient, ZeppelinConfig, ZeppelinError + +__all__ = ["AuthMode", "ZeppelinClient", "ZeppelinConfig", "ZeppelinError"] + +__version__ = "0.13.0.dev0" diff --git a/zeppelin-mcp/src/zeppelin_mcp/client.py b/zeppelin-mcp/src/zeppelin_mcp/client.py new file mode 100644 index 00000000000..8175da19fcd --- /dev/null +++ b/zeppelin-mcp/src/zeppelin_mcp/client.py @@ -0,0 +1,285 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Thin synchronous client for the Apache Zeppelin REST API. + +This module wraps the subset of the Zeppelin REST API that the MCP server +needs and is intentionally free of any MCP concepts so it can be unit tested +in isolation. All responses from Zeppelin share the envelope:: + + {"status": "OK", "message": "", "body": } + +``body`` is the only interesting field; :meth:`ZeppelinClient._body` unwraps it +and raises :class:`ZeppelinError` on any non-OK status or transport failure. +""" + +from __future__ import annotations + +import dataclasses +import enum +from typing import Any, Optional + +import httpx + +# Default time budget for a single HTTP request. Synchronous paragraph runs can +# take a while (a Spark job, a slow SQL query), so this is deliberately generous +# and overridable from the environment by the server. +DEFAULT_TIMEOUT_SECONDS = 600.0 + + +class ZeppelinError(RuntimeError): + """Raised when Zeppelin returns a non-OK status or a request fails. + + The original HTTP status code (when available) is preserved on + :attr:`status_code` so callers can distinguish auth failures (401/403) + from missing notebooks (404) without parsing the message string. + """ + + def __init__(self, message: str, status_code: Optional[int] = None): + super().__init__(message) + self.status_code = status_code + + +class AuthMode(str, enum.Enum): + """Supported authentication strategies. + + NONE + Anonymous Zeppelin (no ``shiro.ini`` realm configured). The default. + BASIC + Shiro form login via ``POST /api/login``; the resulting session cookie + is reused for subsequent calls. + HEADER + A pre-supplied header (e.g. ``Authorization: Bearer ...`` or a session + cookie) is attached verbatim. Covers reverse-proxied / SSO deployments + without baking any specific scheme into this client. + """ + + NONE = "none" + BASIC = "basic" + HEADER = "header" + + +@dataclasses.dataclass +class ZeppelinConfig: + """Connection settings for a Zeppelin server.""" + + base_url: str + auth_mode: AuthMode = AuthMode.NONE + username: Optional[str] = None + password: Optional[str] = None + # For AuthMode.HEADER, the literal header name/value to attach to requests. + auth_header_name: str = "Authorization" + auth_header_value: Optional[str] = None + timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS + # When False, TLS certificate verification is disabled (self-signed dev + # servers). Mirrors httpx's ``verify`` argument. + verify_tls: bool = True + + def api_root(self) -> str: + """Return the ``/api`` root with any trailing slash on base_url removed.""" + return f"{self.base_url.rstrip('/')}/api" + + +class ZeppelinClient: + """Synchronous wrapper around the Zeppelin REST API. + + The client owns a single :class:`httpx.Client` (connection pooling, cookie + persistence) and is safe to reuse across many calls. It is *not* designed + for concurrent use from multiple threads. + """ + + def __init__(self, config: ZeppelinConfig, http_client: Optional[httpx.Client] = None): + self._config = config + # Allow injection of a pre-built client (used by tests to mount a mock + # transport); otherwise construct one from the config. + self._http = http_client or httpx.Client( + timeout=config.timeout_seconds, + verify=config.verify_tls, + follow_redirects=True, + ) + self._logged_in = False + + # -- lifecycle --------------------------------------------------------- + + def close(self) -> None: + self._http.close() + + def __enter__(self) -> "ZeppelinClient": + return self + + def __exit__(self, *_exc: object) -> None: + self.close() + + # -- auth -------------------------------------------------------------- + + def _ensure_auth(self) -> None: + """Perform a one-time Shiro login for BASIC auth. + + NONE and HEADER modes need no handshake. For BASIC we POST the + credentials once; httpx persists the returned ``JSESSIONID`` cookie on + the shared client for the lifetime of the process. + """ + if self._config.auth_mode != AuthMode.BASIC or self._logged_in: + return + if not self._config.username or not self._config.password: + raise ZeppelinError("BASIC auth requires both username and password") + resp = self._http.post( + f"{self._config.api_root()}/login", + data={"userName": self._config.username, "password": self._config.password}, + ) + if resp.status_code != httpx.codes.OK: + raise ZeppelinError( + f"Login failed for user '{self._config.username}' " + f"(HTTP {resp.status_code})", + status_code=resp.status_code, + ) + self._logged_in = True + + def _headers(self) -> dict[str, str]: + if self._config.auth_mode == AuthMode.HEADER and self._config.auth_header_value: + return {self._config.auth_header_name: self._config.auth_header_value} + return {} + + # -- transport --------------------------------------------------------- + + def _request(self, method: str, path: str, *, json: Any = None, + params: Optional[dict[str, Any]] = None) -> Any: + """Issue a request to ``/api`` + ``path`` and return the ``body`` field. + + Raises :class:`ZeppelinError` on transport errors, non-2xx responses, + or a JSON envelope whose ``status`` is not ``OK``. + """ + self._ensure_auth() + url = f"{self._config.api_root()}{path}" + try: + resp = self._http.request( + method, url, json=json, params=params, headers=self._headers() + ) + except httpx.HTTPError as exc: # connection refused, DNS, timeout, ... + raise ZeppelinError(f"Request to {url} failed: {exc}") from exc + + if resp.status_code >= 400: + raise ZeppelinError( + f"{method} {path} returned HTTP {resp.status_code}: " + f"{_truncate(resp.text)}", + status_code=resp.status_code, + ) + + # Some endpoints (e.g. login) return an empty body; treat as no payload. + if not resp.content: + return None + try: + envelope = resp.json() + except ValueError as exc: + raise ZeppelinError( + f"{method} {path} returned non-JSON body: {_truncate(resp.text)}" + ) from exc + + status = envelope.get("status") + if status and status != "OK": + raise ZeppelinError( + f"{method} {path} returned status={status}: " + f"{envelope.get('message', '')}", + status_code=resp.status_code, + ) + return envelope.get("body") + + # -- notebook operations ---------------------------------------------- + + def create_note(self, note_path: str, + default_interpreter_group: Optional[str] = None) -> str: + """Create a notebook and return its new note id. + + ``note_path`` is the full path (Zeppelin nests notes by ``/``); the + REST API calls this field ``name`` in older docs but the server reads + ``NewNoteRequest.notePath``. + """ + payload: dict[str, Any] = {"name": note_path} + if default_interpreter_group: + payload["defaultInterpreterGroup"] = default_interpreter_group + body = self._request("POST", "/notebook", json=payload) + if not body: + raise ZeppelinError("Note creation returned no note id") + return str(body) + + def get_note(self, note_id: str) -> dict[str, Any]: + """Return the full note document (includes ``paragraphs``).""" + body = self._request("GET", f"/notebook/{note_id}") + if not isinstance(body, dict): + raise ZeppelinError(f"Note {note_id} not found") + return body + + def rename_note(self, note_id: str, new_name: str) -> None: + self._request("PUT", f"/notebook/{note_id}/rename", json={"name": new_name}) + + def add_paragraph(self, note_id: str, text: str, + index: Optional[int] = None) -> str: + """Append (or insert at ``index``) a paragraph; return its id.""" + payload: dict[str, Any] = {"text": text} + if index is not None: + payload["index"] = index + body = self._request("POST", f"/notebook/{note_id}/paragraph", json=payload) + if not body: + raise ZeppelinError("Paragraph creation returned no paragraph id") + return str(body) + + def update_paragraph_text(self, note_id: str, paragraph_id: str, text: str) -> None: + self._request( + "PUT", f"/notebook/{note_id}/paragraph/{paragraph_id}", json={"text": text} + ) + + def get_paragraph(self, note_id: str, paragraph_id: str) -> dict[str, Any]: + body = self._request("GET", f"/notebook/{note_id}/paragraph/{paragraph_id}") + if not isinstance(body, dict): + raise ZeppelinError(f"Paragraph {paragraph_id} not found") + return body + + def delete_paragraph(self, note_id: str, paragraph_id: str) -> None: + self._request("DELETE", f"/notebook/{note_id}/paragraph/{paragraph_id}") + + def clear_all_output(self, note_id: str) -> None: + self._request("PUT", f"/notebook/{note_id}/clear") + + def run_paragraph_sync(self, note_id: str, paragraph_id: str) -> dict[str, Any]: + """Run a paragraph and block until it finishes. + + Uses Zeppelin's synchronous endpoint ``POST /api/notebook/run/{}/{}``, + whose ``body`` is the ``InterpreterResult`` (``{"code", "msg": [...]}``). + This avoids the submit-then-poll loop the async ``job`` endpoint needs. + """ + body = self._request("POST", f"/notebook/run/{note_id}/{paragraph_id}") + # The sync endpoint returns the InterpreterResult directly. Normalize a + # null body (paragraph produced no output) to an empty result. + return body if isinstance(body, dict) else {"code": "SUCCESS", "msg": []} + + # -- interpreter operations ------------------------------------------- + + def list_interpreter_settings(self) -> list[dict[str, Any]]: + body = self._request("GET", "/interpreter/setting") + return body if isinstance(body, list) else [] + + def restart_interpreter(self, setting_id: str, + note_id: Optional[str] = None) -> None: + """Restart an interpreter setting, optionally scoped to one note.""" + payload = {"noteId": note_id} if note_id else None + self._request("PUT", f"/interpreter/setting/restart/{setting_id}", json=payload) + + +def _truncate(text: str, limit: int = 500) -> str: + """Clamp server error bodies so exception messages stay readable.""" + text = text.strip() + return text if len(text) <= limit else text[:limit] + "..." diff --git a/zeppelin-mcp/src/zeppelin_mcp/context.py b/zeppelin-mcp/src/zeppelin_mcp/context.py new file mode 100644 index 00000000000..9168e6d1f7f --- /dev/null +++ b/zeppelin-mcp/src/zeppelin_mcp/context.py @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Render a Zeppelin note into compact, agent-readable text. + +The MCP server hands these renderings back to the calling LLM so it can reason +about what is already in a notebook (to write a summary, decide what to delete, +or continue an analysis) without the agent having to fetch and parse the raw +note JSON itself. +""" + +from __future__ import annotations + +from typing import Any + +# Cap result rows per paragraph so a single wide query cannot blow the model's +# context window. The header row is always kept; this bounds the data rows. +MAX_RESULT_ROWS = 20 + +# Result message types that carry no useful text for an LLM (images, HTML +# widgets, Angular bindings). We note their presence but never inline them. +_NON_TEXT_RESULT_TYPES = {"IMG", "HTML", "ANGULAR", "NETWORK"} + + +def render_paragraph(index: int, paragraph: dict[str, Any]) -> str: + """Render one paragraph as a labelled block: id, status, code, and output.""" + pid = paragraph.get("id", "") + status = paragraph.get("status", "") + text = (paragraph.get("text") or "").strip() + + lines = [f"--- Paragraph {index} [id={pid}] (status={status}) ---"] + if text: + lines.append("Code:") + lines.append(text) + + results = paragraph.get("results") or {} + messages = results.get("msg") or [] + rendered_any = False + for msg in messages: + block = _render_result_message(msg) + if block: + lines.append(block) + rendered_any = True + if not rendered_any: + lines.append("(no output)") + + return "\n".join(lines) + + +def _render_result_message(msg: dict[str, Any]) -> str: + """Render a single result message, truncating tabular data to MAX_RESULT_ROWS.""" + msg_type = (msg.get("type") or "TEXT").upper() + data = msg.get("data") or "" + + if msg_type in _NON_TEXT_RESULT_TYPES: + return f"Output [{msg_type}]: (non-text output omitted)" + + if not data: + return "" + + if msg_type == "TABLE": + return _render_table(data) + + # TEXT and anything else: pass through verbatim. + return f"Output:\n{data.rstrip()}" + + +def _render_table(data: str) -> str: + """Render a TSV TABLE result, keeping the header plus up to MAX_RESULT_ROWS rows.""" + rows = [row for row in data.split("\n") if row != ""] + if not rows: + return "Output [TABLE]: (empty)" + + header, body = rows[0], rows[1:] + out = ["Output [TABLE]:", f"Columns: {header}"] + out.extend(body[:MAX_RESULT_ROWS]) + if len(body) > MAX_RESULT_ROWS: + out.append(f"... ({len(body)} total rows, {MAX_RESULT_ROWS} shown)") + return "\n".join(out) + + +def build_context(note: dict[str, Any]) -> str: + """Render a whole note (its name + every paragraph) into one text block.""" + name = note.get("name") or note.get("id") or "" + paragraphs = note.get("paragraphs") or [] + + parts = [f"Notebook: {name}", f"Paragraphs: {len(paragraphs)}", ""] + for index, paragraph in enumerate(paragraphs): + parts.append(render_paragraph(index, paragraph)) + parts.append("") + return "\n".join(parts).rstrip() + "\n" diff --git a/zeppelin-mcp/src/zeppelin_mcp/server.py b/zeppelin-mcp/src/zeppelin_mcp/server.py new file mode 100644 index 00000000000..87df266b929 --- /dev/null +++ b/zeppelin-mcp/src/zeppelin_mcp/server.py @@ -0,0 +1,269 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""MCP server exposing Apache Zeppelin notebook operations to AI agents. + +The server is deliberately thin: every tool maps to one or two REST calls via +:class:`~zeppelin_mcp.client.ZeppelinClient` and returns plain text suitable +for an LLM. Notebook-mutating logic lives in the client; result rendering lives +in :mod:`zeppelin_mcp.context`. This module wires those together, owns the +"currently selected notebook" session state, and reads connection settings +from the environment. + +Environment variables +---------------------- +ZEPPELIN_URL Base server URL, e.g. ``http://localhost:8080`` (required). +ZEPPELIN_AUTH ``none`` (default), ``basic``, or ``header``. +ZEPPELIN_USER Username for ``basic`` auth. +ZEPPELIN_PASSWORD Password for ``basic`` auth. +ZEPPELIN_AUTH_HEADER Header name for ``header`` auth (default ``Authorization``). +ZEPPELIN_AUTH_HEADER_VALUE Header value for ``header`` auth. +ZEPPELIN_TIMEOUT_SECONDS Per-request timeout (default 600). +ZEPPELIN_VERIFY_TLS ``false`` to disable TLS verification (default ``true``). +ZEPPELIN_NOTEBOOK Optional notebook id or URL to pre-select on startup. +""" + +from __future__ import annotations + +import os +import re +from typing import Any, Optional + +from mcp.server.fastmcp import FastMCP + +from .client import AuthMode, ZeppelinClient, ZeppelinConfig, ZeppelinError +from .context import build_context + +mcp = FastMCP("zeppelin") + +# Matches a note id in a Zeppelin notebook URL (``.../#/notebook/``) so a +# user can paste either a bare id or the URL from their browser. +_NOTE_URL_RE = re.compile(r"#/notebook/([^/?#]+)") + +# Lazily-created client + the currently selected note id. Kept module-level so +# all tool invocations in one MCP session share connection state and the +# "current notebook" the user configured. +_state: dict[str, Any] = {"client": None, "note_id": None} + + +# -- configuration ----------------------------------------------------------- + +def _env_bool(name: str, default: bool) -> bool: + raw = os.getenv(name) + if raw is None: + return default + return raw.strip().lower() in ("1", "true", "yes", "on") + + +def _config_from_env() -> ZeppelinConfig: + base_url = os.getenv("ZEPPELIN_URL") + if not base_url: + raise ZeppelinError("ZEPPELIN_URL is not set") + auth_mode = AuthMode(os.getenv("ZEPPELIN_AUTH", "none").strip().lower()) + timeout = float(os.getenv("ZEPPELIN_TIMEOUT_SECONDS", "600")) + return ZeppelinConfig( + base_url=base_url, + auth_mode=auth_mode, + username=os.getenv("ZEPPELIN_USER"), + password=os.getenv("ZEPPELIN_PASSWORD"), + auth_header_name=os.getenv("ZEPPELIN_AUTH_HEADER", "Authorization"), + auth_header_value=os.getenv("ZEPPELIN_AUTH_HEADER_VALUE"), + timeout_seconds=timeout, + verify_tls=_env_bool("ZEPPELIN_VERIFY_TLS", True), + ) + + +def _client() -> ZeppelinClient: + """Return the shared client, creating it from the environment on first use.""" + if _state["client"] is None: + _state["client"] = ZeppelinClient(_config_from_env()) + return _state["client"] + + +def _resolve_note_id(note: str) -> str: + """Accept either a bare note id or a full notebook URL and return the id.""" + match = _NOTE_URL_RE.search(note) + return match.group(1) if match else note.strip() + + +def _require_note() -> str: + """Return the selected note id or raise a helpful error if none is set.""" + note_id = _state["note_id"] + if not note_id: + raise ZeppelinError( + "No notebook selected. Call set_notebook(...) or create_notebook(...) first." + ) + return note_id + + +def _format_result(result: dict[str, Any]) -> str: + """Render an InterpreterResult (``{code, msg:[{type, data}]}``) as text.""" + code = result.get("code", "UNKNOWN") + messages = result.get("msg") or [] + rendered = [m.get("data", "") for m in messages if m.get("data")] + body = "\n".join(rendered) if rendered else "(no output)" + return f"[{code}]\n{body}" + + +# -- tools -------------------------------------------------------------------- + +@mcp.tool() +def set_notebook(notebook: str) -> str: + """Select the notebook that subsequent tools operate on. + + Accepts a bare note id (e.g. ``2MBCDQ2PV``) or a full notebook URL pasted + from the browser. Returns the rendered contents of the notebook so the + agent immediately has context (paragraph ids, code, and recent output). + """ + note_id = _resolve_note_id(notebook) + note = _client().get_note(note_id) + _state["note_id"] = note_id + return f"Selected notebook {note_id}.\n\n{build_context(note)}" + + +@mcp.tool() +def create_notebook(name: str, default_interpreter_group: str = "") -> str: + """Create a new notebook and select it. Returns the new note id. + + ``name`` is the notebook path; Zeppelin groups notebooks by ``/`` so a name + like ``analysis/2024-traffic`` nests it under an ``analysis`` folder. + ``default_interpreter_group`` optionally pins the note's default interpreter + (e.g. ``spark``); leave blank to use the server default. + """ + note_id = _client().create_note(name, default_interpreter_group or None) + _state["note_id"] = note_id + return f"Created and selected notebook '{name}' (id={note_id})." + + +@mcp.tool() +def run_code(code: str, interpreter: str = "") -> str: + """Add a paragraph to the selected notebook, run it, and return its output. + + ``interpreter`` is the magic without the ``%`` (e.g. ``sql``, ``python``, + ``md``, ``spark``). If given, it is prepended as ``%interpreter``. If the + code already starts with a ``%`` magic, pass ``interpreter`` empty to keep + that magic as-is. + + The paragraph runs synchronously; this call blocks until the interpreter + finishes (or the configured timeout elapses) and returns the result text. + """ + note_id = _require_note() + client = _client() + + text = code + if interpreter: + magic = interpreter if interpreter.startswith("%") else f"%{interpreter}" + text = f"{magic}\n{code}" + + paragraph_id = client.add_paragraph(note_id, text) + result = client.run_paragraph_sync(note_id, paragraph_id) + return f"Paragraph {paragraph_id} finished.\n\n{_format_result(result)}" + + +@mcp.tool() +def read_notebook() -> str: + """Return the full rendered contents of the selected notebook. + + Useful before writing a summary or deciding which paragraphs to delete: + shows each paragraph's id, status, code, and (truncated) output. + """ + note_id = _require_note() + return build_context(_client().get_note(note_id)) + + +@mcp.tool() +def delete_paragraph(paragraph_id: str) -> str: + """Delete a paragraph from the selected notebook by its id. + + Paragraph ids look like ``paragraph_1778857961597_888823855`` and can be + found via read_notebook / set_notebook output. + """ + note_id = _require_note() + _client().delete_paragraph(note_id, paragraph_id) + return f"Deleted paragraph {paragraph_id}." + + +@mcp.tool() +def clear_output() -> str: + """Clear the output of every paragraph in the selected notebook. + + Removes results but keeps the code, leaving a clean notebook to re-run. + """ + note_id = _require_note() + _client().clear_all_output(note_id) + return f"Cleared all paragraph output in notebook {note_id}." + + +@mcp.tool() +def list_interpreters() -> str: + """List configured interpreter settings (id, name, group). + + Use the ``id`` field with restart_interpreter when an interpreter is stuck. + """ + settings = _client().list_interpreter_settings() + if not settings: + return "No interpreter settings found." + lines = [ + f"- {s.get('name', '?')} (id={s.get('id', '?')}, group={s.get('group', '?')})" + for s in settings + ] + return "Interpreter settings:\n" + "\n".join(lines) + + +@mcp.tool() +def restart_interpreter(setting_id: str, scope_to_notebook: bool = False) -> str: + """Restart an interpreter setting by id (recovers a hung/broken interpreter). + + By default this restarts the interpreter globally. Set + ``scope_to_notebook=True`` to restart it only for the selected notebook + (relevant for per-note scoped interpreters). + """ + note_id = _require_note() if scope_to_notebook else None + _client().restart_interpreter(setting_id, note_id) + scope = f" for notebook {note_id}" if note_id else "" + return f"Restarted interpreter {setting_id}{scope}." + + +@mcp.tool() +def write_summary(summary_markdown: str, title: str = "") -> str: + """Write a Markdown summary paragraph at the top of the selected notebook. + + Call read_notebook first to gather the data, then pass your own + data-driven summary text here (highlight key numbers, trends, and + anomalies rather than restating which queries ran). The summary is inserted + as the first paragraph and rendered immediately. Optionally pass ``title`` + to also rename the notebook. + """ + note_id = _require_note() + client = _client() + if title: + client.rename_note(note_id, title) + + md = f"%md\n## Summary\n{summary_markdown}" + paragraph_id = client.add_paragraph(note_id, md, index=0) + client.run_paragraph_sync(note_id, paragraph_id) + renamed = f" Notebook renamed to '{title}'." if title else "" + return f"Summary written as paragraph {paragraph_id}.{renamed}" + + +def main() -> None: + """Console-script entry point: run the MCP server over stdio.""" + mcp.run() + + +if __name__ == "__main__": + main() diff --git a/zeppelin-mcp/tests/conftest.py b/zeppelin-mcp/tests/conftest.py new file mode 100644 index 00000000000..9265f79a7bb --- /dev/null +++ b/zeppelin-mcp/tests/conftest.py @@ -0,0 +1,211 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Shared pytest fixtures: an in-memory fake Zeppelin REST server. + +Tests run against a :class:`FakeZeppelin` recording handler mounted on an +``httpx.MockTransport``. This exercises the real request building, URL +construction, JSON envelope unwrapping, and error handling in +:class:`~zeppelin_mcp.client.ZeppelinClient` without a live server. +""" + +from __future__ import annotations + +import json +from typing import Any, Callable + +import httpx +import pytest + +from zeppelin_mcp.client import ZeppelinClient, ZeppelinConfig + + +def _ok(body: Any = "") -> httpx.Response: + return httpx.Response(200, json={"status": "OK", "message": "", "body": body}) + + +class FakeZeppelin: + """A minimal in-memory Zeppelin used as an httpx mock transport handler. + + It stores notebooks keyed by id and implements just enough of the REST + contract for the client and server tests. Each instance records every + request it received on :attr:`requests` for assertions. + """ + + def __init__(self) -> None: + self.notes: dict[str, dict[str, Any]] = {} + self.interpreter_settings: list[dict[str, Any]] = [ + {"id": "spark", "name": "spark", "group": "spark"}, + {"id": "md", "name": "md", "group": "md"}, + ] + self.requests: list[httpx.Request] = [] + self.restarted: list[tuple[str, Any]] = [] + self._seq = 0 + # Optional override hook: name -> handler, used to inject failures. + self.overrides: dict[str, Callable[[httpx.Request], httpx.Response]] = {} + + # -- helpers ---------------------------------------------------------- + + def _next_id(self, prefix: str) -> str: + self._seq += 1 + return f"{prefix}_{self._seq}" + + def add_note(self, name: str = "test", paragraphs: list | None = None) -> str: + note_id = self._next_id("note") + self.notes[note_id] = { + "id": note_id, + "name": name, + "paragraphs": paragraphs or [], + } + return note_id + + # -- transport handler ------------------------------------------------ + + def handler(self, request: httpx.Request) -> httpx.Response: + self.requests.append(request) + path = request.url.path + method = request.method + + # /api/login + if path.endswith("/api/login") and method == "POST": + return self.overrides.get("login", lambda _r: _ok())(request) + + # /api/interpreter/setting + if path.endswith("/api/interpreter/setting") and method == "GET": + return _ok(self.interpreter_settings) + + # /api/interpreter/setting/restart/{id} + if "/api/interpreter/setting/restart/" in path and method == "PUT": + setting_id = path.rsplit("/", 1)[-1] + payload = json.loads(request.content) if request.content else None + self.restarted.append((setting_id, payload)) + return _ok() + + # /api/notebook (create) + if path.endswith("/api/notebook") and method == "POST": + payload = json.loads(request.content) + note_id = self.add_note(payload.get("name", "untitled")) + return _ok(note_id) + + # /api/notebook/run/{noteId}/{paragraphId} (synchronous run) + if "/api/notebook/run/" in path and method == "POST": + return self._run_paragraph(path) + + # /api/notebook/{noteId}/rename + if path.endswith("/rename") and method == "PUT": + note_id = path.split("/api/notebook/")[1].split("/")[0] + payload = json.loads(request.content) + self.notes[note_id]["name"] = payload["name"] + return _ok() + + # /api/notebook/{noteId}/clear + if path.endswith("/clear") and method == "PUT": + note_id = path.split("/api/notebook/")[1].split("/")[0] + for para in self.notes[note_id]["paragraphs"]: + para["results"] = {"code": "SUCCESS", "msg": []} + return _ok() + + # /api/notebook/{noteId}/paragraph (insert) + if path.endswith("/paragraph") and method == "POST": + return self._insert_paragraph(path, request) + + # /api/notebook/{noteId}/paragraph/{paragraphId} + if "/paragraph/" in path: + return self._paragraph_crud(path, method, request) + + # /api/notebook/{noteId} (get) + if "/api/notebook/" in path and method == "GET": + note_id = path.rsplit("/", 1)[-1] + note = self.notes.get(note_id) + if note is None: + return httpx.Response(404, text="Note not found") + return _ok(note) + + return httpx.Response(404, text=f"unhandled {method} {path}") + + # -- paragraph operations -------------------------------------------- + + def _insert_paragraph(self, path: str, request: httpx.Request) -> httpx.Response: + note_id = path.split("/api/notebook/")[1].split("/")[0] + payload = json.loads(request.content) + para = { + "id": self._next_id("paragraph"), + "text": payload.get("text", ""), + "status": "READY", + "results": {"code": "SUCCESS", "msg": []}, + } + paragraphs = self.notes[note_id]["paragraphs"] + index = payload.get("index") + if index is None: + paragraphs.append(para) + else: + paragraphs.insert(int(index), para) + return _ok(para["id"]) + + def _paragraph_crud(self, path: str, method: str, + request: httpx.Request) -> httpx.Response: + note_id = path.split("/api/notebook/")[1].split("/")[0] + paragraph_id = path.rsplit("/", 1)[-1] + note = self.notes.get(note_id, {"paragraphs": []}) + para = next( + (p for p in note["paragraphs"] if p["id"] == paragraph_id), None + ) + + if method == "GET": + if para is None: + return httpx.Response(404, text="Paragraph not found") + return _ok(para) + if method == "PUT": + payload = json.loads(request.content) + if para is not None and "text" in payload: + para["text"] = payload["text"] + return _ok() + if method == "DELETE": + note["paragraphs"] = [ + p for p in note["paragraphs"] if p["id"] != paragraph_id + ] + return _ok() + return httpx.Response(404, text=f"unhandled {method} {path}") + + def _run_paragraph(self, path: str) -> httpx.Response: + # path: /api/notebook/run/{noteId}/{paragraphId} + tail = path.split("/api/notebook/run/")[1] + note_id, paragraph_id = tail.split("/") + note = self.notes.get(note_id, {"paragraphs": []}) + para = next( + (p for p in note["paragraphs"] if p["id"] == paragraph_id), None + ) + if para is None: + return httpx.Response(404, text="Paragraph not found") + para["status"] = "FINISHED" + # Echo a deterministic table result so run output can be asserted. + result = {"code": "SUCCESS", "msg": [{"type": "TEXT", "data": "ran: " + para["text"]}]} + para["results"] = result + return _ok(result) + + +@pytest.fixture +def fake() -> FakeZeppelin: + return FakeZeppelin() + + +@pytest.fixture +def client(fake: FakeZeppelin) -> ZeppelinClient: + transport = httpx.MockTransport(fake.handler) + http = httpx.Client(transport=transport, base_url="http://zeppelin.test") + config = ZeppelinConfig(base_url="http://zeppelin.test") + return ZeppelinClient(config, http_client=http) diff --git a/zeppelin-mcp/tests/test_client.py b/zeppelin-mcp/tests/test_client.py new file mode 100644 index 00000000000..535061da847 --- /dev/null +++ b/zeppelin-mcp/tests/test_client.py @@ -0,0 +1,235 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for :class:`zeppelin_mcp.client.ZeppelinClient`.""" + +from __future__ import annotations + +import httpx +import pytest + +from zeppelin_mcp.client import ( + AuthMode, + ZeppelinClient, + ZeppelinConfig, + ZeppelinError, +) + + +def test_api_root_strips_trailing_slash(): + config = ZeppelinConfig(base_url="http://host:8080/") + assert config.api_root() == "http://host:8080/api" + + +def test_create_note_returns_id(client, fake): + note_id = client.create_note("my/note") + assert note_id in fake.notes + assert fake.notes[note_id]["name"] == "my/note" + + +def test_create_note_passes_default_interpreter_group(client, fake): + client.create_note("n", default_interpreter_group="spark") + create_req = [r for r in fake.requests if r.method == "POST"][-1] + assert b"defaultInterpreterGroup" in create_req.content + assert b"spark" in create_req.content + + +def test_get_note_returns_document(client, fake): + note_id = fake.add_note("hello") + note = client.get_note(note_id) + assert note["name"] == "hello" + assert note["paragraphs"] == [] + + +def test_get_note_missing_raises(client): + with pytest.raises(ZeppelinError) as exc: + client.get_note("does-not-exist") + assert exc.value.status_code == 404 + + +def test_add_and_get_paragraph(client, fake): + note_id = fake.add_note() + pid = client.add_paragraph(note_id, "%md\nhello") + para = client.get_paragraph(note_id, pid) + assert para["text"] == "%md\nhello" + + +def test_add_paragraph_at_index(client, fake): + note_id = fake.add_note() + first = client.add_paragraph(note_id, "first") + second = client.add_paragraph(note_id, "inserted", index=0) + paragraphs = client.get_note(note_id)["paragraphs"] + assert [p["id"] for p in paragraphs] == [second, first] + + +def test_update_paragraph_text(client, fake): + note_id = fake.add_note() + pid = client.add_paragraph(note_id, "old") + client.update_paragraph_text(note_id, pid, "new") + assert client.get_paragraph(note_id, pid)["text"] == "new" + + +def test_delete_paragraph(client, fake): + note_id = fake.add_note() + pid = client.add_paragraph(note_id, "doomed") + client.delete_paragraph(note_id, pid) + assert client.get_note(note_id)["paragraphs"] == [] + + +def test_run_paragraph_sync_returns_interpreter_result(client, fake): + note_id = fake.add_note() + pid = client.add_paragraph(note_id, "%md\nhi") + result = client.run_paragraph_sync(note_id, pid) + assert result["code"] == "SUCCESS" + assert result["msg"][0]["data"] == "ran: %md\nhi" + + +def test_rename_note(client, fake): + note_id = fake.add_note("before") + client.rename_note(note_id, "after") + assert fake.notes[note_id]["name"] == "after" + + +def test_clear_all_output(client, fake): + note_id = fake.add_note() + pid = client.add_paragraph(note_id, "x") + client.run_paragraph_sync(note_id, pid) + client.clear_all_output(note_id) + para = client.get_paragraph(note_id, pid) + assert para["results"]["msg"] == [] + + +def test_list_interpreter_settings(client): + settings = client.list_interpreter_settings() + ids = {s["id"] for s in settings} + assert {"spark", "md"} <= ids + + +def test_restart_interpreter_global(client, fake): + client.restart_interpreter("spark") + assert fake.restarted == [("spark", None)] + + +def test_restart_interpreter_scoped_to_note(client, fake): + client.restart_interpreter("spark", note_id="note_42") + assert fake.restarted == [("spark", {"noteId": "note_42"})] + + +# -- auth -------------------------------------------------------------------- + +def test_basic_auth_logs_in_once(fake): + transport = httpx.MockTransport(fake.handler) + http = httpx.Client(transport=transport) + config = ZeppelinConfig( + base_url="http://zeppelin.test", + auth_mode=AuthMode.BASIC, + username="admin", + password="secret", + ) + client = ZeppelinClient(config, http_client=http) + client.create_note("a") + client.create_note("b") + logins = [r for r in fake.requests if r.url.path.endswith("/api/login")] + assert len(logins) == 1 # login performed once and cached + + +def test_basic_auth_requires_credentials(): + config = ZeppelinConfig(base_url="http://zeppelin.test", auth_mode=AuthMode.BASIC) + client = ZeppelinClient(config, http_client=httpx.Client()) + with pytest.raises(ZeppelinError, match="requires both username and password"): + client.create_note("a") + + +def test_basic_auth_login_failure_raises(fake): + fake.overrides["login"] = lambda _r: httpx.Response(401, text="bad creds") + transport = httpx.MockTransport(fake.handler) + http = httpx.Client(transport=transport) + config = ZeppelinConfig( + base_url="http://zeppelin.test", + auth_mode=AuthMode.BASIC, + username="admin", + password="wrong", + ) + client = ZeppelinClient(config, http_client=http) + with pytest.raises(ZeppelinError) as exc: + client.create_note("a") + assert exc.value.status_code == 401 + + +def test_header_auth_attaches_header(fake): + transport = httpx.MockTransport(fake.handler) + http = httpx.Client(transport=transport) + config = ZeppelinConfig( + base_url="http://zeppelin.test", + auth_mode=AuthMode.HEADER, + auth_header_name="Authorization", + auth_header_value="Bearer token123", + ) + client = ZeppelinClient(config, http_client=http) + client.create_note("a") + assert fake.requests[-1].headers["Authorization"] == "Bearer token123" + + +# -- transport / error handling --------------------------------------------- + +def test_non_ok_envelope_raises(): + def handler(_request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json={"status": "NOT_FOUND", "message": "nope"}) + + http = httpx.Client(transport=httpx.MockTransport(handler)) + client = ZeppelinClient(ZeppelinConfig(base_url="http://t"), http_client=http) + with pytest.raises(ZeppelinError, match="status=NOT_FOUND"): + client.get_note("x") + + +def test_http_error_status_raises_with_code(): + def handler(_request: httpx.Request) -> httpx.Response: + return httpx.Response(500, text="boom") + + http = httpx.Client(transport=httpx.MockTransport(handler)) + client = ZeppelinClient(ZeppelinConfig(base_url="http://t"), http_client=http) + with pytest.raises(ZeppelinError) as exc: + client.create_note("x") + assert exc.value.status_code == 500 + + +def test_transport_failure_wrapped(): + def handler(_request: httpx.Request) -> httpx.Response: + raise httpx.ConnectError("connection refused") + + http = httpx.Client(transport=httpx.MockTransport(handler)) + client = ZeppelinClient(ZeppelinConfig(base_url="http://t"), http_client=http) + with pytest.raises(ZeppelinError, match="failed"): + client.get_note("x") + + +def test_non_json_body_raises(): + def handler(_request: httpx.Request) -> httpx.Response: + return httpx.Response(200, text="not json") + + http = httpx.Client(transport=httpx.MockTransport(handler)) + client = ZeppelinClient(ZeppelinConfig(base_url="http://t"), http_client=http) + with pytest.raises(ZeppelinError, match="non-JSON"): + client.get_note("x") + + +def test_context_manager_closes(fake): + transport = httpx.MockTransport(fake.handler) + http = httpx.Client(transport=transport) + with ZeppelinClient(ZeppelinConfig(base_url="http://t"), http_client=http) as c: + c.create_note("a") + assert http.is_closed diff --git a/zeppelin-mcp/tests/test_context.py b/zeppelin-mcp/tests/test_context.py new file mode 100644 index 00000000000..f090860430e --- /dev/null +++ b/zeppelin-mcp/tests/test_context.py @@ -0,0 +1,124 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for :mod:`zeppelin_mcp.context` rendering.""" + +from __future__ import annotations + +from zeppelin_mcp.context import MAX_RESULT_ROWS, build_context, render_paragraph + + +def _table(rows: int) -> dict: + header = "col_a\tcol_b" + data = "\n".join([header] + [f"{i}\tv{i}" for i in range(rows)]) + return { + "id": "paragraph_1", + "status": "FINISHED", + "text": "%sql\nselect *", + "results": {"code": "SUCCESS", "msg": [{"type": "TABLE", "data": data}]}, + } + + +def test_render_paragraph_includes_id_status_code(): + para = { + "id": "paragraph_9", + "status": "FINISHED", + "text": "%md\n# Title", + "results": {"msg": [{"type": "TEXT", "data": "rendered"}]}, + } + out = render_paragraph(0, para) + assert "id=paragraph_9" in out + assert "status=FINISHED" in out + assert "%md\n# Title" in out + assert "rendered" in out + + +def test_render_paragraph_no_output(): + para = {"id": "p", "status": "READY", "text": "code", "results": {"msg": []}} + assert "(no output)" in render_paragraph(0, para) + + +def test_table_truncated_to_max_rows(): + out = render_paragraph(0, _table(rows=100)) + # Header + MAX_RESULT_ROWS data rows + the truncation notice. + assert "Columns: col_a\tcol_b" in out + assert f"... (100 total rows, {MAX_RESULT_ROWS} shown)" in out + assert out.count("\tv") == MAX_RESULT_ROWS + + +def test_table_under_limit_not_truncated(): + out = render_paragraph(0, _table(rows=3)) + assert "total rows" not in out + assert out.count("\tv") == 3 + + +def test_non_text_output_omitted(): + para = { + "id": "p", + "status": "FINISHED", + "text": "%python\nplot()", + "results": {"msg": [{"type": "IMG", "data": "base64...."}]}, + } + out = render_paragraph(0, para) + assert "non-text output omitted" in out + assert "base64" not in out + + +def test_build_context_lists_all_paragraphs(): + note = { + "id": "note_1", + "name": "Analysis", + "paragraphs": [ + {"id": "p1", "status": "FINISHED", "text": "a", "results": {"msg": []}}, + {"id": "p2", "status": "FINISHED", "text": "b", "results": {"msg": []}}, + ], + } + out = build_context(note) + assert "Notebook: Analysis" in out + assert "Paragraphs: 2" in out + assert "id=p1" in out and "id=p2" in out + + +def test_build_context_empty_notebook(): + out = build_context({"id": "n", "name": "Empty", "paragraphs": []}) + assert "Paragraphs: 0" in out + + +def test_build_context_falls_back_to_id_when_unnamed(): + out = build_context({"id": "note_xyz", "paragraphs": []}) + assert "Notebook: note_xyz" in out + + +def test_empty_table_rendered_as_empty(): + para = { + "id": "p", + "status": "FINISHED", + "text": "%sql\nselect *", + "results": {"msg": [{"type": "TABLE", "data": ""}]}, + } + # Empty data short-circuits to "(no output)" since the message carries no data. + assert "(no output)" in render_paragraph(0, para) + + +def test_table_with_only_blank_lines_is_empty(): + para = { + "id": "p", + "status": "FINISHED", + "text": "x", + "results": {"msg": [{"type": "TABLE", "data": "\n\n"}]}, + } + assert "(empty)" in render_paragraph(0, para) diff --git a/zeppelin-mcp/tests/test_server.py b/zeppelin-mcp/tests/test_server.py new file mode 100644 index 00000000000..7763a63cb93 --- /dev/null +++ b/zeppelin-mcp/tests/test_server.py @@ -0,0 +1,223 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Tests for the MCP tool layer in :mod:`zeppelin_mcp.server`. + +The tools read a shared module-level client/notebook state. Each test injects +the fake-backed client into that state via the ``server_state`` fixture so the +tool bodies exercise the real client against the in-memory Zeppelin. +""" + +from __future__ import annotations + +import pytest + +from zeppelin_mcp import server +from zeppelin_mcp.client import ( + AuthMode, + ZeppelinClient, + ZeppelinConfig, + ZeppelinError, +) + + +@pytest.fixture +def server_state(client, monkeypatch): + """Point the server's shared state at the fake-backed client and reset it.""" + monkeypatch.setitem(server._state, "client", client) + monkeypatch.setitem(server._state, "note_id", None) + return server._state + + +# -- configuration ----------------------------------------------------------- + +def test_config_from_env_requires_url(monkeypatch): + monkeypatch.delenv("ZEPPELIN_URL", raising=False) + with pytest.raises(ZeppelinError, match="ZEPPELIN_URL is not set"): + server._config_from_env() + + +def test_config_from_env_reads_all_fields(monkeypatch): + monkeypatch.setenv("ZEPPELIN_URL", "http://host:8080") + monkeypatch.setenv("ZEPPELIN_AUTH", "basic") + monkeypatch.setenv("ZEPPELIN_USER", "admin") + monkeypatch.setenv("ZEPPELIN_PASSWORD", "pw") + monkeypatch.setenv("ZEPPELIN_TIMEOUT_SECONDS", "42") + monkeypatch.setenv("ZEPPELIN_VERIFY_TLS", "false") + config = server._config_from_env() + assert config.base_url == "http://host:8080" + assert config.auth_mode == AuthMode.BASIC + assert config.username == "admin" + assert config.timeout_seconds == 42 + assert config.verify_tls is False + + +@pytest.mark.parametrize( + "raw,expected", + [("true", True), ("1", True), ("yes", True), ("false", False), ("no", False)], +) +def test_env_bool(monkeypatch, raw, expected): + monkeypatch.setenv("FLAG", raw) + assert server._env_bool("FLAG", default=True) is expected + + +def test_env_bool_default_when_unset(monkeypatch): + monkeypatch.delenv("FLAG", raising=False) + assert server._env_bool("FLAG", default=True) is True + + +# -- note id resolution ------------------------------------------------------ + +@pytest.mark.parametrize( + "value,expected", + [ + ("2MBCDQ2PV", "2MBCDQ2PV"), + (" 2MBCDQ2PV ", "2MBCDQ2PV"), + ("http://host:8080/#/notebook/2MBCDQ2PV", "2MBCDQ2PV"), + ("http://host/#/notebook/2MBCDQ2PV?paragraph=x", "2MBCDQ2PV"), + ], +) +def test_resolve_note_id(value, expected): + assert server._resolve_note_id(value) == expected + + +# -- tool behaviour ---------------------------------------------------------- + +def test_create_notebook_selects_it(server_state, fake): + out = server.create_notebook("My Analysis") + note_id = server_state["note_id"] + assert note_id in fake.notes + assert note_id in out + + +def test_set_notebook_accepts_url_and_returns_context(server_state, fake): + note_id = fake.add_note("Preexisting") + out = server.set_notebook(f"http://host/#/notebook/{note_id}") + assert server_state["note_id"] == note_id + assert "Notebook: Preexisting" in out + + +def test_run_code_prepends_interpreter_magic(server_state, fake): + server.create_notebook("n") + out = server.run_code("select 1", interpreter="sql") + note = fake.notes[server_state["note_id"]] + assert note["paragraphs"][0]["text"] == "%sql\nselect 1" + assert "SUCCESS" in out + + +def test_run_code_keeps_existing_magic(server_state, fake): + server.create_notebook("n") + server.run_code("%md\n# Title", interpreter="") + note = fake.notes[server_state["note_id"]] + assert note["paragraphs"][0]["text"] == "%md\n# Title" + + +def test_run_code_without_notebook_errors(server_state): + with pytest.raises(ZeppelinError, match="No notebook selected"): + server.run_code("select 1", interpreter="sql") + + +def test_read_notebook_renders_current(server_state, fake): + note_id = fake.add_note("Readme", paragraphs=[ + {"id": "p1", "status": "FINISHED", "text": "code", "results": {"msg": []}}, + ]) + server.set_notebook(note_id) + out = server.read_notebook() + assert "Notebook: Readme" in out + assert "id=p1" in out + + +def test_delete_paragraph(server_state, fake): + server.create_notebook("n") + note_id = server_state["note_id"] + server.run_code("x", interpreter="md") + pid = fake.notes[note_id]["paragraphs"][0]["id"] + server.delete_paragraph(pid) + assert fake.notes[note_id]["paragraphs"] == [] + + +def test_clear_output(server_state, fake): + server.create_notebook("n") + note_id = server_state["note_id"] + server.run_code("x", interpreter="md") + server.clear_output() + assert fake.notes[note_id]["paragraphs"][0]["results"]["msg"] == [] + + +def test_list_interpreters(server_state): + out = server.list_interpreters() + assert "spark" in out and "id=spark" in out + + +def test_restart_interpreter_global(server_state, fake): + server.create_notebook("n") + server.restart_interpreter("spark") + assert fake.restarted == [("spark", None)] + + +def test_restart_interpreter_scoped(server_state, fake): + server.create_notebook("n") + note_id = server_state["note_id"] + server.restart_interpreter("spark", scope_to_notebook=True) + assert fake.restarted == [("spark", {"noteId": note_id})] + + +def test_write_summary_inserts_first_paragraph(server_state, fake): + server.create_notebook("n") + note_id = server_state["note_id"] + server.run_code("data", interpreter="sql") + server.write_summary("Revenue up 12% WoW.", title="Weekly Review") + note = fake.notes[note_id] + # Summary is inserted at index 0 and the note is renamed. + assert note["paragraphs"][0]["text"].startswith("%md\n## Summary") + assert "Revenue up 12% WoW." in note["paragraphs"][0]["text"] + assert note["name"] == "Weekly Review" + + +def test_write_summary_without_title_keeps_name(server_state, fake): + server.create_notebook("Original") + note_id = server_state["note_id"] + server.write_summary("done") + assert fake.notes[note_id]["name"] == "Original" + + +def test_list_interpreters_empty(server_state, fake): + fake.interpreter_settings = [] + assert server.list_interpreters() == "No interpreter settings found." + + +def test_client_lazily_created_from_env(monkeypatch, fake): + # Reset shared state so _client() must build a fresh client from the env. + monkeypatch.setitem(server._state, "client", None) + monkeypatch.setitem(server._state, "note_id", None) + monkeypatch.setenv("ZEPPELIN_URL", "http://zeppelin.test") + monkeypatch.setenv("ZEPPELIN_AUTH", "none") + + captured = {} + + def fake_ctor(config): + captured["base_url"] = config.base_url + import httpx + from zeppelin_mcp.client import ZeppelinClient + return ZeppelinClient(config, http_client=httpx.Client( + transport=httpx.MockTransport(fake.handler))) + + monkeypatch.setattr(server, "ZeppelinClient", fake_ctor) + created = server._client() + assert captured["base_url"] == "http://zeppelin.test" + # Subsequent calls reuse the cached client. + assert server._client() is created