From 24f452818a00a7d9447959fb6b8f1b9189aa3337 Mon Sep 17 00:00:00 2001 From: tigerenwork Date: Tue, 19 Aug 2025 16:08:49 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=20=E6=9B=B4=E6=96=B0=E6=9B=BF?= =?UTF-8?q?=E6=8D=A2=E7=AE=97=E6=B3=95=EF=BC=8C=E8=A7=A3=E5=86=B3=E5=8C=B9?= =?UTF-8?q?=E9=85=8Dtoken=E4=B8=AD=E6=9C=89=E7=A9=BA=E6=A0=BC=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DOCKER_COMPOSE_README.md | 2 +- .../document_handlers/document_processor.py | 43 +++- .../extractors/ner_extractor.py | 2 +- .../core/document_handlers/masker_factory.py | 2 +- .../core/document_handlers/ner_processor.py | 196 +++++++++++++++++- .../ner_processor_refactored.py | 133 +++++++++++- backend/tests/debug_position_issue.py | 130 ++++++++++++ backend/tests/test_character_alignment.py | 67 ++++++ backend/tests/test_final_fix.py | 186 +++++++++++++++++ backend/tests/test_fix_verification.py | 173 ++++++++++++++++ backend/tests/test_multiple_occurrences.py | 96 +++++++++ 11 files changed, 1013 insertions(+), 17 deletions(-) create mode 100644 backend/tests/debug_position_issue.py create mode 100644 backend/tests/test_character_alignment.py create mode 100644 backend/tests/test_final_fix.py create mode 100644 backend/tests/test_fix_verification.py create mode 100644 backend/tests/test_multiple_occurrences.py diff --git a/DOCKER_COMPOSE_README.md b/DOCKER_COMPOSE_README.md index 710b762..75996df 100644 --- a/DOCKER_COMPOSE_README.md +++ b/DOCKER_COMPOSE_README.md @@ -86,7 +86,7 @@ docker-compose build frontend docker-compose build mineru-api # Build multiple specific services -docker-compose build backend-api frontend +docker-compose build backend-api frontend celery-worker ``` ### Building and restarting specific services diff --git a/backend/app/core/document_handlers/document_processor.py b/backend/app/core/document_handlers/document_processor.py index 4c61ba5..567e892 100644 --- a/backend/app/core/document_handlers/document_processor.py +++ b/backend/app/core/document_handlers/document_processor.py @@ -40,17 +40,36 @@ class DocumentProcessor(ABC): return chunks - def _apply_mapping(self, text: str, mapping: Dict[str, str]) -> str: - """Apply the mapping to replace sensitive information""" - masked_text = text - for original, masked in mapping.items(): - if isinstance(masked, dict): - masked = next(iter(masked.values()), "某") - elif not isinstance(masked, str): - masked = str(masked) if masked is not None else "某" - masked_text = masked_text.replace(original, masked) + def _apply_mapping_with_alignment(self, text: str, mapping: Dict[str, str]) -> str: + """ + Apply the mapping to replace sensitive information using character-by-character alignment. + + This method uses the new alignment-based masking to handle spacing issues + between NER results and original document text. + + Args: + text: Original document text + mapping: Dictionary mapping original entity text to masked text + + Returns: + Masked document text + """ + logger.info(f"Applying entity mapping with alignment to text of length {len(text)}") + logger.debug(f"Entity mapping: {mapping}") + + # Use the new alignment-based masking method + masked_text = self.ner_processor.apply_entity_masking_with_alignment(text, mapping) + + logger.info("Successfully applied entity masking with alignment") return masked_text + def _apply_mapping(self, text: str, mapping: Dict[str, str]) -> str: + """ + Legacy method for simple string replacement. + Now delegates to the new alignment-based method. + """ + return self._apply_mapping_with_alignment(text, mapping) + def process_content(self, content: str) -> str: """Process document content by masking sensitive information""" sentences = content.split("。") @@ -59,9 +78,11 @@ class DocumentProcessor(ABC): logger.info(f"Split content into {len(chunks)} chunks") final_mapping = self.ner_processor.process(chunks) + logger.info(f"Generated entity mapping with {len(final_mapping)} entities") - masked_content = self._apply_mapping(content, final_mapping) - logger.info("Successfully masked content") + # Use the new alignment-based masking + masked_content = self._apply_mapping_with_alignment(content, final_mapping) + logger.info("Successfully masked content using character alignment") return masked_content diff --git a/backend/app/core/document_handlers/extractors/ner_extractor.py b/backend/app/core/document_handlers/extractors/ner_extractor.py index ed73b02..770525c 100644 --- a/backend/app/core/document_handlers/extractors/ner_extractor.py +++ b/backend/app/core/document_handlers/extractors/ner_extractor.py @@ -122,7 +122,7 @@ class NERExtractor(BaseExtractor): # Add to our list with both original and cleaned text filtered_entities.append({ "text": cleaned_text, # Clean text for display/processing - "original_text": entity_text, # Original tokenized text from model + "tokenized_text": entity_text, # Original tokenized text from model "type": entity_type, "confidence": confidence_score }) diff --git a/backend/app/core/document_handlers/masker_factory.py b/backend/app/core/document_handlers/masker_factory.py index d9207e9..f2a47ba 100644 --- a/backend/app/core/document_handlers/masker_factory.py +++ b/backend/app/core/document_handlers/masker_factory.py @@ -9,7 +9,7 @@ from .maskers.company_masker import CompanyMasker from .maskers.address_masker import AddressMasker from .maskers.id_masker import IDMasker from .maskers.case_masker import CaseMasker -from ...services.ollama_client import OllamaClient +from ..services.ollama_client import OllamaClient class MaskerFactory: diff --git a/backend/app/core/document_handlers/ner_processor.py b/backend/app/core/document_handlers/ner_processor.py index eb76eb5..401aeaf 100644 --- a/backend/app/core/document_handlers/ner_processor.py +++ b/backend/app/core/document_handlers/ner_processor.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List +from typing import Any, Dict, List, Tuple, Optional from ..prompts.masking_prompts import get_ner_name_prompt, get_ner_company_prompt, get_ner_address_prompt, get_ner_project_prompt, get_ner_case_number_prompt, get_entity_linkage_prompt import logging import json @@ -20,9 +20,201 @@ class NerProcessor: # Initialize NER extractor for ML-based entity extraction self.ner_extractor = NERExtractor() + def _find_entity_alignment(self, entity_text: str, original_document_text: str) -> Optional[Tuple[int, int, str]]: + """ + Find entity in original document using character-by-character alignment. + + This method handles the case where the original document may have spaces + that are not from tokenization, and the entity text may have different + spacing patterns. + + Args: + entity_text: The entity text to find (may have spaces from tokenization) + original_document_text: The original document text (may have spaces) + + Returns: + Tuple of (start_pos, end_pos, found_text) or None if not found + """ + # Remove all spaces from entity text to get clean characters + clean_entity = entity_text.replace(" ", "") + + # Create character lists ignoring spaces from both entity and document + entity_chars = [c for c in clean_entity] + doc_chars = [c for c in original_document_text if c != ' '] + + # Find the sequence in document characters + for i in range(len(doc_chars) - len(entity_chars) + 1): + if doc_chars[i:i+len(entity_chars)] == entity_chars: + # Found match, now map back to original positions + return self._map_char_positions_to_original(i, len(entity_chars), original_document_text) + + return None + + def _map_char_positions_to_original(self, clean_start: int, entity_length: int, original_text: str) -> Tuple[int, int, str]: + """ + Map positions from clean text (without spaces) back to original text positions. + + Args: + clean_start: Start position in clean text (without spaces) + entity_length: Length of entity in characters + original_text: Original document text with spaces + + Returns: + Tuple of (start_pos, end_pos, found_text) in original text + """ + original_pos = 0 + clean_pos = 0 + + # Find the start position in original text + while clean_pos < clean_start and original_pos < len(original_text): + if original_text[original_pos] != ' ': + clean_pos += 1 + original_pos += 1 + + start_pos = original_pos + + # Find the end position by counting non-space characters + chars_found = 0 + while chars_found < entity_length and original_pos < len(original_text): + if original_text[original_pos] != ' ': + chars_found += 1 + original_pos += 1 + + end_pos = original_pos + + # Extract the actual text from the original document + found_text = original_text[start_pos:end_pos] + + return start_pos, end_pos, found_text + def _validate_mapping_format(self, mapping: Dict[str, Any]) -> bool: return LLMResponseValidator.validate_entity_extraction(mapping) - + + def apply_entity_masking_with_alignment(self, original_document_text: str, entity_mapping: Dict[str, str], mask_char: str = "*") -> str: + """ + Apply entity masking to original document text using character-by-character alignment. + + This method finds each entity in the original document using alignment and + replaces it with the corresponding masked version. It handles multiple + occurrences of the same entity by finding all instances before moving + to the next entity. + + Args: + original_document_text: The original document text to mask + entity_mapping: Dictionary mapping original entity text to masked text + mask_char: Character to use for masking (default: "*") + + Returns: + Masked document text + """ + masked_document = original_document_text + + # Sort entities by length (longest first) to avoid partial matches + sorted_entities = sorted(entity_mapping.keys(), key=len, reverse=True) + + for entity_text in sorted_entities: + masked_text = entity_mapping[entity_text] + + # Skip if masked text is the same as original text (prevents infinite loop) + if entity_text == masked_text: + logger.debug(f"Skipping entity '{entity_text}' as masked text is identical") + continue + + # Find ALL occurrences of this entity in the document + # We need to loop until no more matches are found + # Add safety counter to prevent infinite loops + max_iterations = 100 # Safety limit + iteration_count = 0 + + while iteration_count < max_iterations: + iteration_count += 1 + + # Find the entity in the current masked document using alignment + alignment_result = self._find_entity_alignment(entity_text, masked_document) + + if alignment_result: + start_pos, end_pos, found_text = alignment_result + + # Replace the found text with the masked version + masked_document = ( + masked_document[:start_pos] + + masked_text + + masked_document[end_pos:] + ) + + logger.debug(f"Masked entity '{entity_text}' -> '{masked_text}' at positions {start_pos}-{end_pos} (iteration {iteration_count})") + else: + # No more occurrences found for this entity, move to next entity + logger.debug(f"No more occurrences of '{entity_text}' found in document after {iteration_count} iterations") + break + + # Log warning if we hit the safety limit + if iteration_count >= max_iterations: + logger.warning(f"Reached maximum iterations ({max_iterations}) for entity '{entity_text}', stopping to prevent infinite loop") + + return masked_document + + def test_character_alignment(self) -> None: + """ + Test method to demonstrate character-by-character alignment functionality. + This method can be used to validate the alignment works correctly with + various spacing patterns. + """ + test_cases = [ + # Test case 1: Entity with spaces, document without spaces + { + "entity_text": "李 淼", + "document_text": "上诉人李淼因合同纠纷", + "expected_found": "李淼" + }, + # Test case 2: Entity without spaces, document with spaces + { + "entity_text": "邓青菁", + "document_text": "上诉人邓 青 菁因合同纠纷", + "expected_found": "邓 青 菁" + }, + # Test case 3: Both entity and document have spaces + { + "entity_text": "王 欢 子", + "document_text": "法定代表人王 欢 子,总经理", + "expected_found": "王 欢 子" + }, + # Test case 4: Entity without spaces, document without spaces + { + "entity_text": "郭东军", + "document_text": "法定代表人郭东军,执行董事", + "expected_found": "郭东军" + }, + # Test case 5: Complex company name + { + "entity_text": "北京丰复久信营销科技有限公司", + "document_text": "上诉人(原审原告):北京 丰复久信 营销科技 有限公司", + "expected_found": "北京 丰复久信 营销科技 有限公司" + } + ] + + logger.info("Testing character-by-character alignment...") + + for i, test_case in enumerate(test_cases, 1): + entity_text = test_case["entity_text"] + document_text = test_case["document_text"] + expected_found = test_case["expected_found"] + + result = self._find_entity_alignment(entity_text, document_text) + + if result: + start_pos, end_pos, found_text = result + success = found_text == expected_found + status = "✓ PASS" if success else "✗ FAIL" + logger.info(f"Test {i} {status}: Entity '{entity_text}' -> Found '{found_text}' (expected '{expected_found}') at positions {start_pos}-{end_pos}") + + if not success: + logger.error(f" Expected: '{expected_found}', Got: '{found_text}'") + else: + logger.error(f"Test {i} ✗ FAIL: Entity '{entity_text}' not found in document") + + logger.info("Character alignment testing completed.") + def extract_entities_with_ner(self, text: str) -> List[Dict[str, Any]]: """ Extract entities using the NER model diff --git a/backend/app/core/document_handlers/ner_processor_refactored.py b/backend/app/core/document_handlers/ner_processor_refactored.py index 20cc1c6..bed4e6c 100644 --- a/backend/app/core/document_handlers/ner_processor_refactored.py +++ b/backend/app/core/document_handlers/ner_processor_refactored.py @@ -3,7 +3,7 @@ Refactored NerProcessor using the new masker architecture. """ import logging -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple from ..prompts.masking_prompts import ( get_ner_name_prompt, get_ner_company_prompt, get_ner_address_prompt, get_ner_project_prompt, get_ner_case_number_prompt, get_entity_linkage_prompt @@ -28,6 +28,137 @@ class NerProcessorRefactored: self.maskers = self._initialize_maskers() self.surname_counter = {} # Shared counter for Chinese names + def _find_entity_alignment(self, entity_text: str, original_document_text: str) -> Optional[Tuple[int, int, str]]: + """ + Find entity in original document using character-by-character alignment. + + This method handles the case where the original document may have spaces + that are not from tokenization, and the entity text may have different + spacing patterns. + + Args: + entity_text: The entity text to find (may have spaces from tokenization) + original_document_text: The original document text (may have spaces) + + Returns: + Tuple of (start_pos, end_pos, found_text) or None if not found + """ + # Remove all spaces from entity text to get clean characters + clean_entity = entity_text.replace(" ", "") + + # Create character lists ignoring spaces from both entity and document + entity_chars = [c for c in clean_entity] + doc_chars = [c for c in original_document_text if c != ' '] + + # Find the sequence in document characters + for i in range(len(doc_chars) - len(entity_chars) + 1): + if doc_chars[i:i+len(entity_chars)] == entity_chars: + # Found match, now map back to original positions + return self._map_char_positions_to_original(i, len(entity_chars), original_document_text) + + return None + + def _map_char_positions_to_original(self, clean_start: int, entity_length: int, original_text: str) -> Tuple[int, int, str]: + """ + Map positions from clean text (without spaces) back to original text positions. + + Args: + clean_start: Start position in clean text (without spaces) + entity_length: Length of entity in characters + original_text: Original document text with spaces + + Returns: + Tuple of (start_pos, end_pos, found_text) in original text + """ + original_pos = 0 + clean_pos = 0 + + # Find the start position in original text + while clean_pos < clean_start and original_pos < len(original_text): + if original_text[original_pos] != ' ': + clean_pos += 1 + original_pos += 1 + + start_pos = original_pos + + # Find the end position by counting non-space characters + chars_found = 0 + while chars_found < entity_length and original_pos < len(original_text): + if original_text[original_pos] != ' ': + chars_found += 1 + original_pos += 1 + + end_pos = original_pos + + # Extract the actual text from the original document + found_text = original_text[start_pos:end_pos] + + return start_pos, end_pos, found_text + + def apply_entity_masking_with_alignment(self, original_document_text: str, entity_mapping: Dict[str, str], mask_char: str = "*") -> str: + """ + Apply entity masking to original document text using character-by-character alignment. + + This method finds each entity in the original document using alignment and + replaces it with the corresponding masked version. It handles multiple + occurrences of the same entity by finding all instances before moving + to the next entity. + + Args: + original_document_text: The original document text to mask + entity_mapping: Dictionary mapping original entity text to masked text + mask_char: Character to use for masking (default: "*") + + Returns: + Masked document text + """ + masked_document = original_document_text + + # Sort entities by length (longest first) to avoid partial matches + sorted_entities = sorted(entity_mapping.keys(), key=len, reverse=True) + + for entity_text in sorted_entities: + masked_text = entity_mapping[entity_text] + + # Skip if masked text is the same as original text (prevents infinite loop) + if entity_text == masked_text: + logger.debug(f"Skipping entity '{entity_text}' as masked text is identical") + continue + + # Find ALL occurrences of this entity in the document + # We need to loop until no more matches are found + # Add safety counter to prevent infinite loops + max_iterations = 100 # Safety limit + iteration_count = 0 + + while iteration_count < max_iterations: + iteration_count += 1 + + # Find the entity in the current masked document using alignment + alignment_result = self._find_entity_alignment(entity_text, masked_document) + + if alignment_result: + start_pos, end_pos, found_text = alignment_result + + # Replace the found text with the masked version + masked_document = ( + masked_document[:start_pos] + + masked_text + + masked_document[end_pos:] + ) + + logger.debug(f"Masked entity '{entity_text}' -> '{masked_text}' at positions {start_pos}-{end_pos} (iteration {iteration_count})") + else: + # No more occurrences found for this entity, move to next entity + logger.debug(f"No more occurrences of '{entity_text}' found in document after {iteration_count} iterations") + break + + # Log warning if we hit the safety limit + if iteration_count >= max_iterations: + logger.warning(f"Reached maximum iterations ({max_iterations}) for entity '{entity_text}', stopping to prevent infinite loop") + + return masked_document + def _initialize_maskers(self) -> Dict[str, BaseMasker]: """Initialize all maskers""" maskers = {} diff --git a/backend/tests/debug_position_issue.py b/backend/tests/debug_position_issue.py new file mode 100644 index 0000000..36615be --- /dev/null +++ b/backend/tests/debug_position_issue.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python3 +""" +Debug script to understand the position mapping issue after masking. +""" + +def find_entity_alignment(entity_text: str, original_document_text: str): + """Simplified version of the alignment method for testing""" + clean_entity = entity_text.replace(" ", "") + doc_chars = [c for c in original_document_text if c != ' '] + + for i in range(len(doc_chars) - len(clean_entity) + 1): + if doc_chars[i:i+len(clean_entity)] == list(clean_entity): + return map_char_positions_to_original(i, len(clean_entity), original_document_text) + return None + +def map_char_positions_to_original(clean_start: int, entity_length: int, original_text: str): + """Simplified version of position mapping for testing""" + original_pos = 0 + clean_pos = 0 + + while clean_pos < clean_start and original_pos < len(original_text): + if original_text[original_pos] != ' ': + clean_pos += 1 + original_pos += 1 + + start_pos = original_pos + + chars_found = 0 + while chars_found < entity_length and original_pos < len(original_text): + if original_text[original_pos] != ' ': + chars_found += 1 + original_pos += 1 + + end_pos = original_pos + found_text = original_text[start_pos:end_pos] + + return start_pos, end_pos, found_text + +def debug_position_issue(): + """Debug the position mapping issue""" + + print("Debugging Position Mapping Issue") + print("=" * 50) + + # Test document + original_doc = "上诉人李淼因合同纠纷,法定代表人李淼,委托代理人李淼。" + entity = "李淼" + masked_text = "李M" + + print(f"Original document: '{original_doc}'") + print(f"Entity to mask: '{entity}'") + print(f"Masked text: '{masked_text}'") + print() + + # First occurrence + print("=== First Occurrence ===") + result1 = find_entity_alignment(entity, original_doc) + if result1: + start1, end1, found1 = result1 + print(f"Found at positions {start1}-{end1}: '{found1}'") + + # Apply first mask + masked_doc = original_doc[:start1] + masked_text + original_doc[end1:] + print(f"After first mask: '{masked_doc}'") + print(f"Length changed from {len(original_doc)} to {len(masked_doc)}") + + # Try to find second occurrence in the masked document + print("\n=== Second Occurrence (in masked document) ===") + result2 = find_entity_alignment(entity, masked_doc) + if result2: + start2, end2, found2 = result2 + print(f"Found at positions {start2}-{end2}: '{found2}'") + + # Apply second mask + masked_doc2 = masked_doc[:start2] + masked_text + masked_doc[end2:] + print(f"After second mask: '{masked_doc2}'") + + # Try to find third occurrence + print("\n=== Third Occurrence (in double-masked document) ===") + result3 = find_entity_alignment(entity, masked_doc2) + if result3: + start3, end3, found3 = result3 + print(f"Found at positions {start3}-{end3}: '{found3}'") + else: + print("No third occurrence found") + else: + print("No second occurrence found") + else: + print("No first occurrence found") + +def debug_infinite_loop(): + """Debug the infinite loop issue""" + + print("\n" + "=" * 50) + print("Debugging Infinite Loop Issue") + print("=" * 50) + + # Test document that causes infinite loop + original_doc = "上诉人李淼因合同纠纷,法定代表人李淼。北京丰复久信营销科技有限公司,丰复久信公司。" + entity = "丰复久信公司" + masked_text = "丰复久信公司" # Same text (no change) + + print(f"Original document: '{original_doc}'") + print(f"Entity to mask: '{entity}'") + print(f"Masked text: '{masked_text}' (same as original)") + print() + + # This will cause infinite loop because we're replacing with the same text + print("=== This will cause infinite loop ===") + print("Because we're replacing '丰复久信公司' with '丰复久信公司'") + print("The document doesn't change, so we keep finding the same position") + + # Show what happens + masked_doc = original_doc + for i in range(3): # Limit to 3 iterations for demo + result = find_entity_alignment(entity, masked_doc) + if result: + start, end, found = result + print(f"Iteration {i+1}: Found at positions {start}-{end}: '{found}'") + + # Apply mask (but it's the same text) + masked_doc = masked_doc[:start] + masked_text + masked_doc[end:] + print(f"After mask: '{masked_doc}'") + else: + print(f"Iteration {i+1}: No occurrence found") + break + +if __name__ == "__main__": + debug_position_issue() + debug_infinite_loop() diff --git a/backend/tests/test_character_alignment.py b/backend/tests/test_character_alignment.py new file mode 100644 index 0000000..9dd1986 --- /dev/null +++ b/backend/tests/test_character_alignment.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 +""" +Test script for character-by-character alignment functionality. +This script demonstrates how the alignment handles different spacing patterns +between entity text and original document text. +""" + +import sys +import os +sys.path.append(os.path.join(os.path.dirname(__file__), 'backend')) + +from app.core.document_handlers.ner_processor import NerProcessor + +def main(): + """Test the character alignment functionality.""" + processor = NerProcessor() + + print("Testing Character-by-Character Alignment") + print("=" * 50) + + # Test the alignment functionality + processor.test_character_alignment() + + print("\n" + "=" * 50) + print("Testing Entity Masking with Alignment") + print("=" * 50) + + # Test entity masking with alignment + original_document = "上诉人(原审原告):北京丰复久信营销科技有限公司,住所地北京市海淀区北小马厂6号1号楼华天大厦1306室。法定代表人:郭东军,执行董事、经理。委托诉讼代理人:周大海,北京市康达律师事务所律师。" + + # Example entity mapping (from your NER results) + entity_mapping = { + "北京丰复久信营销科技有限公司": "北京JO营销科技有限公司", + "郭东军": "郭DJ", + "周大海": "周DH", + "北京市康达律师事务所": "北京市KD律师事务所" + } + + print(f"Original document: {original_document}") + print(f"Entity mapping: {entity_mapping}") + + # Apply masking with alignment + masked_document = processor.apply_entity_masking_with_alignment( + original_document, + entity_mapping + ) + + print(f"Masked document: {masked_document}") + + # Test with document that has spaces + print("\n" + "=" * 50) + print("Testing with Document Containing Spaces") + print("=" * 50) + + spaced_document = "上诉人(原审原告):北京 丰复久信 营销科技 有限公司,住所地北京市海淀区北小马厂6号1号楼华天大厦1306室。法定代表人:郭 东 军,执行董事、经理。" + + print(f"Spaced document: {spaced_document}") + + masked_spaced_document = processor.apply_entity_masking_with_alignment( + spaced_document, + entity_mapping + ) + + print(f"Masked spaced document: {masked_spaced_document}") + +if __name__ == "__main__": + main() diff --git a/backend/tests/test_final_fix.py b/backend/tests/test_final_fix.py new file mode 100644 index 0000000..5177546 --- /dev/null +++ b/backend/tests/test_final_fix.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +""" +Final test to verify the fix handles multiple occurrences and prevents infinite loops. +""" + +def find_entity_alignment(entity_text: str, original_document_text: str): + """Simplified version of the alignment method for testing""" + clean_entity = entity_text.replace(" ", "") + doc_chars = [c for c in original_document_text if c != ' '] + + for i in range(len(doc_chars) - len(clean_entity) + 1): + if doc_chars[i:i+len(clean_entity)] == list(clean_entity): + return map_char_positions_to_original(i, len(clean_entity), original_document_text) + return None + +def map_char_positions_to_original(clean_start: int, entity_length: int, original_text: str): + """Simplified version of position mapping for testing""" + original_pos = 0 + clean_pos = 0 + + while clean_pos < clean_start and original_pos < len(original_text): + if original_text[original_pos] != ' ': + clean_pos += 1 + original_pos += 1 + + start_pos = original_pos + + chars_found = 0 + while chars_found < entity_length and original_pos < len(original_text): + if original_text[original_pos] != ' ': + chars_found += 1 + original_pos += 1 + + end_pos = original_pos + found_text = original_text[start_pos:end_pos] + + return start_pos, end_pos, found_text + +def apply_entity_masking_with_alignment_fixed(original_document_text: str, entity_mapping: dict): + """Fixed implementation that handles multiple occurrences and prevents infinite loops""" + masked_document = original_document_text + sorted_entities = sorted(entity_mapping.keys(), key=len, reverse=True) + + for entity_text in sorted_entities: + masked_text = entity_mapping[entity_text] + + # Skip if masked text is the same as original text (prevents infinite loop) + if entity_text == masked_text: + print(f"Skipping entity '{entity_text}' as masked text is identical") + continue + + # Find ALL occurrences of this entity in the document + # Add safety counter to prevent infinite loops + max_iterations = 100 # Safety limit + iteration_count = 0 + + while iteration_count < max_iterations: + iteration_count += 1 + + # Find the entity in the current masked document using alignment + alignment_result = find_entity_alignment(entity_text, masked_document) + + if alignment_result: + start_pos, end_pos, found_text = alignment_result + + # Replace the found text with the masked version + masked_document = ( + masked_document[:start_pos] + + masked_text + + masked_document[end_pos:] + ) + + print(f"Masked entity '{entity_text}' -> '{masked_text}' at positions {start_pos}-{end_pos} (iteration {iteration_count})") + else: + # No more occurrences found for this entity, move to next entity + print(f"No more occurrences of '{entity_text}' found in document after {iteration_count} iterations") + break + + # Log warning if we hit the safety limit + if iteration_count >= max_iterations: + print(f"WARNING: Reached maximum iterations ({max_iterations}) for entity '{entity_text}', stopping to prevent infinite loop") + + return masked_document + +def test_final_fix(): + """Test the final fix with various scenarios""" + + print("Testing Final Fix for Multiple Occurrences and Infinite Loop Prevention") + print("=" * 70) + + # Test case 1: Multiple occurrences of the same entity (should work) + print("\nTest Case 1: Multiple occurrences of same entity") + test_document_1 = "上诉人李淼因合同纠纷,法定代表人李淼,委托代理人李淼。" + entity_mapping_1 = {"李淼": "李M"} + + print(f"Original: {test_document_1}") + result_1 = apply_entity_masking_with_alignment_fixed(test_document_1, entity_mapping_1) + print(f"Result: {result_1}") + + remaining_1 = result_1.count("李淼") + expected_1 = "上诉人李M因合同纠纷,法定代表人李M,委托代理人李M。" + + if result_1 == expected_1 and remaining_1 == 0: + print("✅ PASS: All occurrences masked correctly") + else: + print(f"❌ FAIL: Expected '{expected_1}', got '{result_1}'") + print(f" Remaining '李淼' occurrences: {remaining_1}") + + # Test case 2: Entity with same masked text (should skip to prevent infinite loop) + print("\nTest Case 2: Entity with same masked text (should skip)") + test_document_2 = "上诉人李淼因合同纠纷,法定代表人李淼。北京丰复久信营销科技有限公司,丰复久信公司。" + entity_mapping_2 = { + "李淼": "李M", + "丰复久信公司": "丰复久信公司" # Same text - should be skipped + } + + print(f"Original: {test_document_2}") + result_2 = apply_entity_masking_with_alignment_fixed(test_document_2, entity_mapping_2) + print(f"Result: {result_2}") + + remaining_2_li = result_2.count("李淼") + remaining_2_company = result_2.count("丰复久信公司") + + if remaining_2_li == 0 and remaining_2_company == 1: # Company should remain unmasked + print("✅ PASS: Infinite loop prevented, only different text masked") + else: + print(f"❌ FAIL: Remaining '李淼': {remaining_2_li}, '丰复久信公司': {remaining_2_company}") + + # Test case 3: Mixed spacing scenarios + print("\nTest Case 3: Mixed spacing scenarios") + test_document_3 = "上诉人李 淼因合同纠纷,法定代表人李淼,委托代理人李 淼。" + entity_mapping_3 = {"李 淼": "李M", "李淼": "李M"} + + print(f"Original: {test_document_3}") + result_3 = apply_entity_masking_with_alignment_fixed(test_document_3, entity_mapping_3) + print(f"Result: {result_3}") + + remaining_3 = result_3.count("李淼") + result_3.count("李 淼") + + if remaining_3 == 0: + print("✅ PASS: Mixed spacing handled correctly") + else: + print(f"❌ FAIL: Remaining occurrences: {remaining_3}") + + # Test case 4: Complex document with real examples + print("\nTest Case 4: Complex document with real examples") + test_document_4 = """上诉人(原审原告):北京丰复久信营销科技有限公司,住所地北京市海淀区北小马厂6号1号楼华天大厦1306室。 +法定代表人:郭东军,执行董事、经理。 +委托诉讼代理人:周大海,北京市康达律师事务所律师。 +委托诉讼代理人:王乃哲,北京市康达律师事务所律师。 +被上诉人(原审被告):中研智创区块链技术有限公司,住所地天津市津南区双港镇工业园区优谷产业园5号楼-1505。 +法定代表人:王欢子,总经理。 +委托诉讼代理人:魏鑫,北京市昊衡律师事务所律师。""" + + entity_mapping_4 = { + "北京丰复久信营销科技有限公司": "北京JO营销科技有限公司", + "郭东军": "郭DJ", + "周大海": "周DH", + "王乃哲": "王NZ", + "中研智创区块链技术有限公司": "中研智创区块链技术有限公司", # Same text - should be skipped + "王欢子": "王HZ", + "魏鑫": "魏X", + "北京市康达律师事务所": "北京市KD律师事务所", + "北京市昊衡律师事务所": "北京市HH律师事务所" + } + + print(f"Original length: {len(test_document_4)} characters") + result_4 = apply_entity_masking_with_alignment_fixed(test_document_4, entity_mapping_4) + print(f"Result length: {len(result_4)} characters") + + # Check that entities were masked correctly + unmasked_entities = [] + for entity in entity_mapping_4.keys(): + if entity in result_4 and entity != entity_mapping_4[entity]: # Skip if masked text is same + unmasked_entities.append(entity) + + if not unmasked_entities: + print("✅ PASS: All entities masked correctly in complex document") + else: + print(f"❌ FAIL: Unmasked entities: {unmasked_entities}") + + print("\n" + "=" * 70) + print("Final Fix Verification Completed!") + +if __name__ == "__main__": + test_final_fix() diff --git a/backend/tests/test_fix_verification.py b/backend/tests/test_fix_verification.py new file mode 100644 index 0000000..7f59bf7 --- /dev/null +++ b/backend/tests/test_fix_verification.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python3 +""" +Test to verify the fix for multiple occurrence issue in apply_entity_masking_with_alignment. +""" + +def find_entity_alignment(entity_text: str, original_document_text: str): + """Simplified version of the alignment method for testing""" + clean_entity = entity_text.replace(" ", "") + doc_chars = [c for c in original_document_text if c != ' '] + + for i in range(len(doc_chars) - len(clean_entity) + 1): + if doc_chars[i:i+len(clean_entity)] == list(clean_entity): + return map_char_positions_to_original(i, len(clean_entity), original_document_text) + return None + +def map_char_positions_to_original(clean_start: int, entity_length: int, original_text: str): + """Simplified version of position mapping for testing""" + original_pos = 0 + clean_pos = 0 + + while clean_pos < clean_start and original_pos < len(original_text): + if original_text[original_pos] != ' ': + clean_pos += 1 + original_pos += 1 + + start_pos = original_pos + + chars_found = 0 + while chars_found < entity_length and original_pos < len(original_text): + if original_text[original_pos] != ' ': + chars_found += 1 + original_pos += 1 + + end_pos = original_pos + found_text = original_text[start_pos:end_pos] + + return start_pos, end_pos, found_text + +def apply_entity_masking_with_alignment_fixed(original_document_text: str, entity_mapping: dict): + """Fixed implementation that handles multiple occurrences""" + masked_document = original_document_text + sorted_entities = sorted(entity_mapping.keys(), key=len, reverse=True) + + for entity_text in sorted_entities: + masked_text = entity_mapping[entity_text] + + # Find ALL occurrences of this entity in the document + # We need to loop until no more matches are found + while True: + # Find the entity in the current masked document using alignment + alignment_result = find_entity_alignment(entity_text, masked_document) + + if alignment_result: + start_pos, end_pos, found_text = alignment_result + + # Replace the found text with the masked version + masked_document = ( + masked_document[:start_pos] + + masked_text + + masked_document[end_pos:] + ) + + print(f"Masked entity '{entity_text}' -> '{masked_text}' at positions {start_pos}-{end_pos}") + else: + # No more occurrences found for this entity, move to next entity + print(f"No more occurrences of '{entity_text}' found in document") + break + + return masked_document + +def test_fix_verification(): + """Test to verify the fix works correctly""" + + print("Testing Fix for Multiple Occurrence Issue") + print("=" * 60) + + # Test case 1: Multiple occurrences of the same entity + print("\nTest Case 1: Multiple occurrences of same entity") + test_document_1 = "上诉人李淼因合同纠纷,法定代表人李淼,委托代理人李淼。" + entity_mapping_1 = {"李淼": "李M"} + + print(f"Original: {test_document_1}") + result_1 = apply_entity_masking_with_alignment_fixed(test_document_1, entity_mapping_1) + print(f"Result: {result_1}") + + remaining_1 = result_1.count("李淼") + expected_1 = "上诉人李M因合同纠纷,法定代表人李M,委托代理人李M。" + + if result_1 == expected_1 and remaining_1 == 0: + print("✅ PASS: All occurrences masked correctly") + else: + print(f"❌ FAIL: Expected '{expected_1}', got '{result_1}'") + print(f" Remaining '李淼' occurrences: {remaining_1}") + + # Test case 2: Multiple entities with multiple occurrences + print("\nTest Case 2: Multiple entities with multiple occurrences") + test_document_2 = "上诉人李淼因合同纠纷,法定代表人李淼。北京丰复久信营销科技有限公司,丰复久信公司。" + entity_mapping_2 = { + "李淼": "李M", + "北京丰复久信营销科技有限公司": "北京JO营销科技有限公司", + "丰复久信公司": "丰复久信公司" + } + + print(f"Original: {test_document_2}") + result_2 = apply_entity_masking_with_alignment_fixed(test_document_2, entity_mapping_2) + print(f"Result: {result_2}") + + remaining_2_li = result_2.count("李淼") + remaining_2_company = result_2.count("北京丰复久信营销科技有限公司") + + if remaining_2_li == 0 and remaining_2_company == 0: + print("✅ PASS: All entities masked correctly") + else: + print(f"❌ FAIL: Remaining '李淼': {remaining_2_li}, '北京丰复久信营销科技有限公司': {remaining_2_company}") + + # Test case 3: Mixed spacing scenarios + print("\nTest Case 3: Mixed spacing scenarios") + test_document_3 = "上诉人李 淼因合同纠纷,法定代表人李淼,委托代理人李 淼。" + entity_mapping_3 = {"李 淼": "李M", "李淼": "李M"} + + print(f"Original: {test_document_3}") + result_3 = apply_entity_masking_with_alignment_fixed(test_document_3, entity_mapping_3) + print(f"Result: {result_3}") + + remaining_3 = result_3.count("李淼") + result_3.count("李 淼") + + if remaining_3 == 0: + print("✅ PASS: Mixed spacing handled correctly") + else: + print(f"❌ FAIL: Remaining occurrences: {remaining_3}") + + # Test case 4: Complex document with real examples + print("\nTest Case 4: Complex document with real examples") + test_document_4 = """上诉人(原审原告):北京丰复久信营销科技有限公司,住所地北京市海淀区北小马厂6号1号楼华天大厦1306室。 +法定代表人:郭东军,执行董事、经理。 +委托诉讼代理人:周大海,北京市康达律师事务所律师。 +委托诉讼代理人:王乃哲,北京市康达律师事务所律师。 +被上诉人(原审被告):中研智创区块链技术有限公司,住所地天津市津南区双港镇工业园区优谷产业园5号楼-1505。 +法定代表人:王欢子,总经理。 +委托诉讼代理人:魏鑫,北京市昊衡律师事务所律师。""" + + entity_mapping_4 = { + "北京丰复久信营销科技有限公司": "北京JO营销科技有限公司", + "郭东军": "郭DJ", + "周大海": "周DH", + "王乃哲": "王NZ", + "中研智创区块链技术有限公司": "中研智创区块链技术有限公司", + "王欢子": "王HZ", + "魏鑫": "魏X", + "北京市康达律师事务所": "北京市KD律师事务所", + "北京市昊衡律师事务所": "北京市HH律师事务所" + } + + print(f"Original length: {len(test_document_4)} characters") + result_4 = apply_entity_masking_with_alignment_fixed(test_document_4, entity_mapping_4) + print(f"Result length: {len(result_4)} characters") + + # Check that all entities were masked + unmasked_entities = [] + for entity in entity_mapping_4.keys(): + if entity in result_4: + unmasked_entities.append(entity) + + if not unmasked_entities: + print("✅ PASS: All entities masked in complex document") + else: + print(f"❌ FAIL: Unmasked entities: {unmasked_entities}") + + print("\n" + "=" * 60) + print("Fix Verification Completed!") + +if __name__ == "__main__": + test_fix_verification() diff --git a/backend/tests/test_multiple_occurrences.py b/backend/tests/test_multiple_occurrences.py new file mode 100644 index 0000000..0aa4e8e --- /dev/null +++ b/backend/tests/test_multiple_occurrences.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +""" +Test to verify the multiple occurrence issue in apply_entity_masking_with_alignment. +""" + +def find_entity_alignment(entity_text: str, original_document_text: str): + """Simplified version of the alignment method for testing""" + clean_entity = entity_text.replace(" ", "") + doc_chars = [c for c in original_document_text if c != ' '] + + for i in range(len(doc_chars) - len(clean_entity) + 1): + if doc_chars[i:i+len(clean_entity)] == list(clean_entity): + return map_char_positions_to_original(i, len(clean_entity), original_document_text) + return None + +def map_char_positions_to_original(clean_start: int, entity_length: int, original_text: str): + """Simplified version of position mapping for testing""" + original_pos = 0 + clean_pos = 0 + + while clean_pos < clean_start and original_pos < len(original_text): + if original_text[original_pos] != ' ': + clean_pos += 1 + original_pos += 1 + + start_pos = original_pos + + chars_found = 0 + while chars_found < entity_length and original_pos < len(original_text): + if original_text[original_pos] != ' ': + chars_found += 1 + original_pos += 1 + + end_pos = original_pos + found_text = original_text[start_pos:end_pos] + + return start_pos, end_pos, found_text + +def apply_entity_masking_with_alignment_current(original_document_text: str, entity_mapping: dict): + """Current implementation with the bug""" + masked_document = original_document_text + sorted_entities = sorted(entity_mapping.keys(), key=len, reverse=True) + + for entity_text in sorted_entities: + masked_text = entity_mapping[entity_text] + + # Find the entity in the original document using alignment + alignment_result = find_entity_alignment(entity_text, masked_document) + + if alignment_result: + start_pos, end_pos, found_text = alignment_result + + # Replace the found text with the masked version + masked_document = ( + masked_document[:start_pos] + + masked_text + + masked_document[end_pos:] + ) + + print(f"Masked entity '{entity_text}' -> '{masked_text}' at positions {start_pos}-{end_pos}") + else: + print(f"Could not find entity '{entity_text}' in document for masking") + + return masked_document + +def test_multiple_occurrences(): + """Test the multiple occurrence issue""" + + print("Testing Multiple Occurrence Issue") + print("=" * 50) + + # Test document with multiple occurrences of the same entity + test_document = "上诉人李淼因合同纠纷,法定代表人李淼,委托代理人李淼。" + entity_mapping = { + "李淼": "李M" + } + + print(f"Original document: {test_document}") + print(f"Entity mapping: {entity_mapping}") + print(f"Expected: All 3 occurrences of '李淼' should be masked") + + # Test current implementation + result = apply_entity_masking_with_alignment_current(test_document, entity_mapping) + print(f"Current result: {result}") + + # Count remaining occurrences + remaining_count = result.count("李淼") + print(f"Remaining '李淼' occurrences: {remaining_count}") + + if remaining_count > 0: + print("❌ ISSUE CONFIRMED: Multiple occurrences are not being masked!") + else: + print("✅ No issue found (unexpected)") + +if __name__ == "__main__": + test_multiple_occurrences()