|
|
|
|
@ -7,7 +7,7 @@ from ...core.config import settings
|
|
|
|
|
from ..utils.json_extractor import LLMJsonExtractor
|
|
|
|
|
from ..utils.llm_validator import LLMResponseValidator
|
|
|
|
|
import re
|
|
|
|
|
from .regs.entity_regex import extract_id_number_entities, extract_social_credit_code_entities
|
|
|
|
|
from .regs.entity_regex import extract_id_number_entities, extract_social_credit_code_entities, extract_case_number_entities
|
|
|
|
|
from .extractors.ner_extractor import NERExtractor
|
|
|
|
|
from pypinyin import pinyin, Style
|
|
|
|
|
|
|
|
|
|
@ -722,7 +722,8 @@ class NerProcessor:
|
|
|
|
|
|
|
|
|
|
regex_entity_extractors = [
|
|
|
|
|
extract_id_number_entities,
|
|
|
|
|
extract_social_credit_code_entities
|
|
|
|
|
extract_social_credit_code_entities,
|
|
|
|
|
extract_case_number_entities
|
|
|
|
|
]
|
|
|
|
|
for extractor in regex_entity_extractors:
|
|
|
|
|
mapping = extractor(chunk)
|
|
|
|
|
@ -733,6 +734,38 @@ class NerProcessor:
|
|
|
|
|
|
|
|
|
|
return mapping_pipeline
|
|
|
|
|
|
|
|
|
|
def build_mapping_regex_only(self, chunk: str) -> list[Dict[str, str]]:
|
|
|
|
|
"""
|
|
|
|
|
Build mapping using only regex-based extraction (no NER, no LLM)
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
chunk: Text chunk to process
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
List of entity mappings
|
|
|
|
|
"""
|
|
|
|
|
mapping_pipeline = []
|
|
|
|
|
|
|
|
|
|
# Use regex-based extraction for IDs, codes, and case numbers
|
|
|
|
|
regex_entity_extractors = [
|
|
|
|
|
extract_id_number_entities,
|
|
|
|
|
extract_social_credit_code_entities,
|
|
|
|
|
extract_case_number_entities
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
for extractor in regex_entity_extractors:
|
|
|
|
|
mapping = extractor(chunk)
|
|
|
|
|
if mapping and LLMResponseValidator.validate_regex_entity(mapping):
|
|
|
|
|
mapping_pipeline.append(mapping)
|
|
|
|
|
logger.info(f"Regex extraction: Added mapping from {extractor.__name__}")
|
|
|
|
|
elif mapping:
|
|
|
|
|
logger.warning(f"Invalid regex entity mapping format: {mapping}")
|
|
|
|
|
else:
|
|
|
|
|
logger.debug(f"No entities found by {extractor.__name__}")
|
|
|
|
|
|
|
|
|
|
logger.info(f"Regex-only extraction: Found {len(mapping_pipeline)} mappings")
|
|
|
|
|
return mapping_pipeline
|
|
|
|
|
|
|
|
|
|
def build_mapping_llm_only(self, chunk: str) -> list[Dict[str, str]]:
|
|
|
|
|
"""
|
|
|
|
|
Build mapping using only LLM (no NER)
|
|
|
|
|
@ -758,10 +791,11 @@ class NerProcessor:
|
|
|
|
|
if mapping:
|
|
|
|
|
mapping_pipeline.append(mapping)
|
|
|
|
|
|
|
|
|
|
# Include regex-based extraction for IDs and codes
|
|
|
|
|
# Include regex-based extraction for IDs, codes, and case numbers
|
|
|
|
|
regex_entity_extractors = [
|
|
|
|
|
extract_id_number_entities,
|
|
|
|
|
extract_social_credit_code_entities
|
|
|
|
|
extract_social_credit_code_entities,
|
|
|
|
|
extract_case_number_entities
|
|
|
|
|
]
|
|
|
|
|
for extractor in regex_entity_extractors:
|
|
|
|
|
mapping = extractor(chunk)
|
|
|
|
|
@ -792,10 +826,11 @@ class NerProcessor:
|
|
|
|
|
mapping_pipeline.append(ner_mapping)
|
|
|
|
|
logger.info(f"NER-only extraction: Added {len(ner_entities)} entities")
|
|
|
|
|
|
|
|
|
|
# Still include regex-based extraction for IDs and codes
|
|
|
|
|
# Still include regex-based extraction for IDs, codes, and case numbers
|
|
|
|
|
regex_entity_extractors = [
|
|
|
|
|
extract_id_number_entities,
|
|
|
|
|
extract_social_credit_code_entities
|
|
|
|
|
extract_social_credit_code_entities,
|
|
|
|
|
extract_case_number_entities
|
|
|
|
|
]
|
|
|
|
|
for extractor in regex_entity_extractors:
|
|
|
|
|
mapping = extractor(chunk)
|
|
|
|
|
@ -863,26 +898,66 @@ class NerProcessor:
|
|
|
|
|
for group in linkage.get('entity_groups', []):
|
|
|
|
|
group_type = group.get('group_type', '')
|
|
|
|
|
entities = group.get('entities', [])
|
|
|
|
|
|
|
|
|
|
if '公司' in group_type or 'Company' in group_type:
|
|
|
|
|
for entity in entities:
|
|
|
|
|
# 使用新的公司名称脱敏方法
|
|
|
|
|
masked = self._mask_company_name(entity['text'])
|
|
|
|
|
group_mask_map[entity['text']] = masked
|
|
|
|
|
# 🚀 OPTIMIZATION: Find primary entity and mask once
|
|
|
|
|
primary_entity = self._find_primary_company_entity(entities)
|
|
|
|
|
if primary_entity:
|
|
|
|
|
# Call _mask_company_name only once for the primary entity
|
|
|
|
|
primary_masked = self._mask_company_name(primary_entity['text'])
|
|
|
|
|
logger.info(f"Masked primary company '{primary_entity['text']}' -> '{primary_masked}'")
|
|
|
|
|
|
|
|
|
|
# Use the same masked name for all entities in the group
|
|
|
|
|
for entity in entities:
|
|
|
|
|
group_mask_map[entity['text']] = primary_masked
|
|
|
|
|
logger.debug(f"Applied same mask '{primary_masked}' to '{entity['text']}'")
|
|
|
|
|
else:
|
|
|
|
|
# Fallback: mask each entity individually if no primary found
|
|
|
|
|
for entity in entities:
|
|
|
|
|
masked = self._mask_company_name(entity['text'])
|
|
|
|
|
group_mask_map[entity['text']] = masked
|
|
|
|
|
|
|
|
|
|
elif '人名' in group_type:
|
|
|
|
|
for entity in entities:
|
|
|
|
|
name = entity['text']
|
|
|
|
|
if not name:
|
|
|
|
|
continue
|
|
|
|
|
# 使用新的中文姓名脱敏方法
|
|
|
|
|
masked = self._mask_chinese_name(name, surname_counter)
|
|
|
|
|
group_mask_map[name] = masked
|
|
|
|
|
# 🚀 OPTIMIZATION: Find primary entity and mask once
|
|
|
|
|
primary_entity = self._find_primary_person_entity(entities)
|
|
|
|
|
if primary_entity:
|
|
|
|
|
# Call _mask_chinese_name only once for the primary entity
|
|
|
|
|
primary_masked = self._mask_chinese_name(primary_entity['text'], surname_counter)
|
|
|
|
|
logger.info(f"Masked primary person '{primary_entity['text']}' -> '{primary_masked}'")
|
|
|
|
|
|
|
|
|
|
# Use the same masked name for all entities in the group
|
|
|
|
|
for entity in entities:
|
|
|
|
|
group_mask_map[entity['text']] = primary_masked
|
|
|
|
|
logger.debug(f"Applied same mask '{primary_masked}' to '{entity['text']}'")
|
|
|
|
|
else:
|
|
|
|
|
# Fallback: mask each entity individually if no primary found
|
|
|
|
|
for entity in entities:
|
|
|
|
|
name = entity['text']
|
|
|
|
|
if not name:
|
|
|
|
|
continue
|
|
|
|
|
masked = self._mask_chinese_name(name, surname_counter)
|
|
|
|
|
group_mask_map[name] = masked
|
|
|
|
|
|
|
|
|
|
elif '英文人名' in group_type:
|
|
|
|
|
for entity in entities:
|
|
|
|
|
name = entity['text']
|
|
|
|
|
if not name:
|
|
|
|
|
continue
|
|
|
|
|
masked = ' '.join([n[0] + '***' if n else '' for n in name.split()])
|
|
|
|
|
group_mask_map[name] = masked
|
|
|
|
|
# 🚀 OPTIMIZATION: Find primary entity and mask once
|
|
|
|
|
primary_entity = self._find_primary_person_entity(entities)
|
|
|
|
|
if primary_entity:
|
|
|
|
|
# Call masking only once for the primary entity
|
|
|
|
|
primary_masked = ' '.join([n[0] + '***' if n else '' for n in primary_entity['text'].split()])
|
|
|
|
|
logger.info(f"Masked primary English person '{primary_entity['text']}' -> '{primary_masked}'")
|
|
|
|
|
|
|
|
|
|
# Use the same masked name for all entities in the group
|
|
|
|
|
for entity in entities:
|
|
|
|
|
group_mask_map[entity['text']] = primary_masked
|
|
|
|
|
logger.debug(f"Applied same mask '{primary_masked}' to '{entity['text']}'")
|
|
|
|
|
else:
|
|
|
|
|
# Fallback: mask each entity individually if no primary found
|
|
|
|
|
for entity in entities:
|
|
|
|
|
name = entity['text']
|
|
|
|
|
if not name:
|
|
|
|
|
continue
|
|
|
|
|
masked = ' '.join([n[0] + '***' if n else '' for n in name.split()])
|
|
|
|
|
group_mask_map[name] = masked
|
|
|
|
|
for entity in unique_entities:
|
|
|
|
|
text = entity['text'] # Use cleaned text for mapping
|
|
|
|
|
entity_type = entity.get('type', '')
|
|
|
|
|
@ -958,6 +1033,114 @@ class NerProcessor:
|
|
|
|
|
used_masked_names.add(masked)
|
|
|
|
|
return entity_mapping
|
|
|
|
|
|
|
|
|
|
def _find_primary_company_entity(self, entities: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
|
|
|
|
|
"""
|
|
|
|
|
Find the primary company entity from a group of related company entities.
|
|
|
|
|
|
|
|
|
|
Strategy:
|
|
|
|
|
1. Look for entity marked as 'is_primary': True
|
|
|
|
|
2. If no primary marked, find the longest/fullest company name
|
|
|
|
|
3. Prefer entities with '公司名称' type over '公司名称简称'
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
entities: List of company entities in a group
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Primary entity or None if not found
|
|
|
|
|
"""
|
|
|
|
|
if not entities:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# First, look for explicitly marked primary entity
|
|
|
|
|
for entity in entities:
|
|
|
|
|
if entity.get('is_primary', False):
|
|
|
|
|
logger.debug(f"Found explicitly marked primary company: {entity['text']}")
|
|
|
|
|
return entity
|
|
|
|
|
|
|
|
|
|
# If no primary marked, find the most complete company name
|
|
|
|
|
# Prefer entities with '公司名称' type over '公司名称简称'
|
|
|
|
|
primary_candidates = []
|
|
|
|
|
secondary_candidates = []
|
|
|
|
|
|
|
|
|
|
for entity in entities:
|
|
|
|
|
entity_type = entity.get('type', '')
|
|
|
|
|
if '公司名称' in entity_type and '简称' not in entity_type:
|
|
|
|
|
primary_candidates.append(entity)
|
|
|
|
|
else:
|
|
|
|
|
secondary_candidates.append(entity)
|
|
|
|
|
|
|
|
|
|
# If we have primary candidates, choose the longest one
|
|
|
|
|
if primary_candidates:
|
|
|
|
|
primary_entity = max(primary_candidates, key=lambda x: len(x['text']))
|
|
|
|
|
logger.debug(f"Selected primary company from primary candidates: {primary_entity['text']}")
|
|
|
|
|
return primary_entity
|
|
|
|
|
|
|
|
|
|
# If no primary candidates, choose the longest from secondary candidates
|
|
|
|
|
if secondary_candidates:
|
|
|
|
|
primary_entity = max(secondary_candidates, key=lambda x: len(x['text']))
|
|
|
|
|
logger.debug(f"Selected primary company from secondary candidates: {primary_entity['text']}")
|
|
|
|
|
return primary_entity
|
|
|
|
|
|
|
|
|
|
# Fallback: return the longest entity overall
|
|
|
|
|
primary_entity = max(entities, key=lambda x: len(x['text']))
|
|
|
|
|
logger.debug(f"Selected primary company by length: {primary_entity['text']}")
|
|
|
|
|
return primary_entity
|
|
|
|
|
|
|
|
|
|
def _find_primary_person_entity(self, entities: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
|
|
|
|
|
"""
|
|
|
|
|
Find the primary person entity from a group of related person entities.
|
|
|
|
|
|
|
|
|
|
Strategy:
|
|
|
|
|
1. Look for entity marked as 'is_primary': True
|
|
|
|
|
2. If no primary marked, find the longest/fullest person name
|
|
|
|
|
3. Prefer entities with '人名' type over '英文人名'
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
entities: List of person entities in a group
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Primary entity or None if not found
|
|
|
|
|
"""
|
|
|
|
|
if not entities:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# First, look for explicitly marked primary entity
|
|
|
|
|
for entity in entities:
|
|
|
|
|
if entity.get('is_primary', False):
|
|
|
|
|
logger.debug(f"Found explicitly marked primary person: {entity['text']}")
|
|
|
|
|
return entity
|
|
|
|
|
|
|
|
|
|
# If no primary marked, find the most complete person name
|
|
|
|
|
# Prefer entities with '人名' type over '英文人名'
|
|
|
|
|
chinese_candidates = []
|
|
|
|
|
english_candidates = []
|
|
|
|
|
|
|
|
|
|
for entity in entities:
|
|
|
|
|
entity_type = entity.get('type', '')
|
|
|
|
|
if '人名' in entity_type and '英文' not in entity_type:
|
|
|
|
|
chinese_candidates.append(entity)
|
|
|
|
|
elif '英文人名' in entity_type:
|
|
|
|
|
english_candidates.append(entity)
|
|
|
|
|
else:
|
|
|
|
|
chinese_candidates.append(entity) # Default to Chinese
|
|
|
|
|
|
|
|
|
|
# If we have Chinese candidates, choose the longest one
|
|
|
|
|
if chinese_candidates:
|
|
|
|
|
primary_entity = max(chinese_candidates, key=lambda x: len(x['text']))
|
|
|
|
|
logger.debug(f"Selected primary person from Chinese candidates: {primary_entity['text']}")
|
|
|
|
|
return primary_entity
|
|
|
|
|
|
|
|
|
|
# If no Chinese candidates, choose the longest from English candidates
|
|
|
|
|
if english_candidates:
|
|
|
|
|
primary_entity = max(english_candidates, key=lambda x: len(x['text']))
|
|
|
|
|
logger.debug(f"Selected primary person from English candidates: {primary_entity['text']}")
|
|
|
|
|
return primary_entity
|
|
|
|
|
|
|
|
|
|
# Fallback: return the longest entity overall
|
|
|
|
|
primary_entity = max(entities, key=lambda x: len(x['text']))
|
|
|
|
|
logger.debug(f"Selected primary person by length: {primary_entity['text']}")
|
|
|
|
|
return primary_entity
|
|
|
|
|
|
|
|
|
|
def _validate_linkage_format(self, linkage: Dict[str, Any]) -> bool:
|
|
|
|
|
return LLMResponseValidator.validate_entity_linkage(linkage)
|
|
|
|
|
|
|
|
|
|
@ -965,7 +1148,7 @@ class NerProcessor:
|
|
|
|
|
linkable_entities = []
|
|
|
|
|
for entity in unique_entities:
|
|
|
|
|
entity_type = entity.get('type', '')
|
|
|
|
|
if any(keyword in entity_type for keyword in ['公司', 'Company', '人名', '英文人名']):
|
|
|
|
|
if any(keyword in entity_type for keyword in ['公司', '公司名称', 'Company', '人名', '英文人名']):
|
|
|
|
|
linkable_entities.append(entity)
|
|
|
|
|
|
|
|
|
|
if not linkable_entities:
|
|
|
|
|
@ -1031,7 +1214,12 @@ class NerProcessor:
|
|
|
|
|
chunk_mappings.append(ner_mapping)
|
|
|
|
|
logger.info(f"Added {len(ner_entities)} NER entities to mappings")
|
|
|
|
|
|
|
|
|
|
logger.info(f"Final chunk mappings: {chunk_mappings}")
|
|
|
|
|
logger.info(f"NER-only mappings: {chunk_mappings}")
|
|
|
|
|
|
|
|
|
|
regex_mapping = self.build_mapping_regex_only(merged_text)
|
|
|
|
|
logger.info(f"Regex mapping: {regex_mapping}")
|
|
|
|
|
chunk_mappings.extend(regex_mapping)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
unique_entities = self._merge_entity_mappings(chunk_mappings)
|
|
|
|
|
logger.info(f"Unique entities: {unique_entities}")
|
|
|
|
|
|