Files
ht-docker-ai/image_support_files/paddleocr_vl_full_server.py

637 lines
20 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
PaddleOCR-VL Full Pipeline API Server (Transformers backend)
Provides REST API for document parsing using:
- PP-DocLayoutV2 for layout detection
- PaddleOCR-VL (transformers) for recognition
- Structured JSON/Markdown output
"""
import os
import io
import re
import base64
import logging
import tempfile
import time
import json
from typing import Optional, List, Union
from pathlib import Path
from fastapi import FastAPI, HTTPException, UploadFile, File, Form
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from PIL import Image
import torch
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Environment configuration
SERVER_HOST = os.environ.get('SERVER_HOST', '0.0.0.0')
SERVER_PORT = int(os.environ.get('SERVER_PORT', '8000'))
MODEL_NAME = "PaddlePaddle/PaddleOCR-VL"
# Device configuration
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"Using device: {DEVICE}")
# Task prompts
TASK_PROMPTS = {
"ocr": "OCR:",
"table": "Table Recognition:",
"formula": "Formula Recognition:",
"chart": "Chart Recognition:",
}
# Initialize FastAPI app
app = FastAPI(
title="PaddleOCR-VL Full Pipeline Server",
description="Document parsing with PP-DocLayoutV2 + PaddleOCR-VL (transformers)",
version="1.0.0"
)
# Global model instances
vl_model = None
vl_processor = None
layout_model = None
def load_vl_model():
"""Load the PaddleOCR-VL model for element recognition"""
global vl_model, vl_processor
if vl_model is not None:
return
logger.info(f"Loading PaddleOCR-VL model: {MODEL_NAME}")
from transformers import AutoModelForCausalLM, AutoProcessor
vl_processor = AutoProcessor.from_pretrained(MODEL_NAME, trust_remote_code=True)
if DEVICE == "cuda":
vl_model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
trust_remote_code=True,
torch_dtype=torch.bfloat16,
).to(DEVICE).eval()
else:
vl_model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
trust_remote_code=True,
torch_dtype=torch.float32,
low_cpu_mem_usage=True,
).eval()
logger.info("PaddleOCR-VL model loaded successfully")
def load_layout_model():
"""Load the LayoutDetection model for layout detection"""
global layout_model
if layout_model is not None:
return
try:
logger.info("Loading LayoutDetection model (PP-DocLayout_plus-L)...")
from paddleocr import LayoutDetection
layout_model = LayoutDetection()
logger.info("LayoutDetection model loaded successfully")
except Exception as e:
logger.warning(f"Could not load LayoutDetection: {e}")
logger.info("Falling back to VL-only mode (no layout detection)")
def recognize_element(image: Image.Image, task: str = "ocr") -> str:
"""Recognize a single element using PaddleOCR-VL"""
load_vl_model()
prompt = TASK_PROMPTS.get(task, TASK_PROMPTS["ocr"])
messages = [
{
"role": "user",
"content": [
{"type": "image", "image": image},
{"type": "text", "text": prompt},
]
}
]
inputs = vl_processor.apply_chat_template(
messages,
tokenize=True,
add_generation_prompt=True,
return_dict=True,
return_tensors="pt"
)
if DEVICE == "cuda":
inputs = {k: v.to(DEVICE) for k, v in inputs.items()}
with torch.inference_mode():
outputs = vl_model.generate(
**inputs,
max_new_tokens=4096,
do_sample=False,
use_cache=True
)
response = vl_processor.batch_decode(outputs, skip_special_tokens=True)[0]
# Extract only the assistant's response content
# The response format is: "User: <prompt>\nAssistant: <content>"
# We want to extract just the content after "Assistant:"
if "Assistant:" in response:
parts = response.split("Assistant:")
if len(parts) > 1:
response = parts[-1].strip()
elif "assistant:" in response.lower():
# Case-insensitive fallback
import re
match = re.split(r'[Aa]ssistant:', response)
if len(match) > 1:
response = match[-1].strip()
return response
def detect_layout(image: Image.Image) -> List[dict]:
"""Detect layout regions in the image"""
load_layout_model()
if layout_model is None:
# No layout model - return a single region covering the whole image
return [{
"type": "text",
"bbox": [0, 0, image.width, image.height],
"score": 1.0
}]
# Save image to temp file
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp:
image.save(tmp.name, "PNG")
tmp_path = tmp.name
try:
results = layout_model.predict(tmp_path)
regions = []
for res in results:
# LayoutDetection returns boxes in 'boxes' key
for box in res.get("boxes", []):
coord = box.get("coordinate", [0, 0, image.width, image.height])
# Convert numpy floats to regular floats
bbox = [float(c) for c in coord]
regions.append({
"type": box.get("label", "text"),
"bbox": bbox,
"score": float(box.get("score", 1.0))
})
# Sort regions by vertical position (top to bottom)
regions.sort(key=lambda r: r["bbox"][1])
return regions if regions else [{
"type": "text",
"bbox": [0, 0, image.width, image.height],
"score": 1.0
}]
finally:
os.unlink(tmp_path)
def process_document(image: Image.Image) -> dict:
"""Process a document through the full pipeline"""
logger.info(f"Processing document: {image.size}")
# Step 1: Detect layout
regions = detect_layout(image)
logger.info(f"Detected {len(regions)} layout regions")
# Step 2: Recognize each region
blocks = []
for i, region in enumerate(regions):
region_type = region["type"].lower()
bbox = region["bbox"]
# Crop region from image
x1, y1, x2, y2 = [int(c) for c in bbox]
region_image = image.crop((x1, y1, x2, y2))
# Determine task based on region type
if "table" in region_type:
task = "table"
elif "formula" in region_type or "math" in region_type:
task = "formula"
elif "chart" in region_type or "figure" in region_type:
task = "chart"
else:
task = "ocr"
# Recognize the region
try:
content = recognize_element(region_image, task)
blocks.append({
"index": i,
"type": region_type,
"bbox": bbox,
"content": content,
"task": task
})
logger.info(f" Region {i} ({region_type}): {len(content)} chars")
except Exception as e:
logger.error(f" Region {i} error: {e}")
blocks.append({
"index": i,
"type": region_type,
"bbox": bbox,
"content": "",
"error": str(e)
})
return {"blocks": blocks, "image_size": list(image.size)}
def result_to_markdown(result: dict) -> str:
"""Convert result to Markdown format with structural hints for LLM processing.
Adds positional and type-based formatting to help downstream LLMs
understand document structure:
- Tables are marked with **[TABLE]** prefix
- Header zone content (top 15%) is bolded
- Footer zone content (bottom 15%) is separated with horizontal rule
- Titles are formatted as # headers
- Figures/charts are marked with *[Figure: ...]*
"""
lines = []
image_height = result.get("image_size", [0, 1000])[1]
for block in result.get("blocks", []):
block_type = block.get("type", "text").lower()
content = block.get("content", "").strip()
bbox = block.get("bbox", [])
if not content:
continue
# Determine position zone (top 15%, middle, bottom 15%)
y_pos = bbox[1] if bbox and len(bbox) > 1 else 0
y_end = bbox[3] if bbox and len(bbox) > 3 else y_pos
is_header_zone = y_pos < image_height * 0.15
is_footer_zone = y_end > image_height * 0.85
# Format based on type and position
if "table" in block_type:
lines.append(f"\n**[TABLE]**\n{content}\n")
elif "title" in block_type:
lines.append(f"# {content}")
elif "formula" in block_type or "math" in block_type:
lines.append(f"\n$$\n{content}\n$$\n")
elif "figure" in block_type or "chart" in block_type:
lines.append(f"*[Figure: {content}]*")
elif is_header_zone:
lines.append(f"**{content}**")
elif is_footer_zone:
lines.append(f"---\n{content}")
else:
lines.append(content)
return "\n\n".join(lines)
def parse_markdown_table(content: str) -> str:
"""Convert table content to HTML table.
Handles:
- PaddleOCR-VL format: <fcel>cell<lcel>cell<nl> (detected by <fcel> tags)
- Pipe-delimited tables: | Header | Header |
- Separator rows: |---|---|
- Returns HTML <table> structure
"""
content_stripped = content.strip()
# Check for PaddleOCR-VL table format (<fcel>, <lcel>, <ecel>, <nl>)
if '<fcel>' in content_stripped or '<nl>' in content_stripped:
return parse_paddleocr_table(content_stripped)
lines = content_stripped.split('\n')
if not lines:
return f'<pre>{content}</pre>'
# Check if it looks like a markdown table
if not any('|' in line for line in lines):
return f'<pre>{content}</pre>'
html_rows = []
is_header = True
for line in lines:
line = line.strip()
if not line or line.startswith('|') == False and '|' not in line:
continue
# Skip separator rows (|---|---|)
if re.match(r'^[\|\s\-:]+$', line):
is_header = False
continue
# Parse cells
cells = [c.strip() for c in line.split('|')]
cells = [c for c in cells if c] # Remove empty from edges
if is_header:
row = '<tr>' + ''.join(f'<th>{c}</th>' for c in cells) + '</tr>'
html_rows.append(f'<thead>{row}</thead>')
is_header = False
else:
row = '<tr>' + ''.join(f'<td>{c}</td>' for c in cells) + '</tr>'
html_rows.append(row)
if html_rows:
# Wrap body rows in tbody
header = html_rows[0] if '<thead>' in html_rows[0] else ''
body_rows = [r for r in html_rows if '<thead>' not in r]
body = f'<tbody>{"".join(body_rows)}</tbody>' if body_rows else ''
return f'<table>{header}{body}</table>'
return f'<pre>{content}</pre>'
def parse_paddleocr_table(content: str) -> str:
"""Convert PaddleOCR-VL table format to HTML table.
PaddleOCR-VL uses:
- <fcel> = first cell in a row
- <lcel> = subsequent cells
- <ecel> = empty cell
- <nl> = row separator (newline)
Example input:
<fcel>Header1<lcel>Header2<nl><fcel>Value1<lcel>Value2<nl>
"""
# Split into rows by <nl>
rows_raw = re.split(r'<nl>', content)
html_rows = []
is_first_row = True
for row_content in rows_raw:
row_content = row_content.strip()
if not row_content:
continue
# Extract cells: split by <fcel>, <lcel>, or <ecel>
# Each cell is the text between these markers
cells = []
# Pattern to match cell markers and capture content
# Content is everything between markers
parts = re.split(r'<fcel>|<lcel>|<ecel>', row_content)
for part in parts:
part = part.strip()
if part:
cells.append(part)
if not cells:
continue
# First row is header
if is_first_row:
row_html = '<tr>' + ''.join(f'<th>{c}</th>' for c in cells) + '</tr>'
html_rows.append(f'<thead>{row_html}</thead>')
is_first_row = False
else:
row_html = '<tr>' + ''.join(f'<td>{c}</td>' for c in cells) + '</tr>'
html_rows.append(row_html)
if html_rows:
header = html_rows[0] if '<thead>' in html_rows[0] else ''
body_rows = [r for r in html_rows if '<thead>' not in r]
body = f'<tbody>{"".join(body_rows)}</tbody>' if body_rows else ''
return f'<table>{header}{body}</table>'
return f'<pre>{content}</pre>'
def result_to_html(result: dict) -> str:
"""Convert result to semantic HTML for optimal LLM processing.
Uses semantic HTML5 tags with position metadata as data-* attributes.
Markdown tables are converted to proper HTML <table> tags for
unambiguous parsing by downstream LLMs.
"""
parts = []
image_height = result.get("image_size", [0, 1000])[1]
parts.append('<!DOCTYPE html><html><body>')
for block in result.get("blocks", []):
block_type = block.get("type", "text").lower()
content = block.get("content", "").strip()
bbox = block.get("bbox", [])
if not content:
continue
# Position metadata
y_pos = bbox[1] / image_height if bbox and len(bbox) > 1 else 0
data_attrs = f'data-type="{block_type}" data-y="{y_pos:.2f}"'
# Format based on type
if "table" in block_type:
table_html = parse_markdown_table(content)
parts.append(f'<section {data_attrs} class="table-region">{table_html}</section>')
elif "title" in block_type:
parts.append(f'<h1 {data_attrs}>{content}</h1>')
elif "formula" in block_type or "math" in block_type:
parts.append(f'<div {data_attrs} class="formula"><code>{content}</code></div>')
elif "figure" in block_type or "chart" in block_type:
parts.append(f'<figure {data_attrs}><figcaption>{content}</figcaption></figure>')
elif y_pos < 0.15:
parts.append(f'<header {data_attrs}><strong>{content}</strong></header>')
elif y_pos > 0.85:
parts.append(f'<footer {data_attrs}>{content}</footer>')
else:
parts.append(f'<p {data_attrs}>{content}</p>')
parts.append('</body></html>')
return '\n'.join(parts)
# Request/Response models
class ParseRequest(BaseModel):
image: str # base64 encoded image
output_format: Optional[str] = "json"
class ParseResponse(BaseModel):
success: bool
format: str
result: Union[dict, str]
processing_time: float
error: Optional[str] = None
def decode_image(image_source: str) -> Image.Image:
"""Decode image from base64 or data URL"""
if image_source.startswith("data:"):
header, data = image_source.split(",", 1)
image_data = base64.b64decode(data)
else:
image_data = base64.b64decode(image_source)
return Image.open(io.BytesIO(image_data)).convert("RGB")
@app.on_event("startup")
async def startup_event():
"""Pre-load models on startup"""
logger.info("Starting PaddleOCR-VL Full Pipeline Server...")
try:
load_vl_model()
load_layout_model()
logger.info("Models loaded successfully")
except Exception as e:
logger.error(f"Failed to pre-load models: {e}")
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy" if vl_model is not None else "loading",
"service": "PaddleOCR-VL Full Pipeline (Transformers)",
"device": DEVICE,
"vl_model_loaded": vl_model is not None,
"layout_model_loaded": layout_model is not None
}
@app.get("/formats")
async def supported_formats():
"""List supported output formats"""
return {
"output_formats": ["json", "markdown", "html"],
"image_formats": ["PNG", "JPEG", "WebP", "BMP", "GIF", "TIFF"],
"capabilities": [
"Layout detection (PP-DocLayoutV2)",
"Text recognition (OCR)",
"Table recognition",
"Formula recognition (LaTeX)",
"Chart recognition",
"Multi-language support (109 languages)"
]
}
@app.post("/parse", response_model=ParseResponse)
async def parse_document_endpoint(request: ParseRequest):
"""Parse a document image and return structured output"""
try:
start_time = time.time()
image = decode_image(request.image)
result = process_document(image)
if request.output_format == "markdown":
markdown = result_to_markdown(result)
output = {"markdown": markdown}
elif request.output_format == "html":
html = result_to_html(result)
output = {"html": html}
else:
output = result
elapsed = time.time() - start_time
logger.info(f"Processing complete in {elapsed:.2f}s")
return ParseResponse(
success=True,
format=request.output_format,
result=output,
processing_time=elapsed
)
except Exception as e:
logger.error(f"Error processing document: {e}", exc_info=True)
return ParseResponse(
success=False,
format=request.output_format,
result={},
processing_time=0,
error=str(e)
)
@app.post("/v1/chat/completions")
async def chat_completions(request: dict):
"""OpenAI-compatible chat completions endpoint"""
try:
messages = request.get("messages", [])
output_format = request.get("output_format", "json")
# Find user message with image
image = None
for msg in reversed(messages):
if msg.get("role") == "user":
content = msg.get("content", [])
if isinstance(content, list):
for item in content:
if item.get("type") == "image_url":
url = item.get("image_url", {}).get("url", "")
image = decode_image(url)
break
break
if image is None:
raise HTTPException(status_code=400, detail="No image provided")
start_time = time.time()
result = process_document(image)
if output_format == "markdown":
content = result_to_markdown(result)
elif output_format == "html":
content = result_to_html(result)
else:
content = json.dumps(result, ensure_ascii=False, indent=2)
elapsed = time.time() - start_time
return {
"id": f"chatcmpl-{int(time.time()*1000)}",
"object": "chat.completion",
"created": int(time.time()),
"model": "paddleocr-vl-full",
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": content},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": 100,
"completion_tokens": len(content) // 4,
"total_tokens": 100 + len(content) // 4
},
"processing_time": elapsed
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in chat completions: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host=SERVER_HOST, port=SERVER_PORT)