本仓库为远程超声诊断平台的统一代码仓库,包含前端、后端、AI质控模块以及专网实时通信中间件。 采用 Monorepo 结构管理,方便统一版本控制,支持针对不同医院/客户进行定制化开发。
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

745 lines
36 KiB

import os
import asyncio
import re
import json
from typing import List, Dict, Any, Optional
import pymysql
# LangChain Imports
try:
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_core.embeddings import Embeddings
from zhipuai import ZhipuAI
except ImportError as e:
print(f"Warning: Dependencies missing ({e}). Core rules will still work.")
# API Configuration
ZHIPU_API_KEY = "dc8bfe33db15c49026cedbf5ffa461e0.1grbcRvEZyADTWJi"
QWEN_API_KEY = "sk-c7d5687a4d044489974b65bde467e93e"
DEEPSEEK_API_KEY = "sk-ee2871ac206c4cadbfa60d06dba0a8fe"
ZHIPU_BASE_URL = "https://open.bigmodel.cn/api/paas/v4/"
QWEN_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
DEEPSEEK_BASE_URL = "https://api.deepseek.com"
PDF_PATH = r"d:\Projects\US_RPT_QA\超声医学质量控制管理规范.pdf"
INDEX_PATH = os.path.join(os.path.dirname(__file__), "faiss_index")
# 数据库配置(与 imurs 保持一致)
DB_CONFIG = {
"host": "39.108.252.248",
"port": 3306,
"user": "root",
"password": "Zp.123456",
"database": "yd_gzlps_test",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor
}
# Config Management for Engineering Approach
class ConfigManager:
def __init__(self):
self.base_dir = os.path.dirname(os.path.abspath(__file__))
self.vocab_dir = os.path.join(self.base_dir, "data", "vocab")
self.config_dir = os.path.join(self.base_dir, "data", "config")
self.load_all()
def load_all(self):
# 实时加载以确保用户修改 JSON 后立即生效,无需重启
self.l1_standard = self._load_json(os.path.join(self.vocab_dir, "l1_standard.json"), [])
self.l2_hospital = self._load_json(os.path.join(self.vocab_dir, "l2_hospital.json"), [])
self.l3_mapping = self._load_json(os.path.join(self.vocab_dir, "l3_mapping.json"), {})
self.pinyin_map = self._load_json(os.path.join(self.vocab_dir, "pinyin_map.json"), {})
self.scoring_standard = self._load_json(os.path.join(self.config_dir, "scoring_standard.json"), {})
# self._load_from_db() # 优先使用本地 JSON 词库,如需同步数据库请手动开启
def _load_from_db(self):
try:
# 增加连接超时,防止数据库不可用时阻塞整个请求
conn = pymysql.connect(**DB_CONFIG, connect_timeout=3)
with conn.cursor() as cursor:
sql = "SELECT raw_text, correct_text, vocab_type FROM ai_qc_vocabulary WHERE status = 'approved'"
cursor.execute(sql)
results = cursor.fetchall()
if results:
# 重置字典(避免累积旧数据,实现真正的热覆盖)
db_l3 = {}
db_pinyin = {}
db_l1 = []
db_l2 = []
for row in results:
v_type = row['vocab_type']
raw = row['raw_text']
correct = row['correct_text']
if v_type == 'L3': db_l3[raw] = correct
elif v_type == 'Pinyin': db_pinyin[raw] = correct
elif v_type == 'L1': db_l1.append(raw)
elif v_type == 'L2': db_l2.append(raw)
# 只有数据库有数据时才覆盖
if db_l3: self.l3_mapping.update(db_l3)
if db_pinyin: self.pinyin_map.update(db_pinyin)
if db_l1: self.l1_standard = db_l1
if db_l2: self.l2_hospital = db_l2
conn.close()
except Exception as e:
print(f"⚠️ 数据库热加载失败,使用本地缓存: {e}")
def _load_json(self, path, default):
try:
if os.path.exists(path):
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Error loading {path}: {e}")
return default
def get_scoring_text(self):
text = "【核心评分规则(必须严格遵守)】:\n"
text += "1. 初始满分 100 分,根据发现的问题倒扣,最低 0 分。\n"
text += "2. **扣分阶梯(禁止针对小错误过度扣分,禁止重复扣分)**:\n"
text += " - 术语瑕疵/错别字/标点规范(如‘冥想’应为‘明显’):同类型错误无论出现多少次,**累计扣分上限为 5 分**。\n"
text += " - 描述不全(如未注大小)、基础信息漏项:每次扣 **5 分**。\n"
text += " - 严重逻辑矛盾(如性别不符、结论与描述完全相反):单项直接扣 **40 分以上**,使得分低于 60 分。\n"
text += "3. **打分维度(仅作参考方向)**:信息完整性、表达清晰度、术语专业性、临床相关性、格式规范性。\n"
return text
# Global Config Instance
config_manager = ConfigManager()
# Medical Rules Data (Mostly static mapping)
GENDER_MAP = {
"": ["前列腺", "精囊", "睾丸", "阴囊"],
"女性": ["前列腺", "精囊", "睾丸", "阴囊"],
"": ["子宫", "卵巢", "输卵管", "阴道"],
"男性": ["子宫", "卵巢", "输卵管", "阴道"]
}
class RuleEngine:
def run_checks(self, report_text: str, patient_info: Dict[str, Any], examine_part: str, clinical_diagnosis: str = "") -> List[str]:
# Always reload removed to prevent DB lag per request
# config_manager.load_all()
gender = patient_info.get("sex", "未知")
try:
age = int(re.sub(r"\D", "", str(patient_info.get("age", 0))))
except:
age = 0
findings = []
# 1. Gender/Organ Mismatch
if gender in GENDER_MAP:
conflicting_organs = [organ for organ in GENDER_MAP[gender] if organ in report_text]
if conflicting_organs:
organs_str = "".join([f"'{o}'" for o in conflicting_organs])
findings.append(f"【严重】性别与部位冲突:患者性别为{gender},但在报告描述中出现了{organs_str}")
# 1b. Age/Logic Conflict
if age > 60:
age_sensitive_keywords = ["胎儿", "早孕", "妊娠", "卵泡", "月经"]
found_age_conflicts = [kw for kw in age_sensitive_keywords if kw in report_text]
if found_age_conflicts:
kws_str = "".join([f"'{k}'" for k in found_age_conflicts])
findings.append(f"【生理逻辑冲突】高龄风险:患者年龄为{age}岁,报告中出现{kws_str},不符合生理常规或伦理审核。")
# 2. Typos & Mapping (Layer 1 - dynamic - Weighted Pattern Matching)
# 获取所有映射并按词长度倒序排列(长词优先,避免子串重复匹配,如“冥想异常”优先于“冥想”)
sorted_mappings = sorted(config_manager.l3_mapping.items(), key=lambda x: len(x[0]), reverse=True)
covered_ranges = [] # 记录已覆盖的文本区间 [(start, end), ...]
matched_results = {} # correction -> {typo1, typo2, ...}
for typo, correction in sorted_mappings:
# 使用正则查找所有出现位置,确保转义特殊字符
for m in re.finditer(re.escape(typo), report_text):
start, end = m.span()
# 检查此位置是否已被之前的长词匹配覆盖
if any(start >= s and end <= e for s, e in covered_ranges):
continue
# 记录新匹配区间
covered_ranges.append((start, end))
if correction not in matched_results:
matched_results[correction] = set()
matched_results[correction].add(typo)
# 归并输出结果,解决用户反馈的“多次重复报错”问题
for correction, typos in matched_results.items():
typos_list = sorted(list(typos), key=len, reverse=True)
typos_str = "".join([f"'{t}'" for t in typos_list])
if len(typos_list) > 1:
findings.append(f"【建议修正】检测到多处相关术语错误({typos_str}),均应统一修正为'{correction}'")
else:
findings.append(f"【建议修正】术语错误:检测到'{typos_list[0]}',应修正为'{correction}'")
# 3. Pinyin Mapping (Input Assist - dynamic)
for py, term in config_manager.pinyin_map.items():
if py in report_text.lower():
findings.append(f"【建议修正】拼音残留:检测到输入法残留'{py}',建议修正为'{term}'")
# 3. Units check (cm vs mm)
keywords = ["结节", "病灶", "占位", "团块"]
for kw in keywords:
if kw in report_text:
if re.search(rf"{kw}.*?(\d+\.?\d*)\s*cm", report_text):
findings.append(f"【规范建议】单位使用:针对'{kw}',行业规范建议使用'mm',当前报告中使用了'cm'")
# 4. Global Site/System Mismatch
system_keywords = {
"血管": ["颈动脉", "股动脉", "静脉", "血流", "斑块"],
"腹部": ["", "", "", "", "", "腹水"],
"妇科": ["子宫", "卵巢", "附件", "内膜"],
"泌尿": ["膀胱", "前列腺", "输尿管"]
}
if examine_part:
match_found = False
for sys_name, sys_kws in system_keywords.items():
if sys_name in examine_part:
match_found = True
# Check if report mentions something completely outside this system
other_systems = {k: v for k, v in system_keywords.items() if k != sys_name}
for other_name, other_kws in other_systems.items():
# If a report for 'Vessels' only mentions 'Liver/Gallbladder', flag it
found_other = [okw for okw in other_kws if okw in report_text]
if found_other:
# Count how many words match the current system
current_sys_matches = [skw for skw in sys_kws if skw in report_text]
if len(current_sys_matches) == 0:
findings.append(f"【系统性偏离】检查部位为'{examine_part}',但报告描述中却出现了{found_other[0]}'{other_name}'系统内容。")
break
# 5. Enhanced Left/Right Consistency(分字段精准侧位核查)
part_norm = (examine_part or "").replace(" ", "")
l_in_part = "" in part_norm
r_in_part = "" in part_norm
h_side = "" if l_in_part and not r_in_part else "" if r_in_part and not l_in_part else None
if h_side:
o_side = "" if h_side == "" else ""
# 分字段提取:兼容「超声所见」和「超声提示」两种格式
# 格式1:「超声所见:...」(标准格式)
# 格式2:「【字段:超声所见】:...」(前端传入格式)
see_match = re.search(
r"(?:【字段:)?超声所见(?:】)?[\s::]+(.*?)(?=(?:【字段:)?超声提示|结论|提示|\Z)",
report_text, re.DOTALL
)
see_text = see_match.group(1).strip() if see_match else ""
hint_match = re.search(
r"(?:【字段:)?超声提示(?:】)?[\s::\n\r]+(.*)",
report_text, re.DOTALL | re.IGNORECASE
)
hint_text = hint_match.group(1).strip() if hint_match else ""
# 如果字段提取均失败,降级为后半段
if not hint_text and len(report_text) > 100:
hint_text = report_text[len(report_text)//2:]
print(f"DEBUG RuleEngine: TargetSide={h_side}, SeeLen={len(see_text)}, HintLen={len(hint_text)}")
# 逐字段核查侧位
field_errors = []
# 核查「超声所见」
if see_text:
if o_side in see_text and h_side not in see_text:
field_errors.append(f"超声所见(仅含'{o_side}'侧,应为'{h_side}'侧)")
# 核查「超声提示」
if hint_text:
if o_side in hint_text and h_side not in hint_text:
field_errors.append(f"超声提示(仅含'{o_side}'侧,应为'{h_side}'侧)")
if field_errors:
fields_str = "".join(field_errors)
findings.append(
f"【严重】侧位冲突:申请部位为'{h_side}'侧,但以下字段描述有误——{fields_str}"
f"请逐一核实并统一修正为'{h_side}'侧。"
)
else:
# 全篇频次兴局安全层(字段提取均失败时的兴局降级方案)
if h_side not in report_text and o_side in report_text:
findings.append(f"【严重】侧位矛盾:申请部位要求'{h_side}'侧,但报告全篇描述均为'{o_side}'侧。")
elif report_text.count(o_side) > report_text.count(h_side) * 2 and h_side in report_text:
findings.append(f"【逻辑疑虑】侧位倾向:描述中'{o_side}'侧占比远超申请的'{h_side}'侧,请核实。")
return findings
# Embedding and RAG Classes
class ZhipuEmbeddings(Embeddings):
"""自定义智谱AI向量类,确保完全兼容官方API"""
def __init__(self, api_key: str):
self.client = ZhipuAI(api_key=api_key, timeout=15)
self.model = "embedding-2"
def embed_documents(self, texts: List[str]) -> List[List[float]]:
# SDK 内部处理了单条或列表
embeddings = []
for text in texts:
# 过滤空字符串或过短内容
if not text.strip():
embeddings.append([0.0] * 1024) # 假设长度,失败时填充
continue
try:
response = self.client.embeddings.create(
model=self.model,
input=text.strip()
)
embeddings.append(response.data[0].embedding)
except Exception as e:
print(f"Embedding error for text snippet: {e}")
embeddings.append([0.0] * 1024)
return embeddings
def embed_query(self, text: str) -> List[float]:
try:
response = self.client.embeddings.create(
model=self.model,
input=text
)
return response.data[0].embedding
except Exception as e:
print(f"Query embedding error: {e}")
return [0.0] * 1024
class RAGManager:
def __init__(self):
self.embeddings = ZhipuEmbeddings(api_key=ZHIPU_API_KEY)
self.vectorstore = None
def get_vectorstore(self):
if self.vectorstore:
return self.vectorstore
if os.path.exists(INDEX_PATH):
try:
self.vectorstore = FAISS.load_local(
INDEX_PATH,
self.embeddings,
allow_dangerous_deserialization=True
)
return self.vectorstore
except Exception as e:
print(f"Error loading index: {e}")
# Build if not exists
self.build_index()
return self.vectorstore
def build_index(self):
print(f"Loading PDF from {PDF_PATH}...")
if not os.path.exists(PDF_PATH):
print("PDF file not found!")
return
try:
loader = PyPDFLoader(PDF_PATH)
docs = loader.load()
# 调小块尺寸以适应 embedding-2 的 512 token 限制
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
splits = text_splitter.split_documents(docs)
print(f"Created {len(splits)} chunks. Generating embeddings...")
# 使用自定义的向量类构建索引
self.vectorstore = FAISS.from_documents(splits, self.embeddings)
self.vectorstore.save_local(INDEX_PATH)
print("Index build and saved.")
except Exception as e:
print(f"Failed to build index: {e}")
class QCManager:
def __init__(self):
self.rag = RAGManager()
self.rule_engine = RuleEngine()
async def get_llm_analysis(self, model_name: str, report_text: str, context: str, extra_instruction: str = "") -> str:
try:
if "qwen" in model_name.lower():
llm = ChatOpenAI(
model="qwen-max",
api_key=QWEN_API_KEY,
base_url=QWEN_BASE_URL,
temperature=0.1
)
elif "deepseek" in model_name.lower():
llm = ChatOpenAI(
model="deepseek-chat",
api_key=DEEPSEEK_API_KEY,
base_url=DEEPSEEK_BASE_URL,
temperature=0.1
)
else: # Default to Zhipu GLM-4 Flash for speed
llm = ChatOpenAI(
model="glm-4-flash",
api_key=ZHIPU_API_KEY,
base_url=ZHIPU_BASE_URL,
temperature=0.1,
timeout=60 # 增加到60秒,确保在复杂网络环境下不超时
)
prompt = ChatPromptTemplate.from_template("""
你是一名严谨的【医疗影像报告审核专家】。请审核以下包含背景信息和正文的超声报告。
【自动检测发现(必须处理并计入扣分)】:
{rule_findings}
【关键须知】:
- **背景信息(表头)已包含患者姓名、性别、年龄、检查部位**。
- **只要背景信息中已有对应数值,即视为信息完整**。严禁以此为由扣分。
【硬性约束】:
1. **合并同类项**:上方【自动检测发现】中若已指出多处术语错误或模式错误,请在最终结果中将其视为一个“模式错误”进行归并,**严禁对同一错别字的不同出现位置重复扣分**。
2. **采纳规则建议**:优先使用规则检测提供的修正建议(例如将‘冥想’修正为‘明显’),除非发现其存在严重的常识错误。
3. **深度逻辑审核**:AI 应重点关注规则引擎无法覆盖的深度逻辑(如解剖部位矛盾、结论不支持所见等)。
4. **侧位一致性**:必须严格核对【背景信息】中的检查部位与【报告正文】描述。若检查部位为‘右膝’,正文描述却为‘左膝’,必须判定为【严重】错误并至少扣除 40 分。
5. **严禁排版建议**:绝对禁止就“超声所见”与“超声提示”的加粗标题、分段、冒号等排版细节提出改进建议。
6. **无错处理**:若未发现真实医学或逻辑错误,且自动检测也无明显问题,请仅输出:【该报告未发现质量问题】。
【输出格式要求(若发现错误)】:
- 初始满分为 100,根据下方标准进行扣分。
- 先逐条列出真实的扣分理由(注明具体扣分值,如 -2 分)。
- 最后输出一个 JSON 块,内容如下(注意:issues 列表必须包含所有【自动检测发现】中的原文词):
{{
"score": 评分(int),
"issues": [
{{ "original": "原文词", "reason": "原因", "suggestion": "建议", "type": "类型" }}
],
"corrected_fields": {{
"bSee": "修正后的【超声所见】全文本",
"bHint": "修正后的【超声提示】全文本"
}}
}}
【评分标准】:
{scoring_text}
【待审内容】:
{report}
{extra_instruction}
""")
chain = (
{
"context": lambda x: context,
"rule_findings": lambda x: extra_instruction.get("rule_findings", ""),
"scoring_text": lambda x: config_manager.get_scoring_text(),
"report": RunnablePassthrough(),
"extra_instruction": lambda x: extra_instruction.get("text", "")
}
| prompt
| llm
| StrOutputParser()
)
import time
start_time = time.time()
print(f"DEBUG: Invoking LLM ({model_name})...")
res = await chain.ainvoke(report_text)
elapsed = time.time() - start_time
print(f"DEBUG: LLM ({model_name}) response received in {elapsed:.2f}s.")
return res
except Exception as e:
print(f"DEBUG: LLM Analysis Error: {e}")
return "✅ 已完成报告基础合规性校验。建议结合自动规则检测结果进行微调。"
async def run_qc(self, report_data: Dict[str, Any]):
import time
overall_start = time.time()
# 每次运行前重新加载词库,确保用户修改 l3_mapping.json 后立即生效
config_manager.load_all()
raw_report = report_data.get("report", "")
patient_info = report_data.get("patient_info", {})
examine_part = report_data.get("examinePart", "")
# 构造包含完整背景信息的报告文本,防止 AI 误报缺失表头信息
full_report_context = f"""
【报告背景信息】:
患者姓名:{patient_info.get('patientName', '测试')}
患者性别:{patient_info.get('sex', '未提供')}
患者年龄:{patient_info.get('age', '未提供')}
检查部位:{examine_part or '全腹'}
【报告正文】:
{raw_report}
""".strip()
# 切换为单模型模式以提升响应速度,不再使用耗时的共识模式
selected_model = "glm-4-flash"
# 1. Rules (Instant)
print("DEBUG: Running Rule Engine...")
try:
rule_findings = self.rule_engine.run_checks(raw_report, patient_info, examine_part)
print(f"DEBUG: Rule findings count: {len(rule_findings)}")
except Exception as rule_err:
print(f"DEBUG: Rule Engine Error: {rule_err}")
rule_findings = []
rule_str = "\n".join(rule_findings) if rule_findings else "未发现自动规则错误。"
# 2. RAG Context (安全加载)
print("DEBUG: Fetching RAG Context...")
context = "请根据医学常识进行判断。"
try:
vs = self.rag.get_vectorstore()
if vs:
relevant_docs = vs.similarity_search(raw_report, k=1)
context = "\n".join([doc.page_content for doc in relevant_docs])
except Exception as rag_err:
print(f"DEBUG: RAG Step missed: {rag_err}")
# 3. LLM Analysis
print("DEBUG: Starting LLM Analysis...")
instruction_payload = {
"text": "只输出发现的错误,不输出无错误项。",
"rule_findings": rule_str
}
if selected_model == "consensus":
print(f"DEBUG: Selected model: consensus. Calling sub-models...")
# ... (保持原有的多模型并发逻辑,但合并提示词会要求它们都闭嘴不谈正确项)
tasks = [
self.get_llm_analysis("glm-4-flash", full_report_context, context, instruction_payload),
self.get_llm_analysis("qwen-max", full_report_context, context, instruction_payload),
]
results = await asyncio.gather(*tasks)
print("DEBUG: Sub-models finished. Starting synthesis...")
synthesizer_llm = ChatOpenAI(
model="glm-4-flash",
api_key=ZHIPU_API_KEY,
base_url=ZHIPU_BASE_URL,
temperature=0.1,
timeout=30
)
synthesis_prompt = ChatPromptTemplate.from_template("""
你现在是【终审质控组长】。请汇总多方质控意见(下方 A/B 记录),生成最终结论。
【质控原始记录】:
{results}
【绝对禁令】:
1. 严禁输出任何“未见明显错误”、“符合规范”或“基本正确”的描述。
2. 严禁输出“范例报告”、“参考报告”、“修改后报告”或任何类似的全篇重写内容。
3. **重点:绝对禁止就“超声所见”与“超声提示”的分段、分行、加粗标题、冒号区分等格式排版问题提出任何建议。即使 A/B 记录中有此类建议,也请将其判定为误报并剔除。**
4. 严禁输出“评分报告”、“评分与范例生成”等标题。
【输出格式要求】:
1. 仅列出发现的每一个真实错误及其扣分理由(按点排列,直接说事,不废话)。
2. 最后必须输出一个合法的 JSON 块。
3. 如果合法的扣分项为零,则正文仅显示:该报告未发现明显质量问题。
```json
{{
"score": XX,
"issues": [
{{ "original": "原文词", "reason": "扣分原因", "suggestion": "修正建议" }}
],
"corrected_fields": {{ "bSee": "修正后的完整所见", "bHint": "修正后的完整提示" }}
}}
```
""")
synth_chain = synthesis_prompt | synthesizer_llm | StrOutputParser()
combined_results = f"A: {results[0]}\nB: {results[1]}"
ai_findings = await synth_chain.ainvoke({
"raw_report": raw_report,
"rule_findings": rule_str,
"results": combined_results,
"scoring_text": config_manager.get_scoring_text()
})
else:
ai_findings = await self.get_llm_analysis(selected_model, full_report_context, context, instruction_payload)
# Parse AI results to extract score and structured issues
ai_description = ai_findings
ai_score = 100
ai_structured_issues = []
try:
# 改进 JSON 提取:查找所有疑似 JSON 块并选取“信息量最大”的一个
json_blocks = re.findall(r"```json\s*(.*?)\s*```", ai_findings, re.DOTALL)
if not json_blocks:
# 尝试直接找大括号
json_blocks = re.findall(r"(\{.*?\})", ai_findings, re.DOTALL)
target_json = ""
if json_blocks:
# 优先级排序:先找包含 issues 的,再找位置靠后的
candidates = []
for jb in json_blocks:
try:
parsed = json.loads(jb.strip())
if isinstance(parsed, dict):
# 计算权重:有 issues 权重更高
weight = len(parsed.get('issues', [])) * 10 - parsed.get('score', 100)
candidates.append((weight, parsed, jb))
except: continue
if candidates:
# 选取权重最高的一个
candidates.sort(key=lambda x: x[0], reverse=True)
best_match = candidates[0][1]
ai_score = best_match.get("score", 100)
ai_structured_issues = best_match.get("issues", [])
target_json = candidates[0][2]
# 提取描述(第一个 JSON 块之前的内容)
first_pos = ai_findings.find(target_json)
if first_pos > 0:
ai_description = ai_findings[:first_pos].strip()
except Exception as e:
print(f"DEBUG: JSON Parser Error: {e}")
# 评分校准逻辑:确保分值真实反映报告质量
# 1. 本地规则校准(最高优先级)
if rule_findings:
has_severe = any("【严重】" in f for f in rule_findings)
has_fix = any("【建议修正】" in f or "【规范建议】" in f for f in rule_findings)
if has_severe: ai_score = min(ai_score, 60)
elif has_fix: ai_score = min(ai_score, 95)
# 2. AI 自身识别出的严重错误校准(防止 AI 说一套做一套)
if ai_structured_issues:
has_ai_severe = any("严重" in str(i.get('type', '')) or "严重" in str(i.get('reason', '')) for i in ai_structured_issues if isinstance(i, dict))
if has_ai_severe and ai_score > 60:
print("DEBUG: AI identified severe issues but gave high score. Calibrating to 60.")
ai_score = 60
# 最后屏护:只有真的没分扣了且没规则发现,才维持 100 分。
# 只要有 issues,分值绝对不允许是 100
if ai_structured_issues and ai_score >= 100:
ai_score = 98
# 闭环逻辑:仅仅在真的没任何问题时才隐藏清单
if ai_score == 100 and not rule_findings and not ai_structured_issues:
ai_structured_issues = []
# 归一化处理:确保 rule_findings 中的原始词也被加入高亮列表,支撑前端展示
existing_originals = {str(issue.get('original', '')) for issue in ai_structured_issues if isinstance(issue, dict)}
SIDE_CONFLICT_KEYWORDS = ("侧位冲突", "侧位矛盾", "侧位倾向")
for find in rule_findings:
if any(kw in find for kw in SIDE_CONFLICT_KEYWORDS):
# 侧位冲突:同时注入两类高亮
# - "仅含'X'侧" → 错误侧位字,type=侧位错误(前端标黄)
# - "应为'Y'侧" → 正确侧位字,type=侧位参照(前端标蓝)
# 规则文本格式: ...(仅含'左'侧,应为'右'侧)...
quoted = re.findall(r"'([^']+)'", find)
for q in quoted:
if q not in ("", ""):
continue
if f"仅含'{q}'" in find and q not in existing_originals:
ai_structured_issues.append({
"original": q,
"reason": "侧位冲突(规则引擎检测)",
"suggestion": f"报告中出现了'{q}'侧,与申请部位不符,请核实",
"type": "侧位错误" # 前端 → 黄色高亮
})
existing_originals.add(q)
elif f"应为'{q}'" in find and q not in existing_originals:
ai_structured_issues.append({
"original": q,
"reason": "正确侧位参照",
"suggestion": f"申请部位要求'{q}'侧,请确认报告中该侧描述是否准确",
"type": "侧位参照" # 前端 → 蓝色高亮
})
existing_originals.add(q)
continue
# 其他规则:提取引号中的敏感词(如检测到'冥想')
extracted_words = re.findall(r"'(.*?)'", find)
if extracted_words:
orig_word = extracted_words[0]
if orig_word not in existing_originals:
ai_structured_issues.append({
"original": orig_word,
"reason": "规则引擎自动发现",
"suggestion": "请参考规则修正建议",
"type": "自动规则"
})
existing_originals.add(orig_word)
# Final Report Construction
final_report = "### 超声报告质控分析报告\n\n"
final_report += f"#### 🎯 综合评分:**{ai_score} 分**\n\n"
if rule_findings:
final_report += "#### ⚠️ 自动规则检测(高可靠):\n"
for find in rule_findings:
final_report += f"- {find}\n"
final_report += "\n"
final_report += "#### 🤖 AI 智能分析:\n"
if ai_description and len(ai_description) > 5 and "未发现" not in ai_description:
final_report += f"{ai_description}\n"
elif rule_findings:
final_report += "✅ AI 深度审核未发现其它语义逻辑错误,请优先参考上方‘自动检测’给出的修正建议。\n"
else:
final_report += "✅ 该报告未发现明显质量问题。\n"
if ai_structured_issues:
final_report += "\n#### 📝 详细建议清单:\n"
# 强化去重:根据 (原文词, 修正建议) 进行去重,防止 AI 重复输出或规则重复合并
seen_issue_keys = set()
unique_issues = []
for issue in ai_structured_issues:
if not isinstance(issue, dict): continue
key = (str(issue.get('original', '')).strip(), str(issue.get('suggestion', '')).strip())
if key not in seen_issue_keys:
unique_issues.append(issue)
seen_issue_keys.add(key)
for issue in unique_issues:
orig = issue.get('original', '原文')
sugg = issue.get('suggestion', '建议内容')
reason = issue.get('reason', '原因描述')
itype = issue.get('type', '核心错误')
final_report += f"- **[{itype}]** {orig} -> {sugg} ({reason})\n"
# 兜底:如果完全没提到规则发现,则 AI 可能漏掉了
if rule_findings and not ai_structured_issues:
final_report += "\n*(提示:已根据自动规则检测结果更新最终评分)*\n"
# 如果解析后的描述太短且有原始输出,则作为参考附上(先剥离其中的 json 代码块避免干扰前端解析)
if len(final_report) < 150 and len(ai_findings) > 100:
final_report += "\n---\n#### 🔍 原始分析记录:\n"
clean_ai_findings = re.sub(r"```json[\s\S]*?```", "", ai_findings).strip()
final_report += clean_ai_findings
# 追加最终的结构化 JSON 块,供前端精准解析高亮和一键修正
# 提取 AI 的纠正字段(保留其深度修正后的全文本)
final_corrected_fields = {}
try:
# 再次从原始输出提取 corrected_fields
json_blocks = re.findall(r"```json\s*(.*?)\s*```", ai_findings, re.DOTALL)
if json_blocks:
last_json = json.loads(json_blocks[-1].strip())
final_corrected_fields = last_json.get("corrected_fields", {})
except:
pass
final_json = {
"score": ai_score,
"issues": ai_structured_issues,
"corrected_fields": final_corrected_fields
}
# 使用唯一标记包裹最终 JSON,前端按此标记精确定位,避免误匹配多个代码块
final_report += f"\n\n===FINAL_QC_JSON===\n{json.dumps(final_json, ensure_ascii=False, indent=2)}\n===END_QC_JSON===\n"
overall_elapsed = time.time() - overall_start
print(f"DEBUG: Total QC processing time: {overall_elapsed:.2f}s. Final Score: {ai_score}")
return final_report
# Global instance
qc_manager = QCManager()