Ingest Internals
This guide explains the internal architecture of the ingest pipeline — the seven execution stages, Cypher MERGE patterns, batching strategy, and error handling.
Architecture Overview
The ingest pipeline is a synchronous, seven-stage linear processor:
corpus JSON files
→ Pydantic validation
→ Stage-ordered FalkorDB writes
→ Run report
Key design decisions:
- Synchronous — FalkorDB (Redis-based) is single-threaded; async adds overhead with no throughput benefit
- ThreadPoolExecutor for file I/O — parallelizes JSON parsing and Pydantic validation (
max_workers=4) - Sequential DB writes — UNWIND batches within each pipeline; sequential ordering ensures MERGE consistency
- Fully idempotent — every node and edge uses
MERGE(notCREATE), so re-runs are safe
Execution Stages
Stage 0 — Index creation (no data)
Stage 1 — Book registry load (no DB writes)
Stage 2 — Lexicon
Stage 3 — Scripture Text
Stage 4 — Reference nodes (TG, BD, Index — parallelized)
Stage 5 — Commentary
Stage 6 — Pending resolution
Report — Summary output
Stage 0: Index Creation
Creates all FalkorDB indices before any data is written. This ensures MERGE operations have index-backed lookups:
CREATE INDEX FOR (p:Passage) ON (p.id)
CREATE INDEX FOR (w:Word) ON (w.strongs)
CREATE INDEX FOR (t:TGEntry) ON (t.topicId)
Stage 1: Book Registry Load
Loads data/book_registry.json into memory. The registry maps book IDs to metadata (title, abbreviation, corpus type, chapter count). Used by all subsequent stages for ID validation.
Stage 2: Lexicon Pipeline
Reads lexicon/*.json files (Hebrew, Greek, Aramaic). Produces:
:Wordnodes (one per Strong's number)DERIVES_FROMedges (etymology chains viaderivation.roots)RELATED_TOedges (cross-referenced entries viarelated)
Must precede Stage 3 so ALIGNED_TO edges can link tokens to existing :Word nodes.
Stage 3: Scripture Text Pipeline
Reads corpus/*.json and all testament directories. Produces:
:Passagenodes (one per verse):Witnessnodes (manuscript evidence):WordAlignmentnodes (interlinear tokens)HAS_ORIGINAL,HAS_WORD,ALIGNED_TO,CROSS_REFedges
Stage 4: Reference Nodes
Three independent pipelines run via ThreadPoolExecutor(max_workers=3):
- Topical Guide →
:TGEntrynodes,CITESedges - Bible Dictionary →
:BDEntrynodes,SEE_ALSO_*edges - Scripture Index →
:IndexTopicnodes,ALSO_CITESedges
All three complete before Stage 5 begins.
Stage 5: Commentary
Reads verse-commentary and scholarly-commentary files. Produces:
:VerseNote,:Commentary,:SectionnodesANNOTATES,CITESedges
Depends on Stage 3 because commentary edges reference :Passage nodes.
Stage 6: Pending Resolution
Promotes :PendingPassage stubs to :Passage where the target now exists. Reports unresolvable references in the run report.
Cypher MERGE Patterns
All writes use MERGE for idempotency — running the pipeline twice produces the same graph:
Node MERGE
UNWIND $rows AS row
MERGE (p:Passage {id: row.id})
SET p.bookId = row.bookId,
p.chapter = row.chapter,
p.verse = row.verse,
p.text = row.text
Edge MERGE
UNWIND $rows AS row
MATCH (p:Passage {id: row.passage_id})
MATCH (w:Word {strongs: row.strongs})
MERGE (p)-[:ALIGNED_TO]->(w)
PendingPassage pattern
When a cross-reference targets a passage that hasn't been ingested yet:
MERGE (p:PendingPassage {id: $target_id})
Stage 6 resolves these by matching against actual :Passage nodes.
Batching Strategy
Writes use UNWIND batches to minimize FalkorDB round-trips:
| Parameter | Default | Purpose |
|---|---|---|
--node-batch | 500 | Rows per UNWIND for node creation |
--edge-batch | 200 | Rows per UNWIND for edge creation |
The batch writer in ingest/db/batch.py:
- Splits the row list into chunks of
batch_size - Executes one
UNWINDquery per chunk - Retries up to 3 times on transient failures (exponential backoff)
- Returns the total number of rows processed
def batch_write(graph, query, rows, batch_size=500, max_retries=3):
total = 0
for i in range(0, len(rows), batch_size):
chunk = rows[i:i + batch_size]
# Execute UNWIND with retry logic
graph.query(query, {"rows": chunk})
total += len(chunk)
return total
UNWIND batching is much faster than individual MERGE statements. A batch of 500 nodes executes as a single FalkorDB command.
Error Handling
Validation errors
Every source file is validated with Pydantic before any DB write. Files that fail validation are:
- Logged at
ERRORlevel with the file path and validation error details - Skipped (the pipeline continues with remaining files)
- Recorded in the run report's
validation_errorsarray
Database errors
- Transient failures (connection resets, timeouts) trigger automatic retry with exponential backoff (up to 3 attempts)
- Persistent failures abort the current pipeline stage and are recorded in the report
- The pipeline continues to subsequent stages if possible
Unresolvable references
Cross-references to passages that don't exist in the corpus are:
- Created as
:PendingPassagestubs during ingestion - Attempted for resolution in Stage 6
- Reported as unresolvable in the run report if no matching
:Passageexists
Node ID Derivation
Node IDs are deterministically derived from the source data:
| Node Type | ID Format | Example |
|---|---|---|
| Passage | {bookId}.{chapter}.{verse} | gen.1.1 |
| Word | Strong's number | H0430 |
| TGEntry | tg:{slug} | tg:angels |
| BDEntry | bd:{slug} | bd:aaron |
| IndexTopic | idx:{slug} | idx:faith |
Deterministic IDs enable MERGE idempotency — the same source data always produces the same node ID.
Project Layout
services/ingest/src/ingest/
├── main.py # CLI entry point (Click)
├── runner.py # Stage orchestrator
├── logger.py # structlog configuration
├── models/ # Pydantic v2 models (one file per schema family)
├── db/
│ ├── client.py # FalkorDB connection management
│ ├── schema.py # Index creation (Stage 0)
│ ├── batch.py # UNWIND batch helpers
│ └── cypher.py # All MERGE query constants
├── registry.py # Book ID registry
├── ids.py # Node ID derivation functions
├── pipelines/
│ ├── base.py # BasePipeline ABC
│ ├── lexicon.py
│ ├── scripture_text.py
│ ├── topical_guide.py
│ ├── bible_dictionary.py
│ ├── scripture_index.py
│ ├── verse_commentary.py
│ └── scholarly.py
├── pending.py # Stage 6: pending reference resolution
├── report.py # Run report accumulation
└── loader.py # File discovery and Pydantic dispatch