Source code for grilly.scripts.ingest_svc

#!/usr/bin/env python3
"""SVC ingestion (fast, streaming).

The original ingestion flow was doing two full passes:
  1) InstantLanguage.ingest_svc(...)
  2) CognitiveController.ingest_svc(...) (which calls language.ingest_svc again)

That doubles the encoding work and makes large JSONL files feel like they
"hang". This script ingests **once** through CognitiveController, streaming
the JSONL in configurable chunks.

Usage:
  python scripts/ingest_svc.py -f datasets/_data/svc_training_merged.jsonl
  python scripts/ingest_svc.py -f ... --max 50000 --chunk 4096 --no-templates
  python scripts/ingest_svc.py -f ... --no-ngrams   # much faster vocab build
"""

import argparse
import sys
import time
from pathlib import Path


[docs]def _fmt_rate(n: int, dt: float) -> str: """Run fmt rate.""" if dt <= 0: return "∞/s" rate = n / dt if rate >= 1e6: return f"{rate / 1e6:.2f}M/s" if rate >= 1e3: return f"{rate / 1e3:.2f}k/s" return f"{rate:.2f}/s"
[docs]def main() -> None: """Run main.""" ap = argparse.ArgumentParser(description="Stream-ingest SVC JSONL into Grilly.") ap.add_argument("--file", "-f", required=True, help="Path to the SVC JSONL file") ap.add_argument("--max", "-n", type=int, default=None, help="Max entries to ingest") ap.add_argument("--realms", "-r", nargs="*", default=None, help="Only these realms") ap.add_argument("--min-complexity", type=float, default=None) ap.add_argument("--max-complexity", type=float, default=None) ap.add_argument("--sources", "-s", nargs="*", default=None, help="Only these sources") ap.add_argument("--dim", "-d", type=int, default=4096) ap.add_argument("--chunk", type=int, default=4096, help="Entries per ingestion chunk") ap.add_argument("--progress", type=int, default=50, help="Print progress every N entries") ap.add_argument("--no-templates", action="store_true", help="Skip template learning") ap.add_argument("--no-realm-vectors", action="store_true", help="Skip realm vector building") ap.add_argument( "--no-ngrams", action="store_true", help="Disable n-gram HRR word encoding (much faster, less lexical similarity)", ) ap.add_argument("--verbose", "-v", action="store_true", default=False, help="Verbose output") args = ap.parse_args() # Imports here so --help is instant # Repo-layout compatibility: some setups package everything under "grilly", # others run directly from the repo root. try: from grilly.experimental.cognitive.controller import CognitiveController from grilly.experimental.language.svc_loader import SVCIngestionEngine, load_svc_entries from grilly.experimental.moe.routing import ResonatorMoE from grilly.experimental.vsa.ops import BinaryOps from grilly.utils.ingest_checkpoint import ( CheckpointView, load_ingest_checkpoint, save_ingest_checkpoint, ) except ModuleNotFoundError: from experimental.cognitive.controller import CognitiveController from experimental.language.svc_loader import SVCIngestionEngine, load_svc_entries from experimental.moe.routing import ResonatorMoE from experimental.vsa.ops import BinaryOps from utils.ingest_checkpoint import ( save_ingest_checkpoint, ) print("=" * 60) print("Grilly SVC Ingestion (streaming)") print("=" * 60) engine = SVCIngestionEngine(dim=args.dim) print(f"\nEngine: {engine.status()}") controller = CognitiveController( dim=args.dim, word_use_ngrams=not args.no_ngrams, ) # Stream entries and ingest in chunks t0 = time.time() chunk = [] total = 0 last_print = t0 total_templates = 0 total_sentences = 0 total_new_words = 0 it = load_svc_entries( path=args.file, max_entries=args.max, realms=args.realms, min_complexity=args.min_complexity, max_complexity=args.max_complexity, sources=args.sources, ) for entry in it: chunk.append(entry) if len(chunk) < args.chunk: continue res = controller.ingest_svc( chunk, learn_templates=not args.no_templates, build_realm_vectors=not args.no_realm_vectors, verbose=args.verbose, engine=engine, ) total += len(chunk) total_templates += res.templates_learned total_sentences += res.sentences_learned total_new_words += res.words_encoded chunk.clear() if args.progress and total % args.progress == 0: now = time.time() print( f" ... {total} ingested ({_fmt_rate(args.progress, now - last_print)}), " f"facts={len(controller.world.facts)}" ) last_print = now # Final partial chunk if chunk: res = controller.ingest_svc( chunk, learn_templates=not args.no_templates, build_realm_vectors=not args.no_realm_vectors, verbose=args.verbose, engine=engine, ) total += len(chunk) total_templates += res.templates_learned total_sentences += res.sentences_learned total_new_words += res.words_encoded dt = time.time() - t0 print("\n" + "=" * 60) print(f"Done in {dt:.2f}s ({_fmt_rate(total, dt)})") print(f" Entries: {total}") print(f" Sentences: {total_sentences}") print(f" New words: {total_new_words}") print(f" Facts: {len(controller.world.facts)}") out = Path(args.file).with_suffix(".ingest_checkpoint") print(f" Checkpoint: {out}") save_ingest_checkpoint( str(out), controller, include_fact_vectors=True, include_sentence_memory=True, sentence_compress="auto", fp16=True, ) realms = getattr(controller.language, "realm_vectors", {}) or {} if realms and not args.no_realm_vectors: realm_names = sorted(realms.keys()) print( f" Realms: {len(realm_names)} ({', '.join(realm_names[:12])}{'...' if len(realm_names) > 12 else ''})" ) # Optional: build MoE router over realm indicators realm_fns = {r: (lambda x, _r=r: x) for r in realm_names} moe = ResonatorMoE.from_realm_vectors( dim=args.dim, realm_expert_fns=realm_fns, realm_vectors=None, # hash-based routing stability ) # Quick sanity routing ok = 0 for r in realm_names: indicator = BinaryOps.hash_to_bipolar(r, args.dim) routed = moe.route(indicator, top_k=1) ok += int(routed[0] == r) print(f" MoE route: {ok}/{len(realm_names)} realm indicators matched") else: print(" Realms: none") print("=" * 60)
if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\nInterrupted.", file=sys.stderr) sys.exit(130)