284 lines
10 KiB
Python
284 lines
10 KiB
Python
"""
|
||
JSON → Neo4j 导入脚本。
|
||
|
||
图谱 Schema:
|
||
节点: Character, Location, Faction, Event
|
||
关系: VISITED, CONTROLS, HAS_MEMBER, LEADS, OCCURRED_AT
|
||
"""
|
||
|
||
import json
|
||
from pathlib import Path
|
||
from neo4j import Driver
|
||
|
||
DATA_DIR_BASE = Path(__file__).parent.parent / "fiction"
|
||
SUPPORTED_NOVELS = ("dtslz", "ldj", "tlbb")
|
||
|
||
|
||
# ── 工具函数 ──────────────────────────────────────────────
|
||
|
||
def _split_characters(name: str) -> list[str]:
|
||
"""'寇仲 & 徐子陵' → ['寇仲', '徐子陵']"""
|
||
return [c.strip() for c in name.split("&") if c.strip()]
|
||
|
||
|
||
def _split_leaders(leader: str) -> list[str]:
|
||
"""'翟让/李密' → ['翟让', '李密'];过滤'未提及'"""
|
||
parts = [p.strip() for p in leader.split("/") if p.strip()]
|
||
return [p for p in parts if p not in ("未提及", "")]
|
||
|
||
|
||
def _node_id(novel: str, raw_id: str) -> str:
|
||
return f"{novel}:{raw_id}"
|
||
|
||
|
||
def _get_data_dir(novel: str) -> Path:
|
||
data_dir = DATA_DIR_BASE / novel / "data"
|
||
if not data_dir.exists():
|
||
raise ValueError(f"小说数据目录不存在: {data_dir}")
|
||
return data_dir
|
||
|
||
|
||
def _iter_volume_files(data_dir: Path):
|
||
for filepath in sorted(data_dir.glob("vol*.json")):
|
||
stem = filepath.stem # vol01
|
||
if len(stem) >= 5 and stem[:3] == "vol" and stem[3:].isdigit():
|
||
yield int(stem[3:]), filepath
|
||
|
||
|
||
# ── Schema 初始化 ─────────────────────────────────────────
|
||
|
||
def _drop_legacy_constraints(session):
|
||
"""兼容旧版本:移除 Character(name) 唯一约束,改为 (novel, name) 复合唯一约束。"""
|
||
rows = session.run(
|
||
"""
|
||
SHOW CONSTRAINTS
|
||
YIELD name, labelsOrTypes, properties
|
||
RETURN name, labelsOrTypes, properties
|
||
"""
|
||
)
|
||
for row in rows:
|
||
labels = row.get("labelsOrTypes") or []
|
||
properties = row.get("properties") or []
|
||
if labels == ["Character"] and properties == ["name"]:
|
||
constraint_name = row["name"].replace("`", "")
|
||
session.run(f"DROP CONSTRAINT `{constraint_name}` IF EXISTS")
|
||
|
||
|
||
def setup_schema(driver: Driver):
|
||
with driver.session() as s:
|
||
_drop_legacy_constraints(s)
|
||
s.run("CREATE CONSTRAINT IF NOT EXISTS FOR (n:Character) REQUIRE (n.novel, n.name) IS UNIQUE")
|
||
s.run("CREATE CONSTRAINT IF NOT EXISTS FOR (n:Location) REQUIRE n.id IS UNIQUE")
|
||
s.run("CREATE CONSTRAINT IF NOT EXISTS FOR (n:Faction) REQUIRE n.id IS UNIQUE")
|
||
s.run("CREATE CONSTRAINT IF NOT EXISTS FOR (n:Event) REQUIRE n.id IS UNIQUE")
|
||
s.run("CREATE INDEX IF NOT EXISTS FOR (e:Event) ON (e.vol)")
|
||
s.run("CREATE INDEX IF NOT EXISTS FOR (e:Event) ON (e.novel)")
|
||
s.run("CREATE INDEX IF NOT EXISTS FOR (c:Character) ON (c.novel)")
|
||
s.run("CREATE INDEX IF NOT EXISTS FOR (l:Location) ON (l.novel)")
|
||
s.run("CREATE INDEX IF NOT EXISTS FOR (f:Faction) ON (f.novel)")
|
||
s.run("CREATE INDEX IF NOT EXISTS FOR ()-[r:VISITED]-() ON (r.vol)")
|
||
s.run("CREATE INDEX IF NOT EXISTS FOR ()-[r:CONTROLS]-() ON (r.vol)")
|
||
|
||
|
||
# ── 各类型导入 ────────────────────────────────────────────
|
||
|
||
def _import_locations(session, novel: str, locations: list[dict]):
|
||
for loc in locations:
|
||
raw_id = loc["id"]
|
||
session.run(
|
||
"""
|
||
MERGE (l:Location {id: $id})
|
||
SET l.name = $name,
|
||
l.source_id = $source_id,
|
||
l.novel = $novel,
|
||
l.type = $type,
|
||
l.lat = $lat,
|
||
l.lng = $lng
|
||
""",
|
||
id=_node_id(novel, raw_id),
|
||
source_id=raw_id,
|
||
novel=novel,
|
||
name=loc["name"],
|
||
type=loc.get("type", ""),
|
||
lat=loc.get("lat"),
|
||
lng=loc.get("lng"),
|
||
)
|
||
|
||
|
||
def _import_factions(session, novel: str, factions: list[dict], vol: int):
|
||
for f in factions:
|
||
raw_id = f["id"]
|
||
session.run(
|
||
"""
|
||
MERGE (n:Faction {id: $id})
|
||
SET n.name = $name,
|
||
n.source_id = $source_id,
|
||
n.novel = $novel,
|
||
n.type = $type,
|
||
n.color = $color
|
||
""",
|
||
id=_node_id(novel, raw_id),
|
||
source_id=raw_id,
|
||
novel=novel,
|
||
name=f["name"],
|
||
type=f.get("type", ""), color=f.get("color", ""),
|
||
)
|
||
|
||
# Faction → CONTROLS → Location
|
||
for loc_id in f.get("territory", []):
|
||
session.run(
|
||
"""
|
||
MATCH (fac:Faction {id: $fid})
|
||
MATCH (loc:Location {id: $lid})
|
||
MERGE (fac)-[:CONTROLS {novel: $novel, vol: $vol}]->(loc)
|
||
""",
|
||
fid=_node_id(novel, raw_id),
|
||
lid=_node_id(novel, loc_id),
|
||
novel=novel,
|
||
vol=vol,
|
||
)
|
||
|
||
# Faction → HAS_MEMBER → Character
|
||
for figure in f.get("key_figures", []):
|
||
if not figure:
|
||
continue
|
||
session.run(
|
||
"""
|
||
MERGE (c:Character {novel: $novel, name: $name})
|
||
WITH c
|
||
MATCH (fac:Faction {id: $fid})
|
||
MERGE (fac)-[:HAS_MEMBER {novel: $novel, vol: $vol}]->(c)
|
||
""",
|
||
novel=novel,
|
||
name=figure,
|
||
fid=_node_id(novel, raw_id),
|
||
vol=vol,
|
||
)
|
||
|
||
# Character → LEADS → Faction
|
||
for leader_name in _split_leaders(f.get("leader", "")):
|
||
session.run(
|
||
"""
|
||
MERGE (c:Character {novel: $novel, name: $name})
|
||
WITH c
|
||
MATCH (fac:Faction {id: $fid})
|
||
MERGE (c)-[:LEADS {novel: $novel, vol: $vol}]->(fac)
|
||
""",
|
||
novel=novel,
|
||
name=leader_name,
|
||
fid=_node_id(novel, raw_id),
|
||
vol=vol,
|
||
)
|
||
|
||
|
||
def _import_routes(session, novel: str, routes: list[dict], vol: int):
|
||
for route in routes:
|
||
char_color = route.get("color", "")
|
||
char_names = _split_characters(route["character"])
|
||
|
||
for char_name in char_names:
|
||
session.run(
|
||
"MERGE (c:Character {novel: $novel, name: $name}) SET c.color = $color",
|
||
novel=novel,
|
||
name=char_name,
|
||
color=char_color,
|
||
)
|
||
|
||
for wp in route.get("route", []):
|
||
loc_id = wp.get("location")
|
||
if not loc_id:
|
||
continue # lat/lng only → 跳过(无命名地点节点)
|
||
chapter = wp.get("chapter", 0)
|
||
event = wp.get("event", "")
|
||
|
||
session.run(
|
||
"""
|
||
MATCH (c:Character {novel: $novel, name: $char})
|
||
MATCH (l:Location {id: $lid})
|
||
MERGE (c)-[v:VISITED {novel: $novel, vol: $vol, chapter: $chapter}]->(l)
|
||
SET v.event = $event
|
||
""",
|
||
novel=novel,
|
||
char=char_name,
|
||
lid=_node_id(novel, loc_id),
|
||
vol=vol, chapter=chapter, event=event,
|
||
)
|
||
|
||
|
||
def _import_events(session, novel: str, events: list[dict], vol: int):
|
||
for i, evt in enumerate(events):
|
||
event_id = _node_id(novel, f"v{vol:02d}_e{i:03d}")
|
||
chapter = evt.get("chapter", 0)
|
||
description = evt.get("event", "")
|
||
|
||
session.run(
|
||
"""
|
||
MERGE (e:Event {id: $id})
|
||
SET e.novel = $novel,
|
||
e.vol = $vol,
|
||
e.chapter = $chapter,
|
||
e.description = $description
|
||
""",
|
||
id=event_id,
|
||
novel=novel,
|
||
vol=vol,
|
||
chapter=chapter,
|
||
description=description,
|
||
)
|
||
|
||
# 只在有命名地点 id 时建立关系(lat/lng 条目跳过)
|
||
loc_ref = evt.get("location")
|
||
if isinstance(loc_ref, str) and loc_ref:
|
||
session.run(
|
||
"""
|
||
MATCH (e:Event {id: $eid})
|
||
MATCH (l:Location {id: $lid})
|
||
MERGE (e)-[:OCCURRED_AT]->(l)
|
||
""",
|
||
eid=event_id,
|
||
lid=_node_id(novel, loc_ref),
|
||
)
|
||
|
||
|
||
# ── 主入口 ────────────────────────────────────────────────
|
||
|
||
def build_graph(driver: Driver, novel: str, clear: bool = False):
|
||
if novel not in SUPPORTED_NOVELS:
|
||
raise ValueError(f"不支持的小说标识: {novel},可选: {', '.join(SUPPORTED_NOVELS)}")
|
||
|
||
if clear:
|
||
print("Clearing existing graph data...")
|
||
with driver.session() as s:
|
||
s.run("MATCH (n) DETACH DELETE n")
|
||
|
||
print("Setting up schema constraints and indexes...")
|
||
setup_schema(driver)
|
||
|
||
data_dir = _get_data_dir(novel)
|
||
imported = 0
|
||
print(f"Importing novel: {novel} ({data_dir})")
|
||
for vol_num, filepath in _iter_volume_files(data_dir):
|
||
|
||
with open(filepath, encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
|
||
with driver.session() as session:
|
||
_import_locations(session, novel, data.get("locations", []))
|
||
_import_factions(session, novel, data.get("factions", []), vol_num)
|
||
_import_routes(session, novel, data.get("character_routes", []), vol_num)
|
||
_import_events(session, novel, data.get("key_events", []), vol_num)
|
||
|
||
imported += 1
|
||
print(f" [✓] {novel}/vol{vol_num:02d} imported")
|
||
|
||
print(f"Done. Imported {imported} volumes for {novel}.\n")
|
||
|
||
|
||
def build_all_graphs(driver: Driver, novels: list[str] | None = None, clear: bool = False):
|
||
selected = novels or list(SUPPORTED_NOVELS)
|
||
if not selected:
|
||
raise ValueError("novels 不能为空")
|
||
|
||
for i, novel in enumerate(selected):
|
||
build_graph(driver, novel=novel, clear=(clear and i == 0))
|