Compare commits
No commits in common. "eb33dc137efc2fc03fa7942bd32890123ad99cf9" and "40dd0de1b3166619c5ef712cb2025b735a52e51d" have entirely different histories.
eb33dc137e
...
40dd0de1b3
|
|
@ -86,7 +86,7 @@ docker-compose build frontend
|
|||
docker-compose build mineru-api
|
||||
|
||||
# Build multiple specific services
|
||||
docker-compose build backend-api frontend celery-worker
|
||||
docker-compose build backend-api frontend
|
||||
```
|
||||
|
||||
### Building and restarting specific services
|
||||
|
|
|
|||
|
|
@ -40,35 +40,16 @@ class DocumentProcessor(ABC):
|
|||
|
||||
return chunks
|
||||
|
||||
def _apply_mapping_with_alignment(self, text: str, mapping: Dict[str, str]) -> str:
|
||||
"""
|
||||
Apply the mapping to replace sensitive information using character-by-character alignment.
|
||||
|
||||
This method uses the new alignment-based masking to handle spacing issues
|
||||
between NER results and original document text.
|
||||
|
||||
Args:
|
||||
text: Original document text
|
||||
mapping: Dictionary mapping original entity text to masked text
|
||||
|
||||
Returns:
|
||||
Masked document text
|
||||
"""
|
||||
logger.info(f"Applying entity mapping with alignment to text of length {len(text)}")
|
||||
logger.debug(f"Entity mapping: {mapping}")
|
||||
|
||||
# Use the new alignment-based masking method
|
||||
masked_text = self.ner_processor.apply_entity_masking_with_alignment(text, mapping)
|
||||
|
||||
logger.info("Successfully applied entity masking with alignment")
|
||||
return masked_text
|
||||
|
||||
def _apply_mapping(self, text: str, mapping: Dict[str, str]) -> str:
|
||||
"""
|
||||
Legacy method for simple string replacement.
|
||||
Now delegates to the new alignment-based method.
|
||||
"""
|
||||
return self._apply_mapping_with_alignment(text, mapping)
|
||||
"""Apply the mapping to replace sensitive information"""
|
||||
masked_text = text
|
||||
for original, masked in mapping.items():
|
||||
if isinstance(masked, dict):
|
||||
masked = next(iter(masked.values()), "某")
|
||||
elif not isinstance(masked, str):
|
||||
masked = str(masked) if masked is not None else "某"
|
||||
masked_text = masked_text.replace(original, masked)
|
||||
return masked_text
|
||||
|
||||
def process_content(self, content: str) -> str:
|
||||
"""Process document content by masking sensitive information"""
|
||||
|
|
@ -78,11 +59,9 @@ class DocumentProcessor(ABC):
|
|||
logger.info(f"Split content into {len(chunks)} chunks")
|
||||
|
||||
final_mapping = self.ner_processor.process(chunks)
|
||||
logger.info(f"Generated entity mapping with {len(final_mapping)} entities")
|
||||
|
||||
# Use the new alignment-based masking
|
||||
masked_content = self._apply_mapping_with_alignment(content, final_mapping)
|
||||
logger.info("Successfully masked content using character alignment")
|
||||
masked_content = self._apply_mapping(content, final_mapping)
|
||||
logger.info("Successfully masked content")
|
||||
|
||||
return masked_content
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
import json
|
||||
import logging
|
||||
import re
|
||||
from typing import Dict, List, Any, Optional
|
||||
from transformers import pipeline, AutoTokenizer, AutoModelForTokenClassification
|
||||
from .base_extractor import BaseExtractor
|
||||
|
|
@ -20,7 +19,6 @@ class NERExtractor(BaseExtractor):
|
|||
self.model = None
|
||||
self.ner_pipeline = None
|
||||
self._model_initialized = False
|
||||
self.confidence_threshold = 0.95
|
||||
|
||||
# Map CLUENER model labels to our desired categories
|
||||
self.label_map = {
|
||||
|
|
@ -60,164 +58,6 @@ class NERExtractor(BaseExtractor):
|
|||
logger.error(f"Failed to load NER model: {str(e)}")
|
||||
raise Exception(f"NER model initialization failed: {str(e)}")
|
||||
|
||||
def _split_text_by_sentences(self, text: str) -> List[str]:
|
||||
"""
|
||||
Split text into sentences using Chinese sentence boundaries
|
||||
|
||||
Args:
|
||||
text: The text to split
|
||||
|
||||
Returns:
|
||||
List of sentences
|
||||
"""
|
||||
# Chinese sentence endings: 。!?;\n
|
||||
# Also consider English sentence endings for mixed text
|
||||
sentence_pattern = r'[。!?;\n]+|[.!?;]+'
|
||||
sentences = re.split(sentence_pattern, text)
|
||||
|
||||
# Clean up sentences and filter out empty ones
|
||||
cleaned_sentences = []
|
||||
for sentence in sentences:
|
||||
sentence = sentence.strip()
|
||||
if sentence:
|
||||
cleaned_sentences.append(sentence)
|
||||
|
||||
return cleaned_sentences
|
||||
|
||||
def _is_entity_boundary_safe(self, text: str, position: int) -> bool:
|
||||
"""
|
||||
Check if a position is safe for splitting (won't break entities)
|
||||
|
||||
Args:
|
||||
text: The text to check
|
||||
position: Position to check for safety
|
||||
|
||||
Returns:
|
||||
True if safe to split at this position
|
||||
"""
|
||||
if position <= 0 or position >= len(text):
|
||||
return True
|
||||
|
||||
# Common entity suffixes that indicate incomplete entities
|
||||
entity_suffixes = ['公', '司', '所', '院', '厅', '局', '部', '会', '团', '社', '处', '室', '楼', '号']
|
||||
|
||||
# Check if we're in the middle of a potential entity
|
||||
for suffix in entity_suffixes:
|
||||
# Look for incomplete entity patterns
|
||||
if text[position-1:position+1] in [f'公{suffix}', f'司{suffix}', f'所{suffix}']:
|
||||
return False
|
||||
|
||||
# Check for incomplete company names
|
||||
if text[position-2:position+1] in ['公司', '事务所', '协会', '研究院']:
|
||||
return False
|
||||
|
||||
# Check for incomplete address patterns
|
||||
address_patterns = ['省', '市', '区', '县', '路', '街', '巷', '号', '室']
|
||||
for pattern in address_patterns:
|
||||
if text[position-1:position+1] in [f'省{pattern}', f'市{pattern}', f'区{pattern}', f'县{pattern}']:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _create_sentence_chunks(self, sentences: List[str], max_tokens: int = 400) -> List[str]:
|
||||
"""
|
||||
Create chunks from sentences while respecting token limits and entity boundaries
|
||||
|
||||
Args:
|
||||
sentences: List of sentences
|
||||
max_tokens: Maximum tokens per chunk
|
||||
|
||||
Returns:
|
||||
List of text chunks
|
||||
"""
|
||||
chunks = []
|
||||
current_chunk = []
|
||||
current_token_count = 0
|
||||
|
||||
for sentence in sentences:
|
||||
# Estimate token count for this sentence
|
||||
sentence_tokens = len(self.tokenizer.tokenize(sentence))
|
||||
|
||||
# If adding this sentence would exceed the limit
|
||||
if current_token_count + sentence_tokens > max_tokens and current_chunk:
|
||||
# Check if we can split the sentence to fit better
|
||||
if sentence_tokens > max_tokens // 2: # If sentence is too long
|
||||
# Try to split the sentence at a safe boundary
|
||||
split_sentence = self._split_long_sentence(sentence, max_tokens - current_token_count)
|
||||
if split_sentence:
|
||||
# Add the first part to current chunk
|
||||
current_chunk.append(split_sentence[0])
|
||||
chunks.append(''.join(current_chunk))
|
||||
|
||||
# Start new chunk with remaining parts
|
||||
current_chunk = split_sentence[1:]
|
||||
current_token_count = sum(len(self.tokenizer.tokenize(s)) for s in current_chunk)
|
||||
else:
|
||||
# Finalize current chunk and start new one
|
||||
chunks.append(''.join(current_chunk))
|
||||
current_chunk = [sentence]
|
||||
current_token_count = sentence_tokens
|
||||
else:
|
||||
# Finalize current chunk and start new one
|
||||
chunks.append(''.join(current_chunk))
|
||||
current_chunk = [sentence]
|
||||
current_token_count = sentence_tokens
|
||||
else:
|
||||
# Add sentence to current chunk
|
||||
current_chunk.append(sentence)
|
||||
current_token_count += sentence_tokens
|
||||
|
||||
# Add the last chunk if it has content
|
||||
if current_chunk:
|
||||
chunks.append(''.join(current_chunk))
|
||||
|
||||
return chunks
|
||||
|
||||
def _split_long_sentence(self, sentence: str, max_tokens: int) -> Optional[List[str]]:
|
||||
"""
|
||||
Split a long sentence at safe boundaries
|
||||
|
||||
Args:
|
||||
sentence: The sentence to split
|
||||
max_tokens: Maximum tokens for the first part
|
||||
|
||||
Returns:
|
||||
List of sentence parts, or None if splitting is not possible
|
||||
"""
|
||||
if len(self.tokenizer.tokenize(sentence)) <= max_tokens:
|
||||
return None
|
||||
|
||||
# Try to find safe splitting points
|
||||
# Look for punctuation marks that are safe to split at
|
||||
safe_splitters = [',', ',', ';', ';', '、', ':', ':']
|
||||
|
||||
for splitter in safe_splitters:
|
||||
if splitter in sentence:
|
||||
parts = sentence.split(splitter)
|
||||
current_part = ""
|
||||
|
||||
for i, part in enumerate(parts):
|
||||
test_part = current_part + part + (splitter if i < len(parts) - 1 else "")
|
||||
if len(self.tokenizer.tokenize(test_part)) > max_tokens:
|
||||
if current_part:
|
||||
# Found a safe split point
|
||||
remaining = splitter.join(parts[i:])
|
||||
return [current_part, remaining]
|
||||
break
|
||||
current_part = test_part
|
||||
|
||||
# If no safe split point found, try character-based splitting with entity boundary check
|
||||
target_chars = int(max_tokens / 1.5) # Rough character estimate
|
||||
|
||||
for i in range(target_chars, len(sentence)):
|
||||
if self._is_entity_boundary_safe(sentence, i):
|
||||
part1 = sentence[:i]
|
||||
part2 = sentence[i:]
|
||||
if len(self.tokenizer.tokenize(part1)) <= max_tokens:
|
||||
return [part1, part2]
|
||||
|
||||
return None
|
||||
|
||||
def extract(self, text: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Extract named entities from the given text
|
||||
|
|
@ -263,9 +103,7 @@ class NERExtractor(BaseExtractor):
|
|||
"""
|
||||
try:
|
||||
# Run the NER pipeline - it handles truncation automatically
|
||||
logger.info(f"Running NER pipeline with text: {text}")
|
||||
results = self.ner_pipeline(text)
|
||||
logger.info(f"NER results: {results}")
|
||||
|
||||
# Filter and process entities
|
||||
filtered_entities = []
|
||||
|
|
@ -281,20 +119,13 @@ class NERExtractor(BaseExtractor):
|
|||
# Clean up the tokenized text (remove spaces between Chinese characters)
|
||||
cleaned_text = self._clean_tokenized_text(entity_text)
|
||||
|
||||
# Add to our list with both original and cleaned text, only add if confidence score is above threshold
|
||||
# if entity_group is 'address' or 'company', and only has characters less then 3, then filter it out
|
||||
if confidence_score > self.confidence_threshold:
|
||||
filtered_entities.append({
|
||||
"text": cleaned_text, # Clean text for display/processing
|
||||
"tokenized_text": entity_text, # Original tokenized text from model
|
||||
"type": entity_type,
|
||||
"entity_group": entity_group,
|
||||
"confidence": confidence_score
|
||||
})
|
||||
logger.info(f"Filtered entities: {filtered_entities}")
|
||||
# filter out entities that are less then 3 characters with entity_group is 'address' or 'company'
|
||||
filtered_entities = [entity for entity in filtered_entities if entity['entity_group'] not in ['address', 'company'] or len(entity['text']) > 3]
|
||||
logger.info(f"Final Filtered entities: {filtered_entities}")
|
||||
# Add to our list with both original and cleaned text
|
||||
filtered_entities.append({
|
||||
"text": cleaned_text, # Clean text for display/processing
|
||||
"original_text": entity_text, # Original tokenized text from model
|
||||
"type": entity_type,
|
||||
"confidence": confidence_score
|
||||
})
|
||||
|
||||
return {
|
||||
"entities": filtered_entities,
|
||||
|
|
@ -307,7 +138,7 @@ class NERExtractor(BaseExtractor):
|
|||
|
||||
def _extract_with_chunking(self, text: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Extract entities from long text using sentence-based chunking approach
|
||||
Extract entities from long text using chunking approach
|
||||
|
||||
Args:
|
||||
text: The text to analyze
|
||||
|
|
@ -316,37 +147,41 @@ class NERExtractor(BaseExtractor):
|
|||
Dictionary containing extracted entities
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Using sentence-based chunking for text of length: {len(text)}")
|
||||
# Estimate token count to determine safe chunk size
|
||||
estimated_tokens = len(text) * 1.5 # Conservative estimate for Chinese text
|
||||
logger.info(f"Estimated tokens: {estimated_tokens:.0f}")
|
||||
|
||||
# Split text into sentences
|
||||
sentences = self._split_text_by_sentences(text)
|
||||
logger.info(f"Split text into {len(sentences)} sentences")
|
||||
# Calculate safe chunk size to stay under 512 tokens
|
||||
# Target ~400 tokens per chunk to leave buffer
|
||||
target_chunk_tokens = 400
|
||||
chunk_size = int(target_chunk_tokens / 1.5) # Convert back to characters
|
||||
overlap = max(50, chunk_size // 8) # 12.5% overlap, minimum 50 chars
|
||||
|
||||
# Create chunks from sentences
|
||||
chunks = self._create_sentence_chunks(sentences, max_tokens=400)
|
||||
logger.info(f"Created {len(chunks)} chunks from sentences")
|
||||
logger.info(f"Using chunk_size: {chunk_size} chars, overlap: {overlap} chars")
|
||||
|
||||
all_entities = []
|
||||
|
||||
# Process each chunk
|
||||
for i, chunk in enumerate(chunks):
|
||||
# Process text in overlapping character chunks
|
||||
for i in range(0, len(text), chunk_size - overlap):
|
||||
chunk_text = text[i:i + chunk_size]
|
||||
|
||||
# Verify chunk won't exceed token limit
|
||||
chunk_tokens = len(self.tokenizer.tokenize(chunk))
|
||||
logger.info(f"Processing chunk {i+1}: {len(chunk)} chars, {chunk_tokens} tokens")
|
||||
chunk_tokens = len(self.tokenizer.tokenize(chunk_text))
|
||||
logger.info(f"Processing chunk {i//(chunk_size-overlap)+1}: {len(chunk_text)} chars, {chunk_tokens} tokens")
|
||||
|
||||
if chunk_tokens > 512:
|
||||
logger.warning(f"Chunk {i+1} has {chunk_tokens} tokens, truncating")
|
||||
logger.warning(f"Chunk {i//(chunk_size-overlap)+1} has {chunk_tokens} tokens, truncating")
|
||||
# Truncate the chunk to fit within token limit
|
||||
chunk = self.tokenizer.convert_tokens_to_string(
|
||||
self.tokenizer.tokenize(chunk)[:512]
|
||||
chunk_text = self.tokenizer.convert_tokens_to_string(
|
||||
self.tokenizer.tokenize(chunk_text)[:512]
|
||||
)
|
||||
|
||||
# Extract entities from this chunk
|
||||
chunk_result = self._extract_single(chunk)
|
||||
chunk_result = self._extract_single(chunk_text)
|
||||
chunk_entities = chunk_result.get("entities", [])
|
||||
|
||||
all_entities.extend(chunk_entities)
|
||||
logger.info(f"Chunk {i+1} extracted {len(chunk_entities)} entities")
|
||||
logger.info(f"Chunk {i//(chunk_size-overlap)+1} extracted {len(chunk_entities)} entities")
|
||||
|
||||
# Remove duplicates while preserving order
|
||||
unique_entities = []
|
||||
|
|
@ -358,7 +193,7 @@ class NERExtractor(BaseExtractor):
|
|||
seen_texts.add(text)
|
||||
unique_entities.append(entity)
|
||||
|
||||
logger.info(f"Sentence-based chunking completed: {len(all_entities)} total entities, {len(unique_entities)} unique entities")
|
||||
logger.info(f"Chunking completed: {len(all_entities)} total entities, {len(unique_entities)} unique entities")
|
||||
|
||||
return {
|
||||
"entities": unique_entities,
|
||||
|
|
@ -366,8 +201,8 @@ class NERExtractor(BaseExtractor):
|
|||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error during sentence-based chunked NER processing: {str(e)}")
|
||||
raise Exception(f"Sentence-based chunked NER processing failed: {str(e)}")
|
||||
logger.error(f"Error during chunked NER processing: {str(e)}")
|
||||
raise Exception(f"Chunked NER processing failed: {str(e)}")
|
||||
|
||||
def _clean_tokenized_text(self, tokenized_text: str) -> str:
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ from .maskers.company_masker import CompanyMasker
|
|||
from .maskers.address_masker import AddressMasker
|
||||
from .maskers.id_masker import IDMasker
|
||||
from .maskers.case_masker import CaseMasker
|
||||
from ..services.ollama_client import OllamaClient
|
||||
from ...services.ollama_client import OllamaClient
|
||||
|
||||
|
||||
class MaskerFactory:
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
from typing import Any, Dict, List, Tuple, Optional
|
||||
from typing import Any, Dict, List
|
||||
from ..prompts.masking_prompts import get_ner_name_prompt, get_ner_company_prompt, get_ner_address_prompt, get_ner_project_prompt, get_ner_case_number_prompt, get_entity_linkage_prompt
|
||||
import logging
|
||||
import json
|
||||
|
|
@ -20,201 +20,9 @@ class NerProcessor:
|
|||
# Initialize NER extractor for ML-based entity extraction
|
||||
self.ner_extractor = NERExtractor()
|
||||
|
||||
def _find_entity_alignment(self, entity_text: str, original_document_text: str) -> Optional[Tuple[int, int, str]]:
|
||||
"""
|
||||
Find entity in original document using character-by-character alignment.
|
||||
|
||||
This method handles the case where the original document may have spaces
|
||||
that are not from tokenization, and the entity text may have different
|
||||
spacing patterns.
|
||||
|
||||
Args:
|
||||
entity_text: The entity text to find (may have spaces from tokenization)
|
||||
original_document_text: The original document text (may have spaces)
|
||||
|
||||
Returns:
|
||||
Tuple of (start_pos, end_pos, found_text) or None if not found
|
||||
"""
|
||||
# Remove all spaces from entity text to get clean characters
|
||||
clean_entity = entity_text.replace(" ", "")
|
||||
|
||||
# Create character lists ignoring spaces from both entity and document
|
||||
entity_chars = [c for c in clean_entity]
|
||||
doc_chars = [c for c in original_document_text if c != ' ']
|
||||
|
||||
# Find the sequence in document characters
|
||||
for i in range(len(doc_chars) - len(entity_chars) + 1):
|
||||
if doc_chars[i:i+len(entity_chars)] == entity_chars:
|
||||
# Found match, now map back to original positions
|
||||
return self._map_char_positions_to_original(i, len(entity_chars), original_document_text)
|
||||
|
||||
return None
|
||||
|
||||
def _map_char_positions_to_original(self, clean_start: int, entity_length: int, original_text: str) -> Tuple[int, int, str]:
|
||||
"""
|
||||
Map positions from clean text (without spaces) back to original text positions.
|
||||
|
||||
Args:
|
||||
clean_start: Start position in clean text (without spaces)
|
||||
entity_length: Length of entity in characters
|
||||
original_text: Original document text with spaces
|
||||
|
||||
Returns:
|
||||
Tuple of (start_pos, end_pos, found_text) in original text
|
||||
"""
|
||||
original_pos = 0
|
||||
clean_pos = 0
|
||||
|
||||
# Find the start position in original text
|
||||
while clean_pos < clean_start and original_pos < len(original_text):
|
||||
if original_text[original_pos] != ' ':
|
||||
clean_pos += 1
|
||||
original_pos += 1
|
||||
|
||||
start_pos = original_pos
|
||||
|
||||
# Find the end position by counting non-space characters
|
||||
chars_found = 0
|
||||
while chars_found < entity_length and original_pos < len(original_text):
|
||||
if original_text[original_pos] != ' ':
|
||||
chars_found += 1
|
||||
original_pos += 1
|
||||
|
||||
end_pos = original_pos
|
||||
|
||||
# Extract the actual text from the original document
|
||||
found_text = original_text[start_pos:end_pos]
|
||||
|
||||
return start_pos, end_pos, found_text
|
||||
|
||||
def _validate_mapping_format(self, mapping: Dict[str, Any]) -> bool:
|
||||
return LLMResponseValidator.validate_entity_extraction(mapping)
|
||||
|
||||
def apply_entity_masking_with_alignment(self, original_document_text: str, entity_mapping: Dict[str, str], mask_char: str = "*") -> str:
|
||||
"""
|
||||
Apply entity masking to original document text using character-by-character alignment.
|
||||
|
||||
This method finds each entity in the original document using alignment and
|
||||
replaces it with the corresponding masked version. It handles multiple
|
||||
occurrences of the same entity by finding all instances before moving
|
||||
to the next entity.
|
||||
|
||||
Args:
|
||||
original_document_text: The original document text to mask
|
||||
entity_mapping: Dictionary mapping original entity text to masked text
|
||||
mask_char: Character to use for masking (default: "*")
|
||||
|
||||
Returns:
|
||||
Masked document text
|
||||
"""
|
||||
masked_document = original_document_text
|
||||
|
||||
# Sort entities by length (longest first) to avoid partial matches
|
||||
sorted_entities = sorted(entity_mapping.keys(), key=len, reverse=True)
|
||||
|
||||
for entity_text in sorted_entities:
|
||||
masked_text = entity_mapping[entity_text]
|
||||
|
||||
# Skip if masked text is the same as original text (prevents infinite loop)
|
||||
if entity_text == masked_text:
|
||||
logger.debug(f"Skipping entity '{entity_text}' as masked text is identical")
|
||||
continue
|
||||
|
||||
# Find ALL occurrences of this entity in the document
|
||||
# We need to loop until no more matches are found
|
||||
# Add safety counter to prevent infinite loops
|
||||
max_iterations = 100 # Safety limit
|
||||
iteration_count = 0
|
||||
|
||||
while iteration_count < max_iterations:
|
||||
iteration_count += 1
|
||||
|
||||
# Find the entity in the current masked document using alignment
|
||||
alignment_result = self._find_entity_alignment(entity_text, masked_document)
|
||||
|
||||
if alignment_result:
|
||||
start_pos, end_pos, found_text = alignment_result
|
||||
|
||||
# Replace the found text with the masked version
|
||||
masked_document = (
|
||||
masked_document[:start_pos] +
|
||||
masked_text +
|
||||
masked_document[end_pos:]
|
||||
)
|
||||
|
||||
logger.debug(f"Masked entity '{entity_text}' -> '{masked_text}' at positions {start_pos}-{end_pos} (iteration {iteration_count})")
|
||||
else:
|
||||
# No more occurrences found for this entity, move to next entity
|
||||
logger.debug(f"No more occurrences of '{entity_text}' found in document after {iteration_count} iterations")
|
||||
break
|
||||
|
||||
# Log warning if we hit the safety limit
|
||||
if iteration_count >= max_iterations:
|
||||
logger.warning(f"Reached maximum iterations ({max_iterations}) for entity '{entity_text}', stopping to prevent infinite loop")
|
||||
|
||||
return masked_document
|
||||
|
||||
def test_character_alignment(self) -> None:
|
||||
"""
|
||||
Test method to demonstrate character-by-character alignment functionality.
|
||||
This method can be used to validate the alignment works correctly with
|
||||
various spacing patterns.
|
||||
"""
|
||||
test_cases = [
|
||||
# Test case 1: Entity with spaces, document without spaces
|
||||
{
|
||||
"entity_text": "李 淼",
|
||||
"document_text": "上诉人李淼因合同纠纷",
|
||||
"expected_found": "李淼"
|
||||
},
|
||||
# Test case 2: Entity without spaces, document with spaces
|
||||
{
|
||||
"entity_text": "邓青菁",
|
||||
"document_text": "上诉人邓 青 菁因合同纠纷",
|
||||
"expected_found": "邓 青 菁"
|
||||
},
|
||||
# Test case 3: Both entity and document have spaces
|
||||
{
|
||||
"entity_text": "王 欢 子",
|
||||
"document_text": "法定代表人王 欢 子,总经理",
|
||||
"expected_found": "王 欢 子"
|
||||
},
|
||||
# Test case 4: Entity without spaces, document without spaces
|
||||
{
|
||||
"entity_text": "郭东军",
|
||||
"document_text": "法定代表人郭东军,执行董事",
|
||||
"expected_found": "郭东军"
|
||||
},
|
||||
# Test case 5: Complex company name
|
||||
{
|
||||
"entity_text": "北京丰复久信营销科技有限公司",
|
||||
"document_text": "上诉人(原审原告):北京 丰复久信 营销科技 有限公司",
|
||||
"expected_found": "北京 丰复久信 营销科技 有限公司"
|
||||
}
|
||||
]
|
||||
|
||||
logger.info("Testing character-by-character alignment...")
|
||||
|
||||
for i, test_case in enumerate(test_cases, 1):
|
||||
entity_text = test_case["entity_text"]
|
||||
document_text = test_case["document_text"]
|
||||
expected_found = test_case["expected_found"]
|
||||
|
||||
result = self._find_entity_alignment(entity_text, document_text)
|
||||
|
||||
if result:
|
||||
start_pos, end_pos, found_text = result
|
||||
success = found_text == expected_found
|
||||
status = "✓ PASS" if success else "✗ FAIL"
|
||||
logger.info(f"Test {i} {status}: Entity '{entity_text}' -> Found '{found_text}' (expected '{expected_found}') at positions {start_pos}-{end_pos}")
|
||||
|
||||
if not success:
|
||||
logger.error(f" Expected: '{expected_found}', Got: '{found_text}'")
|
||||
else:
|
||||
logger.error(f"Test {i} ✗ FAIL: Entity '{entity_text}' not found in document")
|
||||
|
||||
logger.info("Character alignment testing completed.")
|
||||
|
||||
def extract_entities_with_ner(self, text: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Extract entities using the NER model
|
||||
|
|
@ -1018,12 +826,11 @@ class NerProcessor:
|
|||
|
||||
# Process each chunk with LLM for additional entities
|
||||
chunk_mappings = []
|
||||
# TODO: 临时关闭LLM处理
|
||||
# for i, chunk in enumerate(chunks):
|
||||
# logger.info(f"Processing chunk {i+1}/{len(chunks)} with LLM")
|
||||
# chunk_mapping = self.build_mapping_llm_only(chunk) # LLM-only processing
|
||||
# logger.info(f"Chunk mapping: {chunk_mapping}")
|
||||
# chunk_mappings.extend(chunk_mapping)
|
||||
for i, chunk in enumerate(chunks):
|
||||
logger.info(f"Processing chunk {i+1}/{len(chunks)} with LLM")
|
||||
chunk_mapping = self.build_mapping_llm_only(chunk) # LLM-only processing
|
||||
logger.info(f"Chunk mapping: {chunk_mapping}")
|
||||
chunk_mappings.extend(chunk_mapping)
|
||||
|
||||
# Add NER entities to the mappings
|
||||
if ner_entities:
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ Refactored NerProcessor using the new masker architecture.
|
|||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
from typing import Any, Dict, List, Optional
|
||||
from ..prompts.masking_prompts import (
|
||||
get_ner_name_prompt, get_ner_company_prompt, get_ner_address_prompt,
|
||||
get_ner_project_prompt, get_ner_case_number_prompt, get_entity_linkage_prompt
|
||||
|
|
@ -28,137 +28,6 @@ class NerProcessorRefactored:
|
|||
self.maskers = self._initialize_maskers()
|
||||
self.surname_counter = {} # Shared counter for Chinese names
|
||||
|
||||
def _find_entity_alignment(self, entity_text: str, original_document_text: str) -> Optional[Tuple[int, int, str]]:
|
||||
"""
|
||||
Find entity in original document using character-by-character alignment.
|
||||
|
||||
This method handles the case where the original document may have spaces
|
||||
that are not from tokenization, and the entity text may have different
|
||||
spacing patterns.
|
||||
|
||||
Args:
|
||||
entity_text: The entity text to find (may have spaces from tokenization)
|
||||
original_document_text: The original document text (may have spaces)
|
||||
|
||||
Returns:
|
||||
Tuple of (start_pos, end_pos, found_text) or None if not found
|
||||
"""
|
||||
# Remove all spaces from entity text to get clean characters
|
||||
clean_entity = entity_text.replace(" ", "")
|
||||
|
||||
# Create character lists ignoring spaces from both entity and document
|
||||
entity_chars = [c for c in clean_entity]
|
||||
doc_chars = [c for c in original_document_text if c != ' ']
|
||||
|
||||
# Find the sequence in document characters
|
||||
for i in range(len(doc_chars) - len(entity_chars) + 1):
|
||||
if doc_chars[i:i+len(entity_chars)] == entity_chars:
|
||||
# Found match, now map back to original positions
|
||||
return self._map_char_positions_to_original(i, len(entity_chars), original_document_text)
|
||||
|
||||
return None
|
||||
|
||||
def _map_char_positions_to_original(self, clean_start: int, entity_length: int, original_text: str) -> Tuple[int, int, str]:
|
||||
"""
|
||||
Map positions from clean text (without spaces) back to original text positions.
|
||||
|
||||
Args:
|
||||
clean_start: Start position in clean text (without spaces)
|
||||
entity_length: Length of entity in characters
|
||||
original_text: Original document text with spaces
|
||||
|
||||
Returns:
|
||||
Tuple of (start_pos, end_pos, found_text) in original text
|
||||
"""
|
||||
original_pos = 0
|
||||
clean_pos = 0
|
||||
|
||||
# Find the start position in original text
|
||||
while clean_pos < clean_start and original_pos < len(original_text):
|
||||
if original_text[original_pos] != ' ':
|
||||
clean_pos += 1
|
||||
original_pos += 1
|
||||
|
||||
start_pos = original_pos
|
||||
|
||||
# Find the end position by counting non-space characters
|
||||
chars_found = 0
|
||||
while chars_found < entity_length and original_pos < len(original_text):
|
||||
if original_text[original_pos] != ' ':
|
||||
chars_found += 1
|
||||
original_pos += 1
|
||||
|
||||
end_pos = original_pos
|
||||
|
||||
# Extract the actual text from the original document
|
||||
found_text = original_text[start_pos:end_pos]
|
||||
|
||||
return start_pos, end_pos, found_text
|
||||
|
||||
def apply_entity_masking_with_alignment(self, original_document_text: str, entity_mapping: Dict[str, str], mask_char: str = "*") -> str:
|
||||
"""
|
||||
Apply entity masking to original document text using character-by-character alignment.
|
||||
|
||||
This method finds each entity in the original document using alignment and
|
||||
replaces it with the corresponding masked version. It handles multiple
|
||||
occurrences of the same entity by finding all instances before moving
|
||||
to the next entity.
|
||||
|
||||
Args:
|
||||
original_document_text: The original document text to mask
|
||||
entity_mapping: Dictionary mapping original entity text to masked text
|
||||
mask_char: Character to use for masking (default: "*")
|
||||
|
||||
Returns:
|
||||
Masked document text
|
||||
"""
|
||||
masked_document = original_document_text
|
||||
|
||||
# Sort entities by length (longest first) to avoid partial matches
|
||||
sorted_entities = sorted(entity_mapping.keys(), key=len, reverse=True)
|
||||
|
||||
for entity_text in sorted_entities:
|
||||
masked_text = entity_mapping[entity_text]
|
||||
|
||||
# Skip if masked text is the same as original text (prevents infinite loop)
|
||||
if entity_text == masked_text:
|
||||
logger.debug(f"Skipping entity '{entity_text}' as masked text is identical")
|
||||
continue
|
||||
|
||||
# Find ALL occurrences of this entity in the document
|
||||
# We need to loop until no more matches are found
|
||||
# Add safety counter to prevent infinite loops
|
||||
max_iterations = 100 # Safety limit
|
||||
iteration_count = 0
|
||||
|
||||
while iteration_count < max_iterations:
|
||||
iteration_count += 1
|
||||
|
||||
# Find the entity in the current masked document using alignment
|
||||
alignment_result = self._find_entity_alignment(entity_text, masked_document)
|
||||
|
||||
if alignment_result:
|
||||
start_pos, end_pos, found_text = alignment_result
|
||||
|
||||
# Replace the found text with the masked version
|
||||
masked_document = (
|
||||
masked_document[:start_pos] +
|
||||
masked_text +
|
||||
masked_document[end_pos:]
|
||||
)
|
||||
|
||||
logger.debug(f"Masked entity '{entity_text}' -> '{masked_text}' at positions {start_pos}-{end_pos} (iteration {iteration_count})")
|
||||
else:
|
||||
# No more occurrences found for this entity, move to next entity
|
||||
logger.debug(f"No more occurrences of '{entity_text}' found in document after {iteration_count} iterations")
|
||||
break
|
||||
|
||||
# Log warning if we hit the safety limit
|
||||
if iteration_count >= max_iterations:
|
||||
logger.warning(f"Reached maximum iterations ({max_iterations}) for entity '{entity_text}', stopping to prevent infinite loop")
|
||||
|
||||
return masked_document
|
||||
|
||||
def _initialize_maskers(self) -> Dict[str, BaseMasker]:
|
||||
"""Initialize all maskers"""
|
||||
maskers = {}
|
||||
|
|
|
|||
|
|
@ -1,130 +0,0 @@
|
|||
# 句子分块改进文档
|
||||
|
||||
## 问题描述
|
||||
|
||||
在原始的NER提取过程中,我们发现了一些实体被截断的问题,比如:
|
||||
- `"丰复久信公"` (应该是 `"丰复久信营销科技有限公司"`)
|
||||
- `"康达律师事"` (应该是 `"北京市康达律师事务所"`)
|
||||
|
||||
这些截断问题是由于原始的基于字符数量的简单分块策略导致的,该策略没有考虑实体的完整性。
|
||||
|
||||
## 解决方案
|
||||
|
||||
### 1. 句子分块策略
|
||||
|
||||
我们实现了基于句子的智能分块策略,主要特点:
|
||||
|
||||
- **自然边界分割**:使用中文句子结束符(。!?;\n)和英文句子结束符(.!?;)进行分割
|
||||
- **实体完整性保护**:避免在实体名称中间进行分割
|
||||
- **智能长度控制**:基于token数量而非字符数量进行分块
|
||||
|
||||
### 2. 实体边界安全检查
|
||||
|
||||
实现了 `_is_entity_boundary_safe()` 方法来检查分割点是否安全:
|
||||
|
||||
```python
|
||||
def _is_entity_boundary_safe(self, text: str, position: int) -> bool:
|
||||
# 检查常见实体后缀
|
||||
entity_suffixes = ['公', '司', '所', '院', '厅', '局', '部', '会', '团', '社', '处', '室', '楼', '号']
|
||||
|
||||
# 检查不完整的实体模式
|
||||
if text[position-2:position+1] in ['公司', '事务所', '协会', '研究院']:
|
||||
return False
|
||||
|
||||
# 检查地址模式
|
||||
address_patterns = ['省', '市', '区', '县', '路', '街', '巷', '号', '室']
|
||||
# ...
|
||||
```
|
||||
|
||||
### 3. 长句子智能分割
|
||||
|
||||
对于超过token限制的长句子,实现了智能分割策略:
|
||||
|
||||
1. **标点符号分割**:优先在逗号、分号等标点符号处分割
|
||||
2. **实体边界分割**:如果标点分割不可行,在安全的实体边界处分割
|
||||
3. **强制分割**:最后才使用字符级别的强制分割
|
||||
|
||||
## 实现细节
|
||||
|
||||
### 核心方法
|
||||
|
||||
1. **`_split_text_by_sentences()`**: 将文本按句子分割
|
||||
2. **`_create_sentence_chunks()`**: 基于句子创建分块
|
||||
3. **`_split_long_sentence()`**: 智能分割长句子
|
||||
4. **`_is_entity_boundary_safe()`**: 检查分割点安全性
|
||||
|
||||
### 分块流程
|
||||
|
||||
```
|
||||
输入文本
|
||||
↓
|
||||
按句子分割
|
||||
↓
|
||||
估算token数量
|
||||
↓
|
||||
创建句子分块
|
||||
↓
|
||||
检查实体边界
|
||||
↓
|
||||
输出最终分块
|
||||
```
|
||||
|
||||
## 测试结果
|
||||
|
||||
### 改进前 vs 改进后
|
||||
|
||||
| 指标 | 改进前 | 改进后 |
|
||||
|------|--------|--------|
|
||||
| 截断实体数量 | 较多 | 显著减少 |
|
||||
| 实体完整性 | 经常被破坏 | 得到保护 |
|
||||
| 分块质量 | 基于字符 | 基于语义 |
|
||||
|
||||
### 测试案例
|
||||
|
||||
1. **"丰复久信公" 问题**:
|
||||
- 改进前:`"丰复久信公"` (截断)
|
||||
- 改进后:`"北京丰复久信营销科技有限公司"` (完整)
|
||||
|
||||
2. **长句子处理**:
|
||||
- 改进前:可能在实体中间截断
|
||||
- 改进后:在句子边界或安全位置分割
|
||||
|
||||
## 配置参数
|
||||
|
||||
- `max_tokens`: 每个分块的最大token数量 (默认: 400)
|
||||
- `confidence_threshold`: 实体置信度阈值 (默认: 0.95)
|
||||
- `sentence_pattern`: 句子分割正则表达式
|
||||
|
||||
## 使用示例
|
||||
|
||||
```python
|
||||
from app.core.document_handlers.extractors.ner_extractor import NERExtractor
|
||||
|
||||
extractor = NERExtractor()
|
||||
result = extractor.extract(long_text)
|
||||
|
||||
# 结果中的实体将更加完整
|
||||
entities = result.get("entities", [])
|
||||
for entity in entities:
|
||||
print(f"{entity['text']} ({entity['type']})")
|
||||
```
|
||||
|
||||
## 性能影响
|
||||
|
||||
- **内存使用**:略有增加(需要存储句子分割结果)
|
||||
- **处理速度**:基本无影响(句子分割很快)
|
||||
- **准确性**:显著提升(减少截断实体)
|
||||
|
||||
## 未来改进方向
|
||||
|
||||
1. **更智能的实体识别**:使用预训练模型识别实体边界
|
||||
2. **动态分块大小**:根据文本复杂度调整分块大小
|
||||
3. **多语言支持**:扩展到其他语言的分块策略
|
||||
4. **缓存优化**:缓存句子分割结果以提高性能
|
||||
|
||||
## 相关文件
|
||||
|
||||
- `backend/app/core/document_handlers/extractors/ner_extractor.py` - 主要实现
|
||||
- `backend/test_improved_chunking.py` - 测试脚本
|
||||
- `backend/test_truncation_fix.py` - 截断问题测试
|
||||
- `backend/test_chunking_logic.py` - 分块逻辑测试
|
||||
|
|
@ -1,130 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Debug script to understand the position mapping issue after masking.
|
||||
"""
|
||||
|
||||
def find_entity_alignment(entity_text: str, original_document_text: str):
|
||||
"""Simplified version of the alignment method for testing"""
|
||||
clean_entity = entity_text.replace(" ", "")
|
||||
doc_chars = [c for c in original_document_text if c != ' ']
|
||||
|
||||
for i in range(len(doc_chars) - len(clean_entity) + 1):
|
||||
if doc_chars[i:i+len(clean_entity)] == list(clean_entity):
|
||||
return map_char_positions_to_original(i, len(clean_entity), original_document_text)
|
||||
return None
|
||||
|
||||
def map_char_positions_to_original(clean_start: int, entity_length: int, original_text: str):
|
||||
"""Simplified version of position mapping for testing"""
|
||||
original_pos = 0
|
||||
clean_pos = 0
|
||||
|
||||
while clean_pos < clean_start and original_pos < len(original_text):
|
||||
if original_text[original_pos] != ' ':
|
||||
clean_pos += 1
|
||||
original_pos += 1
|
||||
|
||||
start_pos = original_pos
|
||||
|
||||
chars_found = 0
|
||||
while chars_found < entity_length and original_pos < len(original_text):
|
||||
if original_text[original_pos] != ' ':
|
||||
chars_found += 1
|
||||
original_pos += 1
|
||||
|
||||
end_pos = original_pos
|
||||
found_text = original_text[start_pos:end_pos]
|
||||
|
||||
return start_pos, end_pos, found_text
|
||||
|
||||
def debug_position_issue():
|
||||
"""Debug the position mapping issue"""
|
||||
|
||||
print("Debugging Position Mapping Issue")
|
||||
print("=" * 50)
|
||||
|
||||
# Test document
|
||||
original_doc = "上诉人李淼因合同纠纷,法定代表人李淼,委托代理人李淼。"
|
||||
entity = "李淼"
|
||||
masked_text = "李M"
|
||||
|
||||
print(f"Original document: '{original_doc}'")
|
||||
print(f"Entity to mask: '{entity}'")
|
||||
print(f"Masked text: '{masked_text}'")
|
||||
print()
|
||||
|
||||
# First occurrence
|
||||
print("=== First Occurrence ===")
|
||||
result1 = find_entity_alignment(entity, original_doc)
|
||||
if result1:
|
||||
start1, end1, found1 = result1
|
||||
print(f"Found at positions {start1}-{end1}: '{found1}'")
|
||||
|
||||
# Apply first mask
|
||||
masked_doc = original_doc[:start1] + masked_text + original_doc[end1:]
|
||||
print(f"After first mask: '{masked_doc}'")
|
||||
print(f"Length changed from {len(original_doc)} to {len(masked_doc)}")
|
||||
|
||||
# Try to find second occurrence in the masked document
|
||||
print("\n=== Second Occurrence (in masked document) ===")
|
||||
result2 = find_entity_alignment(entity, masked_doc)
|
||||
if result2:
|
||||
start2, end2, found2 = result2
|
||||
print(f"Found at positions {start2}-{end2}: '{found2}'")
|
||||
|
||||
# Apply second mask
|
||||
masked_doc2 = masked_doc[:start2] + masked_text + masked_doc[end2:]
|
||||
print(f"After second mask: '{masked_doc2}'")
|
||||
|
||||
# Try to find third occurrence
|
||||
print("\n=== Third Occurrence (in double-masked document) ===")
|
||||
result3 = find_entity_alignment(entity, masked_doc2)
|
||||
if result3:
|
||||
start3, end3, found3 = result3
|
||||
print(f"Found at positions {start3}-{end3}: '{found3}'")
|
||||
else:
|
||||
print("No third occurrence found")
|
||||
else:
|
||||
print("No second occurrence found")
|
||||
else:
|
||||
print("No first occurrence found")
|
||||
|
||||
def debug_infinite_loop():
|
||||
"""Debug the infinite loop issue"""
|
||||
|
||||
print("\n" + "=" * 50)
|
||||
print("Debugging Infinite Loop Issue")
|
||||
print("=" * 50)
|
||||
|
||||
# Test document that causes infinite loop
|
||||
original_doc = "上诉人李淼因合同纠纷,法定代表人李淼。北京丰复久信营销科技有限公司,丰复久信公司。"
|
||||
entity = "丰复久信公司"
|
||||
masked_text = "丰复久信公司" # Same text (no change)
|
||||
|
||||
print(f"Original document: '{original_doc}'")
|
||||
print(f"Entity to mask: '{entity}'")
|
||||
print(f"Masked text: '{masked_text}' (same as original)")
|
||||
print()
|
||||
|
||||
# This will cause infinite loop because we're replacing with the same text
|
||||
print("=== This will cause infinite loop ===")
|
||||
print("Because we're replacing '丰复久信公司' with '丰复久信公司'")
|
||||
print("The document doesn't change, so we keep finding the same position")
|
||||
|
||||
# Show what happens
|
||||
masked_doc = original_doc
|
||||
for i in range(3): # Limit to 3 iterations for demo
|
||||
result = find_entity_alignment(entity, masked_doc)
|
||||
if result:
|
||||
start, end, found = result
|
||||
print(f"Iteration {i+1}: Found at positions {start}-{end}: '{found}'")
|
||||
|
||||
# Apply mask (but it's the same text)
|
||||
masked_doc = masked_doc[:start] + masked_text + masked_doc[end:]
|
||||
print(f"After mask: '{masked_doc}'")
|
||||
else:
|
||||
print(f"Iteration {i+1}: No occurrence found")
|
||||
break
|
||||
|
||||
if __name__ == "__main__":
|
||||
debug_position_issue()
|
||||
debug_infinite_loop()
|
||||
|
|
@ -1,67 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test script for character-by-character alignment functionality.
|
||||
This script demonstrates how the alignment handles different spacing patterns
|
||||
between entity text and original document text.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), 'backend'))
|
||||
|
||||
from app.core.document_handlers.ner_processor import NerProcessor
|
||||
|
||||
def main():
|
||||
"""Test the character alignment functionality."""
|
||||
processor = NerProcessor()
|
||||
|
||||
print("Testing Character-by-Character Alignment")
|
||||
print("=" * 50)
|
||||
|
||||
# Test the alignment functionality
|
||||
processor.test_character_alignment()
|
||||
|
||||
print("\n" + "=" * 50)
|
||||
print("Testing Entity Masking with Alignment")
|
||||
print("=" * 50)
|
||||
|
||||
# Test entity masking with alignment
|
||||
original_document = "上诉人(原审原告):北京丰复久信营销科技有限公司,住所地北京市海淀区北小马厂6号1号楼华天大厦1306室。法定代表人:郭东军,执行董事、经理。委托诉讼代理人:周大海,北京市康达律师事务所律师。"
|
||||
|
||||
# Example entity mapping (from your NER results)
|
||||
entity_mapping = {
|
||||
"北京丰复久信营销科技有限公司": "北京JO营销科技有限公司",
|
||||
"郭东军": "郭DJ",
|
||||
"周大海": "周DH",
|
||||
"北京市康达律师事务所": "北京市KD律师事务所"
|
||||
}
|
||||
|
||||
print(f"Original document: {original_document}")
|
||||
print(f"Entity mapping: {entity_mapping}")
|
||||
|
||||
# Apply masking with alignment
|
||||
masked_document = processor.apply_entity_masking_with_alignment(
|
||||
original_document,
|
||||
entity_mapping
|
||||
)
|
||||
|
||||
print(f"Masked document: {masked_document}")
|
||||
|
||||
# Test with document that has spaces
|
||||
print("\n" + "=" * 50)
|
||||
print("Testing with Document Containing Spaces")
|
||||
print("=" * 50)
|
||||
|
||||
spaced_document = "上诉人(原审原告):北京 丰复久信 营销科技 有限公司,住所地北京市海淀区北小马厂6号1号楼华天大厦1306室。法定代表人:郭 东 军,执行董事、经理。"
|
||||
|
||||
print(f"Spaced document: {spaced_document}")
|
||||
|
||||
masked_spaced_document = processor.apply_entity_masking_with_alignment(
|
||||
spaced_document,
|
||||
entity_mapping
|
||||
)
|
||||
|
||||
print(f"Masked spaced document: {masked_spaced_document}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,186 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Final test to verify the fix handles multiple occurrences and prevents infinite loops.
|
||||
"""
|
||||
|
||||
def find_entity_alignment(entity_text: str, original_document_text: str):
|
||||
"""Simplified version of the alignment method for testing"""
|
||||
clean_entity = entity_text.replace(" ", "")
|
||||
doc_chars = [c for c in original_document_text if c != ' ']
|
||||
|
||||
for i in range(len(doc_chars) - len(clean_entity) + 1):
|
||||
if doc_chars[i:i+len(clean_entity)] == list(clean_entity):
|
||||
return map_char_positions_to_original(i, len(clean_entity), original_document_text)
|
||||
return None
|
||||
|
||||
def map_char_positions_to_original(clean_start: int, entity_length: int, original_text: str):
|
||||
"""Simplified version of position mapping for testing"""
|
||||
original_pos = 0
|
||||
clean_pos = 0
|
||||
|
||||
while clean_pos < clean_start and original_pos < len(original_text):
|
||||
if original_text[original_pos] != ' ':
|
||||
clean_pos += 1
|
||||
original_pos += 1
|
||||
|
||||
start_pos = original_pos
|
||||
|
||||
chars_found = 0
|
||||
while chars_found < entity_length and original_pos < len(original_text):
|
||||
if original_text[original_pos] != ' ':
|
||||
chars_found += 1
|
||||
original_pos += 1
|
||||
|
||||
end_pos = original_pos
|
||||
found_text = original_text[start_pos:end_pos]
|
||||
|
||||
return start_pos, end_pos, found_text
|
||||
|
||||
def apply_entity_masking_with_alignment_fixed(original_document_text: str, entity_mapping: dict):
|
||||
"""Fixed implementation that handles multiple occurrences and prevents infinite loops"""
|
||||
masked_document = original_document_text
|
||||
sorted_entities = sorted(entity_mapping.keys(), key=len, reverse=True)
|
||||
|
||||
for entity_text in sorted_entities:
|
||||
masked_text = entity_mapping[entity_text]
|
||||
|
||||
# Skip if masked text is the same as original text (prevents infinite loop)
|
||||
if entity_text == masked_text:
|
||||
print(f"Skipping entity '{entity_text}' as masked text is identical")
|
||||
continue
|
||||
|
||||
# Find ALL occurrences of this entity in the document
|
||||
# Add safety counter to prevent infinite loops
|
||||
max_iterations = 100 # Safety limit
|
||||
iteration_count = 0
|
||||
|
||||
while iteration_count < max_iterations:
|
||||
iteration_count += 1
|
||||
|
||||
# Find the entity in the current masked document using alignment
|
||||
alignment_result = find_entity_alignment(entity_text, masked_document)
|
||||
|
||||
if alignment_result:
|
||||
start_pos, end_pos, found_text = alignment_result
|
||||
|
||||
# Replace the found text with the masked version
|
||||
masked_document = (
|
||||
masked_document[:start_pos] +
|
||||
masked_text +
|
||||
masked_document[end_pos:]
|
||||
)
|
||||
|
||||
print(f"Masked entity '{entity_text}' -> '{masked_text}' at positions {start_pos}-{end_pos} (iteration {iteration_count})")
|
||||
else:
|
||||
# No more occurrences found for this entity, move to next entity
|
||||
print(f"No more occurrences of '{entity_text}' found in document after {iteration_count} iterations")
|
||||
break
|
||||
|
||||
# Log warning if we hit the safety limit
|
||||
if iteration_count >= max_iterations:
|
||||
print(f"WARNING: Reached maximum iterations ({max_iterations}) for entity '{entity_text}', stopping to prevent infinite loop")
|
||||
|
||||
return masked_document
|
||||
|
||||
def test_final_fix():
|
||||
"""Test the final fix with various scenarios"""
|
||||
|
||||
print("Testing Final Fix for Multiple Occurrences and Infinite Loop Prevention")
|
||||
print("=" * 70)
|
||||
|
||||
# Test case 1: Multiple occurrences of the same entity (should work)
|
||||
print("\nTest Case 1: Multiple occurrences of same entity")
|
||||
test_document_1 = "上诉人李淼因合同纠纷,法定代表人李淼,委托代理人李淼。"
|
||||
entity_mapping_1 = {"李淼": "李M"}
|
||||
|
||||
print(f"Original: {test_document_1}")
|
||||
result_1 = apply_entity_masking_with_alignment_fixed(test_document_1, entity_mapping_1)
|
||||
print(f"Result: {result_1}")
|
||||
|
||||
remaining_1 = result_1.count("李淼")
|
||||
expected_1 = "上诉人李M因合同纠纷,法定代表人李M,委托代理人李M。"
|
||||
|
||||
if result_1 == expected_1 and remaining_1 == 0:
|
||||
print("✅ PASS: All occurrences masked correctly")
|
||||
else:
|
||||
print(f"❌ FAIL: Expected '{expected_1}', got '{result_1}'")
|
||||
print(f" Remaining '李淼' occurrences: {remaining_1}")
|
||||
|
||||
# Test case 2: Entity with same masked text (should skip to prevent infinite loop)
|
||||
print("\nTest Case 2: Entity with same masked text (should skip)")
|
||||
test_document_2 = "上诉人李淼因合同纠纷,法定代表人李淼。北京丰复久信营销科技有限公司,丰复久信公司。"
|
||||
entity_mapping_2 = {
|
||||
"李淼": "李M",
|
||||
"丰复久信公司": "丰复久信公司" # Same text - should be skipped
|
||||
}
|
||||
|
||||
print(f"Original: {test_document_2}")
|
||||
result_2 = apply_entity_masking_with_alignment_fixed(test_document_2, entity_mapping_2)
|
||||
print(f"Result: {result_2}")
|
||||
|
||||
remaining_2_li = result_2.count("李淼")
|
||||
remaining_2_company = result_2.count("丰复久信公司")
|
||||
|
||||
if remaining_2_li == 0 and remaining_2_company == 1: # Company should remain unmasked
|
||||
print("✅ PASS: Infinite loop prevented, only different text masked")
|
||||
else:
|
||||
print(f"❌ FAIL: Remaining '李淼': {remaining_2_li}, '丰复久信公司': {remaining_2_company}")
|
||||
|
||||
# Test case 3: Mixed spacing scenarios
|
||||
print("\nTest Case 3: Mixed spacing scenarios")
|
||||
test_document_3 = "上诉人李 淼因合同纠纷,法定代表人李淼,委托代理人李 淼。"
|
||||
entity_mapping_3 = {"李 淼": "李M", "李淼": "李M"}
|
||||
|
||||
print(f"Original: {test_document_3}")
|
||||
result_3 = apply_entity_masking_with_alignment_fixed(test_document_3, entity_mapping_3)
|
||||
print(f"Result: {result_3}")
|
||||
|
||||
remaining_3 = result_3.count("李淼") + result_3.count("李 淼")
|
||||
|
||||
if remaining_3 == 0:
|
||||
print("✅ PASS: Mixed spacing handled correctly")
|
||||
else:
|
||||
print(f"❌ FAIL: Remaining occurrences: {remaining_3}")
|
||||
|
||||
# Test case 4: Complex document with real examples
|
||||
print("\nTest Case 4: Complex document with real examples")
|
||||
test_document_4 = """上诉人(原审原告):北京丰复久信营销科技有限公司,住所地北京市海淀区北小马厂6号1号楼华天大厦1306室。
|
||||
法定代表人:郭东军,执行董事、经理。
|
||||
委托诉讼代理人:周大海,北京市康达律师事务所律师。
|
||||
委托诉讼代理人:王乃哲,北京市康达律师事务所律师。
|
||||
被上诉人(原审被告):中研智创区块链技术有限公司,住所地天津市津南区双港镇工业园区优谷产业园5号楼-1505。
|
||||
法定代表人:王欢子,总经理。
|
||||
委托诉讼代理人:魏鑫,北京市昊衡律师事务所律师。"""
|
||||
|
||||
entity_mapping_4 = {
|
||||
"北京丰复久信营销科技有限公司": "北京JO营销科技有限公司",
|
||||
"郭东军": "郭DJ",
|
||||
"周大海": "周DH",
|
||||
"王乃哲": "王NZ",
|
||||
"中研智创区块链技术有限公司": "中研智创区块链技术有限公司", # Same text - should be skipped
|
||||
"王欢子": "王HZ",
|
||||
"魏鑫": "魏X",
|
||||
"北京市康达律师事务所": "北京市KD律师事务所",
|
||||
"北京市昊衡律师事务所": "北京市HH律师事务所"
|
||||
}
|
||||
|
||||
print(f"Original length: {len(test_document_4)} characters")
|
||||
result_4 = apply_entity_masking_with_alignment_fixed(test_document_4, entity_mapping_4)
|
||||
print(f"Result length: {len(result_4)} characters")
|
||||
|
||||
# Check that entities were masked correctly
|
||||
unmasked_entities = []
|
||||
for entity in entity_mapping_4.keys():
|
||||
if entity in result_4 and entity != entity_mapping_4[entity]: # Skip if masked text is same
|
||||
unmasked_entities.append(entity)
|
||||
|
||||
if not unmasked_entities:
|
||||
print("✅ PASS: All entities masked correctly in complex document")
|
||||
else:
|
||||
print(f"❌ FAIL: Unmasked entities: {unmasked_entities}")
|
||||
|
||||
print("\n" + "=" * 70)
|
||||
print("Final Fix Verification Completed!")
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_final_fix()
|
||||
|
|
@ -1,173 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test to verify the fix for multiple occurrence issue in apply_entity_masking_with_alignment.
|
||||
"""
|
||||
|
||||
def find_entity_alignment(entity_text: str, original_document_text: str):
|
||||
"""Simplified version of the alignment method for testing"""
|
||||
clean_entity = entity_text.replace(" ", "")
|
||||
doc_chars = [c for c in original_document_text if c != ' ']
|
||||
|
||||
for i in range(len(doc_chars) - len(clean_entity) + 1):
|
||||
if doc_chars[i:i+len(clean_entity)] == list(clean_entity):
|
||||
return map_char_positions_to_original(i, len(clean_entity), original_document_text)
|
||||
return None
|
||||
|
||||
def map_char_positions_to_original(clean_start: int, entity_length: int, original_text: str):
|
||||
"""Simplified version of position mapping for testing"""
|
||||
original_pos = 0
|
||||
clean_pos = 0
|
||||
|
||||
while clean_pos < clean_start and original_pos < len(original_text):
|
||||
if original_text[original_pos] != ' ':
|
||||
clean_pos += 1
|
||||
original_pos += 1
|
||||
|
||||
start_pos = original_pos
|
||||
|
||||
chars_found = 0
|
||||
while chars_found < entity_length and original_pos < len(original_text):
|
||||
if original_text[original_pos] != ' ':
|
||||
chars_found += 1
|
||||
original_pos += 1
|
||||
|
||||
end_pos = original_pos
|
||||
found_text = original_text[start_pos:end_pos]
|
||||
|
||||
return start_pos, end_pos, found_text
|
||||
|
||||
def apply_entity_masking_with_alignment_fixed(original_document_text: str, entity_mapping: dict):
|
||||
"""Fixed implementation that handles multiple occurrences"""
|
||||
masked_document = original_document_text
|
||||
sorted_entities = sorted(entity_mapping.keys(), key=len, reverse=True)
|
||||
|
||||
for entity_text in sorted_entities:
|
||||
masked_text = entity_mapping[entity_text]
|
||||
|
||||
# Find ALL occurrences of this entity in the document
|
||||
# We need to loop until no more matches are found
|
||||
while True:
|
||||
# Find the entity in the current masked document using alignment
|
||||
alignment_result = find_entity_alignment(entity_text, masked_document)
|
||||
|
||||
if alignment_result:
|
||||
start_pos, end_pos, found_text = alignment_result
|
||||
|
||||
# Replace the found text with the masked version
|
||||
masked_document = (
|
||||
masked_document[:start_pos] +
|
||||
masked_text +
|
||||
masked_document[end_pos:]
|
||||
)
|
||||
|
||||
print(f"Masked entity '{entity_text}' -> '{masked_text}' at positions {start_pos}-{end_pos}")
|
||||
else:
|
||||
# No more occurrences found for this entity, move to next entity
|
||||
print(f"No more occurrences of '{entity_text}' found in document")
|
||||
break
|
||||
|
||||
return masked_document
|
||||
|
||||
def test_fix_verification():
|
||||
"""Test to verify the fix works correctly"""
|
||||
|
||||
print("Testing Fix for Multiple Occurrence Issue")
|
||||
print("=" * 60)
|
||||
|
||||
# Test case 1: Multiple occurrences of the same entity
|
||||
print("\nTest Case 1: Multiple occurrences of same entity")
|
||||
test_document_1 = "上诉人李淼因合同纠纷,法定代表人李淼,委托代理人李淼。"
|
||||
entity_mapping_1 = {"李淼": "李M"}
|
||||
|
||||
print(f"Original: {test_document_1}")
|
||||
result_1 = apply_entity_masking_with_alignment_fixed(test_document_1, entity_mapping_1)
|
||||
print(f"Result: {result_1}")
|
||||
|
||||
remaining_1 = result_1.count("李淼")
|
||||
expected_1 = "上诉人李M因合同纠纷,法定代表人李M,委托代理人李M。"
|
||||
|
||||
if result_1 == expected_1 and remaining_1 == 0:
|
||||
print("✅ PASS: All occurrences masked correctly")
|
||||
else:
|
||||
print(f"❌ FAIL: Expected '{expected_1}', got '{result_1}'")
|
||||
print(f" Remaining '李淼' occurrences: {remaining_1}")
|
||||
|
||||
# Test case 2: Multiple entities with multiple occurrences
|
||||
print("\nTest Case 2: Multiple entities with multiple occurrences")
|
||||
test_document_2 = "上诉人李淼因合同纠纷,法定代表人李淼。北京丰复久信营销科技有限公司,丰复久信公司。"
|
||||
entity_mapping_2 = {
|
||||
"李淼": "李M",
|
||||
"北京丰复久信营销科技有限公司": "北京JO营销科技有限公司",
|
||||
"丰复久信公司": "丰复久信公司"
|
||||
}
|
||||
|
||||
print(f"Original: {test_document_2}")
|
||||
result_2 = apply_entity_masking_with_alignment_fixed(test_document_2, entity_mapping_2)
|
||||
print(f"Result: {result_2}")
|
||||
|
||||
remaining_2_li = result_2.count("李淼")
|
||||
remaining_2_company = result_2.count("北京丰复久信营销科技有限公司")
|
||||
|
||||
if remaining_2_li == 0 and remaining_2_company == 0:
|
||||
print("✅ PASS: All entities masked correctly")
|
||||
else:
|
||||
print(f"❌ FAIL: Remaining '李淼': {remaining_2_li}, '北京丰复久信营销科技有限公司': {remaining_2_company}")
|
||||
|
||||
# Test case 3: Mixed spacing scenarios
|
||||
print("\nTest Case 3: Mixed spacing scenarios")
|
||||
test_document_3 = "上诉人李 淼因合同纠纷,法定代表人李淼,委托代理人李 淼。"
|
||||
entity_mapping_3 = {"李 淼": "李M", "李淼": "李M"}
|
||||
|
||||
print(f"Original: {test_document_3}")
|
||||
result_3 = apply_entity_masking_with_alignment_fixed(test_document_3, entity_mapping_3)
|
||||
print(f"Result: {result_3}")
|
||||
|
||||
remaining_3 = result_3.count("李淼") + result_3.count("李 淼")
|
||||
|
||||
if remaining_3 == 0:
|
||||
print("✅ PASS: Mixed spacing handled correctly")
|
||||
else:
|
||||
print(f"❌ FAIL: Remaining occurrences: {remaining_3}")
|
||||
|
||||
# Test case 4: Complex document with real examples
|
||||
print("\nTest Case 4: Complex document with real examples")
|
||||
test_document_4 = """上诉人(原审原告):北京丰复久信营销科技有限公司,住所地北京市海淀区北小马厂6号1号楼华天大厦1306室。
|
||||
法定代表人:郭东军,执行董事、经理。
|
||||
委托诉讼代理人:周大海,北京市康达律师事务所律师。
|
||||
委托诉讼代理人:王乃哲,北京市康达律师事务所律师。
|
||||
被上诉人(原审被告):中研智创区块链技术有限公司,住所地天津市津南区双港镇工业园区优谷产业园5号楼-1505。
|
||||
法定代表人:王欢子,总经理。
|
||||
委托诉讼代理人:魏鑫,北京市昊衡律师事务所律师。"""
|
||||
|
||||
entity_mapping_4 = {
|
||||
"北京丰复久信营销科技有限公司": "北京JO营销科技有限公司",
|
||||
"郭东军": "郭DJ",
|
||||
"周大海": "周DH",
|
||||
"王乃哲": "王NZ",
|
||||
"中研智创区块链技术有限公司": "中研智创区块链技术有限公司",
|
||||
"王欢子": "王HZ",
|
||||
"魏鑫": "魏X",
|
||||
"北京市康达律师事务所": "北京市KD律师事务所",
|
||||
"北京市昊衡律师事务所": "北京市HH律师事务所"
|
||||
}
|
||||
|
||||
print(f"Original length: {len(test_document_4)} characters")
|
||||
result_4 = apply_entity_masking_with_alignment_fixed(test_document_4, entity_mapping_4)
|
||||
print(f"Result length: {len(result_4)} characters")
|
||||
|
||||
# Check that all entities were masked
|
||||
unmasked_entities = []
|
||||
for entity in entity_mapping_4.keys():
|
||||
if entity in result_4:
|
||||
unmasked_entities.append(entity)
|
||||
|
||||
if not unmasked_entities:
|
||||
print("✅ PASS: All entities masked in complex document")
|
||||
else:
|
||||
print(f"❌ FAIL: Unmasked entities: {unmasked_entities}")
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print("Fix Verification Completed!")
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_fix_verification()
|
||||
|
|
@ -1,96 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test to verify the multiple occurrence issue in apply_entity_masking_with_alignment.
|
||||
"""
|
||||
|
||||
def find_entity_alignment(entity_text: str, original_document_text: str):
|
||||
"""Simplified version of the alignment method for testing"""
|
||||
clean_entity = entity_text.replace(" ", "")
|
||||
doc_chars = [c for c in original_document_text if c != ' ']
|
||||
|
||||
for i in range(len(doc_chars) - len(clean_entity) + 1):
|
||||
if doc_chars[i:i+len(clean_entity)] == list(clean_entity):
|
||||
return map_char_positions_to_original(i, len(clean_entity), original_document_text)
|
||||
return None
|
||||
|
||||
def map_char_positions_to_original(clean_start: int, entity_length: int, original_text: str):
|
||||
"""Simplified version of position mapping for testing"""
|
||||
original_pos = 0
|
||||
clean_pos = 0
|
||||
|
||||
while clean_pos < clean_start and original_pos < len(original_text):
|
||||
if original_text[original_pos] != ' ':
|
||||
clean_pos += 1
|
||||
original_pos += 1
|
||||
|
||||
start_pos = original_pos
|
||||
|
||||
chars_found = 0
|
||||
while chars_found < entity_length and original_pos < len(original_text):
|
||||
if original_text[original_pos] != ' ':
|
||||
chars_found += 1
|
||||
original_pos += 1
|
||||
|
||||
end_pos = original_pos
|
||||
found_text = original_text[start_pos:end_pos]
|
||||
|
||||
return start_pos, end_pos, found_text
|
||||
|
||||
def apply_entity_masking_with_alignment_current(original_document_text: str, entity_mapping: dict):
|
||||
"""Current implementation with the bug"""
|
||||
masked_document = original_document_text
|
||||
sorted_entities = sorted(entity_mapping.keys(), key=len, reverse=True)
|
||||
|
||||
for entity_text in sorted_entities:
|
||||
masked_text = entity_mapping[entity_text]
|
||||
|
||||
# Find the entity in the original document using alignment
|
||||
alignment_result = find_entity_alignment(entity_text, masked_document)
|
||||
|
||||
if alignment_result:
|
||||
start_pos, end_pos, found_text = alignment_result
|
||||
|
||||
# Replace the found text with the masked version
|
||||
masked_document = (
|
||||
masked_document[:start_pos] +
|
||||
masked_text +
|
||||
masked_document[end_pos:]
|
||||
)
|
||||
|
||||
print(f"Masked entity '{entity_text}' -> '{masked_text}' at positions {start_pos}-{end_pos}")
|
||||
else:
|
||||
print(f"Could not find entity '{entity_text}' in document for masking")
|
||||
|
||||
return masked_document
|
||||
|
||||
def test_multiple_occurrences():
|
||||
"""Test the multiple occurrence issue"""
|
||||
|
||||
print("Testing Multiple Occurrence Issue")
|
||||
print("=" * 50)
|
||||
|
||||
# Test document with multiple occurrences of the same entity
|
||||
test_document = "上诉人李淼因合同纠纷,法定代表人李淼,委托代理人李淼。"
|
||||
entity_mapping = {
|
||||
"李淼": "李M"
|
||||
}
|
||||
|
||||
print(f"Original document: {test_document}")
|
||||
print(f"Entity mapping: {entity_mapping}")
|
||||
print(f"Expected: All 3 occurrences of '李淼' should be masked")
|
||||
|
||||
# Test current implementation
|
||||
result = apply_entity_masking_with_alignment_current(test_document, entity_mapping)
|
||||
print(f"Current result: {result}")
|
||||
|
||||
# Count remaining occurrences
|
||||
remaining_count = result.count("李淼")
|
||||
print(f"Remaining '李淼' occurrences: {remaining_count}")
|
||||
|
||||
if remaining_count > 0:
|
||||
print("❌ ISSUE CONFIRMED: Multiple occurrences are not being masked!")
|
||||
else:
|
||||
print("✅ No issue found (unexpected)")
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_multiple_occurrences()
|
||||
Loading…
Reference in New Issue