migrate_db
One-time schema migration: old (blue) DB -> new (green) DB.
Usage: python migrate_db.py old_papers.db [--new-db papers.db] [--force] linxiv-migrate old_papers.db [--new-db papers.db] [--force]
1"""One-time schema migration: old (blue) DB -> new (green) DB. 2 3Usage: 4 python migrate_db.py old_papers.db [--new-db papers.db] [--force] 5 linxiv-migrate old_papers.db [--new-db papers.db] [--force] 6""" 7from __future__ import annotations 8 9import argparse 10import sqlite3 11import sys 12from pathlib import Path 13 14from storage.config.core import apply_sql_schema 15from storage.paths import db_path as _default_new_db_path 16from config import init_data_dir 17 18_MIGRATE_SQL = Path(__file__).resolve().parent / "storage" / "migrations" / "migrate_data.sql" 19 20 21def _connect(path: str) -> sqlite3.Connection: 22 conn = sqlite3.connect(path, detect_types=sqlite3.PARSE_DECLTYPES) 23 conn.row_factory = sqlite3.Row 24 return conn 25 26 27def _split_name(full: str) -> tuple[str | None, str | None]: 28 """Split 'First Last' into (first, last) on the rightmost whitespace. 29 30 Single-token names (no space) return (None, token). Empty string → (None, None). 31 """ 32 parts = full.rsplit(None, 1) 33 if len(parts) == 0: 34 return None, None 35 if len(parts) == 1: 36 return None, parts[0] 37 return parts[0], parts[1] 38 39 40def _verify(conn: sqlite3.Connection) -> bool: 41 """Log row-count comparisons between old and new DB. Returns True if all match.""" 42 checks = [ 43 ("papers (all rows)", "SELECT COUNT(*) FROM old.papers", "SELECT COUNT(*) FROM PAPER"), 44 ("paper roots", "SELECT COUNT(DISTINCT paper_id) FROM old.papers", "SELECT COUNT(*) FROM PAPER_ROOTS"), 45 ("projects", "SELECT COUNT(*) FROM old.projects", "SELECT COUNT(*) FROM PROJECT"), 46 ("notes (with paper_id)", "SELECT COUNT(*) FROM old.notes WHERE paper_id IS NOT NULL", "SELECT COUNT(*) FROM NOTE"), 47 ] 48 all_ok = True 49 for label, old_sql, new_sql in checks: 50 old_n = conn.execute(old_sql).fetchone()[0] 51 new_n = conn.execute(new_sql).fetchone()[0] 52 status = "OK" if old_n == new_n else "MISMATCH" 53 if status == "MISMATCH": 54 all_ok = False 55 print(f"[migrate] {status}: {label} old={old_n} new={new_n}") 56 return all_ok 57 58 59def run_migration(old_db: str, new_db: str, *, force: bool = False) -> None: 60 new_path = Path(new_db) 61 62 if new_path.exists() and not force: 63 print(f"[migrate] {new_db!r} already exists. Pass --force to overwrite.", file=sys.stderr) 64 sys.exit(1) 65 66 if new_path.exists() and force: 67 new_path.unlink() 68 print(f"[migrate] Removed existing {new_db!r}") 69 70 if not Path(old_db).exists(): 71 print(f"[migrate] Old DB not found: {old_db!r}", file=sys.stderr) 72 sys.exit(1) 73 74 if not _MIGRATE_SQL.exists(): 75 print(f"[migrate] migrate_data.sql not found at {_MIGRATE_SQL}", file=sys.stderr) 76 sys.exit(1) 77 78 conn = None 79 try: 80 # 1. Apply green schema DDL (includes DB_VERSION with v0.1.1 seed row) 81 print(f"[migrate] Applying schema to {new_db!r}...") 82 conn = _connect(new_db) 83 apply_sql_schema(conn) 84 85 # 2. Attach old DB and run migrate_data.sql 86 print(f"[migrate] Attaching old DB {old_db!r}...") 87 conn.execute("ATTACH DATABASE ? AS old", (old_db,)) 88 89 print("[migrate] Running migration SQL...") 90 sql = _MIGRATE_SQL.read_text(encoding="utf-8") 91 conn.executescript(sql) 92 93 # 3. Post-process AUTHOR: fill AUTHOR_FIRST / AUTHOR_LAST via Python split. 94 # The SQL leaves these NULL because SQLite has no reverse()/rinstr(). 95 print("[migrate] Splitting author names...") 96 with conn: 97 rows = conn.execute( 98 "SELECT AUTHOR_FK, AUTHOR_FULL_NAME FROM AUTHOR " 99 "WHERE AUTHOR_FIRST IS NULL AND AUTHOR_LAST IS NULL" 100 ).fetchall() 101 for row in rows: 102 first, last = _split_name(row["AUTHOR_FULL_NAME"] or "") 103 conn.execute( 104 "UPDATE AUTHOR SET AUTHOR_FIRST = ?, AUTHOR_LAST = ? WHERE AUTHOR_FK = ?", 105 (first, last, row["AUTHOR_FK"]), 106 ) 107 108 # 4. Surface orphan notes (NULL paper_id — cannot be migrated) 109 try: 110 orphan_row = conn.execute("SELECT n FROM _orphan_notes_count").fetchone() 111 orphan_count = int(orphan_row[0]) if orphan_row else 0 112 except Exception: 113 orphan_count = 0 114 if orphan_count: 115 print( 116 f"[migrate] WARNING: {orphan_count} note(s) had NULL paper_id and could not be " 117 "migrated (SOURCE_FK is NOT NULL in new schema).", 118 file=sys.stderr, 119 ) 120 121 # 5. Verify row counts 122 print("[migrate] Verifying row counts...") 123 _verify(conn) 124 125 # 6. Confirm schema version 126 version_row = conn.execute( 127 "SELECT VERSION FROM DB_VERSION ORDER BY VERSION_FK DESC LIMIT 1" 128 ).fetchone() 129 print(f"[migrate] Schema version: {version_row[0] if version_row else 'unknown'}") 130 131 except SystemExit: 132 raise 133 except Exception as e: 134 print(f"[migrate] Fatal error: {e}", file=sys.stderr) 135 sys.exit(1) 136 finally: 137 if conn is not None: 138 conn.close() 139 140 print("[migrate] Done.") 141 142 143def build_parser() -> argparse.ArgumentParser: 144 p = argparse.ArgumentParser( 145 prog="linxiv-migrate", 146 description="Migrate linXiv DB from old (blue) to new (green) schema.", 147 ) 148 p.add_argument("old_db", help="Path to old SQLite DB") 149 p.add_argument( 150 "--new-db", default=None, dest="new_db", 151 help="Destination path for new DB (default: $LINXIV_DATA_DIR/papers.db)", 152 ) 153 p.add_argument( 154 "--force", action="store_true", default=False, 155 help="Overwrite an existing new DB", 156 ) 157 return p 158 159 160def main(argv: list[str] | None = None) -> None: 161 init_data_dir() 162 parser = build_parser() 163 args = parser.parse_args(argv) 164 new_db = args.new_db or str(_default_new_db_path()) 165 run_migration(args.old_db, new_db, force=args.force) 166 167 168if __name__ == "__main__": 169 main()
def
run_migration(old_db: str, new_db: str, *, force: bool = False) -> None:
60def run_migration(old_db: str, new_db: str, *, force: bool = False) -> None: 61 new_path = Path(new_db) 62 63 if new_path.exists() and not force: 64 print(f"[migrate] {new_db!r} already exists. Pass --force to overwrite.", file=sys.stderr) 65 sys.exit(1) 66 67 if new_path.exists() and force: 68 new_path.unlink() 69 print(f"[migrate] Removed existing {new_db!r}") 70 71 if not Path(old_db).exists(): 72 print(f"[migrate] Old DB not found: {old_db!r}", file=sys.stderr) 73 sys.exit(1) 74 75 if not _MIGRATE_SQL.exists(): 76 print(f"[migrate] migrate_data.sql not found at {_MIGRATE_SQL}", file=sys.stderr) 77 sys.exit(1) 78 79 conn = None 80 try: 81 # 1. Apply green schema DDL (includes DB_VERSION with v0.1.1 seed row) 82 print(f"[migrate] Applying schema to {new_db!r}...") 83 conn = _connect(new_db) 84 apply_sql_schema(conn) 85 86 # 2. Attach old DB and run migrate_data.sql 87 print(f"[migrate] Attaching old DB {old_db!r}...") 88 conn.execute("ATTACH DATABASE ? AS old", (old_db,)) 89 90 print("[migrate] Running migration SQL...") 91 sql = _MIGRATE_SQL.read_text(encoding="utf-8") 92 conn.executescript(sql) 93 94 # 3. Post-process AUTHOR: fill AUTHOR_FIRST / AUTHOR_LAST via Python split. 95 # The SQL leaves these NULL because SQLite has no reverse()/rinstr(). 96 print("[migrate] Splitting author names...") 97 with conn: 98 rows = conn.execute( 99 "SELECT AUTHOR_FK, AUTHOR_FULL_NAME FROM AUTHOR " 100 "WHERE AUTHOR_FIRST IS NULL AND AUTHOR_LAST IS NULL" 101 ).fetchall() 102 for row in rows: 103 first, last = _split_name(row["AUTHOR_FULL_NAME"] or "") 104 conn.execute( 105 "UPDATE AUTHOR SET AUTHOR_FIRST = ?, AUTHOR_LAST = ? WHERE AUTHOR_FK = ?", 106 (first, last, row["AUTHOR_FK"]), 107 ) 108 109 # 4. Surface orphan notes (NULL paper_id — cannot be migrated) 110 try: 111 orphan_row = conn.execute("SELECT n FROM _orphan_notes_count").fetchone() 112 orphan_count = int(orphan_row[0]) if orphan_row else 0 113 except Exception: 114 orphan_count = 0 115 if orphan_count: 116 print( 117 f"[migrate] WARNING: {orphan_count} note(s) had NULL paper_id and could not be " 118 "migrated (SOURCE_FK is NOT NULL in new schema).", 119 file=sys.stderr, 120 ) 121 122 # 5. Verify row counts 123 print("[migrate] Verifying row counts...") 124 _verify(conn) 125 126 # 6. Confirm schema version 127 version_row = conn.execute( 128 "SELECT VERSION FROM DB_VERSION ORDER BY VERSION_FK DESC LIMIT 1" 129 ).fetchone() 130 print(f"[migrate] Schema version: {version_row[0] if version_row else 'unknown'}") 131 132 except SystemExit: 133 raise 134 except Exception as e: 135 print(f"[migrate] Fatal error: {e}", file=sys.stderr) 136 sys.exit(1) 137 finally: 138 if conn is not None: 139 conn.close() 140 141 print("[migrate] Done.")
def
build_parser() -> argparse.ArgumentParser:
144def build_parser() -> argparse.ArgumentParser: 145 p = argparse.ArgumentParser( 146 prog="linxiv-migrate", 147 description="Migrate linXiv DB from old (blue) to new (green) schema.", 148 ) 149 p.add_argument("old_db", help="Path to old SQLite DB") 150 p.add_argument( 151 "--new-db", default=None, dest="new_db", 152 help="Destination path for new DB (default: $LINXIV_DATA_DIR/papers.db)", 153 ) 154 p.add_argument( 155 "--force", action="store_true", default=False, 156 help="Overwrite an existing new DB", 157 ) 158 return p
def
main(argv: list[str] | None = None) -> None: