""" JSON → Neo4j 导入脚本。 图谱 Schema: 节点: Character, Location, Faction, Event 关系: VISITED, CONTROLS, HAS_MEMBER, LEADS, OCCURRED_AT """ import json from pathlib import Path from neo4j import Driver DATA_DIR_BASE = Path(__file__).parent.parent / "fiction" SUPPORTED_NOVELS = ("dtslz", "ldj", "tlbb") # ── 工具函数 ────────────────────────────────────────────── def _split_characters(name: str) -> list[str]: """'寇仲 & 徐子陵' → ['寇仲', '徐子陵']""" return [c.strip() for c in name.split("&") if c.strip()] def _split_leaders(leader: str) -> list[str]: """'翟让/李密' → ['翟让', '李密'];过滤'未提及'""" parts = [p.strip() for p in leader.split("/") if p.strip()] return [p for p in parts if p not in ("未提及", "")] def _node_id(novel: str, raw_id: str) -> str: return f"{novel}:{raw_id}" def _get_data_dir(novel: str) -> Path: data_dir = DATA_DIR_BASE / novel / "data" if not data_dir.exists(): raise ValueError(f"小说数据目录不存在: {data_dir}") return data_dir def _iter_volume_files(data_dir: Path): for filepath in sorted(data_dir.glob("vol*.json")): stem = filepath.stem # vol01 if len(stem) >= 5 and stem[:3] == "vol" and stem[3:].isdigit(): yield int(stem[3:]), filepath # ── Schema 初始化 ───────────────────────────────────────── def _drop_legacy_constraints(session): """兼容旧版本:移除 Character(name) 唯一约束,改为 (novel, name) 复合唯一约束。""" rows = session.run( """ SHOW CONSTRAINTS YIELD name, labelsOrTypes, properties RETURN name, labelsOrTypes, properties """ ) for row in rows: labels = row.get("labelsOrTypes") or [] properties = row.get("properties") or [] if labels == ["Character"] and properties == ["name"]: constraint_name = row["name"].replace("`", "") session.run(f"DROP CONSTRAINT `{constraint_name}` IF EXISTS") def setup_schema(driver: Driver): with driver.session() as s: _drop_legacy_constraints(s) s.run("CREATE CONSTRAINT IF NOT EXISTS FOR (n:Character) REQUIRE (n.novel, n.name) IS UNIQUE") s.run("CREATE CONSTRAINT IF NOT EXISTS FOR (n:Location) REQUIRE n.id IS UNIQUE") s.run("CREATE CONSTRAINT IF NOT EXISTS FOR (n:Faction) REQUIRE n.id IS UNIQUE") s.run("CREATE CONSTRAINT IF NOT EXISTS FOR (n:Event) REQUIRE n.id IS UNIQUE") s.run("CREATE INDEX IF NOT EXISTS FOR (e:Event) ON (e.vol)") s.run("CREATE INDEX IF NOT EXISTS FOR (e:Event) ON (e.novel)") s.run("CREATE INDEX IF NOT EXISTS FOR (c:Character) ON (c.novel)") s.run("CREATE INDEX IF NOT EXISTS FOR (l:Location) ON (l.novel)") s.run("CREATE INDEX IF NOT EXISTS FOR (f:Faction) ON (f.novel)") s.run("CREATE INDEX IF NOT EXISTS FOR ()-[r:VISITED]-() ON (r.vol)") s.run("CREATE INDEX IF NOT EXISTS FOR ()-[r:CONTROLS]-() ON (r.vol)") # ── 各类型导入 ──────────────────────────────────────────── def _import_locations(session, novel: str, locations: list[dict]): for loc in locations: raw_id = loc["id"] session.run( """ MERGE (l:Location {id: $id}) SET l.name = $name, l.source_id = $source_id, l.novel = $novel, l.type = $type, l.lat = $lat, l.lng = $lng """, id=_node_id(novel, raw_id), source_id=raw_id, novel=novel, name=loc["name"], type=loc.get("type", ""), lat=loc.get("lat"), lng=loc.get("lng"), ) def _import_factions(session, novel: str, factions: list[dict], vol: int): for f in factions: raw_id = f["id"] session.run( """ MERGE (n:Faction {id: $id}) SET n.name = $name, n.source_id = $source_id, n.novel = $novel, n.type = $type, n.color = $color """, id=_node_id(novel, raw_id), source_id=raw_id, novel=novel, name=f["name"], type=f.get("type", ""), color=f.get("color", ""), ) # Faction → CONTROLS → Location for loc_id in f.get("territory", []): session.run( """ MATCH (fac:Faction {id: $fid}) MATCH (loc:Location {id: $lid}) MERGE (fac)-[:CONTROLS {novel: $novel, vol: $vol}]->(loc) """, fid=_node_id(novel, raw_id), lid=_node_id(novel, loc_id), novel=novel, vol=vol, ) # Faction → HAS_MEMBER → Character for figure in f.get("key_figures", []): if not figure: continue session.run( """ MERGE (c:Character {novel: $novel, name: $name}) WITH c MATCH (fac:Faction {id: $fid}) MERGE (fac)-[:HAS_MEMBER {novel: $novel, vol: $vol}]->(c) """, novel=novel, name=figure, fid=_node_id(novel, raw_id), vol=vol, ) # Character → LEADS → Faction for leader_name in _split_leaders(f.get("leader", "")): session.run( """ MERGE (c:Character {novel: $novel, name: $name}) WITH c MATCH (fac:Faction {id: $fid}) MERGE (c)-[:LEADS {novel: $novel, vol: $vol}]->(fac) """, novel=novel, name=leader_name, fid=_node_id(novel, raw_id), vol=vol, ) def _import_routes(session, novel: str, routes: list[dict], vol: int): for route in routes: char_color = route.get("color", "") char_names = _split_characters(route["character"]) for char_name in char_names: session.run( "MERGE (c:Character {novel: $novel, name: $name}) SET c.color = $color", novel=novel, name=char_name, color=char_color, ) for wp in route.get("route", []): loc_id = wp.get("location") if not loc_id: continue # lat/lng only → 跳过(无命名地点节点) chapter = wp.get("chapter", 0) event = wp.get("event", "") session.run( """ MATCH (c:Character {novel: $novel, name: $char}) MATCH (l:Location {id: $lid}) MERGE (c)-[v:VISITED {novel: $novel, vol: $vol, chapter: $chapter}]->(l) SET v.event = $event """, novel=novel, char=char_name, lid=_node_id(novel, loc_id), vol=vol, chapter=chapter, event=event, ) def _import_events(session, novel: str, events: list[dict], vol: int): for i, evt in enumerate(events): event_id = _node_id(novel, f"v{vol:02d}_e{i:03d}") chapter = evt.get("chapter", 0) description = evt.get("event", "") session.run( """ MERGE (e:Event {id: $id}) SET e.novel = $novel, e.vol = $vol, e.chapter = $chapter, e.description = $description """, id=event_id, novel=novel, vol=vol, chapter=chapter, description=description, ) # 只在有命名地点 id 时建立关系(lat/lng 条目跳过) loc_ref = evt.get("location") if isinstance(loc_ref, str) and loc_ref: session.run( """ MATCH (e:Event {id: $eid}) MATCH (l:Location {id: $lid}) MERGE (e)-[:OCCURRED_AT]->(l) """, eid=event_id, lid=_node_id(novel, loc_ref), ) # ── 主入口 ──────────────────────────────────────────────── def build_graph(driver: Driver, novel: str, clear: bool = False): if novel not in SUPPORTED_NOVELS: raise ValueError(f"不支持的小说标识: {novel},可选: {', '.join(SUPPORTED_NOVELS)}") if clear: print("Clearing existing graph data...") with driver.session() as s: s.run("MATCH (n) DETACH DELETE n") print("Setting up schema constraints and indexes...") setup_schema(driver) data_dir = _get_data_dir(novel) imported = 0 print(f"Importing novel: {novel} ({data_dir})") for vol_num, filepath in _iter_volume_files(data_dir): with open(filepath, encoding="utf-8") as f: data = json.load(f) with driver.session() as session: _import_locations(session, novel, data.get("locations", [])) _import_factions(session, novel, data.get("factions", []), vol_num) _import_routes(session, novel, data.get("character_routes", []), vol_num) _import_events(session, novel, data.get("key_events", []), vol_num) imported += 1 print(f" [✓] {novel}/vol{vol_num:02d} imported") print(f"Done. Imported {imported} volumes for {novel}.\n") def build_all_graphs(driver: Driver, novels: list[str] | None = None, clear: bool = False): selected = novels or list(SUPPORTED_NOVELS) if not selected: raise ValueError("novels 不能为空") for i, novel in enumerate(selected): build_graph(driver, novel=novel, clear=(clear and i == 0))