diff --git a/README.md b/README.md index aa2f58bb8..2af24a93b 100644 --- a/README.md +++ b/README.md @@ -287,6 +287,107 @@ docker build -t markitdown:latest . docker run --rm -i markitdown:latest < ~/your-file.pdf > output.md ``` +### gRPC + +MarkItDown includes a built-in gRPC server and client, available via the `[grpc]` optional dependency: + +```sh +pip install 'markitdown[grpc]' +``` + +> [!IMPORTANT] +> The gRPC server is unauthenticated and performs I/O with the privileges of the server process: requests can reference server-side file paths and URIs. Bind to localhost (the default) unless the network path is otherwise secured, and review the [Security Considerations](#security-considerations) section before deploying it. + +**Start the server:** + +```sh +markitdown-grpc --bind-address 127.0.0.1:50051 +``` + +The server registers the standard [gRPC health checking](https://grpc.io/docs/guides/health-checking/) and [server reflection](https://grpc.io/docs/guides/reflection/) services, so it works out of the box with Kubernetes health probes and tools like `grpcurl`. + +**CLI client** — send a convert request to the running server: + +```sh +# Convert a local file +markitdown-grpc-client path/to/file.pdf + +# Convert a remote URI +markitdown-grpc-client --uri https://example.com/page.html + +# Pipe content from stdin (provide an extension hint so the server can detect the format) +cat file.docx | markitdown-grpc-client -x .docx + +# Use the streaming RPC and save output to a file +markitdown-grpc-client --stream path/to/file.pdf -o output.md + +# Connect to a non-default address +markitdown-grpc-client --address 10.0.0.5:50051 path/to/file.pdf +``` + +**Python client:** + +```python +from markitdown.grpc import MarkItDownClient + +# Unary convert +with MarkItDownClient("127.0.0.1:50051") as client: + result = client.convert(local_path="path/to/file.pdf") + print(result.markdown) + +# Convert a remote URI +with MarkItDownClient() as client: + result = client.convert(uri="https://example.com/page.html") + print(result.markdown) + +# Convert raw bytes +with MarkItDownClient() as client: + with open("file.docx", "rb") as f: + data = f.read() + result = client.convert(content=data, extension=".docx") + print(result.markdown) + +# Streaming convert — reassemble markdown from chunks +with MarkItDownClient() as client: + parts = [] + for event in client.convert_stream(local_path="path/to/file.pdf"): + if event.HasField("markdown_chunk"): + parts.append(event.markdown_chunk.markdown) + markdown = "".join(parts) + print(markdown) + +# Structured document streaming — receive typed elements (headings, +# paragraphs, tables, lists, code blocks, images, ...) so downstream +# systems can process document structure without re-parsing Markdown +with MarkItDownClient() as client: + for event in client.convert_document_stream(local_path="path/to/file.pdf"): + if not event.HasField("element"): + continue + element = event.element + kind = element.WhichOneof("kind") + if kind == "heading": + print(f"H{element.heading.level}: {element.heading.text}") + elif kind == "table": + print(f"table with {len(element.table.rows)} rows") + elif kind == "image": + print(f"image: {element.image.url} (alt: {element.image.alt_text})") +``` + +Both streaming RPCs deliver results as ordered events (`started`, then content, then `completed`). By default, the conversion completes server-side before streaming begins; streaming reduces time-to-first-byte on the wire and keeps individual messages small. + +**Experimental incremental conversion:** pass `incremental=True` (or set `streaming_options.experimental_incremental` in the proto) to stream results *while the document is still converting* — one fragment per PDF page or PPTX slide. On a 120-page PDF this cuts time-to-first-chunk from seconds to milliseconds. Unsupported formats fall back to whole-document conversion transparently. + +```python +with MarkItDownClient() as client: + for event in client.convert_document_stream( + local_path="path/to/big.pdf", incremental=True + ): + if event.HasField("element"): + ... # elements arrive as each page is processed +``` + +Incremental output is identical to whole-document conversion for PPTX and for PDFs containing tables/forms; pure-prose PDFs may differ slightly in whitespace (the standard converter re-extracts those in a single pass). Incremental conversion is skipped when Azure backends or plugins are configured. + ## Contributing This project welcomes contributions and suggestions. Most contributions require you to agree to a diff --git a/packages/markitdown/README.md b/packages/markitdown/README.md index bedcba183..4b5ab06d7 100644 --- a/packages/markitdown/README.md +++ b/packages/markitdown/README.md @@ -42,6 +42,24 @@ result = md.convert("test.xlsx") print(result.text_content) ``` +### gRPC API + +Install the gRPC extra first: `pip install 'markitdown[grpc]'` + +- Protobuf definition: `proto/markitdown/v1/markitdown.proto` +- Server entrypoint: `markitdown-grpc --bind-address 127.0.0.1:50051` +- Stub regeneration: `./scripts/regenerate-grpc.sh` + +Three RPCs are available: + +- `Convert` returns the full Markdown in a single response. +- `ConvertStream` returns the Markdown as an ordered stream of chunks. +- `ConvertDocumentStream` returns the document as an ordered stream of structured elements (headings, paragraphs, tables, lists, code blocks, images, ...). + +Both streaming RPCs support EXPERIMENTAL incremental conversion (`streaming_options.experimental_incremental`): PDF and PPTX results stream as each page or slide is processed, backed by the `markitdown.streaming` package. + +The server is unauthenticated and performs I/O with the privileges of the server process; bind to localhost unless the network path is otherwise secured. See [Security Considerations](https://github.com/microsoft/markitdown#security-considerations). + ### More Information For more information, and full documentation, see the project [README.md](https://github.com/microsoft/markitdown) on GitHub. diff --git a/packages/markitdown/proto/markitdown/v1/markitdown.proto b/packages/markitdown/proto/markitdown/v1/markitdown.proto new file mode 100644 index 000000000..5829b0d0e --- /dev/null +++ b/packages/markitdown/proto/markitdown/v1/markitdown.proto @@ -0,0 +1,355 @@ +syntax = "proto3"; + +package markitdown.v1; + +// MarkItDownService converts documents (PDF, Office, HTML, images, audio, +// and more) to Markdown suitable for LLM and text-analysis pipelines. +// +// Security note: the server performs I/O with the privileges of the server +// process. `Source.local_path` and `Source.uri` are resolved server-side, so +// requests must come from trusted callers only. See the project README's +// "Security Considerations" section. +service MarkItDownService { + // Convert performs a conversion and returns the complete Markdown + // result in a single response message. + rpc Convert(ConvertRequest) returns (ConvertResponse); + + // ConvertStream performs a conversion and delivers the Markdown result + // as an ordered stream of chunks. The conversion itself completes before + // chunking begins; streaming reduces time-to-first-byte on the wire and + // keeps individual messages small, but does not stream partial conversion + // progress. + rpc ConvertStream(ConvertStreamRequest) returns (stream ConvertStreamResponse); + + // ConvertDocumentStream performs a conversion and delivers the result as + // an ordered stream of structured document elements (headings, paragraphs, + // tables, lists, code blocks, images, ...). This lets downstream consumers + // process document structure incrementally without re-parsing Markdown. + // + // Segmentation is performed on the converted Markdown and is best-effort: + // constructs that do not match a known block type are delivered as + // paragraphs. + rpc ConvertDocumentStream(ConvertDocumentStreamRequest) returns (stream ConvertDocumentStreamResponse); +} + +// Request for the unary Convert RPC. +message ConvertRequest { + // The document to convert. Required. + Source source = 1; + // Options controlling Markdown generation. + ConversionOptions conversion_options = 2; + // Options controlling how the conversion backend is configured. + ServiceOptions service_options = 3; +} + +// Request for the chunked ConvertStream RPC. +message ConvertStreamRequest { + // The document to convert. Required. + Source source = 1; + // Options controlling Markdown generation. + ConversionOptions conversion_options = 2; + // Options controlling how the conversion backend is configured. + ServiceOptions service_options = 3; + // Options controlling chunked delivery. + StreamingOptions streaming_options = 4; +} + +// Request for the structured ConvertDocumentStream RPC. +message ConvertDocumentStreamRequest { + // The document to convert. Required. + Source source = 1; + // Options controlling Markdown generation. + ConversionOptions conversion_options = 2; + // Options controlling how the conversion backend is configured. + ServiceOptions service_options = 3; + // Options controlling streaming delivery. markdown_chunk_size_bytes is + // ignored by this RPC; see experimental_incremental. + StreamingOptions streaming_options = 4; +} + +// Source identifies the document to convert. Exactly one input must be set. +message Source { + oneof input { + // Path to a file on the server's filesystem. The server reads this path + // with its own privileges; only expose to trusted callers. + string local_path = 1; + // An http:, https:, file:, or data: URI fetched by the server. + string uri = 2; + // Raw document bytes supplied inline. Subject to the server's maximum + // receive message size (100 MiB by default; see + // --max-receive-message-bytes). + bytes content = 3; + } + + // Optional hints that help the server detect the document format. + StreamInfo stream_info = 4; +} + +// StreamInfo carries optional metadata hints about the document. Mirrors +// markitdown.StreamInfo in the Python API. +message StreamInfo { + // MIME type, e.g. "application/pdf". + optional string mimetype = 1; + // File extension including the leading dot, e.g. ".pdf". + optional string extension = 2; + // Character set, e.g. "utf-8". Meaningful for text formats only. + optional string charset = 3; + // Original filename, if known. + optional string filename = 4; + // Original local path, if known. + optional string local_path = 5; + // Original URL, if known. + optional string url = 6; +} + +// ConversionOptions control Markdown generation. +message ConversionOptions { + // Keep data: URIs (e.g. base64-encoded images) in the output instead of + // stripping them. Defaults to false. + optional bool keep_data_uris = 1; +} + +// ServiceOptions control how the MarkItDown conversion backend is +// configured for a request. +message ServiceOptions { + // Enable the built-in converters. Defaults to true. + optional bool enable_builtins = 1; + // Enable 3rd-party plugin converters. Defaults to false. + optional bool enable_plugins = 2; + // If set, route supported formats through Azure Document Intelligence. + DocumentIntelligenceOptions document_intelligence = 3; + // If set, route supported formats through Azure Content Understanding. + ContentUnderstandingOptions content_understanding = 4; +} + +// Configuration for Azure Document Intelligence. +message DocumentIntelligenceOptions { + // Azure Document Intelligence endpoint URL. Required. + string endpoint = 1; +} + +// Configuration for Azure Content Understanding. +message ContentUnderstandingOptions { + // Azure Content Understanding endpoint URL. Required. + string endpoint = 1; + // Analyzer to use. Defaults to the service's prebuilt document analyzer. + optional string analyzer_id = 2; + // File types to route through Content Understanding. Empty means the + // server-side default set. + repeated ContentUnderstandingFileType file_types = 3; +} + +// File types supported by the Azure Content Understanding converter. +enum ContentUnderstandingFileType { + CONTENT_UNDERSTANDING_FILE_TYPE_UNSPECIFIED = 0; + CONTENT_UNDERSTANDING_FILE_TYPE_PDF = 1; + CONTENT_UNDERSTANDING_FILE_TYPE_DOCX = 2; + CONTENT_UNDERSTANDING_FILE_TYPE_PPTX = 3; + CONTENT_UNDERSTANDING_FILE_TYPE_XLSX = 4; + CONTENT_UNDERSTANDING_FILE_TYPE_HTML = 5; + CONTENT_UNDERSTANDING_FILE_TYPE_TXT = 6; + CONTENT_UNDERSTANDING_FILE_TYPE_MD = 7; + CONTENT_UNDERSTANDING_FILE_TYPE_RTF = 8; + CONTENT_UNDERSTANDING_FILE_TYPE_XML = 9; + CONTENT_UNDERSTANDING_FILE_TYPE_EML = 10; + CONTENT_UNDERSTANDING_FILE_TYPE_MSG = 11; + CONTENT_UNDERSTANDING_FILE_TYPE_JPEG = 12; + CONTENT_UNDERSTANDING_FILE_TYPE_PNG = 13; + CONTENT_UNDERSTANDING_FILE_TYPE_BMP = 14; + CONTENT_UNDERSTANDING_FILE_TYPE_TIFF = 15; + CONTENT_UNDERSTANDING_FILE_TYPE_HEIF = 16; + CONTENT_UNDERSTANDING_FILE_TYPE_MP4 = 17; + CONTENT_UNDERSTANDING_FILE_TYPE_M4V = 18; + CONTENT_UNDERSTANDING_FILE_TYPE_MOV = 19; + CONTENT_UNDERSTANDING_FILE_TYPE_AVI = 20; + CONTENT_UNDERSTANDING_FILE_TYPE_MKV = 21; + CONTENT_UNDERSTANDING_FILE_TYPE_WEBM = 22; + CONTENT_UNDERSTANDING_FILE_TYPE_FLV = 23; + CONTENT_UNDERSTANDING_FILE_TYPE_WMV = 24; + CONTENT_UNDERSTANDING_FILE_TYPE_WAV = 25; + CONTENT_UNDERSTANDING_FILE_TYPE_MP3 = 26; + CONTENT_UNDERSTANDING_FILE_TYPE_M4A = 27; + CONTENT_UNDERSTANDING_FILE_TYPE_FLAC = 28; + CONTENT_UNDERSTANDING_FILE_TYPE_OGG = 29; + CONTENT_UNDERSTANDING_FILE_TYPE_AAC = 30; + CONTENT_UNDERSTANDING_FILE_TYPE_WMA = 31; +} + +// StreamingOptions control streaming delivery for ConvertStream and +// ConvertDocumentStream. +message StreamingOptions { + // Maximum size of each Markdown chunk in UTF-8 code units. Must be + // greater than zero when set. Defaults to 4096. Used by ConvertStream + // only. + optional uint32 markdown_chunk_size_bytes = 1; + + // EXPERIMENTAL: when true, supported formats (currently PDF and PPTX) + // are converted incrementally — results stream as each page or slide is + // processed, instead of after the whole document converts. Unsupported + // formats fall back to whole-document conversion transparently. + // + // Incremental conversion is skipped when Azure Document Intelligence or + // Content Understanding is configured, or when plugins are enabled, + // since those paths may route the document elsewhere. + // + // Output caveat for PDFs without table/form content: the standard + // converter re-extracts such documents in a single pass for better + // spacing, so incremental output may differ slightly in whitespace. + optional bool experimental_incremental = 2; +} + +// Response for the unary Convert RPC. +message ConvertResponse { + // The conversion result. + ConversionResult result = 1; +} + +// The complete result of a conversion. +message ConversionResult { + // The converted document as Markdown. + string markdown = 1; + // Document title, when one could be extracted. + optional string title = 2; +} + +// A single event in the ConvertStream response stream. Events arrive in +// order: exactly one `started`, zero or more `markdown_chunk`, exactly one +// `completed`. +message ConvertStreamResponse { + oneof event { + ConversionStarted started = 1; + MarkdownChunk markdown_chunk = 2; + ConversionCompleted completed = 3; + } +} + +// A single event in the ConvertDocumentStream response stream. Events +// arrive in order: exactly one `started`, zero or more `element`, exactly +// one `completed`. +message ConvertDocumentStreamResponse { + oneof event { + ConversionStarted started = 1; + DocumentElement element = 2; + DocumentStreamCompleted completed = 3; + } +} + +// Signals that the request was accepted and conversion is starting. +message ConversionStarted { + // Which Source input was provided: "local_path", "uri", or "content". + string source_kind = 1; +} + +// A contiguous piece of the converted Markdown. +message MarkdownChunk { + // Zero-based position of this chunk in the stream. + uint32 chunk_index = 1; + // The Markdown text of this chunk. + string markdown = 2; + // True for the final chunk. + bool is_last = 3; +} + +// Terminal event for ConvertStream. +message ConversionCompleted { + // Total number of markdown_chunk events that were sent. + uint32 total_chunks = 1; + // Document title, when one could be extracted. + optional string title = 2; +} + +// Terminal event for ConvertDocumentStream. +message DocumentStreamCompleted { + // Total number of element events that were sent. + uint32 total_elements = 1; + // Document title, when one could be extracted. + optional string title = 2; +} + +// A structured block of the converted document. +message DocumentElement { + // Zero-based position of this element in the document. + uint32 element_index = 1; + + oneof kind { + Heading heading = 2; + Paragraph paragraph = 3; + Table table = 4; + ListBlock list = 5; + CodeBlock code_block = 6; + Image image = 7; + BlockQuote block_quote = 8; + HorizontalRule horizontal_rule = 9; + } +} + +// An ATX heading (`#` through `######`). +message Heading { + // Heading level, 1 (highest) through 6. + uint32 level = 1; + // Heading text with the leading `#` markers removed. + string text = 2; +} + +// A paragraph of text. Also the fallback for content that does not match +// any other block type. +message Paragraph { + // Paragraph text, possibly containing inline Markdown. + string text = 1; +} + +// A pipe-delimited Markdown table. +message Table { + // The original Markdown for the whole table, including the delimiter row. + string markdown = 1; + // Parsed cell text. The first row is the header row; the delimiter row is + // omitted. + repeated TableRow rows = 2; +} + +// A single row of a Table. +message TableRow { + // Cell text in column order, with surrounding whitespace trimmed. + repeated string cells = 1; +} + +// A bulleted or numbered list. +message ListBlock { + // The original Markdown for the whole list, including nested items. + string markdown = 1; + // True for ordered (numbered) lists. + bool ordered = 2; + // Top-level item text with list markers removed. Nested items and + // continuation lines remain part of their parent item's text, with their + // own markers preserved. + repeated string items = 3; +} + +// A fenced code block. +message CodeBlock { + // The info string following the opening fence, e.g. "python". Empty when + // no language was specified. + string language = 1; + // The code with the fences removed. + string code = 2; +} + +// An image that appears as its own block (`![alt](url "title")`). +message Image { + // Alternative text. + string alt_text = 1; + // Image URL. May be a data: URI when + // ConversionOptions.keep_data_uris is true. + string url = 2; + // Optional image title. + optional string title = 3; +} + +// A block quote (`>` prefixed lines). +message BlockQuote { + // The quoted text with the `>` markers removed. + string text = 1; +} + +// A thematic break (`---`, `***`, or `___`). +message HorizontalRule {} diff --git a/packages/markitdown/pyproject.toml b/packages/markitdown/pyproject.toml index d4c20a402..22fd5ef79 100644 --- a/packages/markitdown/pyproject.toml +++ b/packages/markitdown/pyproject.toml @@ -49,6 +49,10 @@ all = [ "azure-ai-documentintelligence", "azure-ai-contentunderstanding>=1.2.0b1", "azure-identity", + "grpcio>=1.81.0", + "grpcio-health-checking>=1.81.0", + "grpcio-reflection>=1.81.0", + "protobuf>=6.33.5", ] pptx = ["python-pptx"] docx = ["mammoth~=1.11.0", "lxml"] @@ -61,6 +65,12 @@ youtube-transcription = ["youtube-transcript-api"] az-doc-intel = ["azure-ai-documentintelligence", "azure-identity"] # >=1.2.0b1 required for to_llm_input() helper used by ContentUnderstandingConverter az-content-understanding = ["azure-ai-contentunderstanding>=1.2.0b1", "azure-identity"] +grpc = [ + "grpcio>=1.81.0", + "grpcio-health-checking>=1.81.0", + "grpcio-reflection>=1.81.0", + "protobuf>=6.33.5", +] [project.urls] Documentation = "https://github.com/microsoft/markitdown#readme" @@ -72,6 +82,8 @@ path = "src/markitdown/__about__.py" [project.scripts] markitdown = "markitdown.__main__:main" +markitdown-grpc = "markitdown.grpc.server:main" +markitdown-grpc-client = "markitdown.grpc.client:main" [tool.hatch.envs.default] features = ["all"] @@ -112,4 +124,4 @@ exclude_lines = [ ] [tool.hatch.build.targets.sdist] -only-include = ["src/markitdown"] +only-include = ["src/markitdown", "proto", "scripts"] diff --git a/packages/markitdown/scripts/regenerate-grpc.sh b/packages/markitdown/scripts/regenerate-grpc.sh new file mode 100755 index 000000000..05987b35a --- /dev/null +++ b/packages/markitdown/scripts/regenerate-grpc.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +# Regenerates the checked-in gRPC stubs from proto/markitdown/v1/markitdown.proto. +# +# Usage: +# pip install "grpcio-tools>=1.81.0" +# ./scripts/regenerate-grpc.sh +# +# The proto package is `markitdown.v1`, but the generated Python modules live +# inside the `markitdown.grpc.v1` package so they ship with the library. The +# import rewrite below accounts for that difference. +set -euo pipefail + +cd "$(dirname "$0")/.." + +OUT_DIR="src/markitdown/grpc" + +python3 -m grpc_tools.protoc \ + --proto_path=proto \ + --python_out="${OUT_DIR}" \ + --grpc_python_out="${OUT_DIR}" \ + proto/markitdown/v1/markitdown.proto + +# protoc emits to /markitdown/v1; relocate into the grpc/v1 package. +mv "${OUT_DIR}/markitdown/v1/markitdown_pb2.py" "${OUT_DIR}/v1/markitdown_pb2.py" +mv "${OUT_DIR}/markitdown/v1/markitdown_pb2_grpc.py" "${OUT_DIR}/v1/markitdown_pb2_grpc.py" +rm -rf "${OUT_DIR}/markitdown" + +# Rewrite the absolute import to match the package layout. +sed -i.bak \ + 's/^from markitdown\.v1 import markitdown_pb2/from markitdown.grpc.v1 import markitdown_pb2/' \ + "${OUT_DIR}/v1/markitdown_pb2_grpc.py" +rm -f "${OUT_DIR}/v1/markitdown_pb2_grpc.py.bak" + +# Keep formatting consistent with the repo's pre-commit hooks. +if command -v black >/dev/null 2>&1; then + black -q "${OUT_DIR}/v1/markitdown_pb2.py" "${OUT_DIR}/v1/markitdown_pb2_grpc.py" +fi + +echo "Regenerated gRPC stubs in ${OUT_DIR}/v1/" diff --git a/packages/markitdown/src/markitdown/converters/_pptx_converter.py b/packages/markitdown/src/markitdown/converters/_pptx_converter.py index 360f17706..267b92d22 100644 --- a/packages/markitdown/src/markitdown/converters/_pptx_converter.py +++ b/packages/markitdown/src/markitdown/converters/_pptx_converter.py @@ -81,123 +81,128 @@ def convert( # Perform the conversion presentation = pptx.Presentation(file_stream) md_content = "" - slide_num = 0 - for slide in presentation.slides: - slide_num += 1 - - md_content += f"\n\n\n" - - title = slide.shapes.title - - def get_shape_content(shape, **kwargs): - nonlocal md_content - # Pictures - if self._is_picture(shape): - # https://github.com/scanny/python-pptx/pull/512#issuecomment-1713100069 - - llm_description = "" - alt_text = "" - - # Potentially generate a description using an LLM - llm_client = kwargs.get("llm_client") - llm_model = kwargs.get("llm_model") - if llm_client is not None and llm_model is not None: - # Prepare a file_stream and stream_info for the image data - image_filename = shape.image.filename - image_extension = None - if image_filename: - image_extension = os.path.splitext(image_filename)[1] - image_stream_info = StreamInfo( - mimetype=shape.image.content_type, - extension=image_extension, - filename=image_filename, - ) + for slide_num, slide in enumerate(presentation.slides, start=1): + md_content += "\n\n" + self._convert_slide(slide, slide_num, **kwargs) + + return DocumentConverterResult(markdown=md_content.strip()) + + def _convert_slide(self, slide, slide_num: int, **kwargs: Any) -> str: + """Convert a single slide to Markdown.""" + md_content = f"\n" + + title = slide.shapes.title + + def get_shape_content(shape, **kwargs): + nonlocal md_content + # Pictures + if self._is_picture(shape): + # https://github.com/scanny/python-pptx/pull/512#issuecomment-1713100069 + + llm_description = "" + alt_text = "" + + # Potentially generate a description using an LLM + llm_client = kwargs.get("llm_client") + llm_model = kwargs.get("llm_model") + if llm_client is not None and llm_model is not None: + # Prepare a file_stream and stream_info for the image data + image_filename = shape.image.filename + image_extension = None + if image_filename: + image_extension = os.path.splitext(image_filename)[1] + image_stream_info = StreamInfo( + mimetype=shape.image.content_type, + extension=image_extension, + filename=image_filename, + ) + + image_stream = io.BytesIO(shape.image.blob) - image_stream = io.BytesIO(shape.image.blob) - - # Caption the image - try: - llm_description = llm_caption( - image_stream, - image_stream_info, - client=llm_client, - model=llm_model, - prompt=kwargs.get("llm_prompt"), - ) - except Exception: - # Unable to generate a description - pass - - # Also grab any description embedded in the deck + # Caption the image try: - alt_text = shape._element._nvXxPr.cNvPr.attrib.get("descr", "") + llm_description = llm_caption( + image_stream, + image_stream_info, + client=llm_client, + model=llm_model, + prompt=kwargs.get("llm_prompt"), + ) except Exception: - # Unable to get alt text + # Unable to generate a description pass - # Prepare the alt, escaping any special characters - alt_text = "\n".join([llm_description, alt_text]) or shape.name - alt_text = re.sub(r"[\r\n\[\]]", " ", alt_text) - alt_text = re.sub(r"\s+", " ", alt_text).strip() - - # If keep_data_uris is True, use base64 encoding for images - if kwargs.get("keep_data_uris", False): - blob = shape.image.blob - content_type = shape.image.content_type or "image/png" - b64_string = base64.b64encode(blob).decode("utf-8") - md_content += f"\n![{alt_text}](data:{content_type};base64,{b64_string})\n" - else: - # A placeholder name - filename = re.sub(r"\W", "", shape.name) + ".jpg" - md_content += "\n![" + alt_text + "](" + filename + ")\n" - - # Tables - if self._is_table(shape): - md_content += self._convert_table_to_markdown(shape.table, **kwargs) - - # Charts - if shape.has_chart: - md_content += self._convert_chart_to_markdown(shape.chart) - - # Text areas - elif shape.has_text_frame: - if shape == title: - md_content += "# " + shape.text.lstrip() + "\n" - else: - md_content += shape.text + "\n" - - # Group Shapes - if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.GROUP: - sorted_shapes = sorted( - shape.shapes, - key=lambda x: ( - float("-inf") if not x.top else x.top, - float("-inf") if not x.left else x.left, - ), + # Also grab any description embedded in the deck + try: + alt_text = shape._element._nvXxPr.cNvPr.attrib.get("descr", "") + except Exception: + # Unable to get alt text + pass + + # Prepare the alt, escaping any special characters + alt_text = "\n".join([llm_description, alt_text]) or shape.name + alt_text = re.sub(r"[\r\n\[\]]", " ", alt_text) + alt_text = re.sub(r"\s+", " ", alt_text).strip() + + # If keep_data_uris is True, use base64 encoding for images + if kwargs.get("keep_data_uris", False): + blob = shape.image.blob + content_type = shape.image.content_type or "image/png" + b64_string = base64.b64encode(blob).decode("utf-8") + md_content += ( + f"\n![{alt_text}](data:{content_type};base64,{b64_string})\n" ) - for subshape in sorted_shapes: - get_shape_content(subshape, **kwargs) - - sorted_shapes = sorted( - slide.shapes, - key=lambda x: ( - float("-inf") if not x.top else x.top, - float("-inf") if not x.left else x.left, - ), - ) - for shape in sorted_shapes: - get_shape_content(shape, **kwargs) + else: + # A placeholder name + filename = re.sub(r"\W", "", shape.name) + ".jpg" + md_content += "\n![" + alt_text + "](" + filename + ")\n" + + # Tables + if self._is_table(shape): + md_content += self._convert_table_to_markdown(shape.table, **kwargs) + + # Charts + if shape.has_chart: + md_content += self._convert_chart_to_markdown(shape.chart) + + # Text areas + elif shape.has_text_frame: + if shape == title: + md_content += "# " + shape.text.lstrip() + "\n" + else: + md_content += shape.text + "\n" + + # Group Shapes + if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.GROUP: + sorted_shapes = sorted( + shape.shapes, + key=lambda x: ( + float("-inf") if not x.top else x.top, + float("-inf") if not x.left else x.left, + ), + ) + for subshape in sorted_shapes: + get_shape_content(subshape, **kwargs) + + sorted_shapes = sorted( + slide.shapes, + key=lambda x: ( + float("-inf") if not x.top else x.top, + float("-inf") if not x.left else x.left, + ), + ) + for shape in sorted_shapes: + get_shape_content(shape, **kwargs) - md_content = md_content.strip() + md_content = md_content.strip() - if slide.has_notes_slide: - md_content += "\n\n### Notes:\n" - notes_frame = slide.notes_slide.notes_text_frame - if notes_frame is not None: - md_content += notes_frame.text - md_content = md_content.strip() + if slide.has_notes_slide: + md_content += "\n\n### Notes:\n" + notes_frame = slide.notes_slide.notes_text_frame + if notes_frame is not None: + md_content += notes_frame.text + md_content = md_content.strip() - return DocumentConverterResult(markdown=md_content.strip()) + return md_content.strip() def _is_picture(self, shape): if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.PICTURE: diff --git a/packages/markitdown/src/markitdown/grpc/__init__.py b/packages/markitdown/src/markitdown/grpc/__init__.py new file mode 100644 index 000000000..cc81b224d --- /dev/null +++ b/packages/markitdown/src/markitdown/grpc/__init__.py @@ -0,0 +1,13 @@ +try: + import grpc # noqa: F401 +except ImportError as exc: + raise ImportError( + "The markitdown gRPC server and client require the optional [grpc] " + "dependencies. Install them with:\n\n" + " pip install 'markitdown[grpc]'\n" + ) from exc + +from .client import MarkItDownClient +from .server import MarkItDownServiceServicer, serve + +__all__ = ["MarkItDownClient", "MarkItDownServiceServicer", "serve"] diff --git a/packages/markitdown/src/markitdown/grpc/_segmenter.py b/packages/markitdown/src/markitdown/grpc/_segmenter.py new file mode 100644 index 000000000..803288b9d --- /dev/null +++ b/packages/markitdown/src/markitdown/grpc/_segmenter.py @@ -0,0 +1,285 @@ +"""Best-effort segmentation of Markdown into structured document blocks. + +This module backs the ConvertDocumentStream RPC. It performs a line-based +pass over converted Markdown and groups lines into typed blocks (headings, +paragraphs, tables, lists, code blocks, images, block quotes, and horizontal +rules). It is intentionally conservative: anything that does not match a +known block type is emitted as a paragraph, so no content is ever dropped. +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from typing import Iterator, List, Optional, Union + +_HEADING_RE = re.compile(r"^(#{1,6})\s+(.*?)\s*#*\s*$") +_FENCE_RE = re.compile(r"^(```+|~~~+)\s*(\S*)\s*$") +_TABLE_ROW_RE = re.compile(r"^\s*\|.*\|\s*$") +_TABLE_DELIMITER_RE = re.compile(r"^\s*\|?\s*:?-+:?\s*(\|\s*:?-+:?\s*)*\|?\s*$") +_UNORDERED_ITEM_RE = re.compile(r"^(\s*)[-*+]\s+(.*)$") +_ORDERED_ITEM_RE = re.compile(r"^(\s*)\d{1,9}[.)]\s+(.*)$") +_BLOCK_QUOTE_RE = re.compile(r"^\s*>\s?(.*)$") +_HORIZONTAL_RULE_RE = re.compile(r"^\s*((\*\s*){3,}|(-\s*){3,}|(_\s*){3,})$") +_IMAGE_BLOCK_RE = re.compile( + r"""^!\[(?P[^\]]*)\]\(\s*(?P<[^>]*>|[^\s)]+)(?:\s+"(?P[^"]*)")?\s*\)$""" +) + + +@dataclass +class HeadingBlock: + level: int + text: str + + +@dataclass +class ParagraphBlock: + text: str + + +@dataclass +class TableBlock: + markdown: str + rows: List[List[str]] = field(default_factory=list) + + +@dataclass +class ListBlock: + markdown: str + ordered: bool + items: List[str] = field(default_factory=list) + + +@dataclass +class CodeBlock: + language: str + code: str + + +@dataclass +class ImageBlock: + alt_text: str + url: str + title: Optional[str] = None + + +@dataclass +class BlockQuoteBlock: + text: str + + +@dataclass +class HorizontalRuleBlock: + pass + + +DocumentBlock = Union[ + HeadingBlock, + ParagraphBlock, + TableBlock, + ListBlock, + CodeBlock, + ImageBlock, + BlockQuoteBlock, + HorizontalRuleBlock, +] + + +def segment_markdown(markdown: str) -> Iterator[DocumentBlock]: + """Yield typed blocks for the given Markdown, in document order.""" + lines = markdown.split("\n") + index = 0 + total = len(lines) + + while index < total: + line = lines[index] + + if not line.strip(): + index += 1 + continue + + fence_match = _FENCE_RE.match(line) + if fence_match: + block, index = _consume_code_block(lines, index, fence_match) + yield block + continue + + heading_match = _HEADING_RE.match(line) + if heading_match: + yield HeadingBlock( + level=len(heading_match.group(1)), text=heading_match.group(2) + ) + index += 1 + continue + + if _HORIZONTAL_RULE_RE.match(line): + yield HorizontalRuleBlock() + index += 1 + continue + + if _TABLE_ROW_RE.match(line) and _is_table_start(lines, index): + block, index = _consume_table(lines, index) + yield block + continue + + if _BLOCK_QUOTE_RE.match(line): + block, index = _consume_block_quote(lines, index) + yield block + continue + + list_match = _UNORDERED_ITEM_RE.match(line) or _ORDERED_ITEM_RE.match(line) + if list_match and not list_match.group(1): + block, index = _consume_list(lines, index) + yield block + continue + + image_match = _IMAGE_BLOCK_RE.match(line.strip()) + if image_match and _is_block_end(lines, index + 1): + url = image_match.group("url") + if url.startswith("<") and url.endswith(">"): + url = url[1:-1] + yield ImageBlock( + alt_text=image_match.group("alt"), + url=url, + title=image_match.group("title"), + ) + index += 1 + continue + + block, index = _consume_paragraph(lines, index) + yield block + + +def _is_block_end(lines: List[str], index: int) -> bool: + return index >= len(lines) or not lines[index].strip() + + +def _is_table_start(lines: List[str], index: int) -> bool: + return index + 1 < len(lines) and bool(_TABLE_DELIMITER_RE.match(lines[index + 1])) + + +def _consume_code_block( + lines: List[str], index: int, fence_match: "re.Match[str]" +) -> tuple[CodeBlock, int]: + fence = fence_match.group(1) + language = fence_match.group(2) + code_lines: List[str] = [] + index += 1 + while index < len(lines): + if lines[index].strip().startswith(fence[0] * 3): + index += 1 + break + code_lines.append(lines[index]) + index += 1 + return CodeBlock(language=language, code="\n".join(code_lines)), index + + +def _split_table_row(line: str) -> List[str]: + stripped = line.strip() + if stripped.startswith("|"): + stripped = stripped[1:] + if stripped.endswith("|"): + stripped = stripped[:-1] + # Split on pipes that are not escaped with a backslash. + cells = re.split(r"(?<!\\)\|", stripped) + return [cell.replace("\\|", "|").strip() for cell in cells] + + +def _consume_table(lines: List[str], index: int) -> tuple[TableBlock, int]: + start = index + rows: List[List[str]] = [_split_table_row(lines[index])] + index += 2 # Skip past the header and delimiter rows. + while index < len(lines) and _TABLE_ROW_RE.match(lines[index]): + rows.append(_split_table_row(lines[index])) + index += 1 + markdown = "\n".join(lines[start:index]) + return TableBlock(markdown=markdown, rows=rows), index + + +def _consume_block_quote(lines: List[str], index: int) -> tuple[BlockQuoteBlock, int]: + quoted: List[str] = [] + while index < len(lines): + match = _BLOCK_QUOTE_RE.match(lines[index]) + if not match: + break + quoted.append(match.group(1)) + index += 1 + return BlockQuoteBlock(text="\n".join(quoted)), index + + +def _consume_list(lines: List[str], index: int) -> tuple[ListBlock, int]: + start = index + ordered = _ORDERED_ITEM_RE.match(lines[index]) is not None + + items: List[str] = [] + current_item: List[str] = [] + + while index < len(lines): + line = lines[index] + if not line.strip(): + # A blank line ends the list unless it is followed by indented + # continuation or another top-level item of the same list type. + next_index = index + 1 + if next_index < len(lines) and ( + _is_same_type_item(lines[next_index], ordered) + or _is_indented(lines[next_index]) + ): + current_item.append("") + index += 1 + continue + break + + item_match = _UNORDERED_ITEM_RE.match(line) or _ORDERED_ITEM_RE.match(line) + if item_match and not item_match.group(1): + # An adjacent top-level list of the other type starts a new block. + if (_ORDERED_ITEM_RE.match(line) is not None) != ordered: + break + if current_item: + items.append("\n".join(current_item).rstrip()) + current_item = [item_match.group(2)] + elif item_match or _is_indented(line): + # Nested item or indented continuation stays with its parent. + current_item.append(line.strip()) + else: + break + index += 1 + + if current_item: + items.append("\n".join(current_item).rstrip()) + + markdown = "\n".join(lines[start:index]).rstrip() + return ListBlock(markdown=markdown, ordered=ordered, items=items), index + + +def _is_same_type_item(line: str, ordered: bool) -> bool: + match = _UNORDERED_ITEM_RE.match(line) or _ORDERED_ITEM_RE.match(line) + if match is None or match.group(1): + return False + return (_ORDERED_ITEM_RE.match(line) is not None) == ordered + + +def _is_indented(line: str) -> bool: + return line.startswith((" ", "\t", " ")) + + +def _consume_paragraph(lines: List[str], index: int) -> tuple[ParagraphBlock, int]: + paragraph: List[str] = [] + while index < len(lines): + line = lines[index] + if not line.strip(): + break + # Stop if a new structured block begins mid-paragraph. + if ( + _HEADING_RE.match(line) + or _FENCE_RE.match(line) + or _HORIZONTAL_RULE_RE.match(line) + or (_TABLE_ROW_RE.match(line) and _is_table_start(lines, index)) + or _BLOCK_QUOTE_RE.match(line) + ) and paragraph: + break + item_match = _UNORDERED_ITEM_RE.match(line) or _ORDERED_ITEM_RE.match(line) + if item_match and not item_match.group(1) and paragraph: + break + paragraph.append(line) + index += 1 + return ParagraphBlock(text="\n".join(paragraph).strip()), index diff --git a/packages/markitdown/src/markitdown/grpc/client.py b/packages/markitdown/src/markitdown/grpc/client.py new file mode 100644 index 000000000..6d0990039 --- /dev/null +++ b/packages/markitdown/src/markitdown/grpc/client.py @@ -0,0 +1,326 @@ +from __future__ import annotations + +import argparse +import sys +from typing import Iterator + +import grpc + +from .v1 import markitdown_pb2, markitdown_pb2_grpc + +# Matches the server default so large documents and results round-trip. +DEFAULT_MAX_MESSAGE_BYTES = 100 * 1024 * 1024 + + +class MarkItDownClient: + """A simple gRPC client for the MarkItDown service.""" + + def __init__( + self, + address: str = "127.0.0.1:50051", + channel: grpc.Channel | None = None, + max_message_bytes: int = DEFAULT_MAX_MESSAGE_BYTES, + ) -> None: + """Create a client. + + Args: + address: host:port of the server. Ignored when `channel` is given. + channel: An externally managed channel to use instead of creating + one. Message size limits are then the caller's responsibility. + max_message_bytes: Send/receive message size limit for the + internally created channel. Defaults to 100 MiB to match the + server. + """ + if channel is not None: + self._channel = channel + self._owns_channel = False + else: + self._channel = grpc.insecure_channel( + address, + options=[ + ("grpc.max_receive_message_length", max_message_bytes), + ("grpc.max_send_message_length", max_message_bytes), + ], + ) + self._owns_channel = True + self._stub = markitdown_pb2_grpc.MarkItDownServiceStub(self._channel) + + def close(self) -> None: + if self._owns_channel: + self._channel.close() + + def __enter__(self) -> MarkItDownClient: + return self + + def __exit__(self, *args: object) -> None: + self.close() + + def convert( + self, + *, + local_path: str | None = None, + uri: str | None = None, + content: bytes | None = None, + mimetype: str | None = None, + extension: str | None = None, + charset: str | None = None, + keep_data_uris: bool | None = None, + ) -> markitdown_pb2.ConversionResult: + """Convert a document and return the full result.""" + request = _build_convert_request( + local_path=local_path, + uri=uri, + content=content, + mimetype=mimetype, + extension=extension, + charset=charset, + keep_data_uris=keep_data_uris, + ) + response: markitdown_pb2.ConvertResponse = self._stub.Convert(request) + return response.result + + def convert_stream( + self, + *, + local_path: str | None = None, + uri: str | None = None, + content: bytes | None = None, + mimetype: str | None = None, + extension: str | None = None, + charset: str | None = None, + keep_data_uris: bool | None = None, + chunk_size_bytes: int | None = None, + incremental: bool | None = None, + ) -> Iterator[markitdown_pb2.ConvertStreamResponse]: + """Convert a document and yield streaming response events. + + Set `incremental=True` to opt into EXPERIMENTAL incremental + conversion: supported formats (PDF, PPTX) stream chunks as each + page or slide converts, instead of after the whole document. + """ + source = _build_source( + local_path=local_path, + uri=uri, + content=content, + mimetype=mimetype, + extension=extension, + charset=charset, + ) + conversion_options = markitdown_pb2.ConversionOptions() + if keep_data_uris is not None: + conversion_options.keep_data_uris = keep_data_uris + + streaming_options = markitdown_pb2.StreamingOptions() + if chunk_size_bytes is not None: + streaming_options.markdown_chunk_size_bytes = chunk_size_bytes + if incremental is not None: + streaming_options.experimental_incremental = incremental + + request = markitdown_pb2.ConvertStreamRequest( + source=source, + conversion_options=conversion_options, + streaming_options=streaming_options, + ) + yield from self._stub.ConvertStream(request) + + def convert_document_stream( + self, + *, + local_path: str | None = None, + uri: str | None = None, + content: bytes | None = None, + mimetype: str | None = None, + extension: str | None = None, + charset: str | None = None, + keep_data_uris: bool | None = None, + incremental: bool | None = None, + ) -> Iterator[markitdown_pb2.ConvertDocumentStreamResponse]: + """Convert a document and yield structured document elements. + + Events arrive in order: one `started`, zero or more `element` + (headings, paragraphs, tables, lists, code blocks, images, ...), + then one `completed`. + + Set `incremental=True` to opt into EXPERIMENTAL incremental + conversion: supported formats (PDF, PPTX) stream elements as each + page or slide converts, instead of after the whole document. + """ + source = _build_source( + local_path=local_path, + uri=uri, + content=content, + mimetype=mimetype, + extension=extension, + charset=charset, + ) + conversion_options = markitdown_pb2.ConversionOptions() + if keep_data_uris is not None: + conversion_options.keep_data_uris = keep_data_uris + + streaming_options = markitdown_pb2.StreamingOptions() + if incremental is not None: + streaming_options.experimental_incremental = incremental + + request = markitdown_pb2.ConvertDocumentStreamRequest( + source=source, + conversion_options=conversion_options, + streaming_options=streaming_options, + ) + yield from self._stub.ConvertDocumentStream(request) + + +def _build_source( + *, + local_path: str | None, + uri: str | None, + content: bytes | None, + mimetype: str | None, + extension: str | None, + charset: str | None, +) -> markitdown_pb2.Source: + stream_info_kwargs: dict[str, str] = {} + if mimetype is not None: + stream_info_kwargs["mimetype"] = mimetype + if extension is not None: + stream_info_kwargs["extension"] = extension + if charset is not None: + stream_info_kwargs["charset"] = charset + stream_info = markitdown_pb2.StreamInfo(**stream_info_kwargs) + + if local_path is not None: + return markitdown_pb2.Source(local_path=local_path, stream_info=stream_info) + if uri is not None: + return markitdown_pb2.Source(uri=uri, stream_info=stream_info) + if content is not None: + return markitdown_pb2.Source(content=content, stream_info=stream_info) + raise ValueError("One of local_path, uri, or content must be provided.") + + +def _build_convert_request( + *, + local_path: str | None, + uri: str | None, + content: bytes | None, + mimetype: str | None, + extension: str | None, + charset: str | None, + keep_data_uris: bool | None, +) -> markitdown_pb2.ConvertRequest: + source = _build_source( + local_path=local_path, + uri=uri, + content=content, + mimetype=mimetype, + extension=extension, + charset=charset, + ) + conversion_options = markitdown_pb2.ConversionOptions() + if keep_data_uris is not None: + conversion_options.keep_data_uris = keep_data_uris + + return markitdown_pb2.ConvertRequest( + source=source, + conversion_options=conversion_options, + ) + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Send a convert request to a running MarkItDown gRPC server.", + prog="markitdown-grpc-client", + ) + parser.add_argument( + "--address", + default="127.0.0.1:50051", + help="Address of the gRPC server (default: 127.0.0.1:50051).", + ) + parser.add_argument( + "-o", + "--output", + help="Output file. If not provided, output is written to stdout.", + ) + parser.add_argument( + "-x", + "--extension", + help="Hint about the file extension (e.g. .pdf).", + ) + parser.add_argument( + "-m", + "--mime-type", + help="Hint about the file MIME type.", + ) + parser.add_argument( + "-c", + "--charset", + help="Hint about the file charset (e.g. UTF-8).", + ) + parser.add_argument( + "--stream", + action="store_true", + help="Use the streaming ConvertStream RPC instead of the unary Convert RPC.", + ) + + source_group = parser.add_mutually_exclusive_group() + source_group.add_argument( + "--uri", + help="Remote URI to convert.", + ) + source_group.add_argument( + "filename", + nargs="?", + help="Local file path to convert. Reads from stdin if omitted.", + ) + + args = parser.parse_args() + + extension = args.extension + if extension and not extension.startswith("."): + extension = "." + extension + + with MarkItDownClient(address=args.address) as client: + if args.uri: + kwargs = dict( + uri=args.uri, + mimetype=args.mime_type, + extension=extension, + charset=args.charset, + ) + elif args.filename: + kwargs = dict( + local_path=args.filename, + mimetype=args.mime_type, + extension=extension, + charset=args.charset, + ) + else: + data = sys.stdin.buffer.read() + kwargs = dict( + content=data, + mimetype=args.mime_type, + extension=extension, + charset=args.charset, + ) + + if args.stream: + markdown_parts: list[str] = [] + for event in client.convert_stream(**kwargs): # type: ignore[arg-type] + if event.HasField("markdown_chunk"): + markdown_parts.append(event.markdown_chunk.markdown) + markdown = "".join(markdown_parts) + else: + result = client.convert(**kwargs) # type: ignore[arg-type] + markdown = result.markdown + + if args.output: + with open(args.output, "w", encoding="utf-8") as f: + f.write(markdown) + else: + try: + sys.stdout.reconfigure(errors="replace") + except AttributeError: + pass # stdout replaced by a non-reconfigurable stream + print(markdown) + + +if __name__ == "__main__": + main() diff --git a/packages/markitdown/src/markitdown/grpc/server.py b/packages/markitdown/src/markitdown/grpc/server.py new file mode 100644 index 000000000..5157ceef8 --- /dev/null +++ b/packages/markitdown/src/markitdown/grpc/server.py @@ -0,0 +1,592 @@ +from __future__ import annotations + +import argparse +import io +import sys +from concurrent import futures +from typing import Iterable, Iterator, NoReturn + +import grpc + +from markitdown import ( + DocumentConverterResult, + FileConversionException, + MarkItDown, + MarkItDownException, + MissingDependencyException, + StreamInfo, + UnsupportedFormatException, +) +from markitdown.converters import ContentUnderstandingFileType +from markitdown.streaming import StreamingConverterController + +from . import _segmenter +from .v1 import markitdown_pb2, markitdown_pb2_grpc + +_DEFAULT_MARKDOWN_CHUNK_SIZE_BYTES = 4096 + +# Generous default so large documents (big PDFs, Office files with embedded +# media) can be sent inline via Source.content. Operators can lower this with +# --max-receive-message-bytes when exposing the server more broadly. +DEFAULT_MAX_MESSAGE_BYTES = 100 * 1024 * 1024 + +_CU_FILE_TYPE_MAP: dict[int, ContentUnderstandingFileType] = { + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_PDF: ContentUnderstandingFileType.PDF, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_DOCX: ContentUnderstandingFileType.DOCX, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_PPTX: ContentUnderstandingFileType.PPTX, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_XLSX: ContentUnderstandingFileType.XLSX, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_HTML: ContentUnderstandingFileType.HTML, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_TXT: ContentUnderstandingFileType.TXT, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_MD: ContentUnderstandingFileType.MD, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_RTF: ContentUnderstandingFileType.RTF, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_XML: ContentUnderstandingFileType.XML, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_EML: ContentUnderstandingFileType.EML, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_MSG: ContentUnderstandingFileType.MSG, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_JPEG: ContentUnderstandingFileType.JPEG, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_PNG: ContentUnderstandingFileType.PNG, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_BMP: ContentUnderstandingFileType.BMP, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_TIFF: ContentUnderstandingFileType.TIFF, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_HEIF: ContentUnderstandingFileType.HEIF, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_MP4: ContentUnderstandingFileType.MP4, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_M4V: ContentUnderstandingFileType.M4V, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_MOV: ContentUnderstandingFileType.MOV, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_AVI: ContentUnderstandingFileType.AVI, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_MKV: ContentUnderstandingFileType.MKV, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_WEBM: ContentUnderstandingFileType.WEBM, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_FLV: ContentUnderstandingFileType.FLV, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_WMV: ContentUnderstandingFileType.WMV, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_WAV: ContentUnderstandingFileType.WAV, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_MP3: ContentUnderstandingFileType.MP3, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_M4A: ContentUnderstandingFileType.M4A, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_FLAC: ContentUnderstandingFileType.FLAC, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_OGG: ContentUnderstandingFileType.OGG, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_AAC: ContentUnderstandingFileType.AAC, + markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_WMA: ContentUnderstandingFileType.WMA, +} + + +def _abort_from_exception( + context: grpc.ServicerContext, exc: BaseException +) -> NoReturn: + if isinstance(exc, FileNotFoundError): + context.abort(grpc.StatusCode.NOT_FOUND, str(exc)) + if isinstance(exc, UnsupportedFormatException): + context.abort(grpc.StatusCode.FAILED_PRECONDITION, str(exc)) + if isinstance(exc, MissingDependencyException): + context.abort(grpc.StatusCode.FAILED_PRECONDITION, str(exc)) + if isinstance(exc, FileConversionException): + context.abort(grpc.StatusCode.INTERNAL, str(exc)) + if isinstance(exc, MarkItDownException): + context.abort(grpc.StatusCode.INTERNAL, str(exc)) + if isinstance(exc, ValueError): + context.abort(grpc.StatusCode.INVALID_ARGUMENT, str(exc)) + if isinstance(exc, OSError) and exc.errno == 2: + context.abort(grpc.StatusCode.NOT_FOUND, str(exc)) + raise exc + + +class MarkItDownServiceServicer(markitdown_pb2_grpc.MarkItDownServiceServicer): + def __init__(self) -> None: + self._streaming_controller = StreamingConverterController() + + def Convert( + self, request: markitdown_pb2.ConvertRequest, context: grpc.ServicerContext + ) -> markitdown_pb2.ConvertResponse: + try: + conversion_result = self._convert_request(request, context) + except Exception as exc: + _abort_from_exception(context, exc) + return markitdown_pb2.ConvertResponse( + result=self._to_proto_result(conversion_result) + ) + + def ConvertStream( + self, + request: markitdown_pb2.ConvertStreamRequest, + context: grpc.ServicerContext, + ) -> Iterator[markitdown_pb2.ConvertStreamResponse]: + source_kind = request.source.WhichOneof("input") + if source_kind is None: + context.abort( + grpc.StatusCode.INVALID_ARGUMENT, + "source.input is required and must set one of local_path, uri, or content.", + ) + + chunk_size = _resolve_chunk_size(request, context) + + yield markitdown_pb2.ConvertStreamResponse( + started=markitdown_pb2.ConversionStarted(source_kind=source_kind) + ) + + title: str | None = None + try: + fragments = self._try_incremental_fragments(request) + if fragments is not None: + chunk_count = 0 + for markdown_chunk, is_last in _iter_incremental_chunks( + fragments, chunk_size + ): + yield markitdown_pb2.ConvertStreamResponse( + markdown_chunk=markitdown_pb2.MarkdownChunk( + chunk_index=chunk_count, + markdown=markdown_chunk, + is_last=is_last, + ) + ) + chunk_count += 1 + total_chunks = chunk_count + else: + conversion_result = self._convert_request(request, context) + title = conversion_result.title + chunks = list(_chunk_markdown(conversion_result.markdown, chunk_size)) + if not chunks: + chunks = [""] + for chunk_index, markdown_chunk in enumerate(chunks): + yield markitdown_pb2.ConvertStreamResponse( + markdown_chunk=markitdown_pb2.MarkdownChunk( + chunk_index=chunk_index, + markdown=markdown_chunk, + is_last=chunk_index == len(chunks) - 1, + ) + ) + total_chunks = len(chunks) + except Exception as exc: + _abort_from_exception(context, exc) + + completed = markitdown_pb2.ConversionCompleted(total_chunks=total_chunks) + if title: + completed.title = title + yield markitdown_pb2.ConvertStreamResponse(completed=completed) + + def ConvertDocumentStream( + self, + request: markitdown_pb2.ConvertDocumentStreamRequest, + context: grpc.ServicerContext, + ) -> Iterator[markitdown_pb2.ConvertDocumentStreamResponse]: + source_kind = request.source.WhichOneof("input") + if source_kind is None: + context.abort( + grpc.StatusCode.INVALID_ARGUMENT, + "source.input is required and must set one of local_path, uri, or content.", + ) + + yield markitdown_pb2.ConvertDocumentStreamResponse( + started=markitdown_pb2.ConversionStarted(source_kind=source_kind) + ) + + title: str | None = None + element_index = 0 + try: + fragments = self._try_incremental_fragments(request) + if fragments is not None: + # Fragments are separated by blank lines, so blocks never + # span fragments and per-fragment segmentation matches + # whole-document segmentation. + for fragment in fragments: + for block in _segmenter.segment_markdown(fragment): + yield markitdown_pb2.ConvertDocumentStreamResponse( + element=_to_proto_element(block, element_index) + ) + element_index += 1 + else: + conversion_result = self._convert_request(request, context) + title = conversion_result.title + for block in _segmenter.segment_markdown(conversion_result.markdown): + yield markitdown_pb2.ConvertDocumentStreamResponse( + element=_to_proto_element(block, element_index) + ) + element_index += 1 + except Exception as exc: + _abort_from_exception(context, exc) + + completed = markitdown_pb2.DocumentStreamCompleted(total_elements=element_index) + if title: + completed.title = title + yield markitdown_pb2.ConvertDocumentStreamResponse(completed=completed) + + def _try_incremental_fragments( + self, + request: ( + markitdown_pb2.ConvertStreamRequest + | markitdown_pb2.ConvertDocumentStreamRequest + ), + ) -> Iterator[str] | None: + """Return an incremental fragment iterator, or None to use the + whole-document conversion path. + + Incremental conversion is experimental and opt-in via + streaming_options.experimental_incremental. It only applies to + inline content and local paths with a format a streaming converter + accepts, and is skipped when service options route conversion + elsewhere (Azure backends, plugins). + """ + if not ( + request.HasField("streaming_options") + and request.streaming_options.experimental_incremental + ): + return None + + service_options = request.service_options + if ( + service_options.HasField("document_intelligence") + or service_options.HasField("content_understanding") + or ( + service_options.HasField("enable_plugins") + and service_options.enable_plugins + ) + or ( + service_options.HasField("enable_builtins") + and not service_options.enable_builtins + ) + ): + return None + + source_kind = request.source.WhichOneof("input") + if source_kind == "content": + file_stream: io.BufferedIOBase = io.BytesIO(request.source.content) + elif source_kind == "local_path": + file_stream = open(request.source.local_path, "rb") + else: + return None # URIs use the standard path, which handles fetching. + + stream_info = _to_stream_info(request.source.stream_info) or StreamInfo() + kwargs: dict[str, object] = {} + if request.conversion_options.HasField("keep_data_uris"): + kwargs["keep_data_uris"] = request.conversion_options.keep_data_uris + + fragments = self._streaming_controller.iter_markdown( + file_stream, stream_info, **kwargs + ) + if fragments is None: + file_stream.close() + return None + + def _closing() -> Iterator[str]: + try: + yield from fragments + finally: + file_stream.close() + + return _closing() + + def _convert_request( + self, + request: ( + markitdown_pb2.ConvertRequest + | markitdown_pb2.ConvertStreamRequest + | markitdown_pb2.ConvertDocumentStreamRequest + ), + context: grpc.ServicerContext, + ) -> DocumentConverterResult: + source_kind = request.source.WhichOneof("input") + if source_kind is None: + context.abort( + grpc.StatusCode.INVALID_ARGUMENT, + "source.input is required and must set one of local_path, uri, or content.", + ) + + markitdown_client = _create_markitdown(request.service_options) + convert_kwargs = _build_convert_kwargs( + request.conversion_options, request.source + ) + + if source_kind == "local_path": + return markitdown_client.convert_local( + request.source.local_path, **convert_kwargs + ) + if source_kind == "uri": + return markitdown_client.convert_uri(request.source.uri, **convert_kwargs) + + assert source_kind == "content" + return markitdown_client.convert_stream( + io.BytesIO(request.source.content), **convert_kwargs + ) + + @staticmethod + def _to_proto_result( + conversion_result: DocumentConverterResult, + ) -> markitdown_pb2.ConversionResult: + proto_result = markitdown_pb2.ConversionResult( + markdown=conversion_result.markdown + ) + if conversion_result.title: + proto_result.title = conversion_result.title + return proto_result + + +def _build_convert_kwargs( + conversion_options: markitdown_pb2.ConversionOptions, + source: markitdown_pb2.Source, +) -> dict[str, object]: + kwargs: dict[str, object] = {} + if conversion_options.HasField("keep_data_uris"): + kwargs["keep_data_uris"] = conversion_options.keep_data_uris + + stream_info = _to_stream_info(source.stream_info) + if stream_info is not None: + kwargs["stream_info"] = stream_info + return kwargs + + +def _to_stream_info(stream_info: markitdown_pb2.StreamInfo) -> StreamInfo | None: + values: dict[str, str] = {} + + if stream_info.HasField("mimetype"): + values["mimetype"] = stream_info.mimetype + if stream_info.HasField("extension"): + values["extension"] = stream_info.extension + if stream_info.HasField("charset"): + values["charset"] = stream_info.charset + if stream_info.HasField("filename"): + values["filename"] = stream_info.filename + if stream_info.HasField("local_path"): + values["local_path"] = stream_info.local_path + if stream_info.HasField("url"): + values["url"] = stream_info.url + + if not values: + return None + return StreamInfo(**values) + + +def _create_markitdown(service_options: markitdown_pb2.ServiceOptions) -> MarkItDown: + kwargs: dict[str, object] = {} + + if service_options.HasField("enable_builtins"): + kwargs["enable_builtins"] = service_options.enable_builtins + if service_options.HasField("enable_plugins"): + kwargs["enable_plugins"] = service_options.enable_plugins + + if service_options.HasField("document_intelligence"): + kwargs["docintel_endpoint"] = service_options.document_intelligence.endpoint + + if service_options.HasField("content_understanding"): + cu_options = service_options.content_understanding + kwargs["cu_endpoint"] = cu_options.endpoint + if cu_options.HasField("analyzer_id"): + kwargs["cu_analyzer_id"] = cu_options.analyzer_id + if cu_options.file_types: + kwargs["cu_file_types"] = _to_cu_file_types(cu_options.file_types) + + return MarkItDown(**kwargs) + + +def _to_cu_file_types( + file_types: Iterable[int], +) -> list[ContentUnderstandingFileType]: + converted: list[ContentUnderstandingFileType] = [] + for file_type in file_types: + if file_type == markitdown_pb2.CONTENT_UNDERSTANDING_FILE_TYPE_UNSPECIFIED: + continue + mapped = _CU_FILE_TYPE_MAP.get(file_type) + if mapped is None: + raise ValueError( + f"Unknown content_understanding.file_types value: {file_type}" + ) + converted.append(mapped) + return converted + + +def _resolve_chunk_size( + request: markitdown_pb2.ConvertStreamRequest, + context: grpc.ServicerContext, +) -> int: + chunk_size = _DEFAULT_MARKDOWN_CHUNK_SIZE_BYTES + if request.HasField("streaming_options") and request.streaming_options.HasField( + "markdown_chunk_size_bytes" + ): + chunk_size = request.streaming_options.markdown_chunk_size_bytes + if chunk_size == 0: + context.abort( + grpc.StatusCode.INVALID_ARGUMENT, + "streaming_options.markdown_chunk_size_bytes must be greater than zero.", + ) + return int(chunk_size) + + +def _warn_if_non_local_bind(bind_address: str) -> None: + host = bind_address.rsplit(":", maxsplit=1)[0] + if host.startswith("[") and host.endswith("]"): + host = host[1:-1] + if host not in ("127.0.0.1", "localhost", "::1"): + print( + "\n" + "WARNING: The gRPC server is being bound to a non-localhost interface " + f"({host}).\n" + "This exposes the server to other machines on the network or Internet.\n" + "The server has NO authentication and runs with your user's privileges.\n" + "Any process or user that can reach this interface can read files and\n" + "fetch network resources accessible to this user.\n" + "Only proceed if you understand the security implications.\n", + file=sys.stderr, + ) + + +def _chunk_markdown(markdown: str, chunk_size: int) -> Iterator[str]: + if not markdown: + return + + start = 0 + while start < len(markdown): + end = min(start + chunk_size, len(markdown)) + yield markdown[start:end] + start = end + + +def _iter_incremental_chunks( + fragments: Iterator[str], chunk_size: int +) -> Iterator[tuple[str, bool]]: + """Re-chunk incremental markdown fragments into (chunk, is_last) pairs. + + Joins fragments with a blank line (matching whole-document conversion + output) and emits fixed-size chunks as soon as they fill, holding back + a partial tail so the final chunk can be flagged is_last without + waiting for the whole document. + """ + buffer = "" + first = True + for fragment in fragments: + if not first: + buffer += "\n\n" + first = False + buffer += fragment + while len(buffer) > chunk_size: + yield buffer[:chunk_size], False + buffer = buffer[chunk_size:] + yield buffer, True + + +def _to_proto_element( + block: _segmenter.DocumentBlock, element_index: int +) -> markitdown_pb2.DocumentElement: + element = markitdown_pb2.DocumentElement(element_index=element_index) + + if isinstance(block, _segmenter.HeadingBlock): + element.heading.level = block.level + element.heading.text = block.text + elif isinstance(block, _segmenter.TableBlock): + element.table.markdown = block.markdown + for row in block.rows: + element.table.rows.add(cells=row) + elif isinstance(block, _segmenter.ListBlock): + element.list.markdown = block.markdown + element.list.ordered = block.ordered + element.list.items.extend(block.items) + elif isinstance(block, _segmenter.CodeBlock): + element.code_block.language = block.language + element.code_block.code = block.code + elif isinstance(block, _segmenter.ImageBlock): + element.image.alt_text = block.alt_text + element.image.url = block.url + if block.title is not None: + element.image.title = block.title + elif isinstance(block, _segmenter.BlockQuoteBlock): + element.block_quote.text = block.text + elif isinstance(block, _segmenter.HorizontalRuleBlock): + element.horizontal_rule.SetInParent() + else: + assert isinstance(block, _segmenter.ParagraphBlock) + element.paragraph.text = block.text + + return element + + +def _enable_health_and_reflection(grpc_server: grpc.Server) -> None: + """Register standard health and reflection services when available. + + Both packages ship with the markitdown[grpc] extra; the guards keep the + server usable in minimal environments where only grpcio is installed. + """ + service_names = [ + markitdown_pb2.DESCRIPTOR.services_by_name["MarkItDownService"].full_name, + ] + + try: + from grpc_health.v1 import health, health_pb2, health_pb2_grpc + + health_servicer = health.HealthServicer() + health_pb2_grpc.add_HealthServicer_to_server(health_servicer, grpc_server) + for service_name in [*service_names, ""]: + health_servicer.set(service_name, health_pb2.HealthCheckResponse.SERVING) + service_names.append(health.SERVICE_NAME) + except ImportError: + pass + + try: + from grpc_reflection.v1alpha import reflection + + reflection.enable_server_reflection( + [*service_names, reflection.SERVICE_NAME], grpc_server + ) + except ImportError: + pass + + +def serve( + bind_address: str = "127.0.0.1:50051", + max_workers: int = 10, + max_receive_message_bytes: int = DEFAULT_MAX_MESSAGE_BYTES, +) -> grpc.Server: + """Start a MarkItDown gRPC server and return it. + + Args: + bind_address: host:port to listen on. The server is insecure + (no TLS, no authentication); bind to localhost unless the + network path is otherwise secured. + max_workers: Maximum worker threads for handling requests. + max_receive_message_bytes: Upper bound for incoming request size, + which limits inline `Source.content` payloads. Defaults to + 100 MiB. + """ + options = [ + ("grpc.max_receive_message_length", max_receive_message_bytes), + ("grpc.max_send_message_length", max_receive_message_bytes), + ] + + grpc_server = grpc.server( + futures.ThreadPoolExecutor(max_workers=max_workers), options=options + ) + markitdown_pb2_grpc.add_MarkItDownServiceServicer_to_server( + MarkItDownServiceServicer(), grpc_server + ) + _enable_health_and_reflection(grpc_server) + grpc_server.add_insecure_port(bind_address) + grpc_server.start() + return grpc_server + + +def main() -> None: + parser = argparse.ArgumentParser(description="Run the MarkItDown gRPC server.") + parser.add_argument( + "--bind-address", + default="127.0.0.1:50051", + help="Address the gRPC server listens on.", + ) + parser.add_argument( + "--max-workers", + type=int, + default=10, + help="Maximum worker threads for handling requests.", + ) + parser.add_argument( + "--max-receive-message-bytes", + type=int, + default=DEFAULT_MAX_MESSAGE_BYTES, + help=( + "Maximum size of incoming request messages in bytes, which bounds " + "inline Source.content payloads (default: 100 MiB)." + ), + ) + args = parser.parse_args() + + _warn_if_non_local_bind(args.bind_address) + grpc_server = serve( + bind_address=args.bind_address, + max_workers=args.max_workers, + max_receive_message_bytes=args.max_receive_message_bytes, + ) + grpc_server.wait_for_termination() + + +if __name__ == "__main__": + main() diff --git a/packages/markitdown/src/markitdown/grpc/v1/__init__.py b/packages/markitdown/src/markitdown/grpc/v1/__init__.py new file mode 100644 index 000000000..8a6d9949a --- /dev/null +++ b/packages/markitdown/src/markitdown/grpc/v1/__init__.py @@ -0,0 +1,3 @@ +from . import markitdown_pb2, markitdown_pb2_grpc + +__all__ = ["markitdown_pb2", "markitdown_pb2_grpc"] diff --git a/packages/markitdown/src/markitdown/grpc/v1/markitdown_pb2.py b/packages/markitdown/src/markitdown/grpc/v1/markitdown_pb2.py new file mode 100644 index 000000000..913fb4fe4 --- /dev/null +++ b/packages/markitdown/src/markitdown/grpc/v1/markitdown_pb2.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: markitdown/v1/markitdown.proto +# Protobuf Python Version: 6.33.5 +"""Generated protocol buffer code.""" + +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder + +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, 6, 33, 5, "", "markitdown/v1/markitdown.proto" +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n\x1emarkitdown/v1/markitdown.proto\x12\rmarkitdown.v1"\xad\x01\n\x0e\x43onvertRequest\x12%\n\x06source\x18\x01 \x01(\x0b\x32\x15.markitdown.v1.Source\x12<\n\x12\x63onversion_options\x18\x02 \x01(\x0b\x32 .markitdown.v1.ConversionOptions\x12\x36\n\x0fservice_options\x18\x03 \x01(\x0b\x32\x1d.markitdown.v1.ServiceOptions"\xef\x01\n\x14\x43onvertStreamRequest\x12%\n\x06source\x18\x01 \x01(\x0b\x32\x15.markitdown.v1.Source\x12<\n\x12\x63onversion_options\x18\x02 \x01(\x0b\x32 .markitdown.v1.ConversionOptions\x12\x36\n\x0fservice_options\x18\x03 \x01(\x0b\x32\x1d.markitdown.v1.ServiceOptions\x12:\n\x11streaming_options\x18\x04 \x01(\x0b\x32\x1f.markitdown.v1.StreamingOptions"\xf7\x01\n\x1c\x43onvertDocumentStreamRequest\x12%\n\x06source\x18\x01 \x01(\x0b\x32\x15.markitdown.v1.Source\x12<\n\x12\x63onversion_options\x18\x02 \x01(\x0b\x32 .markitdown.v1.ConversionOptions\x12\x36\n\x0fservice_options\x18\x03 \x01(\x0b\x32\x1d.markitdown.v1.ServiceOptions\x12:\n\x11streaming_options\x18\x04 \x01(\x0b\x32\x1f.markitdown.v1.StreamingOptions"y\n\x06Source\x12\x14\n\nlocal_path\x18\x01 \x01(\tH\x00\x12\r\n\x03uri\x18\x02 \x01(\tH\x00\x12\x11\n\x07\x63ontent\x18\x03 \x01(\x0cH\x00\x12.\n\x0bstream_info\x18\x04 \x01(\x0b\x32\x19.markitdown.v1.StreamInfoB\x07\n\x05input"\xde\x01\n\nStreamInfo\x12\x15\n\x08mimetype\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x16\n\textension\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x14\n\x07\x63harset\x18\x03 \x01(\tH\x02\x88\x01\x01\x12\x15\n\x08\x66ilename\x18\x04 \x01(\tH\x03\x88\x01\x01\x12\x17\n\nlocal_path\x18\x05 \x01(\tH\x04\x88\x01\x01\x12\x10\n\x03url\x18\x06 \x01(\tH\x05\x88\x01\x01\x42\x0b\n\t_mimetypeB\x0c\n\n_extensionB\n\n\x08_charsetB\x0b\n\t_filenameB\r\n\x0b_local_pathB\x06\n\x04_url"C\n\x11\x43onversionOptions\x12\x1b\n\x0ekeep_data_uris\x18\x01 \x01(\x08H\x00\x88\x01\x01\x42\x11\n\x0f_keep_data_uris"\x88\x02\n\x0eServiceOptions\x12\x1c\n\x0f\x65nable_builtins\x18\x01 \x01(\x08H\x00\x88\x01\x01\x12\x1b\n\x0e\x65nable_plugins\x18\x02 \x01(\x08H\x01\x88\x01\x01\x12I\n\x15\x64ocument_intelligence\x18\x03 \x01(\x0b\x32*.markitdown.v1.DocumentIntelligenceOptions\x12I\n\x15\x63ontent_understanding\x18\x04 \x01(\x0b\x32*.markitdown.v1.ContentUnderstandingOptionsB\x12\n\x10_enable_builtinsB\x11\n\x0f_enable_plugins"/\n\x1b\x44ocumentIntelligenceOptions\x12\x10\n\x08\x65ndpoint\x18\x01 \x01(\t"\x9a\x01\n\x1b\x43ontentUnderstandingOptions\x12\x10\n\x08\x65ndpoint\x18\x01 \x01(\t\x12\x18\n\x0b\x61nalyzer_id\x18\x02 \x01(\tH\x00\x88\x01\x01\x12?\n\nfile_types\x18\x03 \x03(\x0e\x32+.markitdown.v1.ContentUnderstandingFileTypeB\x0e\n\x0c_analyzer_id"\x9c\x01\n\x10StreamingOptions\x12&\n\x19markdown_chunk_size_bytes\x18\x01 \x01(\rH\x00\x88\x01\x01\x12%\n\x18\x65xperimental_incremental\x18\x02 \x01(\x08H\x01\x88\x01\x01\x42\x1c\n\x1a_markdown_chunk_size_bytesB\x1b\n\x19_experimental_incremental"B\n\x0f\x43onvertResponse\x12/\n\x06result\x18\x01 \x01(\x0b\x32\x1f.markitdown.v1.ConversionResult"B\n\x10\x43onversionResult\x12\x10\n\x08markdown\x18\x01 \x01(\t\x12\x12\n\x05title\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_title"\xc6\x01\n\x15\x43onvertStreamResponse\x12\x33\n\x07started\x18\x01 \x01(\x0b\x32 .markitdown.v1.ConversionStartedH\x00\x12\x36\n\x0emarkdown_chunk\x18\x02 \x01(\x0b\x32\x1c.markitdown.v1.MarkdownChunkH\x00\x12\x37\n\tcompleted\x18\x03 \x01(\x0b\x32".markitdown.v1.ConversionCompletedH\x00\x42\x07\n\x05\x65vent"\xcd\x01\n\x1d\x43onvertDocumentStreamResponse\x12\x33\n\x07started\x18\x01 \x01(\x0b\x32 .markitdown.v1.ConversionStartedH\x00\x12\x31\n\x07\x65lement\x18\x02 \x01(\x0b\x32\x1e.markitdown.v1.DocumentElementH\x00\x12;\n\tcompleted\x18\x03 \x01(\x0b\x32&.markitdown.v1.DocumentStreamCompletedH\x00\x42\x07\n\x05\x65vent"(\n\x11\x43onversionStarted\x12\x13\n\x0bsource_kind\x18\x01 \x01(\t"G\n\rMarkdownChunk\x12\x13\n\x0b\x63hunk_index\x18\x01 \x01(\r\x12\x10\n\x08markdown\x18\x02 \x01(\t\x12\x0f\n\x07is_last\x18\x03 \x01(\x08"I\n\x13\x43onversionCompleted\x12\x14\n\x0ctotal_chunks\x18\x01 \x01(\r\x12\x12\n\x05title\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_title"O\n\x17\x44ocumentStreamCompleted\x12\x16\n\x0etotal_elements\x18\x01 \x01(\r\x12\x12\n\x05title\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_title"\x9e\x03\n\x0f\x44ocumentElement\x12\x15\n\relement_index\x18\x01 \x01(\r\x12)\n\x07heading\x18\x02 \x01(\x0b\x32\x16.markitdown.v1.HeadingH\x00\x12-\n\tparagraph\x18\x03 \x01(\x0b\x32\x18.markitdown.v1.ParagraphH\x00\x12%\n\x05table\x18\x04 \x01(\x0b\x32\x14.markitdown.v1.TableH\x00\x12(\n\x04list\x18\x05 \x01(\x0b\x32\x18.markitdown.v1.ListBlockH\x00\x12.\n\ncode_block\x18\x06 \x01(\x0b\x32\x18.markitdown.v1.CodeBlockH\x00\x12%\n\x05image\x18\x07 \x01(\x0b\x32\x14.markitdown.v1.ImageH\x00\x12\x30\n\x0b\x62lock_quote\x18\x08 \x01(\x0b\x32\x19.markitdown.v1.BlockQuoteH\x00\x12\x38\n\x0fhorizontal_rule\x18\t \x01(\x0b\x32\x1d.markitdown.v1.HorizontalRuleH\x00\x42\x06\n\x04kind"&\n\x07Heading\x12\r\n\x05level\x18\x01 \x01(\r\x12\x0c\n\x04text\x18\x02 \x01(\t"\x19\n\tParagraph\x12\x0c\n\x04text\x18\x01 \x01(\t"@\n\x05Table\x12\x10\n\x08markdown\x18\x01 \x01(\t\x12%\n\x04rows\x18\x02 \x03(\x0b\x32\x17.markitdown.v1.TableRow"\x19\n\x08TableRow\x12\r\n\x05\x63\x65lls\x18\x01 \x03(\t"=\n\tListBlock\x12\x10\n\x08markdown\x18\x01 \x01(\t\x12\x0f\n\x07ordered\x18\x02 \x01(\x08\x12\r\n\x05items\x18\x03 \x03(\t"+\n\tCodeBlock\x12\x10\n\x08language\x18\x01 \x01(\t\x12\x0c\n\x04\x63ode\x18\x02 \x01(\t"D\n\x05Image\x12\x10\n\x08\x61lt_text\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x12\n\x05title\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_title"\x1a\n\nBlockQuote\x12\x0c\n\x04text\x18\x01 \x01(\t"\x10\n\x0eHorizontalRule*\xce\n\n\x1c\x43ontentUnderstandingFileType\x12/\n+CONTENT_UNDERSTANDING_FILE_TYPE_UNSPECIFIED\x10\x00\x12\'\n#CONTENT_UNDERSTANDING_FILE_TYPE_PDF\x10\x01\x12(\n$CONTENT_UNDERSTANDING_FILE_TYPE_DOCX\x10\x02\x12(\n$CONTENT_UNDERSTANDING_FILE_TYPE_PPTX\x10\x03\x12(\n$CONTENT_UNDERSTANDING_FILE_TYPE_XLSX\x10\x04\x12(\n$CONTENT_UNDERSTANDING_FILE_TYPE_HTML\x10\x05\x12\'\n#CONTENT_UNDERSTANDING_FILE_TYPE_TXT\x10\x06\x12&\n"CONTENT_UNDERSTANDING_FILE_TYPE_MD\x10\x07\x12\'\n#CONTENT_UNDERSTANDING_FILE_TYPE_RTF\x10\x08\x12\'\n#CONTENT_UNDERSTANDING_FILE_TYPE_XML\x10\t\x12\'\n#CONTENT_UNDERSTANDING_FILE_TYPE_EML\x10\n\x12\'\n#CONTENT_UNDERSTANDING_FILE_TYPE_MSG\x10\x0b\x12(\n$CONTENT_UNDERSTANDING_FILE_TYPE_JPEG\x10\x0c\x12\'\n#CONTENT_UNDERSTANDING_FILE_TYPE_PNG\x10\r\x12\'\n#CONTENT_UNDERSTANDING_FILE_TYPE_BMP\x10\x0e\x12(\n$CONTENT_UNDERSTANDING_FILE_TYPE_TIFF\x10\x0f\x12(\n$CONTENT_UNDERSTANDING_FILE_TYPE_HEIF\x10\x10\x12\'\n#CONTENT_UNDERSTANDING_FILE_TYPE_MP4\x10\x11\x12\'\n#CONTENT_UNDERSTANDING_FILE_TYPE_M4V\x10\x12\x12\'\n#CONTENT_UNDERSTANDING_FILE_TYPE_MOV\x10\x13\x12\'\n#CONTENT_UNDERSTANDING_FILE_TYPE_AVI\x10\x14\x12\'\n#CONTENT_UNDERSTANDING_FILE_TYPE_MKV\x10\x15\x12(\n$CONTENT_UNDERSTANDING_FILE_TYPE_WEBM\x10\x16\x12\'\n#CONTENT_UNDERSTANDING_FILE_TYPE_FLV\x10\x17\x12\'\n#CONTENT_UNDERSTANDING_FILE_TYPE_WMV\x10\x18\x12\'\n#CONTENT_UNDERSTANDING_FILE_TYPE_WAV\x10\x19\x12\'\n#CONTENT_UNDERSTANDING_FILE_TYPE_MP3\x10\x1a\x12\'\n#CONTENT_UNDERSTANDING_FILE_TYPE_M4A\x10\x1b\x12(\n$CONTENT_UNDERSTANDING_FILE_TYPE_FLAC\x10\x1c\x12\'\n#CONTENT_UNDERSTANDING_FILE_TYPE_OGG\x10\x1d\x12\'\n#CONTENT_UNDERSTANDING_FILE_TYPE_AAC\x10\x1e\x12\'\n#CONTENT_UNDERSTANDING_FILE_TYPE_WMA\x10\x1f\x32\xb1\x02\n\x11MarkItDownService\x12H\n\x07\x43onvert\x12\x1d.markitdown.v1.ConvertRequest\x1a\x1e.markitdown.v1.ConvertResponse\x12\\\n\rConvertStream\x12#.markitdown.v1.ConvertStreamRequest\x1a$.markitdown.v1.ConvertStreamResponse0\x01\x12t\n\x15\x43onvertDocumentStream\x12+.markitdown.v1.ConvertDocumentStreamRequest\x1a,.markitdown.v1.ConvertDocumentStreamResponse0\x01\x62\x06proto3' +) + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages( + DESCRIPTOR, "markitdown.v1.markitdown_pb2", _globals +) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals["_CONTENTUNDERSTANDINGFILETYPE"]._serialized_start = 3384 + _globals["_CONTENTUNDERSTANDINGFILETYPE"]._serialized_end = 4742 + _globals["_CONVERTREQUEST"]._serialized_start = 50 + _globals["_CONVERTREQUEST"]._serialized_end = 223 + _globals["_CONVERTSTREAMREQUEST"]._serialized_start = 226 + _globals["_CONVERTSTREAMREQUEST"]._serialized_end = 465 + _globals["_CONVERTDOCUMENTSTREAMREQUEST"]._serialized_start = 468 + _globals["_CONVERTDOCUMENTSTREAMREQUEST"]._serialized_end = 715 + _globals["_SOURCE"]._serialized_start = 717 + _globals["_SOURCE"]._serialized_end = 838 + _globals["_STREAMINFO"]._serialized_start = 841 + _globals["_STREAMINFO"]._serialized_end = 1063 + _globals["_CONVERSIONOPTIONS"]._serialized_start = 1065 + _globals["_CONVERSIONOPTIONS"]._serialized_end = 1132 + _globals["_SERVICEOPTIONS"]._serialized_start = 1135 + _globals["_SERVICEOPTIONS"]._serialized_end = 1399 + _globals["_DOCUMENTINTELLIGENCEOPTIONS"]._serialized_start = 1401 + _globals["_DOCUMENTINTELLIGENCEOPTIONS"]._serialized_end = 1448 + _globals["_CONTENTUNDERSTANDINGOPTIONS"]._serialized_start = 1451 + _globals["_CONTENTUNDERSTANDINGOPTIONS"]._serialized_end = 1605 + _globals["_STREAMINGOPTIONS"]._serialized_start = 1608 + _globals["_STREAMINGOPTIONS"]._serialized_end = 1764 + _globals["_CONVERTRESPONSE"]._serialized_start = 1766 + _globals["_CONVERTRESPONSE"]._serialized_end = 1832 + _globals["_CONVERSIONRESULT"]._serialized_start = 1834 + _globals["_CONVERSIONRESULT"]._serialized_end = 1900 + _globals["_CONVERTSTREAMRESPONSE"]._serialized_start = 1903 + _globals["_CONVERTSTREAMRESPONSE"]._serialized_end = 2101 + _globals["_CONVERTDOCUMENTSTREAMRESPONSE"]._serialized_start = 2104 + _globals["_CONVERTDOCUMENTSTREAMRESPONSE"]._serialized_end = 2309 + _globals["_CONVERSIONSTARTED"]._serialized_start = 2311 + _globals["_CONVERSIONSTARTED"]._serialized_end = 2351 + _globals["_MARKDOWNCHUNK"]._serialized_start = 2353 + _globals["_MARKDOWNCHUNK"]._serialized_end = 2424 + _globals["_CONVERSIONCOMPLETED"]._serialized_start = 2426 + _globals["_CONVERSIONCOMPLETED"]._serialized_end = 2499 + _globals["_DOCUMENTSTREAMCOMPLETED"]._serialized_start = 2501 + _globals["_DOCUMENTSTREAMCOMPLETED"]._serialized_end = 2580 + _globals["_DOCUMENTELEMENT"]._serialized_start = 2583 + _globals["_DOCUMENTELEMENT"]._serialized_end = 2997 + _globals["_HEADING"]._serialized_start = 2999 + _globals["_HEADING"]._serialized_end = 3037 + _globals["_PARAGRAPH"]._serialized_start = 3039 + _globals["_PARAGRAPH"]._serialized_end = 3064 + _globals["_TABLE"]._serialized_start = 3066 + _globals["_TABLE"]._serialized_end = 3130 + _globals["_TABLEROW"]._serialized_start = 3132 + _globals["_TABLEROW"]._serialized_end = 3157 + _globals["_LISTBLOCK"]._serialized_start = 3159 + _globals["_LISTBLOCK"]._serialized_end = 3220 + _globals["_CODEBLOCK"]._serialized_start = 3222 + _globals["_CODEBLOCK"]._serialized_end = 3265 + _globals["_IMAGE"]._serialized_start = 3267 + _globals["_IMAGE"]._serialized_end = 3335 + _globals["_BLOCKQUOTE"]._serialized_start = 3337 + _globals["_BLOCKQUOTE"]._serialized_end = 3363 + _globals["_HORIZONTALRULE"]._serialized_start = 3365 + _globals["_HORIZONTALRULE"]._serialized_end = 3381 + _globals["_MARKITDOWNSERVICE"]._serialized_start = 4745 + _globals["_MARKITDOWNSERVICE"]._serialized_end = 5050 +# @@protoc_insertion_point(module_scope) diff --git a/packages/markitdown/src/markitdown/grpc/v1/markitdown_pb2_grpc.py b/packages/markitdown/src/markitdown/grpc/v1/markitdown_pb2_grpc.py new file mode 100644 index 000000000..e77b68b14 --- /dev/null +++ b/packages/markitdown/src/markitdown/grpc/v1/markitdown_pb2_grpc.py @@ -0,0 +1,238 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" + +import grpc +import warnings + +from markitdown.grpc.v1 import markitdown_pb2 as markitdown_dot_v1_dot_markitdown__pb2 + +GRPC_GENERATED_VERSION = "1.81.0" +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + + _version_not_supported = first_version_is_lower( + GRPC_VERSION, GRPC_GENERATED_VERSION + ) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f"The grpc package installed is at version {GRPC_VERSION}," + + " but the generated code in markitdown/v1/markitdown_pb2_grpc.py depends on" + + f" grpcio>={GRPC_GENERATED_VERSION}." + + f" Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}" + + f" or downgrade your generated code using grpcio-tools<={GRPC_VERSION}." + ) + + +class MarkItDownServiceStub: + """MarkItDownService converts documents (PDF, Office, HTML, images, audio, + and more) to Markdown suitable for LLM and text-analysis pipelines. + + Security note: the server performs I/O with the privileges of the server + process. `Source.local_path` and `Source.uri` are resolved server-side, so + requests must come from trusted callers only. See the project README's + "Security Considerations" section. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.Convert = channel.unary_unary( + "/markitdown.v1.MarkItDownService/Convert", + request_serializer=markitdown_dot_v1_dot_markitdown__pb2.ConvertRequest.SerializeToString, + response_deserializer=markitdown_dot_v1_dot_markitdown__pb2.ConvertResponse.FromString, + _registered_method=True, + ) + self.ConvertStream = channel.unary_stream( + "/markitdown.v1.MarkItDownService/ConvertStream", + request_serializer=markitdown_dot_v1_dot_markitdown__pb2.ConvertStreamRequest.SerializeToString, + response_deserializer=markitdown_dot_v1_dot_markitdown__pb2.ConvertStreamResponse.FromString, + _registered_method=True, + ) + self.ConvertDocumentStream = channel.unary_stream( + "/markitdown.v1.MarkItDownService/ConvertDocumentStream", + request_serializer=markitdown_dot_v1_dot_markitdown__pb2.ConvertDocumentStreamRequest.SerializeToString, + response_deserializer=markitdown_dot_v1_dot_markitdown__pb2.ConvertDocumentStreamResponse.FromString, + _registered_method=True, + ) + + +class MarkItDownServiceServicer: + """MarkItDownService converts documents (PDF, Office, HTML, images, audio, + and more) to Markdown suitable for LLM and text-analysis pipelines. + + Security note: the server performs I/O with the privileges of the server + process. `Source.local_path` and `Source.uri` are resolved server-side, so + requests must come from trusted callers only. See the project README's + "Security Considerations" section. + """ + + def Convert(self, request, context): + """Convert performs a conversion and returns the complete Markdown + result in a single response message. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def ConvertStream(self, request, context): + """ConvertStream performs a conversion and delivers the Markdown result + as an ordered stream of chunks. The conversion itself completes before + chunking begins; streaming reduces time-to-first-byte on the wire and + keeps individual messages small, but does not stream partial conversion + progress. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def ConvertDocumentStream(self, request, context): + """ConvertDocumentStream performs a conversion and delivers the result as + an ordered stream of structured document elements (headings, paragraphs, + tables, lists, code blocks, images, ...). This lets downstream consumers + process document structure incrementally without re-parsing Markdown. + + Segmentation is performed on the converted Markdown and is best-effort: + constructs that do not match a known block type are delivered as + paragraphs. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + +def add_MarkItDownServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + "Convert": grpc.unary_unary_rpc_method_handler( + servicer.Convert, + request_deserializer=markitdown_dot_v1_dot_markitdown__pb2.ConvertRequest.FromString, + response_serializer=markitdown_dot_v1_dot_markitdown__pb2.ConvertResponse.SerializeToString, + ), + "ConvertStream": grpc.unary_stream_rpc_method_handler( + servicer.ConvertStream, + request_deserializer=markitdown_dot_v1_dot_markitdown__pb2.ConvertStreamRequest.FromString, + response_serializer=markitdown_dot_v1_dot_markitdown__pb2.ConvertStreamResponse.SerializeToString, + ), + "ConvertDocumentStream": grpc.unary_stream_rpc_method_handler( + servicer.ConvertDocumentStream, + request_deserializer=markitdown_dot_v1_dot_markitdown__pb2.ConvertDocumentStreamRequest.FromString, + response_serializer=markitdown_dot_v1_dot_markitdown__pb2.ConvertDocumentStreamResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + "markitdown.v1.MarkItDownService", rpc_method_handlers + ) + server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers( + "markitdown.v1.MarkItDownService", rpc_method_handlers + ) + + +# This class is part of an EXPERIMENTAL API. +class MarkItDownService: + """MarkItDownService converts documents (PDF, Office, HTML, images, audio, + and more) to Markdown suitable for LLM and text-analysis pipelines. + + Security note: the server performs I/O with the privileges of the server + process. `Source.local_path` and `Source.uri` are resolved server-side, so + requests must come from trusted callers only. See the project README's + "Security Considerations" section. + """ + + @staticmethod + def Convert( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/markitdown.v1.MarkItDownService/Convert", + markitdown_dot_v1_dot_markitdown__pb2.ConvertRequest.SerializeToString, + markitdown_dot_v1_dot_markitdown__pb2.ConvertResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True, + ) + + @staticmethod + def ConvertStream( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_stream( + request, + target, + "/markitdown.v1.MarkItDownService/ConvertStream", + markitdown_dot_v1_dot_markitdown__pb2.ConvertStreamRequest.SerializeToString, + markitdown_dot_v1_dot_markitdown__pb2.ConvertStreamResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True, + ) + + @staticmethod + def ConvertDocumentStream( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_stream( + request, + target, + "/markitdown.v1.MarkItDownService/ConvertDocumentStream", + markitdown_dot_v1_dot_markitdown__pb2.ConvertDocumentStreamRequest.SerializeToString, + markitdown_dot_v1_dot_markitdown__pb2.ConvertDocumentStreamResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True, + ) diff --git a/packages/markitdown/src/markitdown/streaming/__init__.py b/packages/markitdown/src/markitdown/streaming/__init__.py new file mode 100644 index 000000000..174526286 --- /dev/null +++ b/packages/markitdown/src/markitdown/streaming/__init__.py @@ -0,0 +1,27 @@ +"""EXPERIMENTAL: incremental (streaming) document conversion. + +This package provides converters that yield Markdown fragments while the +source document is still being processed — one fragment per PDF page or +PPTX slide — instead of returning the complete document at once. It reuses +the extraction logic of the standard converters but drives it with its own +controller, so the stable `DocumentConverter` contract (and the plugin API +built on it) is untouched. + +The full document is equivalent to joining the stripped fragments with a +blank line. See `PdfStreamingConverter` for the documented output +differences vs the standard PDF converter. + +This API is experimental: names and behavior may change between releases. +""" + +from ._base import StreamingDocumentConverter +from ._controller import StreamingConverterController +from ._pdf_streaming_converter import PdfStreamingConverter +from ._pptx_streaming_converter import PptxStreamingConverter + +__all__ = [ + "StreamingDocumentConverter", + "StreamingConverterController", + "PdfStreamingConverter", + "PptxStreamingConverter", +] diff --git a/packages/markitdown/src/markitdown/streaming/_base.py b/packages/markitdown/src/markitdown/streaming/_base.py new file mode 100644 index 000000000..b4d34a3cb --- /dev/null +++ b/packages/markitdown/src/markitdown/streaming/_base.py @@ -0,0 +1,52 @@ +"""Base class for experimental streaming converters.""" + +from __future__ import annotations + +from typing import Any, BinaryIO, Iterator + +from .._stream_info import StreamInfo + + +class StreamingDocumentConverter: + """EXPERIMENTAL: Abstract superclass of incremental document converters. + + Unlike :class:`markitdown.DocumentConverter`, which returns the complete + Markdown in one result, a streaming converter yields Markdown fragments + (e.g. one per page or slide) as the source document is processed. The + full document is equivalent to joining the stripped fragments with a + blank line (``"\\n\\n"``). + + This API is experimental and may change between releases. + """ + + def accepts( + self, + file_stream: BinaryIO, + stream_info: StreamInfo, + **kwargs: Any, + ) -> bool: + """Return True if this converter can stream the given document. + + Implementations may peek at `file_stream` (e.g. to check magic + bytes) but MUST reset the stream position before returning. + """ + raise NotImplementedError( + f"{type(self).__name__} must implement the accepts() method." + ) + + def iter_markdown( + self, + file_stream: BinaryIO, + stream_info: StreamInfo, + **kwargs: Any, + ) -> Iterator[str]: + """Yield Markdown fragments in document order. + + Raises: + - FileConversionException: when the document cannot be converted. + - MissingDependencyException: when a required optional dependency + is not installed. + """ + raise NotImplementedError( + f"{type(self).__name__} must implement the iter_markdown() method." + ) diff --git a/packages/markitdown/src/markitdown/streaming/_controller.py b/packages/markitdown/src/markitdown/streaming/_controller.py new file mode 100644 index 000000000..43688dc39 --- /dev/null +++ b/packages/markitdown/src/markitdown/streaming/_controller.py @@ -0,0 +1,75 @@ +"""EXPERIMENTAL: dispatch controller for streaming converters.""" + +from __future__ import annotations + +import re +from typing import Any, BinaryIO, Iterable, Iterator, List, Optional + +from .._stream_info import StreamInfo +from ._base import StreamingDocumentConverter +from ._pdf_streaming_converter import PdfStreamingConverter +from ._pptx_streaming_converter import PptxStreamingConverter + + +class StreamingConverterController: + """EXPERIMENTAL: Routes documents to a streaming converter when one + supports the format, mirroring how `MarkItDown` routes to standard + converters. + + Unlike `MarkItDown`, there is no mid-stream fallback: once a streaming + converter starts yielding fragments, output has already been delivered, + so a later failure surfaces as an error rather than a retry with another + converter. Callers that need fallback behavior should check + `converter_for()` first and use the standard conversion path when it + returns None. + """ + + def __init__( + self, converters: Optional[List[StreamingDocumentConverter]] = None + ) -> None: + if converters is None: + converters = [PdfStreamingConverter(), PptxStreamingConverter()] + self._converters = list(converters) + + def converter_for( + self, + file_stream: BinaryIO, + stream_info: StreamInfo, + **kwargs: Any, + ) -> Optional[StreamingDocumentConverter]: + """Return the first converter that accepts the document, or None.""" + for converter in self._converters: + if converter.accepts(file_stream, stream_info, **kwargs): + return converter + return None + + def iter_markdown( + self, + file_stream: BinaryIO, + stream_info: StreamInfo, + **kwargs: Any, + ) -> Optional[Iterator[str]]: + """Stream the document if a converter supports it, else return None. + + Fragments are normalized the same way `MarkItDown._convert` + normalizes complete results, so joining the fragments with a blank + line reproduces the standard conversion output. + """ + converter = self.converter_for(file_stream, stream_info, **kwargs) + if converter is None: + return None + return _normalized(converter.iter_markdown(file_stream, stream_info, **kwargs)) + + +def _normalized(fragments: Iterable[str]) -> Iterator[str]: + """Apply MarkItDown's result normalization to each fragment. + + Mirrors the post-processing in `MarkItDown._convert`: trailing + whitespace is removed from every line, and runs of three or more + newlines collapse to two. Empty fragments are dropped. + """ + for fragment in fragments: + fragment = "\n".join(line.rstrip() for line in re.split(r"\r?\n", fragment)) + fragment = re.sub(r"\n{3,}", "\n\n", fragment).strip() + if fragment: + yield fragment diff --git a/packages/markitdown/src/markitdown/streaming/_pdf_streaming_converter.py b/packages/markitdown/src/markitdown/streaming/_pdf_streaming_converter.py new file mode 100644 index 000000000..e3eeede30 --- /dev/null +++ b/packages/markitdown/src/markitdown/streaming/_pdf_streaming_converter.py @@ -0,0 +1,93 @@ +"""EXPERIMENTAL: page-by-page streaming conversion for PDF documents.""" + +from __future__ import annotations + +import io +import sys +from typing import Any, BinaryIO, Iterator + +from .._exceptions import MISSING_DEPENDENCY_MESSAGE, MissingDependencyException +from .._stream_info import StreamInfo +from ..converters._pdf_converter import ( + ACCEPTED_FILE_EXTENSIONS, + ACCEPTED_MIME_TYPE_PREFIXES, + _extract_form_content_from_words, + _merge_partial_numbering_lines, +) +from ._base import StreamingDocumentConverter + +_dependency_exc_info = None +try: + import pdfplumber +except ImportError: + _dependency_exc_info = sys.exc_info() + +_PDF_MAGIC = b"%PDF-" + + +class PdfStreamingConverter(StreamingDocumentConverter): + """EXPERIMENTAL: Converts PDFs to Markdown one page at a time. + + Reuses the per-page extraction logic of the standard + :class:`markitdown.converters.PdfConverter` (form/table detection via + pdfplumber, plain-text extraction otherwise). Output differences vs the + standard converter: + + - Pure-prose PDFs: the standard converter re-extracts the whole document + with pdfminer for better spacing; the streaming converter keeps the + per-page pdfplumber text so pages can be yielded as they are read. + - MasterFormat partial-numbering merging is applied per page, so a + numbering item split exactly across a page boundary is not merged. + """ + + def accepts( + self, + file_stream: BinaryIO, + stream_info: StreamInfo, + **kwargs: Any, + ) -> bool: + mimetype = (stream_info.mimetype or "").lower() + extension = (stream_info.extension or "").lower() + + hinted = extension in ACCEPTED_FILE_EXTENSIONS or any( + mimetype.startswith(prefix) for prefix in ACCEPTED_MIME_TYPE_PREFIXES + ) + if not hinted: + return False + + # Verify the magic bytes so mislabeled content falls back to the + # standard conversion path (which re-detects the actual format). + cur_pos = file_stream.tell() + magic = file_stream.read(len(_PDF_MAGIC)) + file_stream.seek(cur_pos) + return magic == _PDF_MAGIC + + def iter_markdown( + self, + file_stream: BinaryIO, + stream_info: StreamInfo, + **kwargs: Any, + ) -> Iterator[str]: + if _dependency_exc_info is not None: + raise MissingDependencyException( + MISSING_DEPENDENCY_MESSAGE.format( + converter=type(self).__name__, + extension=".pdf", + feature="pdf", + ) + ) from _dependency_exc_info[1].with_traceback( + _dependency_exc_info[2] + ) # type: ignore[union-attr] + + pdf_bytes = io.BytesIO(file_stream.read()) + + with pdfplumber.open(pdf_bytes) as pdf: + for page in pdf.pages: + page_content = _extract_form_content_from_words(page) + if page_content is None: + page_content = page.extract_text() or "" + page.close() # Free cached page data immediately + + page_content = _merge_partial_numbering_lines(page_content).strip() + if page_content: + yield page_content diff --git a/packages/markitdown/src/markitdown/streaming/_pptx_streaming_converter.py b/packages/markitdown/src/markitdown/streaming/_pptx_streaming_converter.py new file mode 100644 index 000000000..e7ebffd56 --- /dev/null +++ b/packages/markitdown/src/markitdown/streaming/_pptx_streaming_converter.py @@ -0,0 +1,79 @@ +"""EXPERIMENTAL: slide-by-slide streaming conversion for PPTX documents.""" + +from __future__ import annotations + +import sys +from typing import Any, BinaryIO, Iterator + +from .._exceptions import MISSING_DEPENDENCY_MESSAGE, MissingDependencyException +from .._stream_info import StreamInfo +from ..converters._pptx_converter import ( + ACCEPTED_FILE_EXTENSIONS, + ACCEPTED_MIME_TYPE_PREFIXES, + PptxConverter, +) +from ._base import StreamingDocumentConverter + +_dependency_exc_info = None +try: + import pptx +except ImportError: + _dependency_exc_info = sys.exc_info() + +_ZIP_MAGIC = b"PK\x03\x04" + + +class PptxStreamingConverter(StreamingDocumentConverter): + """EXPERIMENTAL: Converts PPTX presentations to Markdown one slide at a + time. + + Delegates per-slide conversion to the standard + :class:`markitdown.converters.PptxConverter`, so each fragment matches + the standard converter's output for that slide exactly. + """ + + def __init__(self) -> None: + self._converter = PptxConverter() + + def accepts( + self, + file_stream: BinaryIO, + stream_info: StreamInfo, + **kwargs: Any, + ) -> bool: + mimetype = (stream_info.mimetype or "").lower() + extension = (stream_info.extension or "").lower() + + hinted = extension in ACCEPTED_FILE_EXTENSIONS or any( + mimetype.startswith(prefix) for prefix in ACCEPTED_MIME_TYPE_PREFIXES + ) + if not hinted: + return False + + # PPTX files are ZIP archives; verify the magic bytes so mislabeled + # content falls back to the standard conversion path. + cur_pos = file_stream.tell() + magic = file_stream.read(len(_ZIP_MAGIC)) + file_stream.seek(cur_pos) + return magic == _ZIP_MAGIC + + def iter_markdown( + self, + file_stream: BinaryIO, + stream_info: StreamInfo, + **kwargs: Any, + ) -> Iterator[str]: + if _dependency_exc_info is not None: + raise MissingDependencyException( + MISSING_DEPENDENCY_MESSAGE.format( + converter=type(self).__name__, + extension=".pptx", + feature="pptx", + ) + ) from _dependency_exc_info[1].with_traceback( + _dependency_exc_info[2] + ) # type: ignore[union-attr] + + presentation = pptx.Presentation(file_stream) + for slide_num, slide in enumerate(presentation.slides, start=1): + yield self._converter._convert_slide(slide, slide_num, **kwargs) diff --git a/packages/markitdown/tests/test_grpc_segmenter.py b/packages/markitdown/tests/test_grpc_segmenter.py new file mode 100644 index 000000000..3863d8cab --- /dev/null +++ b/packages/markitdown/tests/test_grpc_segmenter.py @@ -0,0 +1,167 @@ +from markitdown.grpc._segmenter import ( + BlockQuoteBlock, + CodeBlock, + HeadingBlock, + HorizontalRuleBlock, + ImageBlock, + ListBlock, + ParagraphBlock, + TableBlock, + segment_markdown, +) + +SAMPLE_DOCUMENT = """# Quarterly Report + +An **overview** paragraph spanning +two lines. + +## Sales + +| Region | Revenue | +| ------ | ------- | +| East | 100 | +| West | 200 | + +- First item +- Second item + with continuation + +1. Step one +2. Step two + +```python +print("hello") +``` + +> A wise quote +> on two lines. + +![Chart](https://example.com/chart.png "Q3 chart") + +--- + +Closing remarks. +""" + + +def test_segments_full_document_in_order(): + blocks = list(segment_markdown(SAMPLE_DOCUMENT)) + kinds = [type(block).__name__ for block in blocks] + assert kinds == [ + "HeadingBlock", + "ParagraphBlock", + "HeadingBlock", + "TableBlock", + "ListBlock", + "ListBlock", + "CodeBlock", + "BlockQuoteBlock", + "ImageBlock", + "HorizontalRuleBlock", + "ParagraphBlock", + ] + + +def test_heading_levels_and_text(): + blocks = list(segment_markdown("# Title\n\n### Sub")) + assert blocks == [ + HeadingBlock(level=1, text="Title"), + HeadingBlock(level=3, text="Sub"), + ] + + +def test_table_rows_parsed_without_delimiter_row(): + markdown = "| A | B |\n| - | - |\n| 1 | 2 |" + (table,) = segment_markdown(markdown) + assert isinstance(table, TableBlock) + assert table.rows == [["A", "B"], ["1", "2"]] + assert table.markdown == markdown + + +def test_table_with_escaped_pipe(): + (table,) = segment_markdown("| A\\|B | C |\n| --- | --- |\n| 1 | 2 |") + assert table.rows[0] == ["A|B", "C"] + + +def test_unordered_list_items(): + (lst,) = segment_markdown("- one\n- two\n- three") + assert isinstance(lst, ListBlock) + assert not lst.ordered + assert lst.items == ["one", "two", "three"] + + +def test_ordered_list_items(): + (lst,) = segment_markdown("1. one\n2. two") + assert lst.ordered + assert lst.items == ["one", "two"] + + +def test_nested_list_stays_with_parent_item(): + (lst,) = segment_markdown("- parent\n - child\n- sibling") + assert lst.items == ["parent\n- child", "sibling"] + + +def test_adjacent_lists_of_different_type_are_separate_blocks(): + blocks = list(segment_markdown("- bullet\n1. number")) + assert [type(b).__name__ for b in blocks] == ["ListBlock", "ListBlock"] + assert not blocks[0].ordered + assert blocks[1].ordered + + +def test_code_block_language_and_content(): + (code,) = segment_markdown('```python\nprint("x")\n```') + assert code == CodeBlock(language="python", code='print("x")') + + +def test_unterminated_code_block_consumes_remainder(): + (code,) = segment_markdown("```\nline1\nline2") + assert code.code == "line1\nline2" + + +def test_image_block_with_title(): + (image,) = segment_markdown('![Alt text](https://example.com/a.png "The title")') + assert image == ImageBlock( + alt_text="Alt text", url="https://example.com/a.png", title="The title" + ) + + +def test_image_inside_paragraph_is_not_extracted(): + (paragraph,) = segment_markdown("See ![icon](i.png) inline.") + assert isinstance(paragraph, ParagraphBlock) + + +def test_block_quote_text(): + (quote,) = segment_markdown("> line one\n> line two") + assert quote == BlockQuoteBlock(text="line one\nline two") + + +def test_horizontal_rule_variants(): + blocks = list(segment_markdown("---\n\n***\n\n___")) + assert all(isinstance(block, HorizontalRuleBlock) for block in blocks) + + +def test_table_like_line_without_delimiter_is_paragraph(): + (paragraph,) = segment_markdown("| not | a table |") + assert isinstance(paragraph, ParagraphBlock) + + +def test_empty_markdown_yields_nothing(): + assert list(segment_markdown("")) == [] + + +def test_whitespace_only_markdown_yields_nothing(): + assert list(segment_markdown("\n \n\t\n")) == [] + + +def test_no_content_dropped(): + """Every non-blank source line must appear in some block.""" + blocks = list(segment_markdown(SAMPLE_DOCUMENT)) + combined = "\n".join( + getattr(block, "text", "") + + getattr(block, "markdown", "") + + getattr(block, "code", "") + + getattr(block, "alt_text", "") + for block in blocks + ) + for token in ("Quarterly Report", "overview", "East", "Step one", "hello", "wise"): + assert token in combined diff --git a/packages/markitdown/tests/test_grpc_server.py b/packages/markitdown/tests/test_grpc_server.py new file mode 100644 index 000000000..4919568e4 --- /dev/null +++ b/packages/markitdown/tests/test_grpc_server.py @@ -0,0 +1,361 @@ +from __future__ import annotations + +from concurrent import futures +from pathlib import Path + +import grpc +import pytest + +from markitdown.grpc.server import ( + MarkItDownServiceServicer, + _enable_health_and_reflection, +) +from markitdown.grpc.v1 import markitdown_pb2, markitdown_pb2_grpc + + +@pytest.fixture +def grpc_channel(): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=2)) + markitdown_pb2_grpc.add_MarkItDownServiceServicer_to_server( + MarkItDownServiceServicer(), server + ) + _enable_health_and_reflection(server) + port = server.add_insecure_port("127.0.0.1:0") + server.start() + channel = grpc.insecure_channel(f"127.0.0.1:{port}") + try: + yield channel + finally: + channel.close() + server.stop(grace=None) + + +@pytest.fixture +def grpc_client(grpc_channel): + return markitdown_pb2_grpc.MarkItDownServiceStub(grpc_channel) + + +def test_convert_local_file(grpc_client, tmp_path: Path): + sample_path = tmp_path / "sample.txt" + sample_path.write_text("hello\ngrpc\n", encoding="utf-8") + + response = grpc_client.Convert( + markitdown_pb2.ConvertRequest( + source=markitdown_pb2.Source( + local_path=str(sample_path), + stream_info=markitdown_pb2.StreamInfo( + extension=".txt", + mimetype="text/plain", + charset="utf-8", + ), + ), + conversion_options=markitdown_pb2.ConversionOptions(keep_data_uris=False), + service_options=markitdown_pb2.ServiceOptions( + enable_builtins=True, enable_plugins=False + ), + ) + ) + + assert "hello" in response.result.markdown + assert "grpc" in response.result.markdown + + +def test_convert_stream_returns_chunk_sequence(grpc_client): + request = markitdown_pb2.ConvertStreamRequest( + source=markitdown_pb2.Source( + content=b"one\ntwo\nthree\n", + stream_info=markitdown_pb2.StreamInfo( + extension=".txt", mimetype="text/plain", charset="utf-8" + ), + ), + conversion_options=markitdown_pb2.ConversionOptions(keep_data_uris=False), + service_options=markitdown_pb2.ServiceOptions(enable_builtins=True), + streaming_options=markitdown_pb2.StreamingOptions(markdown_chunk_size_bytes=4), + ) + + stream = list(grpc_client.ConvertStream(request)) + + assert stream[0].HasField("started") + markdown_events = [ + event.markdown_chunk for event in stream if event.HasField("markdown_chunk") + ] + assert len(markdown_events) > 0 + assert stream[-1].HasField("completed") + assert stream[-1].completed.total_chunks == len(markdown_events) + assert "".join(event.markdown for event in markdown_events).startswith("one") + assert markdown_events[-1].is_last + + +def test_convert_requires_source_oneof(grpc_client): + with pytest.raises(grpc.RpcError) as exc_info: + grpc_client.Convert(markitdown_pb2.ConvertRequest()) + + assert exc_info.value.code() == grpc.StatusCode.INVALID_ARGUMENT + + +def test_convert_missing_local_file_returns_not_found(grpc_client): + with pytest.raises(grpc.RpcError) as exc_info: + grpc_client.Convert( + markitdown_pb2.ConvertRequest( + source=markitdown_pb2.Source(local_path="/nonexistent/file.txt") + ) + ) + + assert exc_info.value.code() == grpc.StatusCode.NOT_FOUND + + +def test_convert_invalid_cu_file_type_returns_invalid_argument(grpc_client): + with pytest.raises(grpc.RpcError) as exc_info: + grpc_client.Convert( + markitdown_pb2.ConvertRequest( + source=markitdown_pb2.Source( + content=b"hello", + stream_info=markitdown_pb2.StreamInfo(extension=".txt"), + ), + service_options=markitdown_pb2.ServiceOptions( + content_understanding=markitdown_pb2.ContentUnderstandingOptions( + endpoint="https://example.invalid", + file_types=[999], + ) + ), + ) + ) + + assert exc_info.value.code() == grpc.StatusCode.INVALID_ARGUMENT + assert "999" in exc_info.value.details() + + +def test_convert_stream_yields_started_before_chunks(grpc_client): + request = markitdown_pb2.ConvertStreamRequest( + source=markitdown_pb2.Source( + content=b"hello\n", + stream_info=markitdown_pb2.StreamInfo( + extension=".txt", mimetype="text/plain", charset="utf-8" + ), + ), + conversion_options=markitdown_pb2.ConversionOptions(keep_data_uris=False), + service_options=markitdown_pb2.ServiceOptions(enable_builtins=True), + ) + + stream = list(grpc_client.ConvertStream(request)) + + assert stream[0].HasField("started") + assert stream[0].started.source_kind == "content" + assert any(event.HasField("markdown_chunk") for event in stream) + + +def test_convert_stream_zero_chunk_size_returns_invalid_argument(grpc_client): + request = markitdown_pb2.ConvertStreamRequest( + source=markitdown_pb2.Source( + content=b"hello\n", + stream_info=markitdown_pb2.StreamInfo(extension=".txt"), + ), + streaming_options=markitdown_pb2.StreamingOptions(markdown_chunk_size_bytes=0), + ) + + with pytest.raises(grpc.RpcError) as exc_info: + list(grpc_client.ConvertStream(request)) + + assert exc_info.value.code() == grpc.StatusCode.INVALID_ARGUMENT + + +def test_convert_document_stream_returns_structured_elements(grpc_client): + markdown_source = ( + b"# Title\n" + b"\n" + b"Intro paragraph.\n" + b"\n" + b"| A | B |\n" + b"| - | - |\n" + b"| 1 | 2 |\n" + b"\n" + b"- alpha\n" + b"- beta\n" + ) + request = markitdown_pb2.ConvertDocumentStreamRequest( + source=markitdown_pb2.Source( + content=markdown_source, + stream_info=markitdown_pb2.StreamInfo( + extension=".md", mimetype="text/markdown", charset="utf-8" + ), + ), + ) + + stream = list(grpc_client.ConvertDocumentStream(request)) + + assert stream[0].HasField("started") + assert stream[-1].HasField("completed") + + elements = [event.element for event in stream if event.HasField("element")] + assert stream[-1].completed.total_elements == len(elements) + assert [element.element_index for element in elements] == list(range(len(elements))) + + kinds = [element.WhichOneof("kind") for element in elements] + assert "heading" in kinds + assert "paragraph" in kinds + assert "table" in kinds + assert "list" in kinds + + heading = next(e.heading for e in elements if e.WhichOneof("kind") == "heading") + assert heading.level == 1 + assert heading.text == "Title" + + table = next(e.table for e in elements if e.WhichOneof("kind") == "table") + assert [list(row.cells) for row in table.rows] == [["A", "B"], ["1", "2"]] + + list_block = next(e.list for e in elements if e.WhichOneof("kind") == "list") + assert not list_block.ordered + assert list(list_block.items) == ["alpha", "beta"] + + +def test_convert_document_stream_requires_source(grpc_client): + with pytest.raises(grpc.RpcError) as exc_info: + list( + grpc_client.ConvertDocumentStream( + markitdown_pb2.ConvertDocumentStreamRequest() + ) + ) + + assert exc_info.value.code() == grpc.StatusCode.INVALID_ARGUMENT + + +def test_health_service_reports_serving(grpc_channel): + health_pb2 = pytest.importorskip("grpc_health.v1.health_pb2") + health_pb2_grpc = pytest.importorskip("grpc_health.v1.health_pb2_grpc") + + health_stub = health_pb2_grpc.HealthStub(grpc_channel) + response = health_stub.Check( + health_pb2.HealthCheckRequest(service="markitdown.v1.MarkItDownService") + ) + assert response.status == health_pb2.HealthCheckResponse.SERVING + + +def test_convert_stream_incremental_pdf_streams_pages(grpc_client): + pdf_path = ( + Path(__file__).parent / "test_files" / "REPAIR-2022-INV-001_multipage.pdf" + ) + request = markitdown_pb2.ConvertStreamRequest( + source=markitdown_pb2.Source( + content=pdf_path.read_bytes(), + stream_info=markitdown_pb2.StreamInfo(extension=".pdf"), + ), + streaming_options=markitdown_pb2.StreamingOptions( + markdown_chunk_size_bytes=256, experimental_incremental=True + ), + ) + + stream = list(grpc_client.ConvertStream(request)) + + assert stream[0].HasField("started") + chunks = [ + event.markdown_chunk for event in stream if event.HasField("markdown_chunk") + ] + assert len(chunks) > 1 + assert [chunk.chunk_index for chunk in chunks] == list(range(len(chunks))) + assert chunks[-1].is_last + assert all(not chunk.is_last for chunk in chunks[:-1]) + assert stream[-1].HasField("completed") + assert stream[-1].completed.total_chunks == len(chunks) + + # Incremental reassembly must match whole-document conversion exactly + # for documents with table/form pages. + unary = grpc_client.Convert( + markitdown_pb2.ConvertRequest( + source=markitdown_pb2.Source( + content=pdf_path.read_bytes(), + stream_info=markitdown_pb2.StreamInfo(extension=".pdf"), + ) + ) + ) + assert "".join(chunk.markdown for chunk in chunks) == unary.result.markdown + + +def test_convert_document_stream_incremental_pptx(grpc_client): + pptx_path = Path(__file__).parent / "test_files" / "test.pptx" + request = markitdown_pb2.ConvertDocumentStreamRequest( + source=markitdown_pb2.Source( + content=pptx_path.read_bytes(), + stream_info=markitdown_pb2.StreamInfo(extension=".pptx"), + ), + streaming_options=markitdown_pb2.StreamingOptions( + experimental_incremental=True + ), + ) + + stream = list(grpc_client.ConvertDocumentStream(request)) + + assert stream[0].HasField("started") + assert stream[-1].HasField("completed") + elements = [event.element for event in stream if event.HasField("element")] + assert stream[-1].completed.total_elements == len(elements) + assert [element.element_index for element in elements] == list(range(len(elements))) + + kinds = {element.WhichOneof("kind") for element in elements} + assert "heading" in kinds + + +def test_convert_stream_incremental_falls_back_for_unsupported_format(grpc_client): + request = markitdown_pb2.ConvertStreamRequest( + source=markitdown_pb2.Source( + content=b"plain text\n", + stream_info=markitdown_pb2.StreamInfo(extension=".txt"), + ), + streaming_options=markitdown_pb2.StreamingOptions( + experimental_incremental=True + ), + ) + + stream = list(grpc_client.ConvertStream(request)) + + chunks = [ + event.markdown_chunk.markdown + for event in stream + if event.HasField("markdown_chunk") + ] + assert "".join(chunks).startswith("plain text") + assert stream[-1].HasField("completed") + + +def test_large_document_round_trip(tmp_path: Path): + """Documents larger than gRPC's stock 4 MiB limit round-trip by default.""" + from markitdown.grpc import MarkItDownClient + from markitdown.grpc.server import serve + + import socket + + with socket.socket() as sock: + sock.bind(("127.0.0.1", 0)) + port = sock.getsockname()[1] + + server = serve(bind_address=f"127.0.0.1:{port}", max_workers=1) + try: + # ~8 MiB of text: above the 4 MiB stock limit in both directions. + large_text = ("lorem ipsum dolor sit amet " * 38 + "\n") * 8192 + content = large_text.encode("utf-8") + assert len(content) > 4 * 1024 * 1024 + + with MarkItDownClient(f"127.0.0.1:{port}") as client: + result = client.convert(content=content, extension=".txt") + + assert len(result.markdown) > 4 * 1024 * 1024 + assert result.markdown.startswith("lorem ipsum") + finally: + server.stop(grace=None) + + +def test_reflection_lists_markitdown_service(grpc_channel): + reflection_pb2 = pytest.importorskip("grpc_reflection.v1alpha.reflection_pb2") + reflection_pb2_grpc = pytest.importorskip( + "grpc_reflection.v1alpha.reflection_pb2_grpc" + ) + + reflection_stub = reflection_pb2_grpc.ServerReflectionStub(grpc_channel) + responses = reflection_stub.ServerReflectionInfo( + iter([reflection_pb2.ServerReflectionRequest(list_services="")]) + ) + services = { + service.name + for response in responses + for service in response.list_services_response.service + } + assert "markitdown.v1.MarkItDownService" in services diff --git a/packages/markitdown/tests/test_streaming_converters.py b/packages/markitdown/tests/test_streaming_converters.py new file mode 100644 index 000000000..eb6002c17 --- /dev/null +++ b/packages/markitdown/tests/test_streaming_converters.py @@ -0,0 +1,124 @@ +import io +from pathlib import Path + +import pytest + +from markitdown import MarkItDown, StreamInfo +from markitdown.streaming import ( + PdfStreamingConverter, + PptxStreamingConverter, + StreamingConverterController, +) + +TEST_FILES_DIR = Path(__file__).parent / "test_files" + +MULTIPAGE_PDF = TEST_FILES_DIR / "REPAIR-2022-INV-001_multipage.pdf" +PROSE_PDF = TEST_FILES_DIR / "test.pdf" +PPTX_FILE = TEST_FILES_DIR / "test.pptx" + + +@pytest.fixture +def controller(): + return StreamingConverterController() + + +def test_pdf_streams_one_fragment_per_content_page(controller): + with open(MULTIPAGE_PDF, "rb") as f: + fragments = list(controller.iter_markdown(f, StreamInfo(extension=".pdf"))) + + assert len(fragments) == 3 + assert all(fragment.strip() for fragment in fragments) + + +def test_pdf_streaming_matches_standard_converter_for_table_documents(controller): + """Documents with form/table pages use the same per-page extraction as + the standard converter, so output is identical.""" + with open(MULTIPAGE_PDF, "rb") as f: + fragments = list(controller.iter_markdown(f, StreamInfo(extension=".pdf"))) + streamed = "\n\n".join(fragments).strip() + + with open(MULTIPAGE_PDF, "rb") as f: + full = ( + MarkItDown() + .convert_stream(f, stream_info=StreamInfo(extension=".pdf")) + .markdown + ) + + assert streamed == full + + +def test_pptx_streams_one_fragment_per_slide(controller): + with open(PPTX_FILE, "rb") as f: + fragments = list(controller.iter_markdown(f, StreamInfo(extension=".pptx"))) + + assert len(fragments) > 1 + for slide_num, fragment in enumerate(fragments, start=1): + assert f"<!-- Slide number: {slide_num} -->" in fragment + + +def test_pptx_streaming_matches_standard_converter_exactly(controller): + with open(PPTX_FILE, "rb") as f: + fragments = list(controller.iter_markdown(f, StreamInfo(extension=".pptx"))) + streamed = "\n\n".join(fragments).strip() + + with open(PPTX_FILE, "rb") as f: + full = ( + MarkItDown() + .convert_stream(f, stream_info=StreamInfo(extension=".pptx")) + .markdown + ) + + assert streamed == full + + +def test_prose_pdf_still_produces_content(controller): + """Prose PDFs stream per-page text (the standard converter re-extracts + them in one pass, so whitespace may differ — content must not).""" + with open(PROSE_PDF, "rb") as f: + fragments = list(controller.iter_markdown(f, StreamInfo(extension=".pdf"))) + + assert fragments + combined = "\n\n".join(fragments) + assert "While there is contemporaneous exploration" in combined + + +def test_unknown_format_returns_none(controller): + stream = io.BytesIO(b"plain text content") + assert controller.iter_markdown(stream, StreamInfo(extension=".txt")) is None + + +def test_mislabeled_pdf_rejected_by_magic_check(controller): + stream = io.BytesIO(b"this is not a pdf") + assert controller.converter_for(stream, StreamInfo(extension=".pdf")) is None + # The accepts() probe must not consume the stream. + assert stream.tell() == 0 + + +def test_mislabeled_pptx_rejected_by_magic_check(controller): + stream = io.BytesIO(b"this is not a zip archive") + assert controller.converter_for(stream, StreamInfo(extension=".pptx")) is None + assert stream.tell() == 0 + + +def test_pdf_accepts_by_mimetype(): + converter = PdfStreamingConverter() + with open(MULTIPAGE_PDF, "rb") as f: + assert converter.accepts(f, StreamInfo(mimetype="application/pdf")) + assert f.tell() == 0 + + +def test_pptx_accepts_by_mimetype(): + converter = PptxStreamingConverter() + with open(PPTX_FILE, "rb") as f: + assert converter.accepts( + f, + StreamInfo( + mimetype="application/vnd.openxmlformats-officedocument.presentationml.presentation" + ), + ) + assert f.tell() == 0 + + +def test_no_hints_returns_none(controller): + with open(MULTIPAGE_PDF, "rb") as f: + assert controller.converter_for(f, StreamInfo()) is None