Add document converter, seeder data structure, and project wiki
- ai-service/convert.py: converts Office/PDF files to markdown with frontmatter - database/seeders/data/: folder structure for themas, projects, documents, etc. - database/seeders/data/raw/: drop zone for Office/PDF files to convert - wiki/: project architecture, concepts, and knowledge graph documentation - Remove unused Laravel example tests Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
331
ai-service/convert.py
Normal file
331
ai-service/convert.py
Normal file
@@ -0,0 +1,331 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Convert raw Office/PDF files into markdown with YAML frontmatter.
|
||||
|
||||
Usage:
|
||||
python convert.py # convert all files in raw/
|
||||
python convert.py path/to/file.docx # convert a single file
|
||||
python convert.py --out documents # override output subfolder
|
||||
|
||||
Reads from: database/seeders/data/raw/
|
||||
Writes to: database/seeders/data/documents/ (default, or specify --out)
|
||||
|
||||
Supported formats: .docx, .pptx, .xlsx, .pdf, .txt, .md, .csv
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import re
|
||||
import sys
|
||||
from datetime import date
|
||||
from pathlib import Path
|
||||
|
||||
# ── Extractors ──────────────────────────────────────────────────────
|
||||
|
||||
def extract_docx(path: Path) -> tuple[str, dict]:
|
||||
"""Extract text from Word .docx files, preserving heading structure."""
|
||||
from docx import Document
|
||||
doc = Document(str(path))
|
||||
|
||||
lines = []
|
||||
for para in doc.paragraphs:
|
||||
text = para.text.strip()
|
||||
if not text:
|
||||
lines.append("")
|
||||
continue
|
||||
style = para.style.name.lower() if para.style else ""
|
||||
if "heading 1" in style:
|
||||
lines.append(f"# {text}")
|
||||
elif "heading 2" in style:
|
||||
lines.append(f"## {text}")
|
||||
elif "heading 3" in style:
|
||||
lines.append(f"### {text}")
|
||||
elif "title" in style:
|
||||
lines.append(f"# {text}")
|
||||
else:
|
||||
lines.append(text)
|
||||
|
||||
# Extract tables
|
||||
for table in doc.tables:
|
||||
lines.append("")
|
||||
for i, row in enumerate(table.rows):
|
||||
cells = [cell.text.strip().replace("\n", " ") for cell in row.cells]
|
||||
lines.append("| " + " | ".join(cells) + " |")
|
||||
if i == 0:
|
||||
lines.append("| " + " | ".join(["---"] * len(cells)) + " |")
|
||||
lines.append("")
|
||||
|
||||
meta = {}
|
||||
props = doc.core_properties
|
||||
if props.author:
|
||||
meta["auteur"] = props.author
|
||||
if props.title:
|
||||
meta["titel"] = props.title
|
||||
if props.created:
|
||||
meta["datum"] = props.created.strftime("%Y-%m-%d")
|
||||
|
||||
content = "\n".join(lines).strip()
|
||||
# Clean up excessive blank lines
|
||||
content = re.sub(r"\n{3,}", "\n\n", content)
|
||||
return content, meta
|
||||
|
||||
|
||||
def extract_pptx(path: Path) -> tuple[str, dict]:
|
||||
"""Extract text from PowerPoint .pptx files, one section per slide."""
|
||||
from pptx import Presentation
|
||||
prs = Presentation(str(path))
|
||||
|
||||
lines = []
|
||||
for i, slide in enumerate(prs.slides, 1):
|
||||
slide_title = ""
|
||||
slide_texts = []
|
||||
|
||||
for shape in slide.shapes:
|
||||
if shape.has_text_frame:
|
||||
for para in shape.text_frame.paragraphs:
|
||||
text = para.text.strip()
|
||||
if text:
|
||||
slide_texts.append(text)
|
||||
if hasattr(shape, "name") and "title" in shape.name.lower():
|
||||
if shape.has_text_frame:
|
||||
title_text = shape.text_frame.text.strip()
|
||||
if title_text:
|
||||
slide_title = title_text
|
||||
|
||||
# Extract tables from slides
|
||||
for shape in slide.shapes:
|
||||
if shape.has_table:
|
||||
table = shape.table
|
||||
for row_idx, row in enumerate(table.rows):
|
||||
cells = [cell.text.strip().replace("\n", " ") for cell in row.cells]
|
||||
slide_texts.append("| " + " | ".join(cells) + " |")
|
||||
if row_idx == 0:
|
||||
slide_texts.append("| " + " | ".join(["---"] * len(cells)) + " |")
|
||||
|
||||
heading = slide_title or f"Slide {i}"
|
||||
lines.append(f"## {heading}")
|
||||
lines.append("")
|
||||
lines.extend(slide_texts)
|
||||
lines.append("")
|
||||
|
||||
# Slide notes
|
||||
if slide.has_notes_slide and slide.notes_slide.notes_text_frame:
|
||||
notes = slide.notes_slide.notes_text_frame.text.strip()
|
||||
if notes:
|
||||
lines.append(f"> Notities: {notes}")
|
||||
lines.append("")
|
||||
|
||||
meta = {}
|
||||
content = "\n".join(lines).strip()
|
||||
content = re.sub(r"\n{3,}", "\n\n", content)
|
||||
return content, meta
|
||||
|
||||
|
||||
def extract_xlsx(path: Path) -> tuple[str, dict]:
|
||||
"""Extract data from Excel .xlsx files, one section per sheet."""
|
||||
from openpyxl import load_workbook
|
||||
wb = load_workbook(str(path), data_only=True)
|
||||
|
||||
lines = []
|
||||
for sheet_name in wb.sheetnames:
|
||||
ws = wb[sheet_name]
|
||||
rows = list(ws.iter_rows(values_only=True))
|
||||
if not rows:
|
||||
continue
|
||||
|
||||
lines.append(f"## {sheet_name}")
|
||||
lines.append("")
|
||||
|
||||
for i, row in enumerate(rows):
|
||||
cells = [str(cell).strip() if cell is not None else "" for cell in row]
|
||||
# Skip completely empty rows
|
||||
if not any(cells):
|
||||
continue
|
||||
lines.append("| " + " | ".join(cells) + " |")
|
||||
if i == 0:
|
||||
lines.append("| " + " | ".join(["---"] * len(cells)) + " |")
|
||||
|
||||
lines.append("")
|
||||
|
||||
meta = {}
|
||||
content = "\n".join(lines).strip()
|
||||
content = re.sub(r"\n{3,}", "\n\n", content)
|
||||
return content, meta
|
||||
|
||||
|
||||
def extract_pdf(path: Path) -> tuple[str, dict]:
|
||||
"""Extract text from PDF files using PyMuPDF."""
|
||||
import fitz # pymupdf
|
||||
doc = fitz.open(str(path))
|
||||
|
||||
lines = []
|
||||
for page_num, page in enumerate(doc, 1):
|
||||
text = page.get_text("text").strip()
|
||||
if text:
|
||||
if len(doc) > 1:
|
||||
lines.append(f"## Pagina {page_num}")
|
||||
lines.append("")
|
||||
lines.append(text)
|
||||
lines.append("")
|
||||
|
||||
meta = {}
|
||||
pdf_meta = doc.metadata
|
||||
if pdf_meta:
|
||||
if pdf_meta.get("author"):
|
||||
meta["auteur"] = pdf_meta["author"]
|
||||
if pdf_meta.get("title"):
|
||||
meta["titel"] = pdf_meta["title"]
|
||||
if pdf_meta.get("creationDate"):
|
||||
try:
|
||||
raw = pdf_meta["creationDate"]
|
||||
# PDF dates: D:YYYYMMDDHHmmSS
|
||||
if raw.startswith("D:"):
|
||||
raw = raw[2:]
|
||||
meta["datum"] = f"{raw[:4]}-{raw[4:6]}-{raw[6:8]}"
|
||||
except (IndexError, ValueError):
|
||||
pass
|
||||
|
||||
doc.close()
|
||||
content = "\n".join(lines).strip()
|
||||
content = re.sub(r"\n{3,}", "\n\n", content)
|
||||
return content, meta
|
||||
|
||||
|
||||
def extract_text(path: Path) -> tuple[str, dict]:
|
||||
"""Read plain text / markdown / csv files."""
|
||||
import chardet
|
||||
|
||||
raw_bytes = path.read_bytes()
|
||||
detected = chardet.detect(raw_bytes)
|
||||
encoding = detected.get("encoding", "utf-8") or "utf-8"
|
||||
|
||||
try:
|
||||
content = raw_bytes.decode(encoding)
|
||||
except (UnicodeDecodeError, LookupError):
|
||||
content = raw_bytes.decode("utf-8", errors="replace")
|
||||
|
||||
return content.strip(), {}
|
||||
|
||||
|
||||
# ── File type routing ───────────────────────────────────────────────
|
||||
|
||||
EXTRACTORS = {
|
||||
".docx": extract_docx,
|
||||
".doc": None, # needs LibreOffice, warn user
|
||||
".pptx": extract_pptx,
|
||||
".ppt": None,
|
||||
".xlsx": extract_xlsx,
|
||||
".xls": None,
|
||||
".pdf": extract_pdf,
|
||||
".txt": extract_text,
|
||||
".md": extract_text,
|
||||
".csv": extract_text,
|
||||
}
|
||||
|
||||
|
||||
def slugify(text: str) -> str:
|
||||
"""Convert text to a filename-safe slug."""
|
||||
text = text.lower().strip()
|
||||
text = re.sub(r"[^\w\s-]", "", text)
|
||||
text = re.sub(r"[\s_]+", "-", text)
|
||||
text = re.sub(r"-+", "-", text)
|
||||
return text[:80].strip("-")
|
||||
|
||||
|
||||
def build_frontmatter(filename: str, meta: dict) -> str:
|
||||
"""Build YAML frontmatter from extracted metadata + filename."""
|
||||
titel = meta.get("titel") or filename.rsplit(".", 1)[0].replace("-", " ").replace("_", " ").title()
|
||||
auteur = meta.get("auteur", "")
|
||||
datum = meta.get("datum", date.today().isoformat())
|
||||
|
||||
lines = ["---"]
|
||||
lines.append(f"titel: \"{titel}\"")
|
||||
if auteur:
|
||||
lines.append(f"auteur: \"{auteur}\"")
|
||||
lines.append(f"type: document")
|
||||
lines.append(f"datum: {datum}")
|
||||
lines.append(f"bron: \"{filename}\"")
|
||||
lines.append("---")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def convert_file(file_path: Path, out_dir: Path) -> Path | None:
|
||||
"""Convert a single file to markdown with frontmatter."""
|
||||
suffix = file_path.suffix.lower()
|
||||
|
||||
if suffix not in EXTRACTORS:
|
||||
print(f" SKIP {file_path.name} — unsupported format ({suffix})")
|
||||
return None
|
||||
|
||||
if EXTRACTORS[suffix] is None:
|
||||
print(f" SKIP {file_path.name} — old Office format ({suffix}), save as {suffix}x first")
|
||||
return None
|
||||
|
||||
try:
|
||||
content, meta = EXTRACTORS[suffix](file_path)
|
||||
except Exception as e:
|
||||
print(f" ERROR {file_path.name} — {e}")
|
||||
return None
|
||||
|
||||
if not content.strip():
|
||||
print(f" EMPTY {file_path.name} — no text extracted")
|
||||
return None
|
||||
|
||||
frontmatter = build_frontmatter(file_path.name, meta)
|
||||
slug = slugify(meta.get("titel", file_path.stem))
|
||||
out_path = out_dir / f"{slug}.md"
|
||||
|
||||
# Avoid overwriting — append hash if collision
|
||||
if out_path.exists():
|
||||
short_hash = hashlib.md5(file_path.name.encode()).hexdigest()[:6]
|
||||
out_path = out_dir / f"{slug}-{short_hash}.md"
|
||||
|
||||
out_path.write_text(f"{frontmatter}\n\n{content}\n", encoding="utf-8")
|
||||
print(f" OK {file_path.name} → {out_path.relative_to(out_dir.parent)}")
|
||||
return out_path
|
||||
|
||||
|
||||
# ── CLI ─────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Convert Office/PDF files to markdown for seeding")
|
||||
parser.add_argument("files", nargs="*", help="Specific files to convert (default: all in raw/)")
|
||||
parser.add_argument("--out", default="documents", help="Output subfolder name (default: documents)")
|
||||
parser.add_argument("--data-dir", default=None, help="Override data directory path")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Resolve paths
|
||||
script_dir = Path(__file__).resolve().parent
|
||||
project_root = script_dir.parent
|
||||
data_dir = Path(args.data_dir) if args.data_dir else project_root / "database" / "seeders" / "data"
|
||||
raw_dir = data_dir / "raw"
|
||||
out_dir = data_dir / args.out
|
||||
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if args.files:
|
||||
files = [Path(f) for f in args.files]
|
||||
else:
|
||||
if not raw_dir.exists():
|
||||
print(f"No raw/ directory at {raw_dir}")
|
||||
sys.exit(1)
|
||||
files = sorted(f for f in raw_dir.iterdir() if f.is_file() and not f.name.startswith("."))
|
||||
|
||||
if not files:
|
||||
print(f"No files found in {raw_dir}")
|
||||
print("Drop .docx, .pptx, .xlsx, .pdf files there and re-run.")
|
||||
sys.exit(0)
|
||||
|
||||
print(f"Converting {len(files)} file(s) → {out_dir.relative_to(project_root)}/\n")
|
||||
|
||||
converted = 0
|
||||
for f in files:
|
||||
result = convert_file(f, out_dir)
|
||||
if result:
|
||||
converted += 1
|
||||
|
||||
print(f"\nDone: {converted}/{len(files)} files converted.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -9,3 +9,10 @@ numpy>=1.26.0
|
||||
pydantic>=2.0.0
|
||||
python-dotenv>=1.0.0
|
||||
httpx>=0.27.0
|
||||
|
||||
# Document extraction
|
||||
python-docx>=1.1.0
|
||||
python-pptx>=0.6.23
|
||||
openpyxl>=3.1.0
|
||||
pymupdf>=1.24.0
|
||||
chardet>=5.2.0
|
||||
|
||||
Reference in New Issue
Block a user