You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
745 lines
36 KiB
745 lines
36 KiB
import os
|
|
import asyncio
|
|
import re
|
|
import json
|
|
from typing import List, Dict, Any, Optional
|
|
import pymysql
|
|
|
|
# LangChain Imports
|
|
try:
|
|
from langchain_community.document_loaders import PyPDFLoader
|
|
from langchain_text_splitters import RecursiveCharacterTextSplitter
|
|
from langchain_community.vectorstores import FAISS
|
|
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
|
|
from langchain_core.prompts import ChatPromptTemplate
|
|
from langchain_core.runnables import RunnablePassthrough
|
|
from langchain_core.output_parsers import StrOutputParser
|
|
from langchain_core.embeddings import Embeddings
|
|
from zhipuai import ZhipuAI
|
|
except ImportError as e:
|
|
print(f"Warning: Dependencies missing ({e}). Core rules will still work.")
|
|
|
|
# API Configuration
|
|
ZHIPU_API_KEY = "dc8bfe33db15c49026cedbf5ffa461e0.1grbcRvEZyADTWJi"
|
|
QWEN_API_KEY = "sk-c7d5687a4d044489974b65bde467e93e"
|
|
DEEPSEEK_API_KEY = "sk-ee2871ac206c4cadbfa60d06dba0a8fe"
|
|
|
|
ZHIPU_BASE_URL = "https://open.bigmodel.cn/api/paas/v4/"
|
|
QWEN_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
|
|
DEEPSEEK_BASE_URL = "https://api.deepseek.com"
|
|
|
|
PDF_PATH = r"d:\Projects\US_RPT_QA\超声医学质量控制管理规范.pdf"
|
|
INDEX_PATH = os.path.join(os.path.dirname(__file__), "faiss_index")
|
|
|
|
# 数据库配置(与 imurs 保持一致)
|
|
DB_CONFIG = {
|
|
"host": "39.108.252.248",
|
|
"port": 3306,
|
|
"user": "root",
|
|
"password": "Zp.123456",
|
|
"database": "yd_gzlps_test",
|
|
"charset": "utf8mb4",
|
|
"cursorclass": pymysql.cursors.DictCursor
|
|
}
|
|
|
|
# Config Management for Engineering Approach
|
|
class ConfigManager:
|
|
def __init__(self):
|
|
self.base_dir = os.path.dirname(os.path.abspath(__file__))
|
|
self.vocab_dir = os.path.join(self.base_dir, "data", "vocab")
|
|
self.config_dir = os.path.join(self.base_dir, "data", "config")
|
|
self.load_all()
|
|
|
|
def load_all(self):
|
|
# 实时加载以确保用户修改 JSON 后立即生效,无需重启
|
|
self.l1_standard = self._load_json(os.path.join(self.vocab_dir, "l1_standard.json"), [])
|
|
self.l2_hospital = self._load_json(os.path.join(self.vocab_dir, "l2_hospital.json"), [])
|
|
self.l3_mapping = self._load_json(os.path.join(self.vocab_dir, "l3_mapping.json"), {})
|
|
self.pinyin_map = self._load_json(os.path.join(self.vocab_dir, "pinyin_map.json"), {})
|
|
self.scoring_standard = self._load_json(os.path.join(self.config_dir, "scoring_standard.json"), {})
|
|
# self._load_from_db() # 优先使用本地 JSON 词库,如需同步数据库请手动开启
|
|
|
|
def _load_from_db(self):
|
|
try:
|
|
# 增加连接超时,防止数据库不可用时阻塞整个请求
|
|
conn = pymysql.connect(**DB_CONFIG, connect_timeout=3)
|
|
with conn.cursor() as cursor:
|
|
sql = "SELECT raw_text, correct_text, vocab_type FROM ai_qc_vocabulary WHERE status = 'approved'"
|
|
cursor.execute(sql)
|
|
results = cursor.fetchall()
|
|
|
|
if results:
|
|
# 重置字典(避免累积旧数据,实现真正的热覆盖)
|
|
db_l3 = {}
|
|
db_pinyin = {}
|
|
db_l1 = []
|
|
db_l2 = []
|
|
|
|
for row in results:
|
|
v_type = row['vocab_type']
|
|
raw = row['raw_text']
|
|
correct = row['correct_text']
|
|
|
|
if v_type == 'L3': db_l3[raw] = correct
|
|
elif v_type == 'Pinyin': db_pinyin[raw] = correct
|
|
elif v_type == 'L1': db_l1.append(raw)
|
|
elif v_type == 'L2': db_l2.append(raw)
|
|
|
|
# 只有数据库有数据时才覆盖
|
|
if db_l3: self.l3_mapping.update(db_l3)
|
|
if db_pinyin: self.pinyin_map.update(db_pinyin)
|
|
if db_l1: self.l1_standard = db_l1
|
|
if db_l2: self.l2_hospital = db_l2
|
|
conn.close()
|
|
except Exception as e:
|
|
print(f"⚠️ 数据库热加载失败,使用本地缓存: {e}")
|
|
|
|
def _load_json(self, path, default):
|
|
try:
|
|
if os.path.exists(path):
|
|
with open(path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
print(f"Error loading {path}: {e}")
|
|
return default
|
|
|
|
def get_scoring_text(self):
|
|
text = "【核心评分规则(必须严格遵守)】:\n"
|
|
text += "1. 初始满分 100 分,根据发现的问题倒扣,最低 0 分。\n"
|
|
text += "2. **扣分阶梯(禁止针对小错误过度扣分,禁止重复扣分)**:\n"
|
|
text += " - 术语瑕疵/错别字/标点规范(如‘冥想’应为‘明显’):同类型错误无论出现多少次,**累计扣分上限为 5 分**。\n"
|
|
text += " - 描述不全(如未注大小)、基础信息漏项:每次扣 **5 分**。\n"
|
|
text += " - 严重逻辑矛盾(如性别不符、结论与描述完全相反):单项直接扣 **40 分以上**,使得分低于 60 分。\n"
|
|
text += "3. **打分维度(仅作参考方向)**:信息完整性、表达清晰度、术语专业性、临床相关性、格式规范性。\n"
|
|
return text
|
|
|
|
# Global Config Instance
|
|
config_manager = ConfigManager()
|
|
|
|
# Medical Rules Data (Mostly static mapping)
|
|
GENDER_MAP = {
|
|
"女": ["前列腺", "精囊", "睾丸", "阴囊"],
|
|
"女性": ["前列腺", "精囊", "睾丸", "阴囊"],
|
|
"男": ["子宫", "卵巢", "输卵管", "阴道"],
|
|
"男性": ["子宫", "卵巢", "输卵管", "阴道"]
|
|
}
|
|
|
|
class RuleEngine:
|
|
def run_checks(self, report_text: str, patient_info: Dict[str, Any], examine_part: str, clinical_diagnosis: str = "") -> List[str]:
|
|
# Always reload removed to prevent DB lag per request
|
|
# config_manager.load_all()
|
|
gender = patient_info.get("sex", "未知")
|
|
try:
|
|
age = int(re.sub(r"\D", "", str(patient_info.get("age", 0))))
|
|
except:
|
|
age = 0
|
|
findings = []
|
|
|
|
# 1. Gender/Organ Mismatch
|
|
if gender in GENDER_MAP:
|
|
conflicting_organs = [organ for organ in GENDER_MAP[gender] if organ in report_text]
|
|
if conflicting_organs:
|
|
organs_str = "、".join([f"'{o}'" for o in conflicting_organs])
|
|
findings.append(f"【严重】性别与部位冲突:患者性别为{gender},但在报告描述中出现了{organs_str}。")
|
|
|
|
# 1b. Age/Logic Conflict
|
|
if age > 60:
|
|
age_sensitive_keywords = ["胎儿", "早孕", "妊娠", "卵泡", "月经"]
|
|
found_age_conflicts = [kw for kw in age_sensitive_keywords if kw in report_text]
|
|
if found_age_conflicts:
|
|
kws_str = "、".join([f"'{k}'" for k in found_age_conflicts])
|
|
findings.append(f"【生理逻辑冲突】高龄风险:患者年龄为{age}岁,报告中出现{kws_str},不符合生理常规或伦理审核。")
|
|
|
|
# 2. Typos & Mapping (Layer 1 - dynamic - Weighted Pattern Matching)
|
|
# 获取所有映射并按词长度倒序排列(长词优先,避免子串重复匹配,如“冥想异常”优先于“冥想”)
|
|
sorted_mappings = sorted(config_manager.l3_mapping.items(), key=lambda x: len(x[0]), reverse=True)
|
|
|
|
covered_ranges = [] # 记录已覆盖的文本区间 [(start, end), ...]
|
|
matched_results = {} # correction -> {typo1, typo2, ...}
|
|
|
|
for typo, correction in sorted_mappings:
|
|
# 使用正则查找所有出现位置,确保转义特殊字符
|
|
for m in re.finditer(re.escape(typo), report_text):
|
|
start, end = m.span()
|
|
# 检查此位置是否已被之前的长词匹配覆盖
|
|
if any(start >= s and end <= e for s, e in covered_ranges):
|
|
continue
|
|
|
|
# 记录新匹配区间
|
|
covered_ranges.append((start, end))
|
|
if correction not in matched_results:
|
|
matched_results[correction] = set()
|
|
matched_results[correction].add(typo)
|
|
|
|
# 归并输出结果,解决用户反馈的“多次重复报错”问题
|
|
for correction, typos in matched_results.items():
|
|
typos_list = sorted(list(typos), key=len, reverse=True)
|
|
typos_str = "、".join([f"'{t}'" for t in typos_list])
|
|
if len(typos_list) > 1:
|
|
findings.append(f"【建议修正】检测到多处相关术语错误({typos_str}),均应统一修正为'{correction}'。")
|
|
else:
|
|
findings.append(f"【建议修正】术语错误:检测到'{typos_list[0]}',应修正为'{correction}'。")
|
|
|
|
# 3. Pinyin Mapping (Input Assist - dynamic)
|
|
for py, term in config_manager.pinyin_map.items():
|
|
if py in report_text.lower():
|
|
findings.append(f"【建议修正】拼音残留:检测到输入法残留'{py}',建议修正为'{term}'。")
|
|
|
|
# 3. Units check (cm vs mm)
|
|
keywords = ["结节", "病灶", "占位", "团块"]
|
|
for kw in keywords:
|
|
if kw in report_text:
|
|
if re.search(rf"{kw}.*?(\d+\.?\d*)\s*cm", report_text):
|
|
findings.append(f"【规范建议】单位使用:针对'{kw}',行业规范建议使用'mm',当前报告中使用了'cm'。")
|
|
|
|
# 4. Global Site/System Mismatch
|
|
system_keywords = {
|
|
"血管": ["颈动脉", "股动脉", "静脉", "血流", "斑块"],
|
|
"腹部": ["肝", "胆", "胰", "脾", "肾", "腹水"],
|
|
"妇科": ["子宫", "卵巢", "附件", "内膜"],
|
|
"泌尿": ["膀胱", "前列腺", "输尿管"]
|
|
}
|
|
|
|
if examine_part:
|
|
match_found = False
|
|
for sys_name, sys_kws in system_keywords.items():
|
|
if sys_name in examine_part:
|
|
match_found = True
|
|
# Check if report mentions something completely outside this system
|
|
other_systems = {k: v for k, v in system_keywords.items() if k != sys_name}
|
|
for other_name, other_kws in other_systems.items():
|
|
# If a report for 'Vessels' only mentions 'Liver/Gallbladder', flag it
|
|
found_other = [okw for okw in other_kws if okw in report_text]
|
|
if found_other:
|
|
# Count how many words match the current system
|
|
current_sys_matches = [skw for skw in sys_kws if skw in report_text]
|
|
if len(current_sys_matches) == 0:
|
|
findings.append(f"【系统性偏离】检查部位为'{examine_part}',但报告描述中却出现了{found_other[0]}等'{other_name}'系统内容。")
|
|
break
|
|
|
|
# 5. Enhanced Left/Right Consistency(分字段精准侧位核查)
|
|
part_norm = (examine_part or "").replace(" ", "")
|
|
l_in_part = "左" in part_norm
|
|
r_in_part = "右" in part_norm
|
|
h_side = "左" if l_in_part and not r_in_part else "右" if r_in_part and not l_in_part else None
|
|
|
|
if h_side:
|
|
o_side = "右" if h_side == "左" else "左"
|
|
|
|
# 分字段提取:兼容「超声所见」和「超声提示」两种格式
|
|
# 格式1:「超声所见:...」(标准格式)
|
|
# 格式2:「【字段:超声所见】:...」(前端传入格式)
|
|
see_match = re.search(
|
|
r"(?:【字段:)?超声所见(?:】)?[\s::]+(.*?)(?=(?:【字段:)?超声提示|结论|提示|\Z)",
|
|
report_text, re.DOTALL
|
|
)
|
|
see_text = see_match.group(1).strip() if see_match else ""
|
|
|
|
hint_match = re.search(
|
|
r"(?:【字段:)?超声提示(?:】)?[\s::\n\r]+(.*)",
|
|
report_text, re.DOTALL | re.IGNORECASE
|
|
)
|
|
hint_text = hint_match.group(1).strip() if hint_match else ""
|
|
|
|
# 如果字段提取均失败,降级为后半段
|
|
if not hint_text and len(report_text) > 100:
|
|
hint_text = report_text[len(report_text)//2:]
|
|
|
|
print(f"DEBUG RuleEngine: TargetSide={h_side}, SeeLen={len(see_text)}, HintLen={len(hint_text)}")
|
|
|
|
# 逐字段核查侧位
|
|
field_errors = []
|
|
|
|
# 核查「超声所见」
|
|
if see_text:
|
|
if o_side in see_text and h_side not in see_text:
|
|
field_errors.append(f"超声所见(仅含'{o_side}'侧,应为'{h_side}'侧)")
|
|
|
|
# 核查「超声提示」
|
|
if hint_text:
|
|
if o_side in hint_text and h_side not in hint_text:
|
|
field_errors.append(f"超声提示(仅含'{o_side}'侧,应为'{h_side}'侧)")
|
|
|
|
if field_errors:
|
|
fields_str = "、".join(field_errors)
|
|
findings.append(
|
|
f"【严重】侧位冲突:申请部位为'{h_side}'侧,但以下字段描述有误——{fields_str}。"
|
|
f"请逐一核实并统一修正为'{h_side}'侧。"
|
|
)
|
|
else:
|
|
# 全篇频次兴局安全层(字段提取均失败时的兴局降级方案)
|
|
if h_side not in report_text and o_side in report_text:
|
|
findings.append(f"【严重】侧位矛盾:申请部位要求'{h_side}'侧,但报告全篇描述均为'{o_side}'侧。")
|
|
elif report_text.count(o_side) > report_text.count(h_side) * 2 and h_side in report_text:
|
|
findings.append(f"【逻辑疑虑】侧位倾向:描述中'{o_side}'侧占比远超申请的'{h_side}'侧,请核实。")
|
|
|
|
return findings
|
|
|
|
# Embedding and RAG Classes
|
|
|
|
class ZhipuEmbeddings(Embeddings):
|
|
"""自定义智谱AI向量类,确保完全兼容官方API"""
|
|
def __init__(self, api_key: str):
|
|
self.client = ZhipuAI(api_key=api_key, timeout=15)
|
|
self.model = "embedding-2"
|
|
|
|
def embed_documents(self, texts: List[str]) -> List[List[float]]:
|
|
# SDK 内部处理了单条或列表
|
|
embeddings = []
|
|
for text in texts:
|
|
# 过滤空字符串或过短内容
|
|
if not text.strip():
|
|
embeddings.append([0.0] * 1024) # 假设长度,失败时填充
|
|
continue
|
|
try:
|
|
response = self.client.embeddings.create(
|
|
model=self.model,
|
|
input=text.strip()
|
|
)
|
|
embeddings.append(response.data[0].embedding)
|
|
except Exception as e:
|
|
print(f"Embedding error for text snippet: {e}")
|
|
embeddings.append([0.0] * 1024)
|
|
return embeddings
|
|
|
|
def embed_query(self, text: str) -> List[float]:
|
|
try:
|
|
response = self.client.embeddings.create(
|
|
model=self.model,
|
|
input=text
|
|
)
|
|
return response.data[0].embedding
|
|
except Exception as e:
|
|
print(f"Query embedding error: {e}")
|
|
return [0.0] * 1024
|
|
|
|
class RAGManager:
|
|
def __init__(self):
|
|
self.embeddings = ZhipuEmbeddings(api_key=ZHIPU_API_KEY)
|
|
self.vectorstore = None
|
|
|
|
def get_vectorstore(self):
|
|
if self.vectorstore:
|
|
return self.vectorstore
|
|
|
|
if os.path.exists(INDEX_PATH):
|
|
try:
|
|
self.vectorstore = FAISS.load_local(
|
|
INDEX_PATH,
|
|
self.embeddings,
|
|
allow_dangerous_deserialization=True
|
|
)
|
|
return self.vectorstore
|
|
except Exception as e:
|
|
print(f"Error loading index: {e}")
|
|
|
|
# Build if not exists
|
|
self.build_index()
|
|
return self.vectorstore
|
|
|
|
def build_index(self):
|
|
print(f"Loading PDF from {PDF_PATH}...")
|
|
if not os.path.exists(PDF_PATH):
|
|
print("PDF file not found!")
|
|
return
|
|
|
|
try:
|
|
loader = PyPDFLoader(PDF_PATH)
|
|
docs = loader.load()
|
|
# 调小块尺寸以适应 embedding-2 的 512 token 限制
|
|
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
|
|
splits = text_splitter.split_documents(docs)
|
|
print(f"Created {len(splits)} chunks. Generating embeddings...")
|
|
|
|
# 使用自定义的向量类构建索引
|
|
self.vectorstore = FAISS.from_documents(splits, self.embeddings)
|
|
self.vectorstore.save_local(INDEX_PATH)
|
|
print("Index build and saved.")
|
|
except Exception as e:
|
|
print(f"Failed to build index: {e}")
|
|
|
|
class QCManager:
|
|
def __init__(self):
|
|
self.rag = RAGManager()
|
|
self.rule_engine = RuleEngine()
|
|
|
|
async def get_llm_analysis(self, model_name: str, report_text: str, context: str, extra_instruction: str = "") -> str:
|
|
try:
|
|
if "qwen" in model_name.lower():
|
|
llm = ChatOpenAI(
|
|
model="qwen-max",
|
|
api_key=QWEN_API_KEY,
|
|
base_url=QWEN_BASE_URL,
|
|
temperature=0.1
|
|
)
|
|
elif "deepseek" in model_name.lower():
|
|
llm = ChatOpenAI(
|
|
model="deepseek-chat",
|
|
api_key=DEEPSEEK_API_KEY,
|
|
base_url=DEEPSEEK_BASE_URL,
|
|
temperature=0.1
|
|
)
|
|
else: # Default to Zhipu GLM-4 Flash for speed
|
|
llm = ChatOpenAI(
|
|
model="glm-4-flash",
|
|
api_key=ZHIPU_API_KEY,
|
|
base_url=ZHIPU_BASE_URL,
|
|
temperature=0.1,
|
|
timeout=60 # 增加到60秒,确保在复杂网络环境下不超时
|
|
)
|
|
|
|
prompt = ChatPromptTemplate.from_template("""
|
|
你是一名严谨的【医疗影像报告审核专家】。请审核以下包含背景信息和正文的超声报告。
|
|
|
|
【自动检测发现(必须处理并计入扣分)】:
|
|
{rule_findings}
|
|
|
|
【关键须知】:
|
|
- **背景信息(表头)已包含患者姓名、性别、年龄、检查部位**。
|
|
- **只要背景信息中已有对应数值,即视为信息完整**。严禁以此为由扣分。
|
|
|
|
【硬性约束】:
|
|
1. **合并同类项**:上方【自动检测发现】中若已指出多处术语错误或模式错误,请在最终结果中将其视为一个“模式错误”进行归并,**严禁对同一错别字的不同出现位置重复扣分**。
|
|
2. **采纳规则建议**:优先使用规则检测提供的修正建议(例如将‘冥想’修正为‘明显’),除非发现其存在严重的常识错误。
|
|
3. **深度逻辑审核**:AI 应重点关注规则引擎无法覆盖的深度逻辑(如解剖部位矛盾、结论不支持所见等)。
|
|
4. **侧位一致性**:必须严格核对【背景信息】中的检查部位与【报告正文】描述。若检查部位为‘右膝’,正文描述却为‘左膝’,必须判定为【严重】错误并至少扣除 40 分。
|
|
5. **严禁排版建议**:绝对禁止就“超声所见”与“超声提示”的加粗标题、分段、冒号等排版细节提出改进建议。
|
|
6. **无错处理**:若未发现真实医学或逻辑错误,且自动检测也无明显问题,请仅输出:【该报告未发现质量问题】。
|
|
|
|
【输出格式要求(若发现错误)】:
|
|
- 初始满分为 100,根据下方标准进行扣分。
|
|
- 先逐条列出真实的扣分理由(注明具体扣分值,如 -2 分)。
|
|
- 最后输出一个 JSON 块,内容如下(注意:issues 列表必须包含所有【自动检测发现】中的原文词):
|
|
{{
|
|
"score": 评分(int),
|
|
"issues": [
|
|
{{ "original": "原文词", "reason": "原因", "suggestion": "建议", "type": "类型" }}
|
|
],
|
|
"corrected_fields": {{
|
|
"bSee": "修正后的【超声所见】全文本",
|
|
"bHint": "修正后的【超声提示】全文本"
|
|
}}
|
|
}}
|
|
|
|
【评分标准】:
|
|
{scoring_text}
|
|
|
|
【待审内容】:
|
|
{report}
|
|
|
|
{extra_instruction}
|
|
""")
|
|
|
|
chain = (
|
|
{
|
|
"context": lambda x: context,
|
|
"rule_findings": lambda x: extra_instruction.get("rule_findings", "无"),
|
|
"scoring_text": lambda x: config_manager.get_scoring_text(),
|
|
"report": RunnablePassthrough(),
|
|
"extra_instruction": lambda x: extra_instruction.get("text", "")
|
|
}
|
|
| prompt
|
|
| llm
|
|
| StrOutputParser()
|
|
)
|
|
|
|
import time
|
|
start_time = time.time()
|
|
print(f"DEBUG: Invoking LLM ({model_name})...")
|
|
res = await chain.ainvoke(report_text)
|
|
elapsed = time.time() - start_time
|
|
print(f"DEBUG: LLM ({model_name}) response received in {elapsed:.2f}s.")
|
|
return res
|
|
except Exception as e:
|
|
print(f"DEBUG: LLM Analysis Error: {e}")
|
|
return "✅ 已完成报告基础合规性校验。建议结合自动规则检测结果进行微调。"
|
|
|
|
async def run_qc(self, report_data: Dict[str, Any]):
|
|
import time
|
|
overall_start = time.time()
|
|
# 每次运行前重新加载词库,确保用户修改 l3_mapping.json 后立即生效
|
|
config_manager.load_all()
|
|
|
|
raw_report = report_data.get("report", "")
|
|
patient_info = report_data.get("patient_info", {})
|
|
examine_part = report_data.get("examinePart", "")
|
|
|
|
# 构造包含完整背景信息的报告文本,防止 AI 误报缺失表头信息
|
|
full_report_context = f"""
|
|
【报告背景信息】:
|
|
患者姓名:{patient_info.get('patientName', '测试')}
|
|
患者性别:{patient_info.get('sex', '未提供')}
|
|
患者年龄:{patient_info.get('age', '未提供')}
|
|
检查部位:{examine_part or '全腹'}
|
|
|
|
【报告正文】:
|
|
{raw_report}
|
|
""".strip()
|
|
|
|
# 切换为单模型模式以提升响应速度,不再使用耗时的共识模式
|
|
selected_model = "glm-4-flash"
|
|
|
|
# 1. Rules (Instant)
|
|
print("DEBUG: Running Rule Engine...")
|
|
try:
|
|
rule_findings = self.rule_engine.run_checks(raw_report, patient_info, examine_part)
|
|
print(f"DEBUG: Rule findings count: {len(rule_findings)}")
|
|
except Exception as rule_err:
|
|
print(f"DEBUG: Rule Engine Error: {rule_err}")
|
|
rule_findings = []
|
|
rule_str = "\n".join(rule_findings) if rule_findings else "未发现自动规则错误。"
|
|
|
|
# 2. RAG Context (安全加载)
|
|
print("DEBUG: Fetching RAG Context...")
|
|
context = "请根据医学常识进行判断。"
|
|
try:
|
|
vs = self.rag.get_vectorstore()
|
|
if vs:
|
|
relevant_docs = vs.similarity_search(raw_report, k=1)
|
|
context = "\n".join([doc.page_content for doc in relevant_docs])
|
|
except Exception as rag_err:
|
|
print(f"DEBUG: RAG Step missed: {rag_err}")
|
|
|
|
# 3. LLM Analysis
|
|
print("DEBUG: Starting LLM Analysis...")
|
|
instruction_payload = {
|
|
"text": "只输出发现的错误,不输出无错误项。",
|
|
"rule_findings": rule_str
|
|
}
|
|
|
|
if selected_model == "consensus":
|
|
print(f"DEBUG: Selected model: consensus. Calling sub-models...")
|
|
# ... (保持原有的多模型并发逻辑,但合并提示词会要求它们都闭嘴不谈正确项)
|
|
tasks = [
|
|
self.get_llm_analysis("glm-4-flash", full_report_context, context, instruction_payload),
|
|
self.get_llm_analysis("qwen-max", full_report_context, context, instruction_payload),
|
|
]
|
|
results = await asyncio.gather(*tasks)
|
|
print("DEBUG: Sub-models finished. Starting synthesis...")
|
|
|
|
synthesizer_llm = ChatOpenAI(
|
|
model="glm-4-flash",
|
|
api_key=ZHIPU_API_KEY,
|
|
base_url=ZHIPU_BASE_URL,
|
|
temperature=0.1,
|
|
timeout=30
|
|
)
|
|
|
|
synthesis_prompt = ChatPromptTemplate.from_template("""
|
|
你现在是【终审质控组长】。请汇总多方质控意见(下方 A/B 记录),生成最终结论。
|
|
|
|
【质控原始记录】:
|
|
{results}
|
|
|
|
【绝对禁令】:
|
|
1. 严禁输出任何“未见明显错误”、“符合规范”或“基本正确”的描述。
|
|
2. 严禁输出“范例报告”、“参考报告”、“修改后报告”或任何类似的全篇重写内容。
|
|
3. **重点:绝对禁止就“超声所见”与“超声提示”的分段、分行、加粗标题、冒号区分等格式排版问题提出任何建议。即使 A/B 记录中有此类建议,也请将其判定为误报并剔除。**
|
|
4. 严禁输出“评分报告”、“评分与范例生成”等标题。
|
|
|
|
【输出格式要求】:
|
|
1. 仅列出发现的每一个真实错误及其扣分理由(按点排列,直接说事,不废话)。
|
|
2. 最后必须输出一个合法的 JSON 块。
|
|
3. 如果合法的扣分项为零,则正文仅显示:该报告未发现明显质量问题。
|
|
|
|
```json
|
|
{{
|
|
"score": XX,
|
|
"issues": [
|
|
{{ "original": "原文词", "reason": "扣分原因", "suggestion": "修正建议" }}
|
|
],
|
|
"corrected_fields": {{ "bSee": "修正后的完整所见", "bHint": "修正后的完整提示" }}
|
|
}}
|
|
```
|
|
""")
|
|
synth_chain = synthesis_prompt | synthesizer_llm | StrOutputParser()
|
|
combined_results = f"A: {results[0]}\nB: {results[1]}"
|
|
ai_findings = await synth_chain.ainvoke({
|
|
"raw_report": raw_report,
|
|
"rule_findings": rule_str,
|
|
"results": combined_results,
|
|
"scoring_text": config_manager.get_scoring_text()
|
|
})
|
|
else:
|
|
ai_findings = await self.get_llm_analysis(selected_model, full_report_context, context, instruction_payload)
|
|
|
|
# Parse AI results to extract score and structured issues
|
|
ai_description = ai_findings
|
|
ai_score = 100
|
|
ai_structured_issues = []
|
|
|
|
try:
|
|
# 改进 JSON 提取:查找所有疑似 JSON 块并选取“信息量最大”的一个
|
|
json_blocks = re.findall(r"```json\s*(.*?)\s*```", ai_findings, re.DOTALL)
|
|
if not json_blocks:
|
|
# 尝试直接找大括号
|
|
json_blocks = re.findall(r"(\{.*?\})", ai_findings, re.DOTALL)
|
|
|
|
target_json = ""
|
|
if json_blocks:
|
|
# 优先级排序:先找包含 issues 的,再找位置靠后的
|
|
candidates = []
|
|
for jb in json_blocks:
|
|
try:
|
|
parsed = json.loads(jb.strip())
|
|
if isinstance(parsed, dict):
|
|
# 计算权重:有 issues 权重更高
|
|
weight = len(parsed.get('issues', [])) * 10 - parsed.get('score', 100)
|
|
candidates.append((weight, parsed, jb))
|
|
except: continue
|
|
|
|
if candidates:
|
|
# 选取权重最高的一个
|
|
candidates.sort(key=lambda x: x[0], reverse=True)
|
|
best_match = candidates[0][1]
|
|
ai_score = best_match.get("score", 100)
|
|
ai_structured_issues = best_match.get("issues", [])
|
|
target_json = candidates[0][2]
|
|
|
|
# 提取描述(第一个 JSON 块之前的内容)
|
|
first_pos = ai_findings.find(target_json)
|
|
if first_pos > 0:
|
|
ai_description = ai_findings[:first_pos].strip()
|
|
except Exception as e:
|
|
print(f"DEBUG: JSON Parser Error: {e}")
|
|
|
|
# 评分校准逻辑:确保分值真实反映报告质量
|
|
# 1. 本地规则校准(最高优先级)
|
|
if rule_findings:
|
|
has_severe = any("【严重】" in f for f in rule_findings)
|
|
has_fix = any("【建议修正】" in f or "【规范建议】" in f for f in rule_findings)
|
|
if has_severe: ai_score = min(ai_score, 60)
|
|
elif has_fix: ai_score = min(ai_score, 95)
|
|
|
|
# 2. AI 自身识别出的严重错误校准(防止 AI 说一套做一套)
|
|
if ai_structured_issues:
|
|
has_ai_severe = any("严重" in str(i.get('type', '')) or "严重" in str(i.get('reason', '')) for i in ai_structured_issues if isinstance(i, dict))
|
|
if has_ai_severe and ai_score > 60:
|
|
print("DEBUG: AI identified severe issues but gave high score. Calibrating to 60.")
|
|
ai_score = 60
|
|
|
|
# 最后屏护:只有真的没分扣了且没规则发现,才维持 100 分。
|
|
# 只要有 issues,分值绝对不允许是 100
|
|
if ai_structured_issues and ai_score >= 100:
|
|
ai_score = 98
|
|
|
|
# 闭环逻辑:仅仅在真的没任何问题时才隐藏清单
|
|
if ai_score == 100 and not rule_findings and not ai_structured_issues:
|
|
ai_structured_issues = []
|
|
|
|
# 归一化处理:确保 rule_findings 中的原始词也被加入高亮列表,支撑前端展示
|
|
existing_originals = {str(issue.get('original', '')) for issue in ai_structured_issues if isinstance(issue, dict)}
|
|
SIDE_CONFLICT_KEYWORDS = ("侧位冲突", "侧位矛盾", "侧位倾向")
|
|
for find in rule_findings:
|
|
if any(kw in find for kw in SIDE_CONFLICT_KEYWORDS):
|
|
# 侧位冲突:同时注入两类高亮
|
|
# - "仅含'X'侧" → 错误侧位字,type=侧位错误(前端标黄)
|
|
# - "应为'Y'侧" → 正确侧位字,type=侧位参照(前端标蓝)
|
|
# 规则文本格式: ...(仅含'左'侧,应为'右'侧)...
|
|
quoted = re.findall(r"'([^']+)'", find)
|
|
for q in quoted:
|
|
if q not in ("左", "右"):
|
|
continue
|
|
if f"仅含'{q}'侧" in find and q not in existing_originals:
|
|
ai_structured_issues.append({
|
|
"original": q,
|
|
"reason": "侧位冲突(规则引擎检测)",
|
|
"suggestion": f"报告中出现了'{q}'侧,与申请部位不符,请核实",
|
|
"type": "侧位错误" # 前端 → 黄色高亮
|
|
})
|
|
existing_originals.add(q)
|
|
elif f"应为'{q}'侧" in find and q not in existing_originals:
|
|
ai_structured_issues.append({
|
|
"original": q,
|
|
"reason": "正确侧位参照",
|
|
"suggestion": f"申请部位要求'{q}'侧,请确认报告中该侧描述是否准确",
|
|
"type": "侧位参照" # 前端 → 蓝色高亮
|
|
})
|
|
existing_originals.add(q)
|
|
continue
|
|
# 其他规则:提取引号中的敏感词(如检测到'冥想')
|
|
extracted_words = re.findall(r"'(.*?)'", find)
|
|
if extracted_words:
|
|
orig_word = extracted_words[0]
|
|
if orig_word not in existing_originals:
|
|
ai_structured_issues.append({
|
|
"original": orig_word,
|
|
"reason": "规则引擎自动发现",
|
|
"suggestion": "请参考规则修正建议",
|
|
"type": "自动规则"
|
|
})
|
|
existing_originals.add(orig_word)
|
|
|
|
# Final Report Construction
|
|
final_report = "### 超声报告质控分析报告\n\n"
|
|
final_report += f"#### 🎯 综合评分:**{ai_score} 分**\n\n"
|
|
|
|
if rule_findings:
|
|
final_report += "#### ⚠️ 自动规则检测(高可靠):\n"
|
|
for find in rule_findings:
|
|
final_report += f"- {find}\n"
|
|
final_report += "\n"
|
|
|
|
final_report += "#### 🤖 AI 智能分析:\n"
|
|
if ai_description and len(ai_description) > 5 and "未发现" not in ai_description:
|
|
final_report += f"{ai_description}\n"
|
|
elif rule_findings:
|
|
final_report += "✅ AI 深度审核未发现其它语义逻辑错误,请优先参考上方‘自动检测’给出的修正建议。\n"
|
|
else:
|
|
final_report += "✅ 该报告未发现明显质量问题。\n"
|
|
|
|
if ai_structured_issues:
|
|
final_report += "\n#### 📝 详细建议清单:\n"
|
|
# 强化去重:根据 (原文词, 修正建议) 进行去重,防止 AI 重复输出或规则重复合并
|
|
seen_issue_keys = set()
|
|
unique_issues = []
|
|
for issue in ai_structured_issues:
|
|
if not isinstance(issue, dict): continue
|
|
key = (str(issue.get('original', '')).strip(), str(issue.get('suggestion', '')).strip())
|
|
if key not in seen_issue_keys:
|
|
unique_issues.append(issue)
|
|
seen_issue_keys.add(key)
|
|
|
|
for issue in unique_issues:
|
|
orig = issue.get('original', '原文')
|
|
sugg = issue.get('suggestion', '建议内容')
|
|
reason = issue.get('reason', '原因描述')
|
|
itype = issue.get('type', '核心错误')
|
|
final_report += f"- **[{itype}]** {orig} -> {sugg} ({reason})\n"
|
|
|
|
# 兜底:如果完全没提到规则发现,则 AI 可能漏掉了
|
|
if rule_findings and not ai_structured_issues:
|
|
final_report += "\n*(提示:已根据自动规则检测结果更新最终评分)*\n"
|
|
|
|
# 如果解析后的描述太短且有原始输出,则作为参考附上(先剥离其中的 json 代码块避免干扰前端解析)
|
|
if len(final_report) < 150 and len(ai_findings) > 100:
|
|
final_report += "\n---\n#### 🔍 原始分析记录:\n"
|
|
clean_ai_findings = re.sub(r"```json[\s\S]*?```", "", ai_findings).strip()
|
|
final_report += clean_ai_findings
|
|
|
|
# 追加最终的结构化 JSON 块,供前端精准解析高亮和一键修正
|
|
# 提取 AI 的纠正字段(保留其深度修正后的全文本)
|
|
final_corrected_fields = {}
|
|
try:
|
|
# 再次从原始输出提取 corrected_fields
|
|
json_blocks = re.findall(r"```json\s*(.*?)\s*```", ai_findings, re.DOTALL)
|
|
if json_blocks:
|
|
last_json = json.loads(json_blocks[-1].strip())
|
|
final_corrected_fields = last_json.get("corrected_fields", {})
|
|
except:
|
|
pass
|
|
|
|
final_json = {
|
|
"score": ai_score,
|
|
"issues": ai_structured_issues,
|
|
"corrected_fields": final_corrected_fields
|
|
}
|
|
# 使用唯一标记包裹最终 JSON,前端按此标记精确定位,避免误匹配多个代码块
|
|
final_report += f"\n\n===FINAL_QC_JSON===\n{json.dumps(final_json, ensure_ascii=False, indent=2)}\n===END_QC_JSON===\n"
|
|
|
|
overall_elapsed = time.time() - overall_start
|
|
print(f"DEBUG: Total QC processing time: {overall_elapsed:.2f}s. Final Score: {ai_score}")
|
|
return final_report
|
|
|
|
# Global instance
|
|
qc_manager = QCManager()
|
|
|