feat: 使用NER模型进行识别

This commit is contained in:
tigermren 2025-08-19 01:36:08 +08:00
parent 2075218955
commit d446ac1854
10 changed files with 612 additions and 17 deletions

View File

@ -4,9 +4,14 @@ TARGET_DIRECTORY_PATH=/Users/tigeren/Dev/digisky/legal-doc-masker/data/doc_dest
INTERMEDIATE_DIR_PATH=/Users/tigeren/Dev/digisky/legal-doc-masker/data/doc_intermediate
# Ollama API Configuration
OLLAMA_API_URL=http://192.168.2.245:11434
# 3060 GPU
# OLLAMA_API_URL=http://192.168.2.245:11434
# Mac Mini M4
OLLAMA_API_URL=http://192.168.2.224:11434
# OLLAMA_API_KEY=your_api_key_here
OLLAMA_MODEL=qwen3:8b
# OLLAMA_MODEL=qwen3:8b
OLLAMA_MODEL=phi4:14b
# Application Settings
MONITOR_INTERVAL=5

View File

@ -7,20 +7,31 @@ RUN apt-get update && apt-get install -y \
build-essential \
libreoffice \
wget \
git \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements first to leverage Docker cache
COPY requirements.txt .
# RUN pip install huggingface_hub
# RUN wget https://github.com/opendatalab/MinerU/raw/master/scripts/download_models_hf.py -O download_models_hf.py
# RUN wget https://raw.githubusercontent.com/opendatalab/MinerU/refs/heads/release-1.3.1/scripts/download_models_hf.py -O download_models_hf.py
# RUN python download_models_hf.py
# Upgrade pip and install core dependencies
RUN pip install --upgrade pip setuptools wheel
# Install PyTorch CPU version first (for better caching and smaller size)
RUN pip install --no-cache-dir torch==2.7.0 -f https://download.pytorch.org/whl/torch_stable.html
# Install the rest of the requirements
RUN pip install --no-cache-dir -r requirements.txt
# RUN pip install -U magic-pdf[full]
# Pre-download NER model during build (larger image but faster startup)
# RUN python -c "
# from transformers import AutoTokenizer, AutoModelForTokenClassification
# model_name = 'uer/roberta-base-finetuned-cluener2020-chinese'
# print('Downloading NER model...')
# AutoTokenizer.from_pretrained(model_name)
# AutoModelForTokenClassification.from_pretrained(model_name)
# print('NER model downloaded successfully')
# "
# Copy the rest of the application

View File

@ -42,6 +42,10 @@ class Settings(BaseSettings):
MINERU_FORMULA_ENABLE: bool = True # Enable formula parsing
MINERU_TABLE_ENABLE: bool = True # Enable table parsing
# MagicDoc API settings
# MAGICDOC_API_URL: str = "http://magicdoc-api:8000"
# MAGICDOC_TIMEOUT: int = 300 # 5 minutes timeout
# Logging settings
LOG_LEVEL: str = "INFO"
LOG_FORMAT: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

View File

@ -3,15 +3,13 @@ Extractors package for entity component extraction.
"""
from .base_extractor import BaseExtractor
from .llm_extractor import LLMExtractor
from .regex_extractor import RegexExtractor
from .business_name_extractor import BusinessNameExtractor
from .address_extractor import AddressExtractor
from .ner_extractor import NERExtractor
__all__ = [
'BaseExtractor',
'LLMExtractor',
'RegexExtractor',
'BusinessNameExtractor',
'AddressExtractor'
'AddressExtractor',
'NERExtractor'
]

View File

@ -0,0 +1,278 @@
import json
import logging
from typing import Dict, List, Any, Optional
from transformers import pipeline, AutoTokenizer, AutoModelForTokenClassification
from .base_extractor import BaseExtractor
logger = logging.getLogger(__name__)
class NERExtractor(BaseExtractor):
"""
Named Entity Recognition extractor using Chinese NER model.
Uses the uer/roberta-base-finetuned-cluener2020-chinese model for Chinese NER.
"""
def __init__(self):
super().__init__()
self.model_checkpoint = "uer/roberta-base-finetuned-cluener2020-chinese"
self.tokenizer = None
self.model = None
self.ner_pipeline = None
self._model_initialized = False
# Map CLUENER model labels to our desired categories
self.label_map = {
'company': '公司名称',
'organization': '组织机构名',
'name': '人名',
'address': '地址'
}
# Don't initialize the model here - use lazy loading
def _initialize_model(self):
"""Initialize the NER model and pipeline"""
try:
logger.info(f"Loading NER model: {self.model_checkpoint}")
# Load the tokenizer and model
self.tokenizer = AutoTokenizer.from_pretrained(self.model_checkpoint)
self.model = AutoModelForTokenClassification.from_pretrained(self.model_checkpoint)
# Create the NER pipeline with proper configuration
self.ner_pipeline = pipeline(
"ner",
model=self.model,
tokenizer=self.tokenizer,
aggregation_strategy="simple"
)
# Configure the tokenizer to handle max length
if hasattr(self.tokenizer, 'model_max_length'):
self.tokenizer.model_max_length = 512
self._model_initialized = True
logger.info("NER model loaded successfully")
except Exception as e:
logger.error(f"Failed to load NER model: {str(e)}")
raise Exception(f"NER model initialization failed: {str(e)}")
def extract(self, text: str) -> Dict[str, Any]:
"""
Extract named entities from the given text
Args:
text: The text to analyze
Returns:
Dictionary containing extracted entities in the format expected by the system
"""
try:
if not text or not text.strip():
logger.warning("Empty text provided for NER processing")
return {"entities": []}
# Initialize model if not already done
if not self._model_initialized:
self._initialize_model()
logger.info(f"Processing text with NER (length: {len(text)} characters)")
# Check if text needs chunking
if len(text) > 400: # Character-based threshold for chunking
logger.info("Text is long, using chunking approach")
return self._extract_with_chunking(text)
else:
logger.info("Text is short, processing directly")
return self._extract_single(text)
except Exception as e:
logger.error(f"Error during NER processing: {str(e)}")
raise Exception(f"NER processing failed: {str(e)}")
def _extract_single(self, text: str) -> Dict[str, Any]:
"""
Extract entities from a single text chunk
Args:
text: The text to analyze
Returns:
Dictionary containing extracted entities
"""
try:
# Run the NER pipeline - it handles truncation automatically
results = self.ner_pipeline(text)
# Filter and process entities
filtered_entities = []
for entity in results:
entity_group = entity['entity_group']
# Only process entities that we care about
if entity_group in self.label_map:
entity_type = self.label_map[entity_group]
entity_text = entity['word']
confidence_score = entity['score']
# Add to our list
filtered_entities.append({
"text": entity_text,
"type": entity_type,
"confidence": confidence_score
})
return {
"entities": filtered_entities,
"total_count": len(filtered_entities)
}
except Exception as e:
logger.error(f"Error during single NER processing: {str(e)}")
raise Exception(f"Single NER processing failed: {str(e)}")
def _extract_with_chunking(self, text: str) -> Dict[str, Any]:
"""
Extract entities from long text using chunking approach
Args:
text: The text to analyze
Returns:
Dictionary containing extracted entities
"""
try:
# Estimate token count to determine safe chunk size
estimated_tokens = len(text) * 1.5 # Conservative estimate for Chinese text
logger.info(f"Estimated tokens: {estimated_tokens:.0f}")
# Calculate safe chunk size to stay under 512 tokens
# Target ~400 tokens per chunk to leave buffer
target_chunk_tokens = 400
chunk_size = int(target_chunk_tokens / 1.5) # Convert back to characters
overlap = max(50, chunk_size // 8) # 12.5% overlap, minimum 50 chars
logger.info(f"Using chunk_size: {chunk_size} chars, overlap: {overlap} chars")
all_entities = []
# Process text in overlapping character chunks
for i in range(0, len(text), chunk_size - overlap):
chunk_text = text[i:i + chunk_size]
# Verify chunk won't exceed token limit
chunk_tokens = len(self.tokenizer.tokenize(chunk_text))
logger.info(f"Processing chunk {i//(chunk_size-overlap)+1}: {len(chunk_text)} chars, {chunk_tokens} tokens")
if chunk_tokens > 512:
logger.warning(f"Chunk {i//(chunk_size-overlap)+1} has {chunk_tokens} tokens, truncating")
# Truncate the chunk to fit within token limit
chunk_text = self.tokenizer.convert_tokens_to_string(
self.tokenizer.tokenize(chunk_text)[:512]
)
# Extract entities from this chunk
chunk_result = self._extract_single(chunk_text)
chunk_entities = chunk_result.get("entities", [])
all_entities.extend(chunk_entities)
logger.info(f"Chunk {i//(chunk_size-overlap)+1} extracted {len(chunk_entities)} entities")
# Remove duplicates while preserving order
unique_entities = []
seen_texts = set()
for entity in all_entities:
text = entity['text'].strip()
if text and text not in seen_texts:
seen_texts.add(text)
unique_entities.append(entity)
logger.info(f"Chunking completed: {len(all_entities)} total entities, {len(unique_entities)} unique entities")
return {
"entities": unique_entities,
"total_count": len(unique_entities)
}
except Exception as e:
logger.error(f"Error during chunked NER processing: {str(e)}")
raise Exception(f"Chunked NER processing failed: {str(e)}")
def get_entity_summary(self, entities: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Generate a summary of extracted entities by type
Args:
entities: List of extracted entities
Returns:
Summary dictionary with counts by entity type
"""
summary = {}
for entity in entities:
entity_type = entity['type']
if entity_type not in summary:
summary[entity_type] = []
summary[entity_type].append(entity['text'])
# Convert to count format
summary_counts = {entity_type: len(texts) for entity_type, texts in summary.items()}
return {
"summary": summary,
"counts": summary_counts,
"total_entities": len(entities)
}
def extract_and_summarize(self, text: str) -> Dict[str, Any]:
"""
Extract entities and provide a summary in one call
Args:
text: The text to analyze
Returns:
Dictionary containing entities and summary
"""
entities_result = self.extract(text)
entities = entities_result.get("entities", [])
summary_result = self.get_entity_summary(entities)
return {
"entities": entities,
"summary": summary_result,
"total_count": len(entities)
}
def get_confidence(self) -> float:
"""
Return confidence level of extraction
Returns:
Confidence level as a float between 0.0 and 1.0
"""
# NER models typically have high confidence for well-trained entities
# This is a reasonable default confidence level for NER extraction
return 0.85
def get_model_info(self) -> Dict[str, Any]:
"""
Get information about the NER model
Returns:
Dictionary containing model information
"""
return {
"model_name": self.model_checkpoint,
"model_type": "Chinese NER",
"supported_entities": [
"人名 (Person Names)",
"公司名称 (Company Names)",
"组织机构名 (Organization Names)",
"地址 (Addresses)"
],
"description": "Fine-tuned RoBERTa model for Chinese Named Entity Recognition on CLUENER2020 dataset"
}

View File

@ -1,4 +1,4 @@
from typing import Any, Dict
from typing import Any, Dict, List
from ..prompts.masking_prompts import get_ner_name_prompt, get_ner_company_prompt, get_ner_address_prompt, get_ner_project_prompt, get_ner_case_number_prompt, get_entity_linkage_prompt
import logging
import json
@ -8,6 +8,7 @@ from ..utils.json_extractor import LLMJsonExtractor
from ..utils.llm_validator import LLMResponseValidator
import re
from .regs.entity_regex import extract_id_number_entities, extract_social_credit_code_entities
from .extractors.ner_extractor import NERExtractor
from pypinyin import pinyin, Style
logger = logging.getLogger(__name__)
@ -16,9 +17,31 @@ class NerProcessor:
def __init__(self):
self.ollama_client = OllamaClient(model_name=settings.OLLAMA_MODEL, base_url=settings.OLLAMA_API_URL)
self.max_retries = 3
# Initialize NER extractor for ML-based entity extraction
self.ner_extractor = NERExtractor()
def _validate_mapping_format(self, mapping: Dict[str, Any]) -> bool:
return LLMResponseValidator.validate_entity_extraction(mapping)
def extract_entities_with_ner(self, text: str) -> List[Dict[str, Any]]:
"""
Extract entities using the NER model
Args:
text: The text to analyze
Returns:
List of extracted entities
"""
try:
logger.info("Extracting entities using NER model")
result = self.ner_extractor.extract(text)
entities = result.get("entities", [])
logger.info(f"NER model extracted {len(entities)} entities")
return entities
except Exception as e:
logger.error(f"Error extracting entities with NER: {str(e)}")
return []
def _mask_chinese_name(self, name: str, surname_counter: Dict[str, Dict[str, int]]) -> str:
"""
@ -484,6 +507,15 @@ class NerProcessor:
def build_mapping(self, chunk: str) -> list[Dict[str, str]]:
mapping_pipeline = []
# First, try NER-based extraction
ner_entities = self.extract_entities_with_ner(chunk)
if ner_entities:
# Convert NER entities to the expected format
ner_mapping = {"entities": ner_entities}
mapping_pipeline.append(ner_mapping)
logger.info(f"Added {len(ner_entities)} entities from NER model")
# Then, use LLM-based extraction for additional entities
entity_configs = [
(get_ner_name_prompt, "people names"),
(get_ner_company_prompt, "company names"),
@ -508,6 +540,79 @@ class NerProcessor:
logger.warning(f"Invalid regex entity mapping format: {mapping}")
return mapping_pipeline
def build_mapping_llm_only(self, chunk: str) -> list[Dict[str, str]]:
"""
Build mapping using only LLM (no NER)
Args:
chunk: Text chunk to process
Returns:
List of entity mappings
"""
mapping_pipeline = []
# Use LLM-based extraction for entities
entity_configs = [
(get_ner_name_prompt, "people names"),
(get_ner_company_prompt, "company names"),
(get_ner_address_prompt, "addresses"),
(get_ner_project_prompt, "project names"),
(get_ner_case_number_prompt, "case numbers")
]
for prompt_func, entity_type in entity_configs:
mapping = self._process_entity_type(chunk, prompt_func, entity_type)
if mapping:
mapping_pipeline.append(mapping)
# Include regex-based extraction for IDs and codes
regex_entity_extractors = [
extract_id_number_entities,
extract_social_credit_code_entities
]
for extractor in regex_entity_extractors:
mapping = extractor(chunk)
if mapping and LLMResponseValidator.validate_regex_entity(mapping):
mapping_pipeline.append(mapping)
elif mapping:
logger.warning(f"Invalid regex entity mapping format: {mapping}")
return mapping_pipeline
def build_mapping_ner_only(self, chunk: str) -> list[Dict[str, str]]:
"""
Build mapping using only NER model (no LLM)
Args:
chunk: Text chunk to process
Returns:
List of entity mappings
"""
mapping_pipeline = []
# Extract entities using NER model only
ner_entities = self.extract_entities_with_ner(chunk)
if ner_entities:
# Convert NER entities to the expected format
ner_mapping = {"entities": ner_entities}
mapping_pipeline.append(ner_mapping)
logger.info(f"NER-only extraction: Added {len(ner_entities)} entities")
# Still include regex-based extraction for IDs and codes
regex_entity_extractors = [
extract_id_number_entities,
extract_social_credit_code_entities
]
for extractor in regex_entity_extractors:
mapping = extractor(chunk)
if mapping and LLMResponseValidator.validate_regex_entity(mapping):
mapping_pipeline.append(mapping)
elif mapping:
logger.warning(f"Invalid regex entity mapping format: {mapping}")
return mapping_pipeline
def _merge_entity_mappings(self, chunk_mappings: list[Dict[str, Any]]) -> list[Dict[str, str]]:
all_entities = []
@ -709,13 +814,29 @@ class NerProcessor:
return entity_mapping
def process(self, chunks: list[str]) -> Dict[str, str]:
# Merge all chunks into a single text for NER processing
merged_text = " ".join(chunks)
logger.info(f"Merged {len(chunks)} chunks into single text (length: {len(merged_text)} characters)")
# Extract entities using NER on the merged text (NER handles chunking internally)
ner_entities = self.extract_entities_with_ner(merged_text)
logger.info(f"NER extracted {len(ner_entities)} entities from merged text")
logger.info(f"NER entities: {ner_entities}")
# Process each chunk with LLM for additional entities
chunk_mappings = []
for i, chunk in enumerate(chunks):
logger.info(f"Processing chunk {i+1}/{len(chunks)}")
chunk_mapping = self.build_mapping(chunk)
logger.info(f"Processing chunk {i+1}/{len(chunks)} with LLM")
chunk_mapping = self.build_mapping_llm_only(chunk) # LLM-only processing
logger.info(f"Chunk mapping: {chunk_mapping}")
chunk_mappings.extend(chunk_mapping)
# Add NER entities to the mappings
if ner_entities:
ner_mapping = {"entities": ner_entities}
chunk_mappings.append(ner_mapping)
logger.info(f"Added {len(ner_entities)} NER entities to mappings")
logger.info(f"Final chunk mappings: {chunk_mappings}")
unique_entities = self._merge_entity_mappings(chunk_mappings)
@ -734,3 +855,37 @@ class NerProcessor:
logger.info(f"Final mapping: {final_mapping}")
return final_mapping
def process_ner_only(self, chunks: list[str]) -> Dict[str, str]:
"""
Process documents using only NER model (no LLM)
Args:
chunks: List of text chunks to process
Returns:
Mapping dictionary from original text to masked text
"""
chunk_mappings = []
for i, chunk in enumerate(chunks):
logger.info(f"Processing chunk {i+1}/{len(chunks)} with NER only")
chunk_mapping = self.build_mapping_ner_only(chunk)
logger.info(f"Chunk mapping: {chunk_mapping}")
chunk_mappings.extend(chunk_mapping)
logger.info(f"Final chunk mappings: {chunk_mappings}")
unique_entities = self._merge_entity_mappings(chunk_mappings)
logger.info(f"Unique entities: {unique_entities}")
# For NER-only processing, we can skip entity linkage since NER provides direct entity types
entity_linkage = {"entity_groups": []} # Empty linkage for NER-only mode
logger.info(f"Entity linkage: {entity_linkage}")
combined_mapping = self._generate_masked_mapping(unique_entities, entity_linkage)
logger.info(f"Combined mapping: {combined_mapping}")
final_mapping = self._apply_entity_linkage_to_mapping(combined_mapping, entity_linkage)
logger.info(f"Final mapping: {final_mapping}")
return final_mapping

View File

@ -189,6 +189,8 @@ class DocxDocumentProcessor(DocumentProcessor):
# Extract markdown content from the response
markdown_content = self._extract_markdown_from_response(magicdoc_response)
logger.info(f"MagicDoc API response: {markdown_content}")
if not markdown_content:
raise Exception("No markdown content found in MagicDoc API response for DOCX")

View File

@ -32,4 +32,9 @@ pandas>=2.0.0
jsonschema>=4.20.0
# Chinese text processing
pypinyin>=0.50.0
pypinyin>=0.50.0
# NER and ML dependencies
# torch is installed separately in Dockerfile for CPU optimization
transformers>=4.30.0
tokenizers>=0.13.0

View File

@ -0,0 +1,134 @@
#!/usr/bin/env python3
"""
Test script for NER extractor integration
"""
import sys
import os
import logging
# Add the backend directory to the Python path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'backend'))
from app.core.document_handlers.extractors.ner_extractor import NERExtractor
from app.core.document_handlers.ner_processor import NerProcessor
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def test_ner_extractor():
"""Test the NER extractor directly"""
print("🧪 Testing NER Extractor")
print("=" * 50)
# Sample legal text
text_to_analyze = """
上诉人原审原告北京丰复久信营销科技有限公司住所地北京市海淀区北小马厂6号1号楼华天大厦1306室
法定代表人郭东军执行董事经理
委托诉讼代理人周大海北京市康达律师事务所律师
被上诉人原审被告中研智创区块链技术有限公司住所地天津市津南区双港镇工业园区优谷产业园5号楼-1505
法定代表人王欢子总经理
"""
try:
# Test NER extractor
print("1. Testing NER Extractor...")
ner_extractor = NERExtractor()
# Get model info
model_info = ner_extractor.get_model_info()
print(f" Model: {model_info['model_name']}")
print(f" Supported entities: {model_info['supported_entities']}")
# Extract entities
result = ner_extractor.extract_and_summarize(text_to_analyze)
print(f"\n2. Extraction Results:")
print(f" Total entities found: {result['total_count']}")
for entity in result['entities']:
print(f" - '{entity['text']}' ({entity['type']}) - Confidence: {entity['confidence']:.4f}")
print(f"\n3. Summary:")
for entity_type, texts in result['summary']['summary'].items():
print(f" {entity_type}: {len(texts)} entities")
for text in texts:
print(f" - {text}")
return True
except Exception as e:
print(f"❌ NER Extractor test failed: {str(e)}")
return False
def test_ner_processor():
"""Test the NER processor integration"""
print("\n🧪 Testing NER Processor Integration")
print("=" * 50)
# Sample legal text
text_to_analyze = """
上诉人原审原告北京丰复久信营销科技有限公司住所地北京市海淀区北小马厂6号1号楼华天大厦1306室
法定代表人郭东军执行董事经理
委托诉讼代理人周大海北京市康达律师事务所律师
被上诉人原审被告中研智创区块链技术有限公司住所地天津市津南区双港镇工业园区优谷产业园5号楼-1505
法定代表人王欢子总经理
"""
try:
# Test NER processor
print("1. Testing NER Processor...")
ner_processor = NerProcessor()
# Test NER-only extraction
print("2. Testing NER-only entity extraction...")
ner_entities = ner_processor.extract_entities_with_ner(text_to_analyze)
print(f" Extracted {len(ner_entities)} entities with NER model")
for entity in ner_entities:
print(f" - '{entity['text']}' ({entity['type']}) - Confidence: {entity['confidence']:.4f}")
# Test NER-only processing
print("\n3. Testing NER-only document processing...")
chunks = [text_to_analyze] # Single chunk for testing
mapping = ner_processor.process_ner_only(chunks)
print(f" Generated {len(mapping)} masking mappings")
for original, masked in mapping.items():
print(f" '{original}' -> '{masked}'")
return True
except Exception as e:
print(f"❌ NER Processor test failed: {str(e)}")
return False
def main():
"""Main test function"""
print("🧪 NER Integration Test Suite")
print("=" * 60)
# Test 1: NER Extractor
extractor_success = test_ner_extractor()
# Test 2: NER Processor Integration
processor_success = test_ner_processor()
# Summary
print("\n" + "=" * 60)
print("📊 Test Summary:")
print(f" NER Extractor: {'' if extractor_success else ''}")
print(f" NER Processor: {'' if processor_success else ''}")
if extractor_success and processor_success:
print("\n🎉 All tests passed! NER integration is working correctly.")
print("\nNext steps:")
print("1. The NER extractor is ready to use in the document processing pipeline")
print("2. You can use process_ner_only() for ML-based entity extraction")
print("3. The existing process() method now includes NER extraction")
else:
print("\n⚠️ Some tests failed. Please check the error messages above.")
if __name__ == "__main__":
main()

View File

@ -57,6 +57,7 @@ services:
- "8000:8000"
volumes:
- ./backend/storage:/app/storage
- huggingface_cache:/root/.cache/huggingface
env_file:
- ./backend/.env
environment:
@ -79,6 +80,7 @@ services:
command: celery -A app.services.file_service worker --loglevel=info
volumes:
- ./backend/storage:/app/storage
- huggingface_cache:/root/.cache/huggingface
env_file:
- ./backend/.env
environment:
@ -126,4 +128,5 @@ networks:
volumes:
uploads:
processed:
processed:
huggingface_cache: