Compare commits

...

2 Commits

2 changed files with 222 additions and 25 deletions

View File

@ -7,7 +7,7 @@ from ...core.config import settings
from ..utils.json_extractor import LLMJsonExtractor
from ..utils.llm_validator import LLMResponseValidator
import re
from .regs.entity_regex import extract_id_number_entities, extract_social_credit_code_entities
from .regs.entity_regex import extract_id_number_entities, extract_social_credit_code_entities, extract_case_number_entities
from .extractors.ner_extractor import NERExtractor
from pypinyin import pinyin, Style
@ -722,7 +722,8 @@ class NerProcessor:
regex_entity_extractors = [
extract_id_number_entities,
extract_social_credit_code_entities
extract_social_credit_code_entities,
extract_case_number_entities
]
for extractor in regex_entity_extractors:
mapping = extractor(chunk)
@ -733,6 +734,38 @@ class NerProcessor:
return mapping_pipeline
def build_mapping_regex_only(self, chunk: str) -> list[Dict[str, str]]:
"""
Build mapping using only regex-based extraction (no NER, no LLM)
Args:
chunk: Text chunk to process
Returns:
List of entity mappings
"""
mapping_pipeline = []
# Use regex-based extraction for IDs, codes, and case numbers
regex_entity_extractors = [
extract_id_number_entities,
extract_social_credit_code_entities,
extract_case_number_entities
]
for extractor in regex_entity_extractors:
mapping = extractor(chunk)
if mapping and LLMResponseValidator.validate_regex_entity(mapping):
mapping_pipeline.append(mapping)
logger.info(f"Regex extraction: Added mapping from {extractor.__name__}")
elif mapping:
logger.warning(f"Invalid regex entity mapping format: {mapping}")
else:
logger.debug(f"No entities found by {extractor.__name__}")
logger.info(f"Regex-only extraction: Found {len(mapping_pipeline)} mappings")
return mapping_pipeline
def build_mapping_llm_only(self, chunk: str) -> list[Dict[str, str]]:
"""
Build mapping using only LLM (no NER)
@ -758,10 +791,11 @@ class NerProcessor:
if mapping:
mapping_pipeline.append(mapping)
# Include regex-based extraction for IDs and codes
# Include regex-based extraction for IDs, codes, and case numbers
regex_entity_extractors = [
extract_id_number_entities,
extract_social_credit_code_entities
extract_social_credit_code_entities,
extract_case_number_entities
]
for extractor in regex_entity_extractors:
mapping = extractor(chunk)
@ -792,10 +826,11 @@ class NerProcessor:
mapping_pipeline.append(ner_mapping)
logger.info(f"NER-only extraction: Added {len(ner_entities)} entities")
# Still include regex-based extraction for IDs and codes
# Still include regex-based extraction for IDs, codes, and case numbers
regex_entity_extractors = [
extract_id_number_entities,
extract_social_credit_code_entities
extract_social_credit_code_entities,
extract_case_number_entities
]
for extractor in regex_entity_extractors:
mapping = extractor(chunk)
@ -863,26 +898,66 @@ class NerProcessor:
for group in linkage.get('entity_groups', []):
group_type = group.get('group_type', '')
entities = group.get('entities', [])
if '公司' in group_type or 'Company' in group_type:
for entity in entities:
# 使用新的公司名称脱敏方法
masked = self._mask_company_name(entity['text'])
group_mask_map[entity['text']] = masked
# 🚀 OPTIMIZATION: Find primary entity and mask once
primary_entity = self._find_primary_company_entity(entities)
if primary_entity:
# Call _mask_company_name only once for the primary entity
primary_masked = self._mask_company_name(primary_entity['text'])
logger.info(f"Masked primary company '{primary_entity['text']}' -> '{primary_masked}'")
# Use the same masked name for all entities in the group
for entity in entities:
group_mask_map[entity['text']] = primary_masked
logger.debug(f"Applied same mask '{primary_masked}' to '{entity['text']}'")
else:
# Fallback: mask each entity individually if no primary found
for entity in entities:
masked = self._mask_company_name(entity['text'])
group_mask_map[entity['text']] = masked
elif '人名' in group_type:
for entity in entities:
name = entity['text']
if not name:
continue
# 使用新的中文姓名脱敏方法
masked = self._mask_chinese_name(name, surname_counter)
group_mask_map[name] = masked
# 🚀 OPTIMIZATION: Find primary entity and mask once
primary_entity = self._find_primary_person_entity(entities)
if primary_entity:
# Call _mask_chinese_name only once for the primary entity
primary_masked = self._mask_chinese_name(primary_entity['text'], surname_counter)
logger.info(f"Masked primary person '{primary_entity['text']}' -> '{primary_masked}'")
# Use the same masked name for all entities in the group
for entity in entities:
group_mask_map[entity['text']] = primary_masked
logger.debug(f"Applied same mask '{primary_masked}' to '{entity['text']}'")
else:
# Fallback: mask each entity individually if no primary found
for entity in entities:
name = entity['text']
if not name:
continue
masked = self._mask_chinese_name(name, surname_counter)
group_mask_map[name] = masked
elif '英文人名' in group_type:
for entity in entities:
name = entity['text']
if not name:
continue
masked = ' '.join([n[0] + '***' if n else '' for n in name.split()])
group_mask_map[name] = masked
# 🚀 OPTIMIZATION: Find primary entity and mask once
primary_entity = self._find_primary_person_entity(entities)
if primary_entity:
# Call masking only once for the primary entity
primary_masked = ' '.join([n[0] + '***' if n else '' for n in primary_entity['text'].split()])
logger.info(f"Masked primary English person '{primary_entity['text']}' -> '{primary_masked}'")
# Use the same masked name for all entities in the group
for entity in entities:
group_mask_map[entity['text']] = primary_masked
logger.debug(f"Applied same mask '{primary_masked}' to '{entity['text']}'")
else:
# Fallback: mask each entity individually if no primary found
for entity in entities:
name = entity['text']
if not name:
continue
masked = ' '.join([n[0] + '***' if n else '' for n in name.split()])
group_mask_map[name] = masked
for entity in unique_entities:
text = entity['text'] # Use cleaned text for mapping
entity_type = entity.get('type', '')
@ -958,6 +1033,114 @@ class NerProcessor:
used_masked_names.add(masked)
return entity_mapping
def _find_primary_company_entity(self, entities: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""
Find the primary company entity from a group of related company entities.
Strategy:
1. Look for entity marked as 'is_primary': True
2. If no primary marked, find the longest/fullest company name
3. Prefer entities with '公司名称' type over '公司名称简称'
Args:
entities: List of company entities in a group
Returns:
Primary entity or None if not found
"""
if not entities:
return None
# First, look for explicitly marked primary entity
for entity in entities:
if entity.get('is_primary', False):
logger.debug(f"Found explicitly marked primary company: {entity['text']}")
return entity
# If no primary marked, find the most complete company name
# Prefer entities with '公司名称' type over '公司名称简称'
primary_candidates = []
secondary_candidates = []
for entity in entities:
entity_type = entity.get('type', '')
if '公司名称' in entity_type and '简称' not in entity_type:
primary_candidates.append(entity)
else:
secondary_candidates.append(entity)
# If we have primary candidates, choose the longest one
if primary_candidates:
primary_entity = max(primary_candidates, key=lambda x: len(x['text']))
logger.debug(f"Selected primary company from primary candidates: {primary_entity['text']}")
return primary_entity
# If no primary candidates, choose the longest from secondary candidates
if secondary_candidates:
primary_entity = max(secondary_candidates, key=lambda x: len(x['text']))
logger.debug(f"Selected primary company from secondary candidates: {primary_entity['text']}")
return primary_entity
# Fallback: return the longest entity overall
primary_entity = max(entities, key=lambda x: len(x['text']))
logger.debug(f"Selected primary company by length: {primary_entity['text']}")
return primary_entity
def _find_primary_person_entity(self, entities: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""
Find the primary person entity from a group of related person entities.
Strategy:
1. Look for entity marked as 'is_primary': True
2. If no primary marked, find the longest/fullest person name
3. Prefer entities with '人名' type over '英文人名'
Args:
entities: List of person entities in a group
Returns:
Primary entity or None if not found
"""
if not entities:
return None
# First, look for explicitly marked primary entity
for entity in entities:
if entity.get('is_primary', False):
logger.debug(f"Found explicitly marked primary person: {entity['text']}")
return entity
# If no primary marked, find the most complete person name
# Prefer entities with '人名' type over '英文人名'
chinese_candidates = []
english_candidates = []
for entity in entities:
entity_type = entity.get('type', '')
if '人名' in entity_type and '英文' not in entity_type:
chinese_candidates.append(entity)
elif '英文人名' in entity_type:
english_candidates.append(entity)
else:
chinese_candidates.append(entity) # Default to Chinese
# If we have Chinese candidates, choose the longest one
if chinese_candidates:
primary_entity = max(chinese_candidates, key=lambda x: len(x['text']))
logger.debug(f"Selected primary person from Chinese candidates: {primary_entity['text']}")
return primary_entity
# If no Chinese candidates, choose the longest from English candidates
if english_candidates:
primary_entity = max(english_candidates, key=lambda x: len(x['text']))
logger.debug(f"Selected primary person from English candidates: {primary_entity['text']}")
return primary_entity
# Fallback: return the longest entity overall
primary_entity = max(entities, key=lambda x: len(x['text']))
logger.debug(f"Selected primary person by length: {primary_entity['text']}")
return primary_entity
def _validate_linkage_format(self, linkage: Dict[str, Any]) -> bool:
return LLMResponseValidator.validate_entity_linkage(linkage)
@ -965,7 +1148,7 @@ class NerProcessor:
linkable_entities = []
for entity in unique_entities:
entity_type = entity.get('type', '')
if any(keyword in entity_type for keyword in ['公司', 'Company', '人名', '英文人名']):
if any(keyword in entity_type for keyword in ['公司', '公司名称', 'Company', '人名', '英文人名']):
linkable_entities.append(entity)
if not linkable_entities:
@ -1031,7 +1214,12 @@ class NerProcessor:
chunk_mappings.append(ner_mapping)
logger.info(f"Added {len(ner_entities)} NER entities to mappings")
logger.info(f"Final chunk mappings: {chunk_mappings}")
logger.info(f"NER-only mappings: {chunk_mappings}")
regex_mapping = self.build_mapping_regex_only(merged_text)
logger.info(f"Regex mapping: {regex_mapping}")
chunk_mappings.extend(regex_mapping)
unique_entities = self._merge_entity_mappings(chunk_mappings)
logger.info(f"Unique entities: {unique_entities}")

View File

@ -15,4 +15,13 @@ def extract_social_credit_code_entities(chunk: str) -> dict:
entities = []
for match in re.findall(credit_pattern, chunk):
entities.append({"text": match, "type": "统一社会信用代码"})
return {"entities": entities} if entities else {}
def extract_case_number_entities(chunk: str) -> dict:
"""Extract case numbers and return in entity mapping format."""
# Pattern for Chinese case numbers: (2022)京 03 民终 3852 号, 2020京0105 民初69754 号
case_pattern = r'[(]\d{4}[)][^\d]*\d+[^\d]*\d+[^\d]*号'
entities = []
for match in re.findall(case_pattern, chunk):
entities.append({"text": match, "type": "案号"})
return {"entities": entities} if entities else {}