/** * Bank statement extraction using Nanonets-OCR2-3B + GPT-OSS 20B (sequential two-stage pipeline) * * Stage 1: Nanonets-OCR2-3B converts ALL document pages to markdown (stop after completion) * Stage 2: GPT-OSS 20B extracts structured JSON from saved markdown (after Nanonets stops) * * This approach avoids GPU contention by running services sequentially. */ import { tap, expect } from '@git.zone/tstest/tapbundle'; import * as fs from 'fs'; import * as path from 'path'; import { execSync } from 'child_process'; import * as os from 'os'; import { ensureNanonetsOcr, ensureMiniCpm, isContainerRunning } from './helpers/docker.js'; import { SmartAi } from '@push.rocks/smartai'; import { DualAgentOrchestrator, JsonValidatorTool } from '@push.rocks/smartagent'; const NANONETS_URL = 'http://localhost:8000/v1'; const NANONETS_MODEL = 'nanonets/Nanonets-OCR2-3B'; const OLLAMA_URL = 'http://localhost:11434'; const EXTRACTION_MODEL = 'gpt-oss:20b'; // Temp directory for storing markdown between stages const TEMP_MD_DIR = path.join(os.tmpdir(), 'nanonets-markdown'); // SmartAi instance for Ollama with optimized settings const smartAi = new SmartAi({ ollama: { baseUrl: OLLAMA_URL, model: EXTRACTION_MODEL, defaultOptions: { num_ctx: 32768, // Larger context for long statements + thinking temperature: 0, // Deterministic for JSON extraction }, defaultTimeout: 600000, // 10 minute timeout for large documents }, }); // DualAgentOrchestrator for structured task execution let orchestrator: DualAgentOrchestrator; interface ITransaction { date: string; counterparty: string; amount: number; } interface IImageData { base64: string; width: number; height: number; pageNum: number; } interface ITestCase { name: string; pdfPath: string; jsonPath: string; markdownPath?: string; images?: IImageData[]; } // Nanonets-specific prompt for document OCR to markdown const NANONETS_OCR_PROMPT = `Extract the text from the above document as if you were reading it naturally. Return the tables in html format. Return the equations in LaTeX representation. If there is an image in the document and image caption is not present, add a small description inside tag. Watermarks should be wrapped in brackets. Ex: OFFICIAL COPY. Page numbers should be wrapped in brackets. Ex: 14.`; // JSON extraction prompt for GPT-OSS 20B (sent AFTER the statement text is provided) const JSON_EXTRACTION_PROMPT = `Extract ALL transactions from the bank statement. Return ONLY valid JSON array. WHERE TO FIND DATA: - Transactions are typically in TABLES with columns: Date, Description/Counterparty, Debit, Credit, Balance - Look for rows with actual money movements, NOT header rows or summary totals RULES: 1. date: Convert to YYYY-MM-DD format 2. counterparty: The name/description of who the money went to/from 3. amount: NEGATIVE for debits/withdrawals, POSITIVE for credits/deposits 4. Only include actual transactions, NOT opening/closing balances JSON array only: [{"date":"YYYY-MM-DD","counterparty":"NAME","amount":-25.99}]`; // Constants for smart batching const MAX_VISUAL_TOKENS = 28000; // ~32K context minus prompt/output headroom const PATCH_SIZE = 14; // Qwen2.5-VL uses 14x14 patches /** * Estimate visual tokens for an image based on dimensions */ function estimateVisualTokens(width: number, height: number): number { return Math.ceil((width * height) / (PATCH_SIZE * PATCH_SIZE)); } /** * Process images one page at a time for reliability */ function batchImages(images: IImageData[]): IImageData[][] { // One page per batch for reliable processing return images.map(img => [img]); } /** * Convert PDF to JPEG images using ImageMagick with dimension tracking */ function convertPdfToImages(pdfPath: string): IImageData[] { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'pdf-convert-')); const outputPattern = path.join(tempDir, 'page-%d.jpg'); try { execSync( `convert -density 150 -quality 90 "${pdfPath}" -background white -alpha remove "${outputPattern}"`, { stdio: 'pipe' } ); const files = fs.readdirSync(tempDir).filter((f: string) => f.endsWith('.jpg')).sort(); const images: IImageData[] = []; for (let i = 0; i < files.length; i++) { const file = files[i]; const imagePath = path.join(tempDir, file); const imageData = fs.readFileSync(imagePath); // Get image dimensions using identify command const dimensions = execSync(`identify -format "%w %h" "${imagePath}"`, { encoding: 'utf-8' }).trim(); const [width, height] = dimensions.split(' ').map(Number); images.push({ base64: imageData.toString('base64'), width, height, pageNum: i + 1, }); } return images; } finally { fs.rmSync(tempDir, { recursive: true, force: true }); } } /** * Convert a batch of pages to markdown using Nanonets-OCR-s */ async function convertBatchToMarkdown(batch: IImageData[]): Promise { const startTime = Date.now(); const pageNums = batch.map(img => img.pageNum).join(', '); // Build content array with all images first, then the prompt const content: Array<{ type: string; image_url?: { url: string }; text?: string }> = []; for (const img of batch) { content.push({ type: 'image_url', image_url: { url: `data:image/jpeg;base64,${img.base64}` }, }); } // Add prompt with page separator instruction if multiple pages const promptText = batch.length > 1 ? `${NANONETS_OCR_PROMPT}\n\nPlease clearly separate each page's content with "--- PAGE N ---" markers, where N is the page number starting from ${batch[0].pageNum}.` : NANONETS_OCR_PROMPT; content.push({ type: 'text', text: promptText }); const response = await fetch(`${NANONETS_URL}/chat/completions`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Authorization': 'Bearer dummy', }, body: JSON.stringify({ model: NANONETS_MODEL, messages: [{ role: 'user', content, }], max_tokens: 4096 * batch.length, // Scale output tokens with batch size temperature: 0.0, }), signal: AbortSignal.timeout(600000), // 10 minute timeout for OCR }); const elapsed = ((Date.now() - startTime) / 1000).toFixed(1); if (!response.ok) { const errorText = await response.text(); throw new Error(`Nanonets API error: ${response.status} - ${errorText}`); } const data = await response.json(); let responseContent = (data.choices?.[0]?.message?.content || '').trim(); // For single-page batches, add page marker if not present if (batch.length === 1 && !responseContent.includes('--- PAGE')) { responseContent = `--- PAGE ${batch[0].pageNum} ---\n${responseContent}`; } console.log(` Pages [${pageNums}]: ${responseContent.length} chars (${elapsed}s)`); return responseContent; } /** * Convert all pages of a document to markdown using smart batching */ async function convertDocumentToMarkdown(images: IImageData[], docName: string): Promise { const batches = batchImages(images); console.log(` [${docName}] Processing ${images.length} page(s) in ${batches.length} batch(es)...`); const markdownParts: string[] = []; for (let i = 0; i < batches.length; i++) { const batch = batches[i]; const batchTokens = batch.reduce((sum, img) => sum + estimateVisualTokens(img.width, img.height), 0); console.log(` Batch ${i + 1}: ${batch.length} page(s), ~${batchTokens} tokens`); const markdown = await convertBatchToMarkdown(batch); markdownParts.push(markdown); } const fullMarkdown = markdownParts.join('\n\n'); console.log(` [${docName}] Complete: ${fullMarkdown.length} chars total`); return fullMarkdown; } /** * Stop Nanonets container */ function stopNanonets(): void { console.log(' [Docker] Stopping Nanonets container...'); try { execSync('docker stop nanonets-test 2>/dev/null || true', { stdio: 'pipe' }); // Wait for GPU memory to be released execSync('sleep 5', { stdio: 'pipe' }); console.log(' [Docker] Nanonets stopped'); } catch { console.log(' [Docker] Nanonets was not running'); } } /** * Ensure GPT-OSS 20B model is available and warmed up */ async function ensureExtractionModel(): Promise { try { const response = await fetch(`${OLLAMA_URL}/api/tags`); if (response.ok) { const data = await response.json(); const models = data.models || []; if (models.some((m: { name: string }) => m.name === EXTRACTION_MODEL)) { console.log(` [Ollama] Model available: ${EXTRACTION_MODEL}`); return true; } } } catch { return false; } console.log(` [Ollama] Pulling ${EXTRACTION_MODEL}...`); const pullResponse = await fetch(`${OLLAMA_URL}/api/pull`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ name: EXTRACTION_MODEL, stream: false }), }); return pullResponse.ok; } /** * Try to extract valid JSON from a response string */ function tryExtractJson(response: string): unknown[] | null { // Remove thinking tags let clean = response.replace(/[\s\S]*?<\/think>/g, '').trim(); // Try task_complete tags first const completeMatch = clean.match(/([\s\S]*?)<\/task_complete>/); if (completeMatch) { clean = completeMatch[1].trim(); } // Try code block const codeBlockMatch = clean.match(/```(?:json)?\s*([\s\S]*?)```/); const jsonStr = codeBlockMatch ? codeBlockMatch[1].trim() : clean; try { const parsed = JSON.parse(jsonStr); if (Array.isArray(parsed)) return parsed; } catch { // Try to find JSON array const jsonMatch = jsonStr.match(/\[[\s\S]*\]/); if (jsonMatch) { try { const parsed = JSON.parse(sanitizeJson(jsonMatch[0])); if (Array.isArray(parsed)) return parsed; } catch { return null; } } return null; } return null; } /** * Extract transactions from markdown using smartagent DualAgentOrchestrator * Validates JSON and retries if invalid */ async function extractTransactionsFromMarkdown(markdown: string, queryId: string): Promise { const startTime = Date.now(); console.log(` [${queryId}] Statement: ${markdown.length} chars`); // Build the extraction task with document context const taskPrompt = `Extract all transactions from this bank statement document and output ONLY the JSON array: ${markdown} ${JSON_EXTRACTION_PROMPT} Before completing, validate your JSON using the json.validate tool: json validate {"jsonString": "YOUR_JSON_ARRAY_HERE"} Only complete after validation passes. Output the final JSON array in tags.`; try { const result = await orchestrator.run(taskPrompt); const elapsed = ((Date.now() - startTime) / 1000).toFixed(1); console.log(` [${queryId}] Status: ${result.status}, Iterations: ${result.iterations} (${elapsed}s)`); // Try to parse JSON from result let jsonData: unknown[] | null = null; let responseText = result.result || ''; if (result.success && responseText) { jsonData = tryExtractJson(responseText); } // Fallback: try parsing from history if (!jsonData && result.history?.length > 0) { const lastMessage = result.history[result.history.length - 1]; if (lastMessage?.content) { responseText = lastMessage.content; jsonData = tryExtractJson(responseText); } } if (!jsonData) { console.log(` [${queryId}] Failed to parse JSON`); return []; } // Convert to transactions const txs = jsonData.map((tx: any) => ({ date: String(tx.date || ''), counterparty: String(tx.counterparty || tx.description || ''), amount: parseAmount(tx.amount), })); console.log(` [${queryId}] Parsed ${txs.length} transactions`); return txs; } catch (error) { const elapsed = ((Date.now() - startTime) / 1000).toFixed(1); console.log(` [${queryId}] ERROR: ${error} (${elapsed}s)`); throw error; } } /** * Sanitize JSON string */ function sanitizeJson(jsonStr: string): string { let s = jsonStr; s = s.replace(/"amount"\s*:\s*\+/g, '"amount": '); s = s.replace(/:\s*\+(\d)/g, ': $1'); s = s.replace(/"amount"\s*:\s*(-?)(\d{1,3})\.(\d{3})\.(\d{2})\b/g, '"amount": $1$2$3.$4'); s = s.replace(/,\s*([}\]])/g, '$1'); s = s.replace(/"([^"\\]*)\n([^"]*)"/g, '"$1 $2"'); s = s.replace(/"([^"\\]*)\t([^"]*)"/g, '"$1 $2"'); s = s.replace(/[\x00-\x08\x0B\x0C\x0E-\x1F]/g, ' '); return s; } /** * Parse amount from various formats */ function parseAmount(value: unknown): number { if (typeof value === 'number') return value; if (typeof value !== 'string') return 0; let s = value.replace(/[€$£\s]/g, '').replace('−', '-').replace('–', '-'); if (s.includes(',') && s.indexOf(',') > s.lastIndexOf('.')) { s = s.replace(/\./g, '').replace(',', '.'); } else { s = s.replace(/,/g, ''); } return parseFloat(s) || 0; } /** * Parse JSON response into transactions */ function parseJsonResponse(response: string, queryId: string): ITransaction[] { // Remove thinking tags if present let cleanResponse = response.replace(/[\s\S]*?<\/think>/g, '').trim(); // Debug: show what we're working with console.log(` [${queryId}] Response preview: ${cleanResponse.substring(0, 300)}...`); const codeBlockMatch = cleanResponse.match(/```(?:json)?\s*([\s\S]*?)```/); let jsonStr = codeBlockMatch ? codeBlockMatch[1].trim() : cleanResponse; jsonStr = sanitizeJson(jsonStr); try { const parsed = JSON.parse(jsonStr); if (Array.isArray(parsed)) { const txs = parsed.map(tx => ({ date: String(tx.date || ''), counterparty: String(tx.counterparty || tx.description || ''), amount: parseAmount(tx.amount), })); console.log(` [${queryId}] Parsed ${txs.length} transactions`); return txs; } } catch (e) { // Try to find a JSON array in the text const arrayMatch = jsonStr.match(/\[[\s\S]*\]/); if (arrayMatch) { console.log(` [${queryId}] Array match found: ${arrayMatch[0].length} chars`); try { const parsed = JSON.parse(sanitizeJson(arrayMatch[0])); if (Array.isArray(parsed)) { const txs = parsed.map(tx => ({ date: String(tx.date || ''), counterparty: String(tx.counterparty || tx.description || ''), amount: parseAmount(tx.amount), })); console.log(` [${queryId}] Parsed ${txs.length} transactions (array match)`); return txs; } } catch (innerErr) { console.log(` [${queryId}] Array parse error: ${(innerErr as Error).message}`); } } else { console.log(` [${queryId}] No JSON array found in response`); } } console.log(` [${queryId}] PARSE FAILED`); return []; } /** * Extract transactions (single pass) */ async function extractTransactions(markdown: string, docName: string): Promise { console.log(` [${docName}] Extracting...`); const txs = await extractTransactionsFromMarkdown(markdown, docName); console.log(` [${docName}] Extracted ${txs.length} transactions`); return txs; } /** * Compare transactions */ function compareTransactions( extracted: ITransaction[], expected: ITransaction[] ): { matches: number; total: number; errors: string[] } { const errors: string[] = []; let matches = 0; for (let i = 0; i < expected.length; i++) { const exp = expected[i]; const ext = extracted[i]; if (!ext) { errors.push(`Missing tx ${i}: ${exp.date} ${exp.counterparty}`); continue; } const dateMatch = ext.date === exp.date; const amountMatch = Math.abs(ext.amount - exp.amount) < 0.01; if (dateMatch && amountMatch) { matches++; } else { errors.push(`Mismatch ${i}: exp ${exp.date}/${exp.amount}, got ${ext.date}/${ext.amount}`); } } if (extracted.length > expected.length) { errors.push(`Extra transactions: ${extracted.length - expected.length}`); } return { matches, total: expected.length, errors }; } /** * Find all test cases */ function findTestCases(): ITestCase[] { const testDir = path.join(process.cwd(), '.nogit'); if (!fs.existsSync(testDir)) return []; const files = fs.readdirSync(testDir); const testCases: ITestCase[] = []; for (const pdf of files.filter((f: string) => f.endsWith('.pdf'))) { const baseName = pdf.replace('.pdf', ''); const jsonFile = `${baseName}.json`; if (files.includes(jsonFile)) { testCases.push({ name: baseName, pdfPath: path.join(testDir, pdf), jsonPath: path.join(testDir, jsonFile), }); } } return testCases.sort((a, b) => a.name.localeCompare(b.name)); } // ============ TESTS ============ const testCases = findTestCases(); console.log(`\nFound ${testCases.length} bank statement test cases\n`); // Ensure temp directory exists if (!fs.existsSync(TEMP_MD_DIR)) { fs.mkdirSync(TEMP_MD_DIR, { recursive: true }); } // -------- STAGE 1: OCR with Nanonets -------- // Check if all markdown files already exist function allMarkdownFilesExist(): boolean { for (const tc of testCases) { const mdPath = path.join(TEMP_MD_DIR, `${tc.name}.md`); if (!fs.existsSync(mdPath)) { return false; } } return true; } // Track whether we need to run Stage 1 let stage1Needed = !allMarkdownFilesExist(); tap.test('Stage 1: Setup Nanonets', async () => { console.log('\n========== STAGE 1: Nanonets OCR ==========\n'); if (!stage1Needed) { console.log(' [SKIP] All markdown files already exist, skipping Nanonets setup'); return; } const ok = await ensureNanonetsOcr(); expect(ok).toBeTrue(); }); tap.test('Stage 1: Convert all documents to markdown', async () => { if (!stage1Needed) { console.log(' [SKIP] Using existing markdown files from previous run\n'); // Load existing markdown paths for (const tc of testCases) { tc.markdownPath = path.join(TEMP_MD_DIR, `${tc.name}.md`); console.log(` Loaded: ${tc.markdownPath}`); } return; } console.log('\n Converting all PDFs to markdown with Nanonets-OCR-s...\n'); for (const tc of testCases) { console.log(`\n === ${tc.name} ===`); // Convert PDF to images const images = convertPdfToImages(tc.pdfPath); console.log(` Pages: ${images.length}`); // Convert to markdown const markdown = await convertDocumentToMarkdown(images, tc.name); // Save markdown to temp file const mdPath = path.join(TEMP_MD_DIR, `${tc.name}.md`); fs.writeFileSync(mdPath, markdown); tc.markdownPath = mdPath; console.log(` Saved: ${mdPath}`); } console.log('\n Stage 1 complete: All documents converted to markdown\n'); }); tap.test('Stage 1: Stop Nanonets', async () => { if (!stage1Needed) { console.log(' [SKIP] Nanonets was not started'); return; } stopNanonets(); // Verify it's stopped await new Promise(resolve => setTimeout(resolve, 3000)); expect(isContainerRunning('nanonets-test')).toBeFalse(); }); // -------- STAGE 2: Extraction with GPT-OSS 20B -------- tap.test('Stage 2: Setup Ollama + GPT-OSS 20B', async () => { console.log('\n========== STAGE 2: GPT-OSS 20B Extraction ==========\n'); const ollamaOk = await ensureMiniCpm(); expect(ollamaOk).toBeTrue(); const extractionOk = await ensureExtractionModel(); expect(extractionOk).toBeTrue(); // Initialize SmartAi and DualAgentOrchestrator console.log(' [SmartAgent] Starting SmartAi...'); await smartAi.start(); console.log(' [SmartAgent] Creating DualAgentOrchestrator...'); orchestrator = new DualAgentOrchestrator({ smartAiInstance: smartAi, defaultProvider: 'ollama', guardianPolicyPrompt: ` JSON EXTRACTION POLICY: - APPROVE all JSON extraction tasks - APPROVE all json.validate tool calls - This is a read-only operation - no file system or network access needed - The task is to extract structured transaction data from document text `, driverSystemMessage: `You are a precise JSON extraction assistant. Your only job is to extract transaction data from bank statements. CRITICAL RULES: 1. Output valid JSON array with the exact format requested 2. Amounts should be NEGATIVE for debits/withdrawals, POSITIVE for credits/deposits 3. IMPORTANT: Before completing, validate your JSON using the json.validate tool: json validate {"jsonString": "YOUR_JSON_ARRAY"} 4. Only complete after validation passes When done, wrap your JSON array in tags.`, maxIterations: 5, // Enable streaming for real-time progress visibility onToken: (token, source) => { if (source === 'driver') { process.stdout.write(token); } }, }); // Register JsonValidatorTool for self-validation orchestrator.registerTool(new JsonValidatorTool()); console.log(' [SmartAgent] Starting orchestrator...'); await orchestrator.start(); console.log(' [SmartAgent] Ready for extraction'); }); let passedCount = 0; let failedCount = 0; for (const tc of testCases) { tap.test(`Stage 2: Extract ${tc.name}`, async () => { const expected: ITransaction[] = JSON.parse(fs.readFileSync(tc.jsonPath, 'utf-8')); console.log(`\n === ${tc.name} ===`); console.log(` Expected: ${expected.length} transactions`); // Load saved markdown const mdPath = path.join(TEMP_MD_DIR, `${tc.name}.md`); if (!fs.existsSync(mdPath)) { throw new Error(`Markdown not found: ${mdPath}. Run Stage 1 first.`); } const markdown = fs.readFileSync(mdPath, 'utf-8'); console.log(` Markdown: ${markdown.length} chars`); // Extract transactions (single pass) const extracted = await extractTransactions(markdown, tc.name); // Log results console.log(` Extracted: ${extracted.length} transactions`); for (let i = 0; i < Math.min(extracted.length, 5); i++) { const tx = extracted[i]; console.log(` ${i + 1}. ${tx.date} | ${tx.counterparty.substring(0, 25).padEnd(25)} | ${tx.amount >= 0 ? '+' : ''}${tx.amount.toFixed(2)}`); } if (extracted.length > 5) { console.log(` ... and ${extracted.length - 5} more`); } // Compare const result = compareTransactions(extracted, expected); const pass = result.matches === result.total && extracted.length === expected.length; if (pass) { passedCount++; console.log(` Result: PASS (${result.matches}/${result.total})`); } else { failedCount++; console.log(` Result: FAIL (${result.matches}/${result.total})`); result.errors.slice(0, 5).forEach(e => console.log(` - ${e}`)); } expect(result.matches).toEqual(result.total); expect(extracted.length).toEqual(expected.length); }); } tap.test('Summary', async () => { // Cleanup orchestrator and SmartAi if (orchestrator) { console.log('\n [SmartAgent] Stopping orchestrator...'); await orchestrator.stop(); } console.log(' [SmartAgent] Stopping SmartAi...'); await smartAi.stop(); console.log(`\n======================================================`); console.log(` Bank Statement Summary (Nanonets + SmartAgent)`); console.log(`======================================================`); console.log(` Stage 1: Nanonets-OCR-s (document -> markdown)`); console.log(` Stage 2: GPT-OSS 20B + SmartAgent (markdown -> JSON)`); console.log(` Passed: ${passedCount}/${testCases.length}`); console.log(` Failed: ${failedCount}/${testCases.length}`); console.log(`======================================================\n`); // Only cleanup temp files if ALL tests passed if (failedCount === 0 && passedCount === testCases.length) { try { fs.rmSync(TEMP_MD_DIR, { recursive: true, force: true }); console.log(` Cleaned up temp directory: ${TEMP_MD_DIR}\n`); } catch { // Ignore } } else { console.log(` Keeping temp directory for debugging: ${TEMP_MD_DIR}\n`); } }); export default tap.start();