Compare commits

...

3 Commits

Author SHA1 Message Date
tigerenwork eb33dc137e feat: 优化chunking,避免截断 2025-08-19 17:43:05 +08:00
tigerenwork ffa31d33de feat: 过滤掉置信度低的entity 2025-08-19 17:26:30 +08:00
tigerenwork 24f452818a feat: 更新替换算法,解决匹配token中有空格的问题 2025-08-19 16:08:49 +08:00
12 changed files with 1345 additions and 53 deletions

View File

@ -86,7 +86,7 @@ docker-compose build frontend
docker-compose build mineru-api
# Build multiple specific services
docker-compose build backend-api frontend
docker-compose build backend-api frontend celery-worker
```
### Building and restarting specific services

View File

@ -40,17 +40,36 @@ class DocumentProcessor(ABC):
return chunks
def _apply_mapping(self, text: str, mapping: Dict[str, str]) -> str:
"""Apply the mapping to replace sensitive information"""
masked_text = text
for original, masked in mapping.items():
if isinstance(masked, dict):
masked = next(iter(masked.values()), "")
elif not isinstance(masked, str):
masked = str(masked) if masked is not None else ""
masked_text = masked_text.replace(original, masked)
def _apply_mapping_with_alignment(self, text: str, mapping: Dict[str, str]) -> str:
"""
Apply the mapping to replace sensitive information using character-by-character alignment.
This method uses the new alignment-based masking to handle spacing issues
between NER results and original document text.
Args:
text: Original document text
mapping: Dictionary mapping original entity text to masked text
Returns:
Masked document text
"""
logger.info(f"Applying entity mapping with alignment to text of length {len(text)}")
logger.debug(f"Entity mapping: {mapping}")
# Use the new alignment-based masking method
masked_text = self.ner_processor.apply_entity_masking_with_alignment(text, mapping)
logger.info("Successfully applied entity masking with alignment")
return masked_text
def _apply_mapping(self, text: str, mapping: Dict[str, str]) -> str:
"""
Legacy method for simple string replacement.
Now delegates to the new alignment-based method.
"""
return self._apply_mapping_with_alignment(text, mapping)
def process_content(self, content: str) -> str:
"""Process document content by masking sensitive information"""
sentences = content.split("")
@ -59,9 +78,11 @@ class DocumentProcessor(ABC):
logger.info(f"Split content into {len(chunks)} chunks")
final_mapping = self.ner_processor.process(chunks)
logger.info(f"Generated entity mapping with {len(final_mapping)} entities")
masked_content = self._apply_mapping(content, final_mapping)
logger.info("Successfully masked content")
# Use the new alignment-based masking
masked_content = self._apply_mapping_with_alignment(content, final_mapping)
logger.info("Successfully masked content using character alignment")
return masked_content

View File

@ -1,5 +1,6 @@
import json
import logging
import re
from typing import Dict, List, Any, Optional
from transformers import pipeline, AutoTokenizer, AutoModelForTokenClassification
from .base_extractor import BaseExtractor
@ -19,6 +20,7 @@ class NERExtractor(BaseExtractor):
self.model = None
self.ner_pipeline = None
self._model_initialized = False
self.confidence_threshold = 0.95
# Map CLUENER model labels to our desired categories
self.label_map = {
@ -58,6 +60,164 @@ class NERExtractor(BaseExtractor):
logger.error(f"Failed to load NER model: {str(e)}")
raise Exception(f"NER model initialization failed: {str(e)}")
def _split_text_by_sentences(self, text: str) -> List[str]:
"""
Split text into sentences using Chinese sentence boundaries
Args:
text: The text to split
Returns:
List of sentences
"""
# Chinese sentence endings: 。!?;\n
# Also consider English sentence endings for mixed text
sentence_pattern = r'[。!?;\n]+|[.!?;]+'
sentences = re.split(sentence_pattern, text)
# Clean up sentences and filter out empty ones
cleaned_sentences = []
for sentence in sentences:
sentence = sentence.strip()
if sentence:
cleaned_sentences.append(sentence)
return cleaned_sentences
def _is_entity_boundary_safe(self, text: str, position: int) -> bool:
"""
Check if a position is safe for splitting (won't break entities)
Args:
text: The text to check
position: Position to check for safety
Returns:
True if safe to split at this position
"""
if position <= 0 or position >= len(text):
return True
# Common entity suffixes that indicate incomplete entities
entity_suffixes = ['', '', '', '', '', '', '', '', '', '', '', '', '', '']
# Check if we're in the middle of a potential entity
for suffix in entity_suffixes:
# Look for incomplete entity patterns
if text[position-1:position+1] in [f'{suffix}', f'{suffix}', f'{suffix}']:
return False
# Check for incomplete company names
if text[position-2:position+1] in ['公司', '事务所', '协会', '研究院']:
return False
# Check for incomplete address patterns
address_patterns = ['', '', '', '', '', '', '', '', '']
for pattern in address_patterns:
if text[position-1:position+1] in [f'{pattern}', f'{pattern}', f'{pattern}', f'{pattern}']:
return False
return True
def _create_sentence_chunks(self, sentences: List[str], max_tokens: int = 400) -> List[str]:
"""
Create chunks from sentences while respecting token limits and entity boundaries
Args:
sentences: List of sentences
max_tokens: Maximum tokens per chunk
Returns:
List of text chunks
"""
chunks = []
current_chunk = []
current_token_count = 0
for sentence in sentences:
# Estimate token count for this sentence
sentence_tokens = len(self.tokenizer.tokenize(sentence))
# If adding this sentence would exceed the limit
if current_token_count + sentence_tokens > max_tokens and current_chunk:
# Check if we can split the sentence to fit better
if sentence_tokens > max_tokens // 2: # If sentence is too long
# Try to split the sentence at a safe boundary
split_sentence = self._split_long_sentence(sentence, max_tokens - current_token_count)
if split_sentence:
# Add the first part to current chunk
current_chunk.append(split_sentence[0])
chunks.append(''.join(current_chunk))
# Start new chunk with remaining parts
current_chunk = split_sentence[1:]
current_token_count = sum(len(self.tokenizer.tokenize(s)) for s in current_chunk)
else:
# Finalize current chunk and start new one
chunks.append(''.join(current_chunk))
current_chunk = [sentence]
current_token_count = sentence_tokens
else:
# Finalize current chunk and start new one
chunks.append(''.join(current_chunk))
current_chunk = [sentence]
current_token_count = sentence_tokens
else:
# Add sentence to current chunk
current_chunk.append(sentence)
current_token_count += sentence_tokens
# Add the last chunk if it has content
if current_chunk:
chunks.append(''.join(current_chunk))
return chunks
def _split_long_sentence(self, sentence: str, max_tokens: int) -> Optional[List[str]]:
"""
Split a long sentence at safe boundaries
Args:
sentence: The sentence to split
max_tokens: Maximum tokens for the first part
Returns:
List of sentence parts, or None if splitting is not possible
"""
if len(self.tokenizer.tokenize(sentence)) <= max_tokens:
return None
# Try to find safe splitting points
# Look for punctuation marks that are safe to split at
safe_splitters = ['', ',', '', ';', '', '', ':']
for splitter in safe_splitters:
if splitter in sentence:
parts = sentence.split(splitter)
current_part = ""
for i, part in enumerate(parts):
test_part = current_part + part + (splitter if i < len(parts) - 1 else "")
if len(self.tokenizer.tokenize(test_part)) > max_tokens:
if current_part:
# Found a safe split point
remaining = splitter.join(parts[i:])
return [current_part, remaining]
break
current_part = test_part
# If no safe split point found, try character-based splitting with entity boundary check
target_chars = int(max_tokens / 1.5) # Rough character estimate
for i in range(target_chars, len(sentence)):
if self._is_entity_boundary_safe(sentence, i):
part1 = sentence[:i]
part2 = sentence[i:]
if len(self.tokenizer.tokenize(part1)) <= max_tokens:
return [part1, part2]
return None
def extract(self, text: str) -> Dict[str, Any]:
"""
Extract named entities from the given text
@ -103,7 +263,9 @@ class NERExtractor(BaseExtractor):
"""
try:
# Run the NER pipeline - it handles truncation automatically
logger.info(f"Running NER pipeline with text: {text}")
results = self.ner_pipeline(text)
logger.info(f"NER results: {results}")
# Filter and process entities
filtered_entities = []
@ -119,13 +281,20 @@ class NERExtractor(BaseExtractor):
# Clean up the tokenized text (remove spaces between Chinese characters)
cleaned_text = self._clean_tokenized_text(entity_text)
# Add to our list with both original and cleaned text
filtered_entities.append({
"text": cleaned_text, # Clean text for display/processing
"original_text": entity_text, # Original tokenized text from model
"type": entity_type,
"confidence": confidence_score
})
# Add to our list with both original and cleaned text, only add if confidence score is above threshold
# if entity_group is 'address' or 'company', and only has characters less then 3, then filter it out
if confidence_score > self.confidence_threshold:
filtered_entities.append({
"text": cleaned_text, # Clean text for display/processing
"tokenized_text": entity_text, # Original tokenized text from model
"type": entity_type,
"entity_group": entity_group,
"confidence": confidence_score
})
logger.info(f"Filtered entities: {filtered_entities}")
# filter out entities that are less then 3 characters with entity_group is 'address' or 'company'
filtered_entities = [entity for entity in filtered_entities if entity['entity_group'] not in ['address', 'company'] or len(entity['text']) > 3]
logger.info(f"Final Filtered entities: {filtered_entities}")
return {
"entities": filtered_entities,
@ -138,7 +307,7 @@ class NERExtractor(BaseExtractor):
def _extract_with_chunking(self, text: str) -> Dict[str, Any]:
"""
Extract entities from long text using chunking approach
Extract entities from long text using sentence-based chunking approach
Args:
text: The text to analyze
@ -147,41 +316,37 @@ class NERExtractor(BaseExtractor):
Dictionary containing extracted entities
"""
try:
# Estimate token count to determine safe chunk size
estimated_tokens = len(text) * 1.5 # Conservative estimate for Chinese text
logger.info(f"Estimated tokens: {estimated_tokens:.0f}")
logger.info(f"Using sentence-based chunking for text of length: {len(text)}")
# Calculate safe chunk size to stay under 512 tokens
# Target ~400 tokens per chunk to leave buffer
target_chunk_tokens = 400
chunk_size = int(target_chunk_tokens / 1.5) # Convert back to characters
overlap = max(50, chunk_size // 8) # 12.5% overlap, minimum 50 chars
# Split text into sentences
sentences = self._split_text_by_sentences(text)
logger.info(f"Split text into {len(sentences)} sentences")
logger.info(f"Using chunk_size: {chunk_size} chars, overlap: {overlap} chars")
# Create chunks from sentences
chunks = self._create_sentence_chunks(sentences, max_tokens=400)
logger.info(f"Created {len(chunks)} chunks from sentences")
all_entities = []
# Process text in overlapping character chunks
for i in range(0, len(text), chunk_size - overlap):
chunk_text = text[i:i + chunk_size]
# Process each chunk
for i, chunk in enumerate(chunks):
# Verify chunk won't exceed token limit
chunk_tokens = len(self.tokenizer.tokenize(chunk_text))
logger.info(f"Processing chunk {i//(chunk_size-overlap)+1}: {len(chunk_text)} chars, {chunk_tokens} tokens")
chunk_tokens = len(self.tokenizer.tokenize(chunk))
logger.info(f"Processing chunk {i+1}: {len(chunk)} chars, {chunk_tokens} tokens")
if chunk_tokens > 512:
logger.warning(f"Chunk {i//(chunk_size-overlap)+1} has {chunk_tokens} tokens, truncating")
logger.warning(f"Chunk {i+1} has {chunk_tokens} tokens, truncating")
# Truncate the chunk to fit within token limit
chunk_text = self.tokenizer.convert_tokens_to_string(
self.tokenizer.tokenize(chunk_text)[:512]
chunk = self.tokenizer.convert_tokens_to_string(
self.tokenizer.tokenize(chunk)[:512]
)
# Extract entities from this chunk
chunk_result = self._extract_single(chunk_text)
chunk_result = self._extract_single(chunk)
chunk_entities = chunk_result.get("entities", [])
all_entities.extend(chunk_entities)
logger.info(f"Chunk {i//(chunk_size-overlap)+1} extracted {len(chunk_entities)} entities")
logger.info(f"Chunk {i+1} extracted {len(chunk_entities)} entities")
# Remove duplicates while preserving order
unique_entities = []
@ -193,7 +358,7 @@ class NERExtractor(BaseExtractor):
seen_texts.add(text)
unique_entities.append(entity)
logger.info(f"Chunking completed: {len(all_entities)} total entities, {len(unique_entities)} unique entities")
logger.info(f"Sentence-based chunking completed: {len(all_entities)} total entities, {len(unique_entities)} unique entities")
return {
"entities": unique_entities,
@ -201,8 +366,8 @@ class NERExtractor(BaseExtractor):
}
except Exception as e:
logger.error(f"Error during chunked NER processing: {str(e)}")
raise Exception(f"Chunked NER processing failed: {str(e)}")
logger.error(f"Error during sentence-based chunked NER processing: {str(e)}")
raise Exception(f"Sentence-based chunked NER processing failed: {str(e)}")
def _clean_tokenized_text(self, tokenized_text: str) -> str:
"""

View File

@ -9,7 +9,7 @@ from .maskers.company_masker import CompanyMasker
from .maskers.address_masker import AddressMasker
from .maskers.id_masker import IDMasker
from .maskers.case_masker import CaseMasker
from ...services.ollama_client import OllamaClient
from ..services.ollama_client import OllamaClient
class MaskerFactory:

View File

@ -1,4 +1,4 @@
from typing import Any, Dict, List
from typing import Any, Dict, List, Tuple, Optional
from ..prompts.masking_prompts import get_ner_name_prompt, get_ner_company_prompt, get_ner_address_prompt, get_ner_project_prompt, get_ner_case_number_prompt, get_entity_linkage_prompt
import logging
import json
@ -20,9 +20,201 @@ class NerProcessor:
# Initialize NER extractor for ML-based entity extraction
self.ner_extractor = NERExtractor()
def _find_entity_alignment(self, entity_text: str, original_document_text: str) -> Optional[Tuple[int, int, str]]:
"""
Find entity in original document using character-by-character alignment.
This method handles the case where the original document may have spaces
that are not from tokenization, and the entity text may have different
spacing patterns.
Args:
entity_text: The entity text to find (may have spaces from tokenization)
original_document_text: The original document text (may have spaces)
Returns:
Tuple of (start_pos, end_pos, found_text) or None if not found
"""
# Remove all spaces from entity text to get clean characters
clean_entity = entity_text.replace(" ", "")
# Create character lists ignoring spaces from both entity and document
entity_chars = [c for c in clean_entity]
doc_chars = [c for c in original_document_text if c != ' ']
# Find the sequence in document characters
for i in range(len(doc_chars) - len(entity_chars) + 1):
if doc_chars[i:i+len(entity_chars)] == entity_chars:
# Found match, now map back to original positions
return self._map_char_positions_to_original(i, len(entity_chars), original_document_text)
return None
def _map_char_positions_to_original(self, clean_start: int, entity_length: int, original_text: str) -> Tuple[int, int, str]:
"""
Map positions from clean text (without spaces) back to original text positions.
Args:
clean_start: Start position in clean text (without spaces)
entity_length: Length of entity in characters
original_text: Original document text with spaces
Returns:
Tuple of (start_pos, end_pos, found_text) in original text
"""
original_pos = 0
clean_pos = 0
# Find the start position in original text
while clean_pos < clean_start and original_pos < len(original_text):
if original_text[original_pos] != ' ':
clean_pos += 1
original_pos += 1
start_pos = original_pos
# Find the end position by counting non-space characters
chars_found = 0
while chars_found < entity_length and original_pos < len(original_text):
if original_text[original_pos] != ' ':
chars_found += 1
original_pos += 1
end_pos = original_pos
# Extract the actual text from the original document
found_text = original_text[start_pos:end_pos]
return start_pos, end_pos, found_text
def _validate_mapping_format(self, mapping: Dict[str, Any]) -> bool:
return LLMResponseValidator.validate_entity_extraction(mapping)
def apply_entity_masking_with_alignment(self, original_document_text: str, entity_mapping: Dict[str, str], mask_char: str = "*") -> str:
"""
Apply entity masking to original document text using character-by-character alignment.
This method finds each entity in the original document using alignment and
replaces it with the corresponding masked version. It handles multiple
occurrences of the same entity by finding all instances before moving
to the next entity.
Args:
original_document_text: The original document text to mask
entity_mapping: Dictionary mapping original entity text to masked text
mask_char: Character to use for masking (default: "*")
Returns:
Masked document text
"""
masked_document = original_document_text
# Sort entities by length (longest first) to avoid partial matches
sorted_entities = sorted(entity_mapping.keys(), key=len, reverse=True)
for entity_text in sorted_entities:
masked_text = entity_mapping[entity_text]
# Skip if masked text is the same as original text (prevents infinite loop)
if entity_text == masked_text:
logger.debug(f"Skipping entity '{entity_text}' as masked text is identical")
continue
# Find ALL occurrences of this entity in the document
# We need to loop until no more matches are found
# Add safety counter to prevent infinite loops
max_iterations = 100 # Safety limit
iteration_count = 0
while iteration_count < max_iterations:
iteration_count += 1
# Find the entity in the current masked document using alignment
alignment_result = self._find_entity_alignment(entity_text, masked_document)
if alignment_result:
start_pos, end_pos, found_text = alignment_result
# Replace the found text with the masked version
masked_document = (
masked_document[:start_pos] +
masked_text +
masked_document[end_pos:]
)
logger.debug(f"Masked entity '{entity_text}' -> '{masked_text}' at positions {start_pos}-{end_pos} (iteration {iteration_count})")
else:
# No more occurrences found for this entity, move to next entity
logger.debug(f"No more occurrences of '{entity_text}' found in document after {iteration_count} iterations")
break
# Log warning if we hit the safety limit
if iteration_count >= max_iterations:
logger.warning(f"Reached maximum iterations ({max_iterations}) for entity '{entity_text}', stopping to prevent infinite loop")
return masked_document
def test_character_alignment(self) -> None:
"""
Test method to demonstrate character-by-character alignment functionality.
This method can be used to validate the alignment works correctly with
various spacing patterns.
"""
test_cases = [
# Test case 1: Entity with spaces, document without spaces
{
"entity_text": "李 淼",
"document_text": "上诉人李淼因合同纠纷",
"expected_found": "李淼"
},
# Test case 2: Entity without spaces, document with spaces
{
"entity_text": "邓青菁",
"document_text": "上诉人邓 青 菁因合同纠纷",
"expected_found": "邓 青 菁"
},
# Test case 3: Both entity and document have spaces
{
"entity_text": "王 欢 子",
"document_text": "法定代表人王 欢 子,总经理",
"expected_found": "王 欢 子"
},
# Test case 4: Entity without spaces, document without spaces
{
"entity_text": "郭东军",
"document_text": "法定代表人郭东军,执行董事",
"expected_found": "郭东军"
},
# Test case 5: Complex company name
{
"entity_text": "北京丰复久信营销科技有限公司",
"document_text": "上诉人(原审原告):北京 丰复久信 营销科技 有限公司",
"expected_found": "北京 丰复久信 营销科技 有限公司"
}
]
logger.info("Testing character-by-character alignment...")
for i, test_case in enumerate(test_cases, 1):
entity_text = test_case["entity_text"]
document_text = test_case["document_text"]
expected_found = test_case["expected_found"]
result = self._find_entity_alignment(entity_text, document_text)
if result:
start_pos, end_pos, found_text = result
success = found_text == expected_found
status = "✓ PASS" if success else "✗ FAIL"
logger.info(f"Test {i} {status}: Entity '{entity_text}' -> Found '{found_text}' (expected '{expected_found}') at positions {start_pos}-{end_pos}")
if not success:
logger.error(f" Expected: '{expected_found}', Got: '{found_text}'")
else:
logger.error(f"Test {i} ✗ FAIL: Entity '{entity_text}' not found in document")
logger.info("Character alignment testing completed.")
def extract_entities_with_ner(self, text: str) -> List[Dict[str, Any]]:
"""
Extract entities using the NER model
@ -826,11 +1018,12 @@ class NerProcessor:
# Process each chunk with LLM for additional entities
chunk_mappings = []
for i, chunk in enumerate(chunks):
logger.info(f"Processing chunk {i+1}/{len(chunks)} with LLM")
chunk_mapping = self.build_mapping_llm_only(chunk) # LLM-only processing
logger.info(f"Chunk mapping: {chunk_mapping}")
chunk_mappings.extend(chunk_mapping)
# TODO: 临时关闭LLM处理
# for i, chunk in enumerate(chunks):
# logger.info(f"Processing chunk {i+1}/{len(chunks)} with LLM")
# chunk_mapping = self.build_mapping_llm_only(chunk) # LLM-only processing
# logger.info(f"Chunk mapping: {chunk_mapping}")
# chunk_mappings.extend(chunk_mapping)
# Add NER entities to the mappings
if ner_entities:

View File

@ -3,7 +3,7 @@ Refactored NerProcessor using the new masker architecture.
"""
import logging
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple
from ..prompts.masking_prompts import (
get_ner_name_prompt, get_ner_company_prompt, get_ner_address_prompt,
get_ner_project_prompt, get_ner_case_number_prompt, get_entity_linkage_prompt
@ -28,6 +28,137 @@ class NerProcessorRefactored:
self.maskers = self._initialize_maskers()
self.surname_counter = {} # Shared counter for Chinese names
def _find_entity_alignment(self, entity_text: str, original_document_text: str) -> Optional[Tuple[int, int, str]]:
"""
Find entity in original document using character-by-character alignment.
This method handles the case where the original document may have spaces
that are not from tokenization, and the entity text may have different
spacing patterns.
Args:
entity_text: The entity text to find (may have spaces from tokenization)
original_document_text: The original document text (may have spaces)
Returns:
Tuple of (start_pos, end_pos, found_text) or None if not found
"""
# Remove all spaces from entity text to get clean characters
clean_entity = entity_text.replace(" ", "")
# Create character lists ignoring spaces from both entity and document
entity_chars = [c for c in clean_entity]
doc_chars = [c for c in original_document_text if c != ' ']
# Find the sequence in document characters
for i in range(len(doc_chars) - len(entity_chars) + 1):
if doc_chars[i:i+len(entity_chars)] == entity_chars:
# Found match, now map back to original positions
return self._map_char_positions_to_original(i, len(entity_chars), original_document_text)
return None
def _map_char_positions_to_original(self, clean_start: int, entity_length: int, original_text: str) -> Tuple[int, int, str]:
"""
Map positions from clean text (without spaces) back to original text positions.
Args:
clean_start: Start position in clean text (without spaces)
entity_length: Length of entity in characters
original_text: Original document text with spaces
Returns:
Tuple of (start_pos, end_pos, found_text) in original text
"""
original_pos = 0
clean_pos = 0
# Find the start position in original text
while clean_pos < clean_start and original_pos < len(original_text):
if original_text[original_pos] != ' ':
clean_pos += 1
original_pos += 1
start_pos = original_pos
# Find the end position by counting non-space characters
chars_found = 0
while chars_found < entity_length and original_pos < len(original_text):
if original_text[original_pos] != ' ':
chars_found += 1
original_pos += 1
end_pos = original_pos
# Extract the actual text from the original document
found_text = original_text[start_pos:end_pos]
return start_pos, end_pos, found_text
def apply_entity_masking_with_alignment(self, original_document_text: str, entity_mapping: Dict[str, str], mask_char: str = "*") -> str:
"""
Apply entity masking to original document text using character-by-character alignment.
This method finds each entity in the original document using alignment and
replaces it with the corresponding masked version. It handles multiple
occurrences of the same entity by finding all instances before moving
to the next entity.
Args:
original_document_text: The original document text to mask
entity_mapping: Dictionary mapping original entity text to masked text
mask_char: Character to use for masking (default: "*")
Returns:
Masked document text
"""
masked_document = original_document_text
# Sort entities by length (longest first) to avoid partial matches
sorted_entities = sorted(entity_mapping.keys(), key=len, reverse=True)
for entity_text in sorted_entities:
masked_text = entity_mapping[entity_text]
# Skip if masked text is the same as original text (prevents infinite loop)
if entity_text == masked_text:
logger.debug(f"Skipping entity '{entity_text}' as masked text is identical")
continue
# Find ALL occurrences of this entity in the document
# We need to loop until no more matches are found
# Add safety counter to prevent infinite loops
max_iterations = 100 # Safety limit
iteration_count = 0
while iteration_count < max_iterations:
iteration_count += 1
# Find the entity in the current masked document using alignment
alignment_result = self._find_entity_alignment(entity_text, masked_document)
if alignment_result:
start_pos, end_pos, found_text = alignment_result
# Replace the found text with the masked version
masked_document = (
masked_document[:start_pos] +
masked_text +
masked_document[end_pos:]
)
logger.debug(f"Masked entity '{entity_text}' -> '{masked_text}' at positions {start_pos}-{end_pos} (iteration {iteration_count})")
else:
# No more occurrences found for this entity, move to next entity
logger.debug(f"No more occurrences of '{entity_text}' found in document after {iteration_count} iterations")
break
# Log warning if we hit the safety limit
if iteration_count >= max_iterations:
logger.warning(f"Reached maximum iterations ({max_iterations}) for entity '{entity_text}', stopping to prevent infinite loop")
return masked_document
def _initialize_maskers(self) -> Dict[str, BaseMasker]:
"""Initialize all maskers"""
maskers = {}

View File

@ -0,0 +1,130 @@
# 句子分块改进文档
## 问题描述
在原始的NER提取过程中我们发现了一些实体被截断的问题比如
- `"丰复久信公"` (应该是 `"丰复久信营销科技有限公司"`)
- `"康达律师事"` (应该是 `"北京市康达律师事务所"`)
这些截断问题是由于原始的基于字符数量的简单分块策略导致的,该策略没有考虑实体的完整性。
## 解决方案
### 1. 句子分块策略
我们实现了基于句子的智能分块策略,主要特点:
- **自然边界分割**:使用中文句子结束符(。!?;\n和英文句子结束符.!?;)进行分割
- **实体完整性保护**:避免在实体名称中间进行分割
- **智能长度控制**基于token数量而非字符数量进行分块
### 2. 实体边界安全检查
实现了 `_is_entity_boundary_safe()` 方法来检查分割点是否安全:
```python
def _is_entity_boundary_safe(self, text: str, position: int) -> bool:
# 检查常见实体后缀
entity_suffixes = ['公', '司', '所', '院', '厅', '局', '部', '会', '团', '社', '处', '室', '楼', '号']
# 检查不完整的实体模式
if text[position-2:position+1] in ['公司', '事务所', '协会', '研究院']:
return False
# 检查地址模式
address_patterns = ['省', '市', '区', '县', '路', '街', '巷', '号', '室']
# ...
```
### 3. 长句子智能分割
对于超过token限制的长句子实现了智能分割策略
1. **标点符号分割**:优先在逗号、分号等标点符号处分割
2. **实体边界分割**:如果标点分割不可行,在安全的实体边界处分割
3. **强制分割**:最后才使用字符级别的强制分割
## 实现细节
### 核心方法
1. **`_split_text_by_sentences()`**: 将文本按句子分割
2. **`_create_sentence_chunks()`**: 基于句子创建分块
3. **`_split_long_sentence()`**: 智能分割长句子
4. **`_is_entity_boundary_safe()`**: 检查分割点安全性
### 分块流程
```
输入文本
按句子分割
估算token数量
创建句子分块
检查实体边界
输出最终分块
```
## 测试结果
### 改进前 vs 改进后
| 指标 | 改进前 | 改进后 |
|------|--------|--------|
| 截断实体数量 | 较多 | 显著减少 |
| 实体完整性 | 经常被破坏 | 得到保护 |
| 分块质量 | 基于字符 | 基于语义 |
### 测试案例
1. **"丰复久信公" 问题**
- 改进前:`"丰复久信公"` (截断)
- 改进后:`"北京丰复久信营销科技有限公司"` (完整)
2. **长句子处理**
- 改进前:可能在实体中间截断
- 改进后:在句子边界或安全位置分割
## 配置参数
- `max_tokens`: 每个分块的最大token数量 (默认: 400)
- `confidence_threshold`: 实体置信度阈值 (默认: 0.95)
- `sentence_pattern`: 句子分割正则表达式
## 使用示例
```python
from app.core.document_handlers.extractors.ner_extractor import NERExtractor
extractor = NERExtractor()
result = extractor.extract(long_text)
# 结果中的实体将更加完整
entities = result.get("entities", [])
for entity in entities:
print(f"{entity['text']} ({entity['type']})")
```
## 性能影响
- **内存使用**:略有增加(需要存储句子分割结果)
- **处理速度**:基本无影响(句子分割很快)
- **准确性**:显著提升(减少截断实体)
## 未来改进方向
1. **更智能的实体识别**:使用预训练模型识别实体边界
2. **动态分块大小**:根据文本复杂度调整分块大小
3. **多语言支持**:扩展到其他语言的分块策略
4. **缓存优化**:缓存句子分割结果以提高性能
## 相关文件
- `backend/app/core/document_handlers/extractors/ner_extractor.py` - 主要实现
- `backend/test_improved_chunking.py` - 测试脚本
- `backend/test_truncation_fix.py` - 截断问题测试
- `backend/test_chunking_logic.py` - 分块逻辑测试

View File

@ -0,0 +1,130 @@
#!/usr/bin/env python3
"""
Debug script to understand the position mapping issue after masking.
"""
def find_entity_alignment(entity_text: str, original_document_text: str):
"""Simplified version of the alignment method for testing"""
clean_entity = entity_text.replace(" ", "")
doc_chars = [c for c in original_document_text if c != ' ']
for i in range(len(doc_chars) - len(clean_entity) + 1):
if doc_chars[i:i+len(clean_entity)] == list(clean_entity):
return map_char_positions_to_original(i, len(clean_entity), original_document_text)
return None
def map_char_positions_to_original(clean_start: int, entity_length: int, original_text: str):
"""Simplified version of position mapping for testing"""
original_pos = 0
clean_pos = 0
while clean_pos < clean_start and original_pos < len(original_text):
if original_text[original_pos] != ' ':
clean_pos += 1
original_pos += 1
start_pos = original_pos
chars_found = 0
while chars_found < entity_length and original_pos < len(original_text):
if original_text[original_pos] != ' ':
chars_found += 1
original_pos += 1
end_pos = original_pos
found_text = original_text[start_pos:end_pos]
return start_pos, end_pos, found_text
def debug_position_issue():
"""Debug the position mapping issue"""
print("Debugging Position Mapping Issue")
print("=" * 50)
# Test document
original_doc = "上诉人李淼因合同纠纷,法定代表人李淼,委托代理人李淼。"
entity = "李淼"
masked_text = "李M"
print(f"Original document: '{original_doc}'")
print(f"Entity to mask: '{entity}'")
print(f"Masked text: '{masked_text}'")
print()
# First occurrence
print("=== First Occurrence ===")
result1 = find_entity_alignment(entity, original_doc)
if result1:
start1, end1, found1 = result1
print(f"Found at positions {start1}-{end1}: '{found1}'")
# Apply first mask
masked_doc = original_doc[:start1] + masked_text + original_doc[end1:]
print(f"After first mask: '{masked_doc}'")
print(f"Length changed from {len(original_doc)} to {len(masked_doc)}")
# Try to find second occurrence in the masked document
print("\n=== Second Occurrence (in masked document) ===")
result2 = find_entity_alignment(entity, masked_doc)
if result2:
start2, end2, found2 = result2
print(f"Found at positions {start2}-{end2}: '{found2}'")
# Apply second mask
masked_doc2 = masked_doc[:start2] + masked_text + masked_doc[end2:]
print(f"After second mask: '{masked_doc2}'")
# Try to find third occurrence
print("\n=== Third Occurrence (in double-masked document) ===")
result3 = find_entity_alignment(entity, masked_doc2)
if result3:
start3, end3, found3 = result3
print(f"Found at positions {start3}-{end3}: '{found3}'")
else:
print("No third occurrence found")
else:
print("No second occurrence found")
else:
print("No first occurrence found")
def debug_infinite_loop():
"""Debug the infinite loop issue"""
print("\n" + "=" * 50)
print("Debugging Infinite Loop Issue")
print("=" * 50)
# Test document that causes infinite loop
original_doc = "上诉人李淼因合同纠纷,法定代表人李淼。北京丰复久信营销科技有限公司,丰复久信公司。"
entity = "丰复久信公司"
masked_text = "丰复久信公司" # Same text (no change)
print(f"Original document: '{original_doc}'")
print(f"Entity to mask: '{entity}'")
print(f"Masked text: '{masked_text}' (same as original)")
print()
# This will cause infinite loop because we're replacing with the same text
print("=== This will cause infinite loop ===")
print("Because we're replacing '丰复久信公司' with '丰复久信公司'")
print("The document doesn't change, so we keep finding the same position")
# Show what happens
masked_doc = original_doc
for i in range(3): # Limit to 3 iterations for demo
result = find_entity_alignment(entity, masked_doc)
if result:
start, end, found = result
print(f"Iteration {i+1}: Found at positions {start}-{end}: '{found}'")
# Apply mask (but it's the same text)
masked_doc = masked_doc[:start] + masked_text + masked_doc[end:]
print(f"After mask: '{masked_doc}'")
else:
print(f"Iteration {i+1}: No occurrence found")
break
if __name__ == "__main__":
debug_position_issue()
debug_infinite_loop()

View File

@ -0,0 +1,67 @@
#!/usr/bin/env python3
"""
Test script for character-by-character alignment functionality.
This script demonstrates how the alignment handles different spacing patterns
between entity text and original document text.
"""
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), 'backend'))
from app.core.document_handlers.ner_processor import NerProcessor
def main():
"""Test the character alignment functionality."""
processor = NerProcessor()
print("Testing Character-by-Character Alignment")
print("=" * 50)
# Test the alignment functionality
processor.test_character_alignment()
print("\n" + "=" * 50)
print("Testing Entity Masking with Alignment")
print("=" * 50)
# Test entity masking with alignment
original_document = "上诉人原审原告北京丰复久信营销科技有限公司住所地北京市海淀区北小马厂6号1号楼华天大厦1306室。法定代表人郭东军执行董事、经理。委托诉讼代理人周大海北京市康达律师事务所律师。"
# Example entity mapping (from your NER results)
entity_mapping = {
"北京丰复久信营销科技有限公司": "北京JO营销科技有限公司",
"郭东军": "郭DJ",
"周大海": "周DH",
"北京市康达律师事务所": "北京市KD律师事务所"
}
print(f"Original document: {original_document}")
print(f"Entity mapping: {entity_mapping}")
# Apply masking with alignment
masked_document = processor.apply_entity_masking_with_alignment(
original_document,
entity_mapping
)
print(f"Masked document: {masked_document}")
# Test with document that has spaces
print("\n" + "=" * 50)
print("Testing with Document Containing Spaces")
print("=" * 50)
spaced_document = "上诉人(原审原告):北京 丰复久信 营销科技 有限公司住所地北京市海淀区北小马厂6号1号楼华天大厦1306室。法定代表人郭 东 军,执行董事、经理。"
print(f"Spaced document: {spaced_document}")
masked_spaced_document = processor.apply_entity_masking_with_alignment(
spaced_document,
entity_mapping
)
print(f"Masked spaced document: {masked_spaced_document}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,186 @@
#!/usr/bin/env python3
"""
Final test to verify the fix handles multiple occurrences and prevents infinite loops.
"""
def find_entity_alignment(entity_text: str, original_document_text: str):
"""Simplified version of the alignment method for testing"""
clean_entity = entity_text.replace(" ", "")
doc_chars = [c for c in original_document_text if c != ' ']
for i in range(len(doc_chars) - len(clean_entity) + 1):
if doc_chars[i:i+len(clean_entity)] == list(clean_entity):
return map_char_positions_to_original(i, len(clean_entity), original_document_text)
return None
def map_char_positions_to_original(clean_start: int, entity_length: int, original_text: str):
"""Simplified version of position mapping for testing"""
original_pos = 0
clean_pos = 0
while clean_pos < clean_start and original_pos < len(original_text):
if original_text[original_pos] != ' ':
clean_pos += 1
original_pos += 1
start_pos = original_pos
chars_found = 0
while chars_found < entity_length and original_pos < len(original_text):
if original_text[original_pos] != ' ':
chars_found += 1
original_pos += 1
end_pos = original_pos
found_text = original_text[start_pos:end_pos]
return start_pos, end_pos, found_text
def apply_entity_masking_with_alignment_fixed(original_document_text: str, entity_mapping: dict):
"""Fixed implementation that handles multiple occurrences and prevents infinite loops"""
masked_document = original_document_text
sorted_entities = sorted(entity_mapping.keys(), key=len, reverse=True)
for entity_text in sorted_entities:
masked_text = entity_mapping[entity_text]
# Skip if masked text is the same as original text (prevents infinite loop)
if entity_text == masked_text:
print(f"Skipping entity '{entity_text}' as masked text is identical")
continue
# Find ALL occurrences of this entity in the document
# Add safety counter to prevent infinite loops
max_iterations = 100 # Safety limit
iteration_count = 0
while iteration_count < max_iterations:
iteration_count += 1
# Find the entity in the current masked document using alignment
alignment_result = find_entity_alignment(entity_text, masked_document)
if alignment_result:
start_pos, end_pos, found_text = alignment_result
# Replace the found text with the masked version
masked_document = (
masked_document[:start_pos] +
masked_text +
masked_document[end_pos:]
)
print(f"Masked entity '{entity_text}' -> '{masked_text}' at positions {start_pos}-{end_pos} (iteration {iteration_count})")
else:
# No more occurrences found for this entity, move to next entity
print(f"No more occurrences of '{entity_text}' found in document after {iteration_count} iterations")
break
# Log warning if we hit the safety limit
if iteration_count >= max_iterations:
print(f"WARNING: Reached maximum iterations ({max_iterations}) for entity '{entity_text}', stopping to prevent infinite loop")
return masked_document
def test_final_fix():
"""Test the final fix with various scenarios"""
print("Testing Final Fix for Multiple Occurrences and Infinite Loop Prevention")
print("=" * 70)
# Test case 1: Multiple occurrences of the same entity (should work)
print("\nTest Case 1: Multiple occurrences of same entity")
test_document_1 = "上诉人李淼因合同纠纷,法定代表人李淼,委托代理人李淼。"
entity_mapping_1 = {"李淼": "李M"}
print(f"Original: {test_document_1}")
result_1 = apply_entity_masking_with_alignment_fixed(test_document_1, entity_mapping_1)
print(f"Result: {result_1}")
remaining_1 = result_1.count("李淼")
expected_1 = "上诉人李M因合同纠纷法定代表人李M委托代理人李M。"
if result_1 == expected_1 and remaining_1 == 0:
print("✅ PASS: All occurrences masked correctly")
else:
print(f"❌ FAIL: Expected '{expected_1}', got '{result_1}'")
print(f" Remaining '李淼' occurrences: {remaining_1}")
# Test case 2: Entity with same masked text (should skip to prevent infinite loop)
print("\nTest Case 2: Entity with same masked text (should skip)")
test_document_2 = "上诉人李淼因合同纠纷,法定代表人李淼。北京丰复久信营销科技有限公司,丰复久信公司。"
entity_mapping_2 = {
"李淼": "李M",
"丰复久信公司": "丰复久信公司" # Same text - should be skipped
}
print(f"Original: {test_document_2}")
result_2 = apply_entity_masking_with_alignment_fixed(test_document_2, entity_mapping_2)
print(f"Result: {result_2}")
remaining_2_li = result_2.count("李淼")
remaining_2_company = result_2.count("丰复久信公司")
if remaining_2_li == 0 and remaining_2_company == 1: # Company should remain unmasked
print("✅ PASS: Infinite loop prevented, only different text masked")
else:
print(f"❌ FAIL: Remaining '李淼': {remaining_2_li}, '丰复久信公司': {remaining_2_company}")
# Test case 3: Mixed spacing scenarios
print("\nTest Case 3: Mixed spacing scenarios")
test_document_3 = "上诉人李 淼因合同纠纷,法定代表人李淼,委托代理人李 淼。"
entity_mapping_3 = {"李 淼": "李M", "李淼": "李M"}
print(f"Original: {test_document_3}")
result_3 = apply_entity_masking_with_alignment_fixed(test_document_3, entity_mapping_3)
print(f"Result: {result_3}")
remaining_3 = result_3.count("李淼") + result_3.count("李 淼")
if remaining_3 == 0:
print("✅ PASS: Mixed spacing handled correctly")
else:
print(f"❌ FAIL: Remaining occurrences: {remaining_3}")
# Test case 4: Complex document with real examples
print("\nTest Case 4: Complex document with real examples")
test_document_4 = """上诉人原审原告北京丰复久信营销科技有限公司住所地北京市海淀区北小马厂6号1号楼华天大厦1306室。
法定代表人郭东军执行董事经理
委托诉讼代理人周大海北京市康达律师事务所律师
委托诉讼代理人王乃哲北京市康达律师事务所律师
被上诉人原审被告中研智创区块链技术有限公司住所地天津市津南区双港镇工业园区优谷产业园5号楼-1505
法定代表人王欢子总经理
委托诉讼代理人魏鑫北京市昊衡律师事务所律师"""
entity_mapping_4 = {
"北京丰复久信营销科技有限公司": "北京JO营销科技有限公司",
"郭东军": "郭DJ",
"周大海": "周DH",
"王乃哲": "王NZ",
"中研智创区块链技术有限公司": "中研智创区块链技术有限公司", # Same text - should be skipped
"王欢子": "王HZ",
"魏鑫": "魏X",
"北京市康达律师事务所": "北京市KD律师事务所",
"北京市昊衡律师事务所": "北京市HH律师事务所"
}
print(f"Original length: {len(test_document_4)} characters")
result_4 = apply_entity_masking_with_alignment_fixed(test_document_4, entity_mapping_4)
print(f"Result length: {len(result_4)} characters")
# Check that entities were masked correctly
unmasked_entities = []
for entity in entity_mapping_4.keys():
if entity in result_4 and entity != entity_mapping_4[entity]: # Skip if masked text is same
unmasked_entities.append(entity)
if not unmasked_entities:
print("✅ PASS: All entities masked correctly in complex document")
else:
print(f"❌ FAIL: Unmasked entities: {unmasked_entities}")
print("\n" + "=" * 70)
print("Final Fix Verification Completed!")
if __name__ == "__main__":
test_final_fix()

View File

@ -0,0 +1,173 @@
#!/usr/bin/env python3
"""
Test to verify the fix for multiple occurrence issue in apply_entity_masking_with_alignment.
"""
def find_entity_alignment(entity_text: str, original_document_text: str):
"""Simplified version of the alignment method for testing"""
clean_entity = entity_text.replace(" ", "")
doc_chars = [c for c in original_document_text if c != ' ']
for i in range(len(doc_chars) - len(clean_entity) + 1):
if doc_chars[i:i+len(clean_entity)] == list(clean_entity):
return map_char_positions_to_original(i, len(clean_entity), original_document_text)
return None
def map_char_positions_to_original(clean_start: int, entity_length: int, original_text: str):
"""Simplified version of position mapping for testing"""
original_pos = 0
clean_pos = 0
while clean_pos < clean_start and original_pos < len(original_text):
if original_text[original_pos] != ' ':
clean_pos += 1
original_pos += 1
start_pos = original_pos
chars_found = 0
while chars_found < entity_length and original_pos < len(original_text):
if original_text[original_pos] != ' ':
chars_found += 1
original_pos += 1
end_pos = original_pos
found_text = original_text[start_pos:end_pos]
return start_pos, end_pos, found_text
def apply_entity_masking_with_alignment_fixed(original_document_text: str, entity_mapping: dict):
"""Fixed implementation that handles multiple occurrences"""
masked_document = original_document_text
sorted_entities = sorted(entity_mapping.keys(), key=len, reverse=True)
for entity_text in sorted_entities:
masked_text = entity_mapping[entity_text]
# Find ALL occurrences of this entity in the document
# We need to loop until no more matches are found
while True:
# Find the entity in the current masked document using alignment
alignment_result = find_entity_alignment(entity_text, masked_document)
if alignment_result:
start_pos, end_pos, found_text = alignment_result
# Replace the found text with the masked version
masked_document = (
masked_document[:start_pos] +
masked_text +
masked_document[end_pos:]
)
print(f"Masked entity '{entity_text}' -> '{masked_text}' at positions {start_pos}-{end_pos}")
else:
# No more occurrences found for this entity, move to next entity
print(f"No more occurrences of '{entity_text}' found in document")
break
return masked_document
def test_fix_verification():
"""Test to verify the fix works correctly"""
print("Testing Fix for Multiple Occurrence Issue")
print("=" * 60)
# Test case 1: Multiple occurrences of the same entity
print("\nTest Case 1: Multiple occurrences of same entity")
test_document_1 = "上诉人李淼因合同纠纷,法定代表人李淼,委托代理人李淼。"
entity_mapping_1 = {"李淼": "李M"}
print(f"Original: {test_document_1}")
result_1 = apply_entity_masking_with_alignment_fixed(test_document_1, entity_mapping_1)
print(f"Result: {result_1}")
remaining_1 = result_1.count("李淼")
expected_1 = "上诉人李M因合同纠纷法定代表人李M委托代理人李M。"
if result_1 == expected_1 and remaining_1 == 0:
print("✅ PASS: All occurrences masked correctly")
else:
print(f"❌ FAIL: Expected '{expected_1}', got '{result_1}'")
print(f" Remaining '李淼' occurrences: {remaining_1}")
# Test case 2: Multiple entities with multiple occurrences
print("\nTest Case 2: Multiple entities with multiple occurrences")
test_document_2 = "上诉人李淼因合同纠纷,法定代表人李淼。北京丰复久信营销科技有限公司,丰复久信公司。"
entity_mapping_2 = {
"李淼": "李M",
"北京丰复久信营销科技有限公司": "北京JO营销科技有限公司",
"丰复久信公司": "丰复久信公司"
}
print(f"Original: {test_document_2}")
result_2 = apply_entity_masking_with_alignment_fixed(test_document_2, entity_mapping_2)
print(f"Result: {result_2}")
remaining_2_li = result_2.count("李淼")
remaining_2_company = result_2.count("北京丰复久信营销科技有限公司")
if remaining_2_li == 0 and remaining_2_company == 0:
print("✅ PASS: All entities masked correctly")
else:
print(f"❌ FAIL: Remaining '李淼': {remaining_2_li}, '北京丰复久信营销科技有限公司': {remaining_2_company}")
# Test case 3: Mixed spacing scenarios
print("\nTest Case 3: Mixed spacing scenarios")
test_document_3 = "上诉人李 淼因合同纠纷,法定代表人李淼,委托代理人李 淼。"
entity_mapping_3 = {"李 淼": "李M", "李淼": "李M"}
print(f"Original: {test_document_3}")
result_3 = apply_entity_masking_with_alignment_fixed(test_document_3, entity_mapping_3)
print(f"Result: {result_3}")
remaining_3 = result_3.count("李淼") + result_3.count("李 淼")
if remaining_3 == 0:
print("✅ PASS: Mixed spacing handled correctly")
else:
print(f"❌ FAIL: Remaining occurrences: {remaining_3}")
# Test case 4: Complex document with real examples
print("\nTest Case 4: Complex document with real examples")
test_document_4 = """上诉人原审原告北京丰复久信营销科技有限公司住所地北京市海淀区北小马厂6号1号楼华天大厦1306室。
法定代表人郭东军执行董事经理
委托诉讼代理人周大海北京市康达律师事务所律师
委托诉讼代理人王乃哲北京市康达律师事务所律师
被上诉人原审被告中研智创区块链技术有限公司住所地天津市津南区双港镇工业园区优谷产业园5号楼-1505
法定代表人王欢子总经理
委托诉讼代理人魏鑫北京市昊衡律师事务所律师"""
entity_mapping_4 = {
"北京丰复久信营销科技有限公司": "北京JO营销科技有限公司",
"郭东军": "郭DJ",
"周大海": "周DH",
"王乃哲": "王NZ",
"中研智创区块链技术有限公司": "中研智创区块链技术有限公司",
"王欢子": "王HZ",
"魏鑫": "魏X",
"北京市康达律师事务所": "北京市KD律师事务所",
"北京市昊衡律师事务所": "北京市HH律师事务所"
}
print(f"Original length: {len(test_document_4)} characters")
result_4 = apply_entity_masking_with_alignment_fixed(test_document_4, entity_mapping_4)
print(f"Result length: {len(result_4)} characters")
# Check that all entities were masked
unmasked_entities = []
for entity in entity_mapping_4.keys():
if entity in result_4:
unmasked_entities.append(entity)
if not unmasked_entities:
print("✅ PASS: All entities masked in complex document")
else:
print(f"❌ FAIL: Unmasked entities: {unmasked_entities}")
print("\n" + "=" * 60)
print("Fix Verification Completed!")
if __name__ == "__main__":
test_fix_verification()

View File

@ -0,0 +1,96 @@
#!/usr/bin/env python3
"""
Test to verify the multiple occurrence issue in apply_entity_masking_with_alignment.
"""
def find_entity_alignment(entity_text: str, original_document_text: str):
"""Simplified version of the alignment method for testing"""
clean_entity = entity_text.replace(" ", "")
doc_chars = [c for c in original_document_text if c != ' ']
for i in range(len(doc_chars) - len(clean_entity) + 1):
if doc_chars[i:i+len(clean_entity)] == list(clean_entity):
return map_char_positions_to_original(i, len(clean_entity), original_document_text)
return None
def map_char_positions_to_original(clean_start: int, entity_length: int, original_text: str):
"""Simplified version of position mapping for testing"""
original_pos = 0
clean_pos = 0
while clean_pos < clean_start and original_pos < len(original_text):
if original_text[original_pos] != ' ':
clean_pos += 1
original_pos += 1
start_pos = original_pos
chars_found = 0
while chars_found < entity_length and original_pos < len(original_text):
if original_text[original_pos] != ' ':
chars_found += 1
original_pos += 1
end_pos = original_pos
found_text = original_text[start_pos:end_pos]
return start_pos, end_pos, found_text
def apply_entity_masking_with_alignment_current(original_document_text: str, entity_mapping: dict):
"""Current implementation with the bug"""
masked_document = original_document_text
sorted_entities = sorted(entity_mapping.keys(), key=len, reverse=True)
for entity_text in sorted_entities:
masked_text = entity_mapping[entity_text]
# Find the entity in the original document using alignment
alignment_result = find_entity_alignment(entity_text, masked_document)
if alignment_result:
start_pos, end_pos, found_text = alignment_result
# Replace the found text with the masked version
masked_document = (
masked_document[:start_pos] +
masked_text +
masked_document[end_pos:]
)
print(f"Masked entity '{entity_text}' -> '{masked_text}' at positions {start_pos}-{end_pos}")
else:
print(f"Could not find entity '{entity_text}' in document for masking")
return masked_document
def test_multiple_occurrences():
"""Test the multiple occurrence issue"""
print("Testing Multiple Occurrence Issue")
print("=" * 50)
# Test document with multiple occurrences of the same entity
test_document = "上诉人李淼因合同纠纷,法定代表人李淼,委托代理人李淼。"
entity_mapping = {
"李淼": "李M"
}
print(f"Original document: {test_document}")
print(f"Entity mapping: {entity_mapping}")
print(f"Expected: All 3 occurrences of '李淼' should be masked")
# Test current implementation
result = apply_entity_masking_with_alignment_current(test_document, entity_mapping)
print(f"Current result: {result}")
# Count remaining occurrences
remaining_count = result.count("李淼")
print(f"Remaining '李淼' occurrences: {remaining_count}")
if remaining_count > 0:
print("❌ ISSUE CONFIRMED: Multiple occurrences are not being masked!")
else:
print("✅ No issue found (unexpected)")
if __name__ == "__main__":
test_multiple_occurrences()