#!/usr/bin/env python3 import os, sys, json, time, re from pathlib import Path from typing import Dict, Any, List from tqdm import tqdm from google.cloud import documentai_v1 as documentai from google.cloud import storage from google.api_core.operation import Operation # ---- Config from env ---- PROJECT_ID = os.environ.get("DOC_AI_PROJECT_ID") LOCATION = os.environ.get("DOC_AI_LOCATION", "us") PROCESSOR_ID = os.environ.get("DOC_AI_PROCESSOR_ID") BUCKET_NAME = os.environ.get("DOC_AI_BUCKET") if not (PROJECT_ID and PROCESSOR_ID and BUCKET_NAME and os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")): print("Missing env. Set GOOGLE_APPLICATION_CREDENTIALS, DOC_AI_PROJECT_ID, DOC_AI_LOCATION, DOC_AI_PROCESSOR_ID, DOC_AI_BUCKET") sys.exit(1) # ---- Clients ---- doc_client = documentai.DocumentProcessorServiceClient() storage_client = storage.Client() def processor_name(): return f"projects/{PROJECT_ID}/locations/{LOCATION}/processors/{PROCESSOR_ID}" # ---------- Slimmer (keeps only what you need for Markdown) ---------- def _slice(full: str, anchor: Dict[str,Any]) -> str: out=[] for seg in (anchor or {}).get("textSegments", []): s = int(seg.get("startIndex", 0) or 0) e = int(seg.get("endIndex", 0) or 0) out.append(full[s:e]) return "".join(out) def _clean(s: str) -> str: s = (s.replace("fi","fi").replace("fl","fl") .replace("\u00A0"," ") .replace("“",'"').replace("”",'"').replace("’","'")) s = re.sub(r"[ \t]+"," ", s) s = re.sub(r"\s*\n\s*", " ", s) return s.strip() def slim_document(doc: Dict[str,Any]) -> List[Dict[str,Any]]: # doc may already be {"document": {...}} in batch output d = doc.get("document", doc) text = d.get("text","") pages = d.get("pages", []) items=[] consumed=[] def overlaps(anchor): for seg in (anchor or {}).get("textSegments", []): s = int(seg.get("startIndex", 0) or 0) e = int(seg.get("endIndex", 0) or 0) for cs,ce in consumed: if not (e <= cs or s >= ce): return True return False def mark(anchor): for seg in (anchor or {}).get("textSegments", []): s = int(seg.get("startIndex", 0) or 0) e = int(seg.get("endIndex", 0) or 0) consumed.append((s,e)) # tables first (avoid dup text) for p in pages: for t in p.get("tables", []) or []: ta = (t.get("layout") or {}).get("textAnchor", {}) if overlaps(ta): continue rows=[] for r in t.get("headerRows", []) or []: row=[_clean(_slice(text, (c.get("layout") or {}).get("textAnchor", {}))) for c in r.get("cells", [])] rows.append(row) for r in t.get("bodyRows", []) or []: row=[_clean(_slice(text, (c.get("layout") or {}).get("textAnchor", {}))) for c in r.get("cells", [])] rows.append(row) if rows: items.append({"type":"table","rows":rows}) mark(ta) # blocks/paragraphs/lines for p in pages: containers = p.get("paragraphs") or p.get("blocks") or p.get("lines") or [] for obj in containers: ta = (obj.get("layout") or {}).get("textAnchor", {}) if not ta or overlaps(ta): continue t = _clean(_slice(text, ta)) if not t: continue # drop isolated page numbers/punctuation if re.fullmatch(r"[\d\W]{1,4}", t): continue # naive heading guess (no style in your JSON) if (len(t) <= 20 and t.isupper()) or t.startswith("CHAPTER"): items.append({"type":"h1","text":t}) elif len(t) <= 80 and t.isupper(): items.append({"type":"h2","text":t}) # lists elif re.match(r"^(\u2022|[-*])\s+\S", t): items.append({"type":"ul","text": re.sub(r"^(\u2022|[-*])\s+","",t)}) elif re.match(r"^\(?\d+[\).]\s+\S", t): items.append({"type":"ol","text": re.sub(r"^\(?\d+[\).]\s+","",t)}) else: items.append({"type":"p","text":t}) mark(ta) items.append({"type":"pagebreak"}) return items # ---------- Sync (fast) ---------- def process_sync(pdf_bytes: bytes) -> Dict[str,Any]: name = processor_name() raw = documentai.RawDocument(content=pdf_bytes, mime_type="application/pdf") req = documentai.ProcessRequest(name=name, raw_document=raw) res = doc_client.process_document(request=req) return document_to_json(res.document) def document_to_json(doc: documentai.Document) -> Dict[str,Any]: # Convert protobuf to JSON-compatible dict from google.protobuf.json_format import MessageToDict return MessageToDict(doc._pb, preserving_proto_field_name=False) # ---------- Batch fallback ---------- def gcs_upload(local: Path, gcs_uri: str): # gcs_uri like gs://bucket/path/file.pdf assert gcs_uri.startswith("gs://") _, _, bucket_path = gcs_uri.partition("gs://") bucket_name, _, obj_path = bucket_path.partition("/") bucket = storage_client.bucket(bucket_name) blob = bucket.blob(obj_path) blob.upload_from_filename(str(local)) def start_batch(gcs_in_prefix: str, gcs_out_prefix: str) -> Operation: name = processor_name() input_cfg = documentai.BatchDocumentsInputConfig( gcs_prefix=documentai.GcsPrefix(gcs_uri_prefix=gcs_in_prefix) ) out_cfg = documentai.DocumentOutputConfig( gcs_output_config=documentai.DocumentOutputConfig.GcsOutputConfig( gcs_uri=gcs_out_prefix ) ) req = documentai.BatchProcessRequest( name=name, input_documents=input_cfg, document_output_config=out_cfg ) op = doc_client.batch_process_documents(request=req) return op def wait_and_grab_output(gcs_out_prefix: str) -> Dict[str,Any]: # Find the newest JSON under the output prefix and return its content as dict assert gcs_out_prefix.startswith("gs://") _, _, tail = gcs_out_prefix.partition("gs://") bucket_name, _, prefix = tail.partition("/") bucket = storage_client.bucket(bucket_name) # Batch creates a run-specific subfolder; find deepest JSON blobs = list(storage_client.list_blobs(bucket, prefix=prefix)) json_blobs = [b for b in blobs if b.name.lower().endswith(".json")] if not json_blobs: raise RuntimeError("No JSON found in output bucket.") # Pick the largest JSON (usually the doc-level one) json_blobs.sort(key=lambda b: b.size or 0, reverse=True) data = json_blobs[0].download_as_bytes() return json.loads(data.decode("utf-8")) # ---------- Orchestrator ---------- def process_pdf_to_jsons(pdf_path: Path): folder = pdf_path.parent stem = pdf_path.stem out_full = folder / f"{stem}.documentai.json" out_slim = folder / f"{stem}.slim.json" # Try sync first try: pdf_bytes = pdf_path.read_bytes() doc_json = process_sync(pdf_bytes) # raises on size/page limits with open(out_full, "w", encoding="utf-8") as f: json.dump(doc_json, f, ensure_ascii=False) items = slim_document(doc_json) with open(out_slim, "w", encoding="utf-8") as f: json.dump(items, f, ensure_ascii=False) print(f"✔ Sync OK → {out_full.name}, {out_slim.name}") return except Exception as e: print(f"Sync failed (will try batch): {e}") # Batch fallback in_uri = f"gs://{BUCKET_NAME}/docai_input/{stem}.pdf" out_uri = f"gs://{BUCKET_NAME}/docai_output/{stem}/" print(f"Uploading to {in_uri} …") gcs_upload(pdf_path, in_uri) print("Starting batch job …") op = start_batch(gcs_in_prefix=in_uri.rsplit("/",1)[0]+"/", gcs_out_prefix=out_uri) for _ in tqdm(range(300), desc="Waiting (batch)", unit="s"): if op.done(): break time.sleep(1) if not op.done(): raise RuntimeError("Batch timeout; check Cloud Console > Document AI Operations.") if op.exception(): raise RuntimeError(f"Batch failed: {op.exception()}") doc_json = wait_and_grab_output(out_uri) with open(out_full, "w", encoding="utf-8") as f: json.dump(doc_json, f, ensure_ascii=False) items = slim_document(doc_json) with open(out_slim, "w", encoding="utf-8") as f: json.dump(items, f, ensure_ascii=False) print(f"✔ Batch OK → {out_full.name}, {out_slim.name}") # ---------- CLI ---------- if __name__ == "__main__": if len(sys.argv) < 2: print("Usage: docai_pdf_to_json.py /path/to/file.pdf [more.pdf …]") sys.exit(1) for p in sys.argv[1:]: path = Path(p).expanduser().resolve() if path.suffix.lower() != ".pdf": print(f"Skip (not PDF): {path}") continue process_pdf_to_jsons(path)