refine: 解决了导入路径的问题

This commit is contained in:
oliviamn 2025-05-25 00:04:19 +08:00
parent 3e9c44e8c4
commit 900a614b09
31 changed files with 54 additions and 1115 deletions

View File

@ -5,11 +5,14 @@ WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
build-essential \
libreoffice \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements first to leverage Docker cache
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install -U magic-pdf[full]
# Copy the rest of the application
COPY . .

View File

@ -13,27 +13,42 @@ class Settings(BaseSettings):
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 8 # 8 days
# Database
DATABASE_URL: str = "sqlite:///./legal_doc_masker.db"
BASE_DIR: Path = Path(__file__).parent.parent.parent
DATABASE_URL: str = f"sqlite:///{BASE_DIR}/storage/legal_doc_masker.db"
# File Storage
BASE_DIR: Path = Path(__file__).parent.parent.parent
UPLOAD_FOLDER: Path = BASE_DIR / "storage" / "uploads"
PROCESSED_FOLDER: Path = BASE_DIR / "storage" / "processed"
MAX_FILE_SIZE: int = 50 * 1024 * 1024 # 50MB
ALLOWED_EXTENSIONS: set = {"pdf", "docx", "doc"}
# Celery
CELERY_BROKER_URL: str = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND: str = "redis://localhost:6379/0"
CELERY_BROKER_URL: str = "redis://redis:6379/0"
CELERY_RESULT_BACKEND: str = "redis://redis:6379/0"
# Ollama API settings
OLLAMA_API_URL: str = "https://api.ollama.com"
OLLAMA_API_KEY: str = ""
OLLAMA_MODEL: str = "llama2"
# Logging settings
LOG_LEVEL: str = "INFO"
LOG_FORMAT: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
LOG_DATE_FORMAT: str = "%Y-%m-%d %H:%M:%S"
LOG_FILE: str = "app.log"
class Config:
case_sensitive = True
env_file = ".env"
env_file_encoding = "utf-8"
extra = "allow"
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Create storage directories if they don't exist
self.UPLOAD_FOLDER.mkdir(parents=True, exist_ok=True)
self.PROCESSED_FOLDER.mkdir(parents=True, exist_ok=True)
# Create storage directory for database
(self.BASE_DIR / "storage").mkdir(parents=True, exist_ok=True)
settings = Settings()

View File

@ -1,5 +1,6 @@
import logging.config
from config.settings import settings
# from config.settings import settings
from .settings import settings
LOGGING_CONFIG = {
"version": 1,

View File

@ -1,31 +0,0 @@
# settings.py
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
# Storage paths
OBJECT_STORAGE_PATH: str = ""
TARGET_DIRECTORY_PATH: str = ""
# Ollama API settings
OLLAMA_API_URL: str = "https://api.ollama.com"
OLLAMA_API_KEY: str = ""
OLLAMA_MODEL: str = "llama2"
# File monitoring settings
MONITOR_INTERVAL: int = 5
# Logging settings
LOG_LEVEL: str = "INFO"
LOG_FORMAT: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
LOG_DATE_FORMAT: str = "%Y-%m-%d %H:%M:%S"
LOG_FILE: str = "app.log"
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
extra = "allow"
# Create settings instance
settings = Settings()

View File

@ -1,7 +1,7 @@
import os
from typing import Optional
from document_handlers.document_processor import DocumentProcessor
from document_handlers.processors import (
from .document_processor import DocumentProcessor
from .processors import (
TxtDocumentProcessor,
DocxDocumentProcessor,
PdfDocumentProcessor,

View File

@ -1,11 +1,13 @@
from abc import ABC, abstractmethod
from typing import Any, Dict
from prompts.masking_prompts import get_masking_mapping_prompt
from ..prompts.masking_prompts import get_masking_mapping_prompt
import logging
import json
from services.ollama_client import OllamaClient
from config.settings import settings
from utils.json_extractor import LLMJsonExtractor
from ..services.ollama_client import OllamaClient
from ...core.config import settings
from ..utils.json_extractor import LLMJsonExtractor
logger = logging.getLogger(__name__)

View File

@ -1,6 +1,6 @@
from document_handlers.processors.txt_processor import TxtDocumentProcessor
from document_handlers.processors.docx_processor import DocxDocumentProcessor
from document_handlers.processors.pdf_processor import PdfDocumentProcessor
from document_handlers.processors.md_processor import MarkdownDocumentProcessor
from .txt_processor import TxtDocumentProcessor
from .docx_processor import DocxDocumentProcessor
from .pdf_processor import PdfDocumentProcessor
from .md_processor import MarkdownDocumentProcessor
__all__ = ['TxtDocumentProcessor', 'DocxDocumentProcessor', 'PdfDocumentProcessor', 'MarkdownDocumentProcessor']

View File

@ -1,13 +1,13 @@
import os
import docx
from document_handlers.document_processor import DocumentProcessor
from ...document_handlers.document_processor import DocumentProcessor
from magic_pdf.data.data_reader_writer import FileBasedDataWriter
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
from magic_pdf.data.read_api import read_local_office
import logging
from services.ollama_client import OllamaClient
from config.settings import settings
from prompts.masking_prompts import get_masking_mapping_prompt
from ...services.ollama_client import OllamaClient
from ...config import settings
from ...prompts.masking_prompts import get_masking_mapping_prompt
logger = logging.getLogger(__name__)

View File

@ -1,8 +1,8 @@
import os
from document_handlers.document_processor import DocumentProcessor
from services.ollama_client import OllamaClient
from ...document_handlers.document_processor import DocumentProcessor
from ...services.ollama_client import OllamaClient
import logging
from config.settings import settings
from ...config import settings
logger = logging.getLogger(__name__)

View File

@ -1,14 +1,14 @@
import os
import PyPDF2
from document_handlers.document_processor import DocumentProcessor
from ...document_handlers.document_processor import DocumentProcessor
from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader
from magic_pdf.data.dataset import PymuDocDataset
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
from magic_pdf.config.enums import SupportedPdfParseMethod
from prompts.masking_prompts import get_masking_prompt, get_masking_mapping_prompt
from ...prompts.masking_prompts import get_masking_prompt, get_masking_mapping_prompt
import logging
from services.ollama_client import OllamaClient
from config.settings import settings
from ...services.ollama_client import OllamaClient
from ...config import settings
logger = logging.getLogger(__name__)

View File

@ -1,8 +1,8 @@
from document_handlers.document_processor import DocumentProcessor
from services.ollama_client import OllamaClient
from ...document_handlers.document_processor import DocumentProcessor
from ...services.ollama_client import OllamaClient
import logging
from prompts.masking_prompts import get_masking_prompt
from config.settings import settings
from ...prompts.masking_prompts import get_masking_prompt
from ...config import settings
logger = logging.getLogger(__name__)
class TxtDocumentProcessor(DocumentProcessor):

View File

@ -1,6 +1,6 @@
import logging
from document_handlers.document_factory import DocumentProcessorFactory
from services.ollama_client import OllamaClient
from ..document_handlers.document_factory import DocumentProcessorFactory
from ..services.ollama_client import OllamaClient
logger = logging.getLogger(__name__)

View File

@ -1,54 +0,0 @@
import logging
import os
from services.document_service import DocumentService
from services.ollama_client import OllamaClient
from config.settings import settings
logger = logging.getLogger(__name__)
class FileMonitor:
def __init__(self, input_directory: str, output_directory: str):
self.input_directory = input_directory
self.output_directory = output_directory
# Create OllamaClient instance using settings
ollama_client = OllamaClient(
model_name=settings.OLLAMA_MODEL,
base_url=settings.OLLAMA_API_URL
)
# Inject OllamaClient into DocumentService
self.document_service = DocumentService(ollama_client=ollama_client)
def process_new_file(self, file_path: str) -> None:
try:
# Get the filename without directory path
filename = os.path.basename(file_path)
# Create output path
output_path = os.path.join(self.output_directory, filename)
logger.info(f"Processing file: {filename}")
# Process the document using document service
self.document_service.process_document(file_path, output_path)
logger.info(f"File processed successfully: {filename}")
except Exception as e:
logger.error(f"Error processing file {file_path}: {str(e)}")
def start_monitoring(self):
import time
# Ensure output directory exists
os.makedirs(self.output_directory, exist_ok=True)
already_seen = set(os.listdir(self.input_directory))
while True:
time.sleep(1) # Check every second
current_files = set(os.listdir(self.input_directory))
new_files = current_files - already_seen
for new_file in new_files:
file_path = os.path.join(self.input_directory, new_file)
logger.info(f"New file found: {new_file}")
self.process_new_file(file_path)
already_seen = current_files

View File

@ -5,12 +5,9 @@ from sqlalchemy.orm import Session
from ..core.database import SessionLocal
import sys
import os
from core.services.document_service import DocumentService
from ..core.services.document_service import DocumentService
from pathlib import Path
# Add the parent directory to Python path to import the masking system
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))))
from core.document_handlers.document_processor import DocumentProcessor
celery = Celery(
'file_service',

View File

@ -1,39 +0,0 @@
import logging.config
from config.settings import settings
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"standard": {
"format": settings.LOG_FORMAT,
"datefmt": settings.LOG_DATE_FORMAT
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "standard",
"level": settings.LOG_LEVEL,
"stream": "ext://sys.stdout"
},
"file": {
"class": "logging.FileHandler",
"formatter": "standard",
"level": settings.LOG_LEVEL,
"filename": settings.LOG_FILE,
"mode": "a",
}
},
"loggers": {
"": { # root logger
"handlers": ["console", "file"],
"level": settings.LOG_LEVEL,
"propagate": True
}
}
}
def setup_logging():
"""Initialize logging configuration"""
logging.config.dictConfig(LOGGING_CONFIG)

View File

@ -1,31 +0,0 @@
# settings.py
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
# Storage paths
OBJECT_STORAGE_PATH: str = ""
TARGET_DIRECTORY_PATH: str = ""
# Ollama API settings
OLLAMA_API_URL: str = "https://api.ollama.com"
OLLAMA_API_KEY: str = ""
OLLAMA_MODEL: str = "llama2"
# File monitoring settings
MONITOR_INTERVAL: int = 5
# Logging settings
LOG_LEVEL: str = "INFO"
LOG_FORMAT: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
LOG_DATE_FORMAT: str = "%Y-%m-%d %H:%M:%S"
LOG_FILE: str = "app.log"
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
extra = "allow"
# Create settings instance
settings = Settings()

View File

@ -1,12 +0,0 @@
class Document:
def __init__(self, file_path):
self.file_path = file_path
self.content = ""
def load(self):
with open(self.file_path, 'r') as file:
self.content = file.read()
def save(self, target_path):
with open(target_path, 'w') as file:
file.write(self.content)

View File

@ -1,28 +0,0 @@
import os
from typing import Optional
from document_handlers.document_processor import DocumentProcessor
from document_handlers.processors import (
TxtDocumentProcessor,
DocxDocumentProcessor,
PdfDocumentProcessor,
MarkdownDocumentProcessor
)
class DocumentProcessorFactory:
@staticmethod
def create_processor(input_path: str, output_path: str) -> Optional[DocumentProcessor]:
file_extension = os.path.splitext(input_path)[1].lower()
processors = {
'.txt': TxtDocumentProcessor,
'.docx': DocxDocumentProcessor,
'.doc': DocxDocumentProcessor,
'.pdf': PdfDocumentProcessor,
'.md': MarkdownDocumentProcessor,
'.markdown': MarkdownDocumentProcessor
}
processor_class = processors.get(file_extension)
if processor_class:
return processor_class(input_path, output_path)
return None

View File

@ -1,190 +0,0 @@
from abc import ABC, abstractmethod
from typing import Any, Dict
from prompts.masking_prompts import get_masking_mapping_prompt
import logging
import json
from services.ollama_client import OllamaClient
from config.settings import settings
from utils.json_extractor import LLMJsonExtractor
logger = logging.getLogger(__name__)
class DocumentProcessor(ABC):
def __init__(self):
self.ollama_client = OllamaClient(model_name=settings.OLLAMA_MODEL, base_url=settings.OLLAMA_API_URL)
self.max_chunk_size = 1000 # Maximum number of characters per chunk
self.max_retries = 3 # Maximum number of retries for mapping generation
@abstractmethod
def read_content(self) -> str:
"""Read document content"""
pass
def _split_into_chunks(self, sentences: list[str]) -> list[str]:
"""Split sentences into chunks that don't exceed max_chunk_size"""
chunks = []
current_chunk = ""
for sentence in sentences:
if not sentence.strip():
continue
# If adding this sentence would exceed the limit, save current chunk and start new one
if len(current_chunk) + len(sentence) > self.max_chunk_size and current_chunk:
chunks.append(current_chunk)
current_chunk = sentence
else:
if current_chunk:
current_chunk += "" + sentence
else:
current_chunk = sentence
# Add the last chunk if it's not empty
if current_chunk:
chunks.append(current_chunk)
return chunks
def _validate_mapping_format(self, mapping: Dict[str, Any]) -> bool:
"""
Validate that the mapping follows the required format:
{
"原文1": "脱敏后1",
"原文2": "脱敏后2",
...
}
"""
if not isinstance(mapping, dict):
logger.warning("Mapping is not a dictionary")
return False
# Check if any key or value is not a string
for key, value in mapping.items():
if not isinstance(key, str) or not isinstance(value, str):
logger.warning(f"Invalid mapping format - key or value is not a string: {key}: {value}")
return False
# Check if the mapping has any nested structures
if any(isinstance(v, (dict, list)) for v in mapping.values()):
logger.warning("Invalid mapping format - contains nested structures")
return False
return True
def _build_mapping(self, chunk: str) -> Dict[str, str]:
"""Build mapping for a single chunk of text with retry logic"""
for attempt in range(self.max_retries):
try:
formatted_prompt = get_masking_mapping_prompt(chunk)
logger.info(f"Calling ollama to generate mapping for chunk (attempt {attempt + 1}/{self.max_retries}): {formatted_prompt}")
response = self.ollama_client.generate(formatted_prompt)
logger.info(f"Raw response from LLM: {response}")
# Parse the JSON response into a dictionary
mapping = LLMJsonExtractor.parse_raw_json_str(response)
logger.info(f"Parsed mapping: {mapping}")
if mapping and self._validate_mapping_format(mapping):
return mapping
else:
logger.warning(f"Invalid mapping format received on attempt {attempt + 1}, retrying...")
except Exception as e:
logger.error(f"Error generating mapping on attempt {attempt + 1}: {e}")
if attempt < self.max_retries - 1:
logger.info("Retrying...")
else:
logger.error("Max retries reached, returning empty mapping")
return {}
def _apply_mapping(self, text: str, mapping: Dict[str, str]) -> str:
"""Apply the mapping to replace sensitive information"""
masked_text = text
for original, masked in mapping.items():
# Ensure masked value is a string
if isinstance(masked, dict):
# If it's a dict, use the first value or a default
masked = next(iter(masked.values()), "")
elif not isinstance(masked, str):
# If it's not a string, convert to string or use default
masked = str(masked) if masked is not None else ""
masked_text = masked_text.replace(original, masked)
return masked_text
def _get_next_suffix(self, value: str) -> str:
"""Get the next available suffix for a value that already has a suffix"""
# Define the sequence of suffixes
suffixes = ['', '', '', '', '', '', '', '', '', '']
# Check if the value already has a suffix
for suffix in suffixes:
if value.endswith(suffix):
# Find the next suffix in the sequence
current_index = suffixes.index(suffix)
if current_index + 1 < len(suffixes):
return value[:-1] + suffixes[current_index + 1]
else:
# If we've used all suffixes, start over with the first one
return value[:-1] + suffixes[0]
# If no suffix found, return the value with the first suffix
return value + ''
def _merge_mappings(self, existing: Dict[str, str], new: Dict[str, str]) -> Dict[str, str]:
"""
Merge two mappings following the rules:
1. If key exists in existing, keep existing value
2. If value exists in existing:
- If value ends with a suffix (甲乙丙丁...), add next suffix
- If no suffix, add ''
"""
result = existing.copy()
# Get all existing values
existing_values = set(result.values())
for key, value in new.items():
if key in result:
# Rule 1: Keep existing value if key exists
continue
if value in existing_values:
# Rule 2: Handle duplicate values
new_value = self._get_next_suffix(value)
result[key] = new_value
existing_values.add(new_value)
else:
# No conflict, add as is
result[key] = value
existing_values.add(value)
return result
def process_content(self, content: str) -> str:
"""Process document content by masking sensitive information"""
# Split content into sentences
sentences = content.split("")
# Split sentences into manageable chunks
chunks = self._split_into_chunks(sentences)
logger.info(f"Split content into {len(chunks)} chunks")
# Build mapping for each chunk
combined_mapping = {}
for i, chunk in enumerate(chunks):
logger.info(f"Processing chunk {i+1}/{len(chunks)}")
chunk_mapping = self._build_mapping(chunk)
if chunk_mapping: # Only update if we got a valid mapping
combined_mapping = self._merge_mappings(combined_mapping, chunk_mapping)
else:
logger.warning(f"Failed to generate mapping for chunk {i+1}")
# Apply the combined mapping to the entire content
masked_content = self._apply_mapping(content, combined_mapping)
logger.info("Successfully masked content")
return masked_content
@abstractmethod
def save_content(self, content: str) -> None:
"""Save processed content"""
pass

View File

@ -1,6 +0,0 @@
from document_handlers.processors.txt_processor import TxtDocumentProcessor
from document_handlers.processors.docx_processor import DocxDocumentProcessor
from document_handlers.processors.pdf_processor import PdfDocumentProcessor
from document_handlers.processors.md_processor import MarkdownDocumentProcessor
__all__ = ['TxtDocumentProcessor', 'DocxDocumentProcessor', 'PdfDocumentProcessor', 'MarkdownDocumentProcessor']

View File

@ -1,77 +0,0 @@
import os
import docx
from document_handlers.document_processor import DocumentProcessor
from magic_pdf.data.data_reader_writer import FileBasedDataWriter
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
from magic_pdf.data.read_api import read_local_office
import logging
from services.ollama_client import OllamaClient
from config.settings import settings
from prompts.masking_prompts import get_masking_mapping_prompt
logger = logging.getLogger(__name__)
class DocxDocumentProcessor(DocumentProcessor):
def __init__(self, input_path: str, output_path: str):
super().__init__() # Call parent class's __init__
self.input_path = input_path
self.output_path = output_path
self.output_dir = os.path.dirname(output_path)
self.name_without_suff = os.path.splitext(os.path.basename(input_path))[0]
# Setup output directories
self.local_image_dir = os.path.join(self.output_dir, "images")
self.image_dir = os.path.basename(self.local_image_dir)
os.makedirs(self.local_image_dir, exist_ok=True)
self.ollama_client = OllamaClient(model_name=settings.OLLAMA_MODEL, base_url=settings.OLLAMA_API_URL)
def read_content(self) -> str:
try:
# Initialize writers
image_writer = FileBasedDataWriter(self.local_image_dir)
md_writer = FileBasedDataWriter(self.output_dir)
# Create Dataset Instance and process
ds = read_local_office(self.input_path)[0]
pipe_result = ds.apply(doc_analyze, ocr=True).pipe_txt_mode(image_writer)
# Generate markdown
md_content = pipe_result.get_markdown(self.image_dir)
pipe_result.dump_md(md_writer, f"{self.name_without_suff}.md", self.image_dir)
return md_content
except Exception as e:
logger.error(f"Error converting DOCX to MD: {e}")
raise
# def process_content(self, content: str) -> str:
# logger.info("Processing DOCX content")
# # Split content into sentences and apply masking
# sentences = content.split("。")
# final_md = ""
# for sentence in sentences:
# if sentence.strip(): # Only process non-empty sentences
# formatted_prompt = get_masking_mapping_prompt(sentence)
# logger.info("Calling ollama to generate response, prompt: %s", formatted_prompt)
# response = self.ollama_client.generate(formatted_prompt)
# logger.info(f"Response generated: {response}")
# final_md += response + "。"
# return final_md
def save_content(self, content: str) -> None:
# Ensure output path has .md extension
output_dir = os.path.dirname(self.output_path)
base_name = os.path.splitext(os.path.basename(self.output_path))[0]
md_output_path = os.path.join(output_dir, f"{base_name}.md")
logger.info(f"Saving masked content to: {md_output_path}")
try:
with open(md_output_path, 'w', encoding='utf-8') as file:
file.write(content)
logger.info(f"Successfully saved content to {md_output_path}")
except Exception as e:
logger.error(f"Error saving content: {e}")
raise

View File

@ -1,39 +0,0 @@
import os
from document_handlers.document_processor import DocumentProcessor
from services.ollama_client import OllamaClient
import logging
from config.settings import settings
logger = logging.getLogger(__name__)
class MarkdownDocumentProcessor(DocumentProcessor):
def __init__(self, input_path: str, output_path: str):
super().__init__() # Call parent class's __init__
self.input_path = input_path
self.output_path = output_path
self.ollama_client = OllamaClient(model_name=settings.OLLAMA_MODEL, base_url=settings.OLLAMA_API_URL)
def read_content(self) -> str:
"""Read markdown content from file"""
try:
with open(self.input_path, 'r', encoding='utf-8') as file:
content = file.read()
logger.info(f"Successfully read markdown content from {self.input_path}")
return content
except Exception as e:
logger.error(f"Error reading markdown file {self.input_path}: {e}")
raise
def save_content(self, content: str) -> None:
"""Save processed markdown content"""
try:
# Ensure output directory exists
output_dir = os.path.dirname(self.output_path)
os.makedirs(output_dir, exist_ok=True)
with open(self.output_path, 'w', encoding='utf-8') as file:
file.write(content)
logger.info(f"Successfully saved masked content to {self.output_path}")
except Exception as e:
logger.error(f"Error saving content to {self.output_path}: {e}")
raise

View File

@ -1,105 +0,0 @@
import os
import PyPDF2
from document_handlers.document_processor import DocumentProcessor
from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader
from magic_pdf.data.dataset import PymuDocDataset
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
from magic_pdf.config.enums import SupportedPdfParseMethod
from prompts.masking_prompts import get_masking_prompt, get_masking_mapping_prompt
import logging
from services.ollama_client import OllamaClient
from config.settings import settings
logger = logging.getLogger(__name__)
class PdfDocumentProcessor(DocumentProcessor):
def __init__(self, input_path: str, output_path: str):
super().__init__() # Call parent class's __init__
self.input_path = input_path
self.output_path = output_path
self.output_dir = os.path.dirname(output_path)
self.name_without_suff = os.path.splitext(os.path.basename(input_path))[0]
# Setup output directories
self.local_image_dir = os.path.join(self.output_dir, "images")
self.image_dir = os.path.basename(self.local_image_dir)
os.makedirs(self.local_image_dir, exist_ok=True)
# Setup work directory under output directory
self.work_dir = os.path.join(
os.path.dirname(output_path),
".work",
os.path.splitext(os.path.basename(input_path))[0]
)
os.makedirs(self.work_dir, exist_ok=True)
self.work_local_image_dir = os.path.join(self.work_dir, "images")
self.work_image_dir = os.path.basename(self.work_local_image_dir)
os.makedirs(self.work_local_image_dir, exist_ok=True)
self.ollama_client = OllamaClient(model_name=settings.OLLAMA_MODEL, base_url=settings.OLLAMA_API_URL)
def read_content(self) -> str:
logger.info("Starting PDF content processing")
# Read the PDF file
with open(self.input_path, 'rb') as file:
content = file.read()
# Initialize writers
image_writer = FileBasedDataWriter(self.work_local_image_dir)
md_writer = FileBasedDataWriter(self.work_dir)
# Create Dataset Instance
ds = PymuDocDataset(content)
logger.info("Classifying PDF type: %s", ds.classify())
# Process based on PDF type
if ds.classify() == SupportedPdfParseMethod.OCR:
infer_result = ds.apply(doc_analyze, ocr=True)
pipe_result = infer_result.pipe_ocr_mode(image_writer)
else:
infer_result = ds.apply(doc_analyze, ocr=False)
pipe_result = infer_result.pipe_txt_mode(image_writer)
logger.info("Generating all outputs")
# Generate all outputs
infer_result.draw_model(os.path.join(self.work_dir, f"{self.name_without_suff}_model.pdf"))
model_inference_result = infer_result.get_infer_res()
pipe_result.draw_layout(os.path.join(self.work_dir, f"{self.name_without_suff}_layout.pdf"))
pipe_result.draw_span(os.path.join(self.work_dir, f"{self.name_without_suff}_spans.pdf"))
md_content = pipe_result.get_markdown(self.work_image_dir)
pipe_result.dump_md(md_writer, f"{self.name_without_suff}.md", self.work_image_dir)
content_list = pipe_result.get_content_list(self.work_image_dir)
pipe_result.dump_content_list(md_writer, f"{self.name_without_suff}_content_list.json", self.work_image_dir)
middle_json = pipe_result.get_middle_json()
pipe_result.dump_middle_json(md_writer, f'{self.name_without_suff}_middle.json')
return md_content
# def process_content(self, content: str) -> str:
# logger.info("Starting content masking process")
# sentences = content.split("。")
# final_md = ""
# for sentence in sentences:
# if not sentence.strip(): # Skip empty sentences
# continue
# formatted_prompt = get_masking_mapping_prompt(sentence)
# logger.info("Calling ollama to generate response, prompt: %s", formatted_prompt)
# response = self.ollama_client.generate(formatted_prompt)
# logger.info(f"Response generated: {response}")
# final_md += response + "。"
# return final_md
def save_content(self, content: str) -> None:
# Ensure output path has .md extension
output_dir = os.path.dirname(self.output_path)
base_name = os.path.splitext(os.path.basename(self.output_path))[0]
md_output_path = os.path.join(output_dir, f"{base_name}.md")
logger.info(f"Saving masked content to: {md_output_path}")
with open(md_output_path, 'w', encoding='utf-8') as file:
file.write(content)

View File

@ -1,28 +0,0 @@
from document_handlers.document_processor import DocumentProcessor
from services.ollama_client import OllamaClient
import logging
from prompts.masking_prompts import get_masking_prompt
from config.settings import settings
logger = logging.getLogger(__name__)
class TxtDocumentProcessor(DocumentProcessor):
def __init__(self, input_path: str, output_path: str):
super().__init__()
self.input_path = input_path
self.output_path = output_path
self.ollama_client = OllamaClient(model_name=settings.OLLAMA_MODEL, base_url=settings.OLLAMA_API_URL)
def read_content(self) -> str:
with open(self.input_path, 'r', encoding='utf-8') as file:
return file.read()
# def process_content(self, content: str) -> str:
# formatted_prompt = get_masking_prompt(content)
# response = self.ollama_client.generate(formatted_prompt)
# logger.debug(f"Processed content: {response}")
# return response
def save_content(self, content: str) -> None:
with open(self.output_path, 'w', encoding='utf-8') as file:
file.write(content)

View File

@ -1,22 +0,0 @@
from config.logging_config import setup_logging
def main():
# Setup logging first
setup_logging()
from services.file_monitor import FileMonitor
from config.settings import settings
import logging
logger = logging.getLogger(__name__)
logger.info("Starting the application")
logger.info(f"Monitoring directory: {settings.OBJECT_STORAGE_PATH}")
logger.info(f"Target directory: {settings.TARGET_DIRECTORY_PATH}")
# Initialize the file monitor
file_monitor = FileMonitor(settings.OBJECT_STORAGE_PATH, settings.TARGET_DIRECTORY_PATH)
# Start monitoring the directory for new files
file_monitor.start_monitoring()
if __name__ == "__main__":
main()

View File

@ -1,81 +0,0 @@
import textwrap
def get_masking_prompt(text: str) -> str:
"""
Returns the prompt for masking sensitive information in legal documents.
Args:
text (str): The input text to be masked
Returns:
str: The formatted prompt with the input text
"""
prompt = textwrap.dedent("""
您是一位专业的法律文档脱敏专家请按照以下规则对文本进行脱敏处理
规则
1. 人名
- 两字名改为"姓+某"张三 张某
- 三字名改为"姓+某某"张三丰 张某某
2. 公司名
- 保留地理位置信息北京上海等
- 保留公司类型有限公司股份公司等
- ""替换核心名称
3. 保持原文其他部分不变
4. 确保脱敏后的文本保持原有的语言流畅性和可读性
输入文本
{text}
请直接输出脱敏后的文本无需解释或其他备注
""")
return prompt.format(text=text)
def get_masking_mapping_prompt(text: str) -> str:
"""
Returns a prompt that generates a mapping of original names/companies to their masked versions.
Args:
text (str): The input text to be analyzed for masking
Returns:
str: The formatted prompt that will generate a mapping dictionary
"""
prompt = textwrap.dedent("""
您是一位专业的法律文档脱敏专家请分析文本并生成一个脱敏映射表遵循以下规则
规则
1. 人名映射规则
- 对于同一姓氏的不同人名使用字母区分
* 第一个出现的用"姓+某"张三 张某
* 第二个出现的用"姓+某A"张四 张某A
* 第三个出现的用"姓+某B"张五 张某B
依此类推
- 三字名同样遵循此规则张三丰 张某某张四海 张某某A
2. 公司名映射规则
- 保留地理位置信息北京上海等
- 保留公司类型有限公司股份公司等
- ""替换核心名称,但保留首尾字(北京智慧科技有限公司 北京智某科技有限公司)
- 对于多个相似公司名使用字母区分
北京智慧科技有限公司 北京某科技有限公司
北京智能科技有限公司 北京某科技有限公司A
3. 公权机关不做脱敏处理公安局法院检察院中国人民银行银监会及其他未列明的公权机关
请分析以下文本并生成一个JSON格式的映射表包含所有需要脱敏的名称及其对应的脱敏后的形式
{text}
请直接输出JSON格式的映射表格式如下
{{
"原文1": "脱敏后1",
"原文2": "脱敏后2",
...
}}
如无需要输出的映射请输出空json如下:
{{}}
""")
return prompt.format(text=text)

View File

@ -1,30 +0,0 @@
import logging
from document_handlers.document_factory import DocumentProcessorFactory
from services.ollama_client import OllamaClient
logger = logging.getLogger(__name__)
class DocumentService:
def __init__(self, ollama_client: OllamaClient):
self.ollama_client = ollama_client
def process_document(self, input_path: str, output_path: str) -> bool:
try:
processor = DocumentProcessorFactory.create_processor(input_path, output_path)
if not processor:
logger.error(f"Unsupported file format: {input_path}")
return False
# Read content
content = processor.read_content()
# Process with Ollama
masked_content = processor.process_content(content)
# Save processed content
processor.save_content(masked_content)
return True
except Exception as e:
logger.error(f"Error processing document {input_path}: {str(e)}")
return False

View File

@ -1,54 +0,0 @@
import logging
import os
from services.document_service import DocumentService
from services.ollama_client import OllamaClient
from config.settings import settings
logger = logging.getLogger(__name__)
class FileMonitor:
def __init__(self, input_directory: str, output_directory: str):
self.input_directory = input_directory
self.output_directory = output_directory
# Create OllamaClient instance using settings
ollama_client = OllamaClient(
model_name=settings.OLLAMA_MODEL,
base_url=settings.OLLAMA_API_URL
)
# Inject OllamaClient into DocumentService
self.document_service = DocumentService(ollama_client=ollama_client)
def process_new_file(self, file_path: str) -> None:
try:
# Get the filename without directory path
filename = os.path.basename(file_path)
# Create output path
output_path = os.path.join(self.output_directory, filename)
logger.info(f"Processing file: {filename}")
# Process the document using document service
self.document_service.process_document(file_path, output_path)
logger.info(f"File processed successfully: {filename}")
except Exception as e:
logger.error(f"Error processing file {file_path}: {str(e)}")
def start_monitoring(self):
import time
# Ensure output directory exists
os.makedirs(self.output_directory, exist_ok=True)
already_seen = set(os.listdir(self.input_directory))
while True:
time.sleep(1) # Check every second
current_files = set(os.listdir(self.input_directory))
new_files = current_files - already_seen
for new_file in new_files:
file_path = os.path.join(self.input_directory, new_file)
logger.info(f"New file found: {new_file}")
self.process_new_file(file_path)
already_seen = current_files

View File

@ -1,91 +0,0 @@
import requests
import logging
from typing import Dict, Any
logger = logging.getLogger(__name__)
class OllamaClient:
def __init__(self, model_name: str, base_url: str = "http://localhost:11434"):
"""Initialize Ollama client.
Args:
model_name (str): Name of the Ollama model to use
host (str): Ollama server host address
port (int): Ollama server port
"""
self.model_name = model_name
self.base_url = base_url
self.headers = {"Content-Type": "application/json"}
def generate(self, prompt: str, strip_think: bool = True) -> str:
"""Process a document using the Ollama API.
Args:
document_text (str): The text content to process
Returns:
str: Processed text response from the model
Raises:
RequestException: If the API call fails
"""
try:
url = f"{self.base_url}/api/generate"
payload = {
"model": self.model_name,
"prompt": prompt,
"stream": False
}
logger.debug(f"Sending request to Ollama API: {url}")
response = requests.post(url, json=payload, headers=self.headers)
response.raise_for_status()
result = response.json()
logger.debug(f"Received response from Ollama API: {result}")
if strip_think:
# Remove the "thinking" part from the response
# the response is expected to be <think>...</think>response_text
# Check if the response contains <think> tag
if "<think>" in result.get("response", ""):
# Split the response and take the part after </think>
response_parts = result["response"].split("</think>")
if len(response_parts) > 1:
# Return the part after </think>
return response_parts[1].strip()
else:
# If no closing tag, return the full response
return result.get("response", "").strip()
else:
# If no <think> tag, return the full response
return result.get("response", "").strip()
else:
# If strip_think is False, return the full response
return result.get("response", "")
except requests.exceptions.RequestException as e:
logger.error(f"Error calling Ollama API: {str(e)}")
raise
def get_model_info(self) -> Dict[str, Any]:
"""Get information about the current model.
Returns:
Dict[str, Any]: Model information
Raises:
RequestException: If the API call fails
"""
try:
url = f"{self.base_url}/api/show"
payload = {"name": self.model_name}
response = requests.post(url, json=payload, headers=self.headers)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Error getting model info: {str(e)}")
raise

View File

@ -1,20 +0,0 @@
def read_file(file_path):
with open(file_path, 'r') as file:
return file.read()
def write_file(file_path, content):
with open(file_path, 'w') as file:
file.write(content)
def file_exists(file_path):
import os
return os.path.isfile(file_path)
def delete_file(file_path):
import os
if file_exists(file_path):
os.remove(file_path)
def list_files_in_directory(directory_path):
import os
return [f for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f))]

View File

@ -1,141 +0,0 @@
import json
import re
from typing import Any, Optional, Dict, TypeVar, Type
T = TypeVar('T')
class LLMJsonExtractor:
"""Utility class for extracting and parsing JSON from LLM outputs"""
@staticmethod
def extract_json(text: str) -> Optional[str]:
"""
Extracts JSON string from text using regex pattern matching.
Handles both single and multiple JSON objects in text.
Args:
text (str): Raw text containing JSON
Returns:
Optional[str]: Extracted JSON string or None if no valid JSON found
"""
# Pattern to match JSON objects with balanced braces
pattern = r'{[^{}]*(?:{[^{}]*}[^{}]*)*}'
matches = re.findall(pattern, text)
if not matches:
return None
# Return the first valid JSON match
for match in matches:
try:
# Verify it's valid JSON
json.loads(match)
return match
except json.JSONDecodeError:
continue
return None
@staticmethod
def parse_json(text: str) -> Optional[Dict[str, Any]]:
"""
Extracts and parses JSON from text into a Python dictionary.
Args:
text (str): Raw text containing JSON
Returns:
Optional[Dict[str, Any]]: Parsed JSON as dictionary or None if parsing fails
"""
try:
json_str = LLMJsonExtractor.extract_json(text)
if json_str:
return json.loads(json_str)
return None
except json.JSONDecodeError:
return None
@staticmethod
def parse_to_dataclass(text: str, dataclass_type: Type[T]) -> Optional[T]:
"""
Extracts JSON and converts it to a specified dataclass type.
Args:
text (str): Raw text containing JSON
dataclass_type (Type[T]): Target dataclass type
Returns:
Optional[T]: Instance of specified dataclass or None if conversion fails
"""
try:
data = LLMJsonExtractor.parse_json(text)
if data:
return dataclass_type(**data)
return None
except (json.JSONDecodeError, TypeError):
return None
@staticmethod
def parse_raw_json_str(text: str) -> Optional[Dict[str, Any]]:
"""
Extracts and parses JSON from text into a Python dictionary.
Args:
text (str): Raw text containing JSON
Returns:
Optional[Dict[str, Any]]: Parsed JSON as dictionary or None if parsing fails
"""
try:
json_str = LLMJsonExtractor.extract_json_max(text)
if json_str:
return json.loads(json_str)
return None
except json.JSONDecodeError:
return None
@staticmethod
def extract_json_max(text: str) -> Optional[str]:
"""
Extracts the maximum valid JSON object from text using stack-based brace matching.
Args:
text (str): Raw text containing JSON
Returns:
Optional[str]: Maximum valid JSON object as string or None if no valid JSON found
"""
max_json = None
max_length = 0
# Iterate through each character as a potential start of JSON
for start in range(len(text)):
if text[start] != '{':
continue
stack = []
for end in range(start, len(text)):
if text[end] == '{':
stack.append(end)
elif text[end] == '}':
if not stack: # Unmatched closing brace
break
opening_pos = stack.pop()
# If stack is empty, we have a complete JSON object
if not stack:
json_candidate = text[opening_pos:end + 1]
try:
# Verify it's valid JSON
json.loads(json_candidate)
if len(json_candidate) > max_length:
max_length = len(json_candidate)
max_json = json_candidate
except json.JSONDecodeError:
continue
return max_json