修改GraphRAG后端,另外两本小说入库

This commit is contained in:
2026-04-01 15:55:53 +08:00
parent 403b7dfacf
commit bfcf840013
4 changed files with 170 additions and 70 deletions

View File

@@ -1,5 +1,5 @@
"""
大唐双龙传 GraphRAG — FastAPI 后端
武侠三部曲 GraphRAG — FastAPI 后端
端点:
GET /api/health — 健康检查(含 Neo4j 连通性)
@@ -13,11 +13,11 @@ from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from graph_query import get_driver, get_graph_stats
from graph_builder import build_graph
from graph_builder import build_all_graphs
from llm_router import answer_question
import uvicorn
app = FastAPI(title="大唐双龙传 GraphRAG API", version="1.0.0")
app = FastAPI(title="武侠三部曲 GraphRAG API", version="1.1.0")
app.add_middleware(
CORSMiddleware,
@@ -37,6 +37,7 @@ class ChatRequest(BaseModel):
class ImportRequest(BaseModel):
clear: bool = False # True = 先清空图谱再重新导入
novels: list[str] | None = None # 默认导入 dtslz/ldj/tlbb
# ── Endpoints ─────────────────────────────────────────────
@@ -62,10 +63,10 @@ def stats():
@app.post("/api/import")
def import_data(req: ImportRequest = ImportRequest()):
"""导入所有卷数据到 Neo4j耗时约 1-3 分钟,请勿重复调用)"""
"""导入小说数据到 Neo4j耗时约 1-3 分钟,请勿重复调用)"""
driver = get_driver()
try:
build_graph(driver, clear=req.clear)
build_all_graphs(driver, novels=req.novels, clear=req.clear)
stats = get_graph_stats()
return {"status": "ok", "stats": stats}
except Exception as e:

View File

@@ -10,7 +10,8 @@ import json
from pathlib import Path
from neo4j import Driver
DATA_DIR = Path(__file__).parent.parent / "data"
DATA_DIR_BASE = Path(__file__).parent.parent / "fiction"
SUPPORTED_NOVELS = ("dtslz", "ldj", "tlbb")
# ── 工具函数 ──────────────────────────────────────────────
@@ -26,32 +27,77 @@ def _split_leaders(leader: str) -> list[str]:
return [p for p in parts if p not in ("未提及", "")]
def _node_id(novel: str, raw_id: str) -> str:
return f"{novel}:{raw_id}"
def _get_data_dir(novel: str) -> Path:
data_dir = DATA_DIR_BASE / novel / "data"
if not data_dir.exists():
raise ValueError(f"小说数据目录不存在: {data_dir}")
return data_dir
def _iter_volume_files(data_dir: Path):
for filepath in sorted(data_dir.glob("vol*.json")):
stem = filepath.stem # vol01
if len(stem) >= 5 and stem[:3] == "vol" and stem[3:].isdigit():
yield int(stem[3:]), filepath
# ── Schema 初始化 ─────────────────────────────────────────
def _drop_legacy_constraints(session):
"""兼容旧版本:移除 Character(name) 唯一约束,改为 (novel, name) 复合唯一约束。"""
rows = session.run(
"""
SHOW CONSTRAINTS
YIELD name, labelsOrTypes, properties
RETURN name, labelsOrTypes, properties
"""
)
for row in rows:
labels = row.get("labelsOrTypes") or []
properties = row.get("properties") or []
if labels == ["Character"] and properties == ["name"]:
constraint_name = row["name"].replace("`", "")
session.run(f"DROP CONSTRAINT `{constraint_name}` IF EXISTS")
def setup_schema(driver: Driver):
with driver.session() as s:
s.run("CREATE CONSTRAINT IF NOT EXISTS FOR (n:Character) REQUIRE n.name IS UNIQUE")
_drop_legacy_constraints(s)
s.run("CREATE CONSTRAINT IF NOT EXISTS FOR (n:Character) REQUIRE (n.novel, n.name) IS UNIQUE")
s.run("CREATE CONSTRAINT IF NOT EXISTS FOR (n:Location) REQUIRE n.id IS UNIQUE")
s.run("CREATE CONSTRAINT IF NOT EXISTS FOR (n:Faction) REQUIRE n.id IS UNIQUE")
s.run("CREATE CONSTRAINT IF NOT EXISTS FOR (n:Event) REQUIRE n.id IS UNIQUE")
s.run("CREATE INDEX IF NOT EXISTS FOR (e:Event) ON (e.vol)")
s.run("CREATE INDEX IF NOT EXISTS FOR (e:Event) ON (e.novel)")
s.run("CREATE INDEX IF NOT EXISTS FOR (c:Character) ON (c.novel)")
s.run("CREATE INDEX IF NOT EXISTS FOR (l:Location) ON (l.novel)")
s.run("CREATE INDEX IF NOT EXISTS FOR (f:Faction) ON (f.novel)")
s.run("CREATE INDEX IF NOT EXISTS FOR ()-[r:VISITED]-() ON (r.vol)")
s.run("CREATE INDEX IF NOT EXISTS FOR ()-[r:CONTROLS]-() ON (r.vol)")
# ── 各类型导入 ────────────────────────────────────────────
def _import_locations(session, locations: list[dict]):
def _import_locations(session, novel: str, locations: list[dict]):
for loc in locations:
raw_id = loc["id"]
session.run(
"""
MERGE (l:Location {id: $id})
SET l.name = $name,
l.source_id = $source_id,
l.novel = $novel,
l.type = $type,
l.lat = $lat,
l.lng = $lng
""",
id=loc["id"],
id=_node_id(novel, raw_id),
source_id=raw_id,
novel=novel,
name=loc["name"],
type=loc.get("type", ""),
lat=loc.get("lat"),
@@ -59,14 +105,22 @@ def _import_locations(session, locations: list[dict]):
)
def _import_factions(session, factions: list[dict], vol: int):
def _import_factions(session, novel: str, factions: list[dict], vol: int):
for f in factions:
raw_id = f["id"]
session.run(
"""
MERGE (n:Faction {id: $id})
SET n.name = $name, n.type = $type, n.color = $color
SET n.name = $name,
n.source_id = $source_id,
n.novel = $novel,
n.type = $type,
n.color = $color
""",
id=f["id"], name=f["name"],
id=_node_id(novel, raw_id),
source_id=raw_id,
novel=novel,
name=f["name"],
type=f.get("type", ""), color=f.get("color", ""),
)
@@ -76,9 +130,12 @@ def _import_factions(session, factions: list[dict], vol: int):
"""
MATCH (fac:Faction {id: $fid})
MATCH (loc:Location {id: $lid})
MERGE (fac)-[:CONTROLS {vol: $vol}]->(loc)
MERGE (fac)-[:CONTROLS {novel: $novel, vol: $vol}]->(loc)
""",
fid=f["id"], lid=loc_id, vol=vol,
fid=_node_id(novel, raw_id),
lid=_node_id(novel, loc_id),
novel=novel,
vol=vol,
)
# Faction → HAS_MEMBER → Character
@@ -87,36 +144,44 @@ def _import_factions(session, factions: list[dict], vol: int):
continue
session.run(
"""
MERGE (c:Character {name: $name})
MERGE (c:Character {novel: $novel, name: $name})
WITH c
MATCH (fac:Faction {id: $fid})
MERGE (fac)-[:HAS_MEMBER {vol: $vol}]->(c)
MERGE (fac)-[:HAS_MEMBER {novel: $novel, vol: $vol}]->(c)
""",
name=figure, fid=f["id"], vol=vol,
novel=novel,
name=figure,
fid=_node_id(novel, raw_id),
vol=vol,
)
# Character → LEADS → Faction
for leader_name in _split_leaders(f.get("leader", "")):
session.run(
"""
MERGE (c:Character {name: $name})
MERGE (c:Character {novel: $novel, name: $name})
WITH c
MATCH (fac:Faction {id: $fid})
MERGE (c)-[:LEADS {vol: $vol}]->(fac)
MERGE (c)-[:LEADS {novel: $novel, vol: $vol}]->(fac)
""",
name=leader_name, fid=f["id"], vol=vol,
novel=novel,
name=leader_name,
fid=_node_id(novel, raw_id),
vol=vol,
)
def _import_routes(session, routes: list[dict], vol: int):
def _import_routes(session, novel: str, routes: list[dict], vol: int):
for route in routes:
char_color = route.get("color", "")
char_names = _split_characters(route["character"])
for char_name in char_names:
session.run(
"MERGE (c:Character {name: $name}) SET c.color = $color",
name=char_name, color=char_color,
"MERGE (c:Character {novel: $novel, name: $name}) SET c.color = $color",
novel=novel,
name=char_name,
color=char_color,
)
for wp in route.get("route", []):
@@ -128,28 +193,37 @@ def _import_routes(session, routes: list[dict], vol: int):
session.run(
"""
MATCH (c:Character {name: $char})
MATCH (c:Character {novel: $novel, name: $char})
MATCH (l:Location {id: $lid})
MERGE (c)-[v:VISITED {vol: $vol, chapter: $chapter}]->(l)
MERGE (c)-[v:VISITED {novel: $novel, vol: $vol, chapter: $chapter}]->(l)
SET v.event = $event
""",
char=char_name, lid=loc_id,
novel=novel,
char=char_name,
lid=_node_id(novel, loc_id),
vol=vol, chapter=chapter, event=event,
)
def _import_events(session, events: list[dict], vol: int):
def _import_events(session, novel: str, events: list[dict], vol: int):
for i, evt in enumerate(events):
event_id = f"v{vol:02d}_e{i:03d}"
event_id = _node_id(novel, f"v{vol:02d}_e{i:03d}")
chapter = evt.get("chapter", 0)
description = evt.get("event", "")
session.run(
"""
MERGE (e:Event {id: $id})
SET e.vol = $vol, e.chapter = $chapter, e.description = $description
SET e.novel = $novel,
e.vol = $vol,
e.chapter = $chapter,
e.description = $description
""",
id=event_id, vol=vol, chapter=chapter, description=description,
id=event_id,
novel=novel,
vol=vol,
chapter=chapter,
description=description,
)
# 只在有命名地点 id 时建立关系lat/lng 条目跳过)
@@ -161,13 +235,17 @@ def _import_events(session, events: list[dict], vol: int):
MATCH (l:Location {id: $lid})
MERGE (e)-[:OCCURRED_AT]->(l)
""",
eid=event_id, lid=loc_ref,
eid=event_id,
lid=_node_id(novel, loc_ref),
)
# ── 主入口 ────────────────────────────────────────────────
def build_graph(driver: Driver, clear: bool = False):
def build_graph(driver: Driver, novel: str, clear: bool = False):
if novel not in SUPPORTED_NOVELS:
raise ValueError(f"不支持的小说标识: {novel},可选: {', '.join(SUPPORTED_NOVELS)}")
if clear:
print("Clearing existing graph data...")
with driver.session() as s:
@@ -176,22 +254,30 @@ def build_graph(driver: Driver, clear: bool = False):
print("Setting up schema constraints and indexes...")
setup_schema(driver)
data_dir = _get_data_dir(novel)
imported = 0
for vol_num in range(1, 64):
filepath = DATA_DIR / f"vol{vol_num:02d}.json"
if not filepath.exists():
continue
print(f"Importing novel: {novel} ({data_dir})")
for vol_num, filepath in _iter_volume_files(data_dir):
with open(filepath, encoding="utf-8") as f:
data = json.load(f)
with driver.session() as session:
_import_locations(session, data.get("locations", []))
_import_factions(session, data.get("factions", []), vol_num)
_import_routes(session, data.get("character_routes", []), vol_num)
_import_events(session, data.get("key_events", []), vol_num)
_import_locations(session, novel, data.get("locations", []))
_import_factions(session, novel, data.get("factions", []), vol_num)
_import_routes(session, novel, data.get("character_routes", []), vol_num)
_import_events(session, novel, data.get("key_events", []), vol_num)
imported += 1
print(f" [✓] vol{vol_num:02d} imported")
print(f" [✓] {novel}/vol{vol_num:02d} imported")
print(f"\nDone. Imported {imported} volumes.")
print(f"Done. Imported {imported} volumes for {novel}.\n")
def build_all_graphs(driver: Driver, novels: list[str] | None = None, clear: bool = False):
selected = novels or list(SUPPORTED_NOVELS)
if not selected:
raise ValueError("novels 不能为空")
for i, novel in enumerate(selected):
build_graph(driver, novel=novel, clear=(clear and i == 0))

View File

@@ -1,44 +1,49 @@
SCHEMA_DESCRIPTION = """
大唐双龙传知识图谱 SchemaNeo4j
武侠三部曲知识图谱 SchemaNeo4j
小说标识novel
- dtslz = 大唐双龙传
- ldj = 鹿鼎记
- tlbb = 天龙八部
节点类型:
- Character {name, color}
主要人物:寇仲、徐子陵、宇文化及、傅君婥、宋师道、李靖、石青璇、李密、李子通、
杜伏威、跋锋寒、李世民、李渊、宋缺、寇仲、毕玄、阴后
- Character {novel, name, color}
同名人物在不同小说中会按 novel 隔离
- Location {id, name, type, lat, lng}
- Location {id, novel, source_id, name, type, lat, lng}
type 取值city / town / waterway / landmark / grassland / forest / region
主要城市:扬州(yangzhou)、洛阳(luoyang)、长安/大兴(daxing)、丹阳(danyang)、
梁都、历阳(liyang)、江陵
id 为全局唯一键,格式:{novel}:{source_id}
- Faction {id, name, type, color}
- Faction {id, novel, source_id, name, type, color}
type 取值:朝廷 / 门阀 / 义军 / 游牧政权 / 江湖势力 / 地方军阀 / 帮会 / 外族
主要势力:隋朝(sui)、李阀(li_clan)、宋阀(song_clan)、宇文阀(yuwen)、
瓦岗军(wagang_army)、突厥(turks)、慈航静斋、阴癸派
id 为全局唯一键,格式:{novel}:{source_id}
- Event {id, vol, chapter, description}
vol 是卷号(整数 1-63chapter 是章节号
- Event {id, novel, vol, chapter, description}
id 为全局唯一键,格式:{novel}:vXX_eYYY
vol 是该小说内部卷号整数chapter 是章节号
关系类型:
- (Character)-[:VISITED {vol, chapter, event}]->(Location)
- (Character)-[:VISITED {novel, vol, chapter, event}]->(Location)
人物在某卷某章到访某地
- (Faction)-[:CONTROLS {vol}]->(Location)
- (Faction)-[:CONTROLS {novel, vol}]->(Location)
势力在某卷控制某地
- (Faction)-[:HAS_MEMBER {vol}]->(Character)
- (Faction)-[:HAS_MEMBER {novel, vol}]->(Character)
势力在某卷拥有某成员
- (Character)-[:LEADS {vol}]->(Faction)
- (Character)-[:LEADS {novel, vol}]->(Faction)
人物在某卷领导某势力
- (Event)-[:OCCURRED_AT]->(Location)
事件发生于某地
注意vol 属性用整数表示(如 vol=1 代表第一卷vol=20 代表第二十卷)
查询建议:
- 用户明确提到小说名时,务必加 novel 过滤
- 未指定小说时,可跨小说查询
"""
CYPHER_SYSTEM_PROMPT = f"""你是大唐双龙传知识图谱的 Cypher 查询专家。
CYPHER_SYSTEM_PROMPT = f"""你是武侠三部曲知识图谱的 Cypher 查询专家。
{SCHEMA_DESCRIPTION}
@@ -49,26 +54,30 @@ CYPHER_SYSTEM_PROMPT = f"""你是大唐双龙传知识图谱的 Cypher 查询专
4. 默认加 LIMIT 30除非用户指定数量
5. 使用 DISTINCT 去重
6. 属性名用 n.name、r.vol 格式,不要用整个节点
7. 如果问题完全无法用图谱回答只输出单词UNSUPPORTED
7. 若问题指定小说,优先使用 novel 过滤:
- 大唐双龙传 => novel = "dtslz"
- 鹿鼎记 => novel = "ldj"
- 天龙八部 => novel = "tlbb"
8. 如果问题完全无法用图谱回答只输出单词UNSUPPORTED
示例:
Q: 寇仲去过哪些地方?
A: MATCH (c:Character {{name: "寇仲"}})-[v:VISITED]->(l:Location) RETURN DISTINCT l.name, l.type, min(v.vol) AS first_vol ORDER BY first_vol LIMIT 30
A: MATCH (c:Character {{novel: "dtslz", name: "寇仲"}})-[v:VISITED]->(l:Location) RETURN DISTINCT l.name, l.type, min(v.vol) AS first_vol ORDER BY first_vol LIMIT 30
Q: 第30卷时宇文阀控制哪些城市
A: MATCH (f:Faction {{name: "宇文阀"}})-[r:CONTROLS]->(l:Location) WHERE r.vol <= 30 AND l.type = "city" RETURN DISTINCT l.name, r.vol ORDER BY r.vol LIMIT 30
A: MATCH (f:Faction {{novel: "dtslz", name: "宇文阀"}})-[r:CONTROLS]->(l:Location) WHERE r.vol <= 30 AND l.type = "city" RETURN DISTINCT l.name, r.vol ORDER BY r.vol LIMIT 30
Q: 扬州发生过哪些重要事件?
A: MATCH (e:Event)-[:OCCURRED_AT]->(l:Location {{name: "扬州"}}) RETURN e.description, e.vol, e.chapter ORDER BY e.vol, e.chapter LIMIT 30
A: MATCH (e:Event {{novel: "dtslz"}})-[:OCCURRED_AT]->(l:Location {{novel: "dtslz", name: "扬州"}}) RETURN e.description, e.vol, e.chapter ORDER BY e.vol, e.chapter LIMIT 30
Q: 谁领导过瓦岗军?
A: MATCH (c:Character)-[r:LEADS]->(f:Faction {{name: "瓦岗军"}}) RETURN DISTINCT c.name, r.vol ORDER BY r.vol LIMIT 30
A: MATCH (c:Character)-[r:LEADS]->(f:Faction {{novel: "dtslz", name: "瓦岗军"}}) RETURN DISTINCT c.name, r.vol ORDER BY r.vol LIMIT 30
Q: 寇仲和哪些势力有过关联
A: MATCH (c:Character {{name: "寇仲"}})-[:VISITED]->(l:Location)<-[:CONTROLS]-(f:Faction) RETURN DISTINCT f.name, f.type LIMIT 30
Q: 韦小宝加入过哪些势力
A: MATCH (c:Character {{novel: "ldj", name: "韦小宝"}})<-[:HAS_MEMBER]-(f:Faction) RETURN DISTINCT f.name, f.type LIMIT 30
"""
ANSWER_SYSTEM_PROMPT = """你是大唐双龙传的知识问答助手,熟悉小说中的人物、势力、地点和事件。
ANSWER_SYSTEM_PROMPT = """你是武侠三部曲(大唐双龙传、鹿鼎记、天龙八部)的知识问答助手,熟悉小说中的人物、势力、地点和事件。
请根据知识图谱的查询结果,用中文给出准确、自然的回答:
- 直接回答问题,语言简洁流畅

View File

@@ -5,25 +5,29 @@
用法:
python run_import.py # 增量导入MERGE不删除现有数据
python run_import.py --clear # 清空图谱后全量重新导入
python run_import.py ldj # 仅导入鹿鼎记
python run_import.py dtslz tlbb --clear # 清空后导入指定小说
"""
import sys
from dotenv import load_dotenv
from graph_query import get_driver
from graph_builder import build_graph
from graph_builder import build_all_graphs, SUPPORTED_NOVELS
load_dotenv()
def main():
clear = "--clear" in sys.argv
novels = [arg for arg in sys.argv[1:] if not arg.startswith("--")]
selected = novels or list(SUPPORTED_NOVELS)
print("Connecting to Neo4j...")
driver = get_driver()
driver.verify_connectivity()
print("Connected.\n")
build_graph(driver, clear=clear)
build_all_graphs(driver, novels=selected, clear=clear)
print("\nGraph stats:")
from graph_query import get_graph_stats