feature-ner-keyword-detect #1
|
|
@ -5,10 +5,12 @@ from sqlalchemy.orm import Session
|
||||||
from ..core.database import SessionLocal
|
from ..core.database import SessionLocal
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
|
from core.services.document_service import DocumentService
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
# Add the parent directory to Python path to import the masking system
|
# Add the parent directory to Python path to import the masking system
|
||||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))))
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))))
|
||||||
from src.main import process_document # Import your existing masking function
|
from core.document_handlers.document_processor import DocumentProcessor
|
||||||
|
|
||||||
celery = Celery(
|
celery = Celery(
|
||||||
'file_service',
|
'file_service',
|
||||||
|
|
@ -30,7 +32,15 @@ def process_file(file_id: str):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Process the file using your existing masking system
|
# Process the file using your existing masking system
|
||||||
output_path = process_document(file.original_path)
|
process_service = DocumentService()
|
||||||
|
|
||||||
|
# Determine output path
|
||||||
|
input_path = Path(file.original_path)
|
||||||
|
output_filename = f"processed_{input_path.name}"
|
||||||
|
output_path = str(settings.PROCESSED_FOLDER / output_filename)
|
||||||
|
|
||||||
|
# Process document with both input and output paths
|
||||||
|
process_service.process_document(file.original_path, output_path)
|
||||||
|
|
||||||
# Update file record with processed path
|
# Update file record with processed path
|
||||||
file.processed_path = output_path
|
file.processed_path = output_path
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,39 @@
|
||||||
|
import logging.config
|
||||||
|
from config.settings import settings
|
||||||
|
|
||||||
|
LOGGING_CONFIG = {
|
||||||
|
"version": 1,
|
||||||
|
"disable_existing_loggers": False,
|
||||||
|
"formatters": {
|
||||||
|
"standard": {
|
||||||
|
"format": settings.LOG_FORMAT,
|
||||||
|
"datefmt": settings.LOG_DATE_FORMAT
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"handlers": {
|
||||||
|
"console": {
|
||||||
|
"class": "logging.StreamHandler",
|
||||||
|
"formatter": "standard",
|
||||||
|
"level": settings.LOG_LEVEL,
|
||||||
|
"stream": "ext://sys.stdout"
|
||||||
|
},
|
||||||
|
"file": {
|
||||||
|
"class": "logging.FileHandler",
|
||||||
|
"formatter": "standard",
|
||||||
|
"level": settings.LOG_LEVEL,
|
||||||
|
"filename": settings.LOG_FILE,
|
||||||
|
"mode": "a",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"loggers": {
|
||||||
|
"": { # root logger
|
||||||
|
"handlers": ["console", "file"],
|
||||||
|
"level": settings.LOG_LEVEL,
|
||||||
|
"propagate": True
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def setup_logging():
|
||||||
|
"""Initialize logging configuration"""
|
||||||
|
logging.config.dictConfig(LOGGING_CONFIG)
|
||||||
|
|
@ -0,0 +1,31 @@
|
||||||
|
# settings.py
|
||||||
|
|
||||||
|
from pydantic_settings import BaseSettings
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
class Settings(BaseSettings):
|
||||||
|
# Storage paths
|
||||||
|
OBJECT_STORAGE_PATH: str = ""
|
||||||
|
TARGET_DIRECTORY_PATH: str = ""
|
||||||
|
|
||||||
|
# Ollama API settings
|
||||||
|
OLLAMA_API_URL: str = "https://api.ollama.com"
|
||||||
|
OLLAMA_API_KEY: str = ""
|
||||||
|
OLLAMA_MODEL: str = "llama2"
|
||||||
|
|
||||||
|
# File monitoring settings
|
||||||
|
MONITOR_INTERVAL: int = 5
|
||||||
|
|
||||||
|
# Logging settings
|
||||||
|
LOG_LEVEL: str = "INFO"
|
||||||
|
LOG_FORMAT: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||||||
|
LOG_DATE_FORMAT: str = "%Y-%m-%d %H:%M:%S"
|
||||||
|
LOG_FILE: str = "app.log"
|
||||||
|
|
||||||
|
class Config:
|
||||||
|
env_file = ".env"
|
||||||
|
env_file_encoding = "utf-8"
|
||||||
|
extra = "allow"
|
||||||
|
|
||||||
|
# Create settings instance
|
||||||
|
settings = Settings()
|
||||||
|
|
@ -0,0 +1,12 @@
|
||||||
|
class Document:
|
||||||
|
def __init__(self, file_path):
|
||||||
|
self.file_path = file_path
|
||||||
|
self.content = ""
|
||||||
|
|
||||||
|
def load(self):
|
||||||
|
with open(self.file_path, 'r') as file:
|
||||||
|
self.content = file.read()
|
||||||
|
|
||||||
|
def save(self, target_path):
|
||||||
|
with open(target_path, 'w') as file:
|
||||||
|
file.write(self.content)
|
||||||
|
|
@ -0,0 +1,28 @@
|
||||||
|
import os
|
||||||
|
from typing import Optional
|
||||||
|
from document_handlers.document_processor import DocumentProcessor
|
||||||
|
from document_handlers.processors import (
|
||||||
|
TxtDocumentProcessor,
|
||||||
|
DocxDocumentProcessor,
|
||||||
|
PdfDocumentProcessor,
|
||||||
|
MarkdownDocumentProcessor
|
||||||
|
)
|
||||||
|
|
||||||
|
class DocumentProcessorFactory:
|
||||||
|
@staticmethod
|
||||||
|
def create_processor(input_path: str, output_path: str) -> Optional[DocumentProcessor]:
|
||||||
|
file_extension = os.path.splitext(input_path)[1].lower()
|
||||||
|
|
||||||
|
processors = {
|
||||||
|
'.txt': TxtDocumentProcessor,
|
||||||
|
'.docx': DocxDocumentProcessor,
|
||||||
|
'.doc': DocxDocumentProcessor,
|
||||||
|
'.pdf': PdfDocumentProcessor,
|
||||||
|
'.md': MarkdownDocumentProcessor,
|
||||||
|
'.markdown': MarkdownDocumentProcessor
|
||||||
|
}
|
||||||
|
|
||||||
|
processor_class = processors.get(file_extension)
|
||||||
|
if processor_class:
|
||||||
|
return processor_class(input_path, output_path)
|
||||||
|
return None
|
||||||
|
|
@ -0,0 +1,190 @@
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from typing import Any, Dict
|
||||||
|
from prompts.masking_prompts import get_masking_mapping_prompt
|
||||||
|
import logging
|
||||||
|
import json
|
||||||
|
from services.ollama_client import OllamaClient
|
||||||
|
from config.settings import settings
|
||||||
|
from utils.json_extractor import LLMJsonExtractor
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class DocumentProcessor(ABC):
|
||||||
|
def __init__(self):
|
||||||
|
self.ollama_client = OllamaClient(model_name=settings.OLLAMA_MODEL, base_url=settings.OLLAMA_API_URL)
|
||||||
|
self.max_chunk_size = 1000 # Maximum number of characters per chunk
|
||||||
|
self.max_retries = 3 # Maximum number of retries for mapping generation
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def read_content(self) -> str:
|
||||||
|
"""Read document content"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _split_into_chunks(self, sentences: list[str]) -> list[str]:
|
||||||
|
"""Split sentences into chunks that don't exceed max_chunk_size"""
|
||||||
|
chunks = []
|
||||||
|
current_chunk = ""
|
||||||
|
|
||||||
|
for sentence in sentences:
|
||||||
|
if not sentence.strip():
|
||||||
|
continue
|
||||||
|
|
||||||
|
# If adding this sentence would exceed the limit, save current chunk and start new one
|
||||||
|
if len(current_chunk) + len(sentence) > self.max_chunk_size and current_chunk:
|
||||||
|
chunks.append(current_chunk)
|
||||||
|
current_chunk = sentence
|
||||||
|
else:
|
||||||
|
if current_chunk:
|
||||||
|
current_chunk += "。" + sentence
|
||||||
|
else:
|
||||||
|
current_chunk = sentence
|
||||||
|
|
||||||
|
# Add the last chunk if it's not empty
|
||||||
|
if current_chunk:
|
||||||
|
chunks.append(current_chunk)
|
||||||
|
|
||||||
|
return chunks
|
||||||
|
|
||||||
|
def _validate_mapping_format(self, mapping: Dict[str, Any]) -> bool:
|
||||||
|
"""
|
||||||
|
Validate that the mapping follows the required format:
|
||||||
|
{
|
||||||
|
"原文1": "脱敏后1",
|
||||||
|
"原文2": "脱敏后2",
|
||||||
|
...
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
if not isinstance(mapping, dict):
|
||||||
|
logger.warning("Mapping is not a dictionary")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Check if any key or value is not a string
|
||||||
|
for key, value in mapping.items():
|
||||||
|
if not isinstance(key, str) or not isinstance(value, str):
|
||||||
|
logger.warning(f"Invalid mapping format - key or value is not a string: {key}: {value}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Check if the mapping has any nested structures
|
||||||
|
if any(isinstance(v, (dict, list)) for v in mapping.values()):
|
||||||
|
logger.warning("Invalid mapping format - contains nested structures")
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _build_mapping(self, chunk: str) -> Dict[str, str]:
|
||||||
|
"""Build mapping for a single chunk of text with retry logic"""
|
||||||
|
for attempt in range(self.max_retries):
|
||||||
|
try:
|
||||||
|
formatted_prompt = get_masking_mapping_prompt(chunk)
|
||||||
|
logger.info(f"Calling ollama to generate mapping for chunk (attempt {attempt + 1}/{self.max_retries}): {formatted_prompt}")
|
||||||
|
response = self.ollama_client.generate(formatted_prompt)
|
||||||
|
logger.info(f"Raw response from LLM: {response}")
|
||||||
|
|
||||||
|
# Parse the JSON response into a dictionary
|
||||||
|
mapping = LLMJsonExtractor.parse_raw_json_str(response)
|
||||||
|
logger.info(f"Parsed mapping: {mapping}")
|
||||||
|
|
||||||
|
if mapping and self._validate_mapping_format(mapping):
|
||||||
|
return mapping
|
||||||
|
else:
|
||||||
|
logger.warning(f"Invalid mapping format received on attempt {attempt + 1}, retrying...")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error generating mapping on attempt {attempt + 1}: {e}")
|
||||||
|
if attempt < self.max_retries - 1:
|
||||||
|
logger.info("Retrying...")
|
||||||
|
else:
|
||||||
|
logger.error("Max retries reached, returning empty mapping")
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def _apply_mapping(self, text: str, mapping: Dict[str, str]) -> str:
|
||||||
|
"""Apply the mapping to replace sensitive information"""
|
||||||
|
masked_text = text
|
||||||
|
for original, masked in mapping.items():
|
||||||
|
# Ensure masked value is a string
|
||||||
|
if isinstance(masked, dict):
|
||||||
|
# If it's a dict, use the first value or a default
|
||||||
|
masked = next(iter(masked.values()), "某")
|
||||||
|
elif not isinstance(masked, str):
|
||||||
|
# If it's not a string, convert to string or use default
|
||||||
|
masked = str(masked) if masked is not None else "某"
|
||||||
|
masked_text = masked_text.replace(original, masked)
|
||||||
|
return masked_text
|
||||||
|
|
||||||
|
def _get_next_suffix(self, value: str) -> str:
|
||||||
|
"""Get the next available suffix for a value that already has a suffix"""
|
||||||
|
# Define the sequence of suffixes
|
||||||
|
suffixes = ['甲', '乙', '丙', '丁', '戊', '己', '庚', '辛', '壬', '癸']
|
||||||
|
|
||||||
|
# Check if the value already has a suffix
|
||||||
|
for suffix in suffixes:
|
||||||
|
if value.endswith(suffix):
|
||||||
|
# Find the next suffix in the sequence
|
||||||
|
current_index = suffixes.index(suffix)
|
||||||
|
if current_index + 1 < len(suffixes):
|
||||||
|
return value[:-1] + suffixes[current_index + 1]
|
||||||
|
else:
|
||||||
|
# If we've used all suffixes, start over with the first one
|
||||||
|
return value[:-1] + suffixes[0]
|
||||||
|
|
||||||
|
# If no suffix found, return the value with the first suffix
|
||||||
|
return value + '甲'
|
||||||
|
|
||||||
|
def _merge_mappings(self, existing: Dict[str, str], new: Dict[str, str]) -> Dict[str, str]:
|
||||||
|
"""
|
||||||
|
Merge two mappings following the rules:
|
||||||
|
1. If key exists in existing, keep existing value
|
||||||
|
2. If value exists in existing:
|
||||||
|
- If value ends with a suffix (甲乙丙丁...), add next suffix
|
||||||
|
- If no suffix, add '甲'
|
||||||
|
"""
|
||||||
|
result = existing.copy()
|
||||||
|
|
||||||
|
# Get all existing values
|
||||||
|
existing_values = set(result.values())
|
||||||
|
|
||||||
|
for key, value in new.items():
|
||||||
|
if key in result:
|
||||||
|
# Rule 1: Keep existing value if key exists
|
||||||
|
continue
|
||||||
|
|
||||||
|
if value in existing_values:
|
||||||
|
# Rule 2: Handle duplicate values
|
||||||
|
new_value = self._get_next_suffix(value)
|
||||||
|
result[key] = new_value
|
||||||
|
existing_values.add(new_value)
|
||||||
|
else:
|
||||||
|
# No conflict, add as is
|
||||||
|
result[key] = value
|
||||||
|
existing_values.add(value)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def process_content(self, content: str) -> str:
|
||||||
|
"""Process document content by masking sensitive information"""
|
||||||
|
# Split content into sentences
|
||||||
|
sentences = content.split("。")
|
||||||
|
|
||||||
|
# Split sentences into manageable chunks
|
||||||
|
chunks = self._split_into_chunks(sentences)
|
||||||
|
logger.info(f"Split content into {len(chunks)} chunks")
|
||||||
|
|
||||||
|
# Build mapping for each chunk
|
||||||
|
combined_mapping = {}
|
||||||
|
for i, chunk in enumerate(chunks):
|
||||||
|
logger.info(f"Processing chunk {i+1}/{len(chunks)}")
|
||||||
|
chunk_mapping = self._build_mapping(chunk)
|
||||||
|
if chunk_mapping: # Only update if we got a valid mapping
|
||||||
|
combined_mapping = self._merge_mappings(combined_mapping, chunk_mapping)
|
||||||
|
else:
|
||||||
|
logger.warning(f"Failed to generate mapping for chunk {i+1}")
|
||||||
|
|
||||||
|
# Apply the combined mapping to the entire content
|
||||||
|
masked_content = self._apply_mapping(content, combined_mapping)
|
||||||
|
logger.info("Successfully masked content")
|
||||||
|
|
||||||
|
return masked_content
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def save_content(self, content: str) -> None:
|
||||||
|
"""Save processed content"""
|
||||||
|
pass
|
||||||
|
|
@ -0,0 +1,6 @@
|
||||||
|
from document_handlers.processors.txt_processor import TxtDocumentProcessor
|
||||||
|
from document_handlers.processors.docx_processor import DocxDocumentProcessor
|
||||||
|
from document_handlers.processors.pdf_processor import PdfDocumentProcessor
|
||||||
|
from document_handlers.processors.md_processor import MarkdownDocumentProcessor
|
||||||
|
|
||||||
|
__all__ = ['TxtDocumentProcessor', 'DocxDocumentProcessor', 'PdfDocumentProcessor', 'MarkdownDocumentProcessor']
|
||||||
|
|
@ -0,0 +1,77 @@
|
||||||
|
import os
|
||||||
|
import docx
|
||||||
|
from document_handlers.document_processor import DocumentProcessor
|
||||||
|
from magic_pdf.data.data_reader_writer import FileBasedDataWriter
|
||||||
|
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
|
||||||
|
from magic_pdf.data.read_api import read_local_office
|
||||||
|
import logging
|
||||||
|
from services.ollama_client import OllamaClient
|
||||||
|
from config.settings import settings
|
||||||
|
from prompts.masking_prompts import get_masking_mapping_prompt
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class DocxDocumentProcessor(DocumentProcessor):
|
||||||
|
def __init__(self, input_path: str, output_path: str):
|
||||||
|
super().__init__() # Call parent class's __init__
|
||||||
|
self.input_path = input_path
|
||||||
|
self.output_path = output_path
|
||||||
|
self.output_dir = os.path.dirname(output_path)
|
||||||
|
self.name_without_suff = os.path.splitext(os.path.basename(input_path))[0]
|
||||||
|
|
||||||
|
# Setup output directories
|
||||||
|
self.local_image_dir = os.path.join(self.output_dir, "images")
|
||||||
|
self.image_dir = os.path.basename(self.local_image_dir)
|
||||||
|
os.makedirs(self.local_image_dir, exist_ok=True)
|
||||||
|
|
||||||
|
self.ollama_client = OllamaClient(model_name=settings.OLLAMA_MODEL, base_url=settings.OLLAMA_API_URL)
|
||||||
|
|
||||||
|
def read_content(self) -> str:
|
||||||
|
try:
|
||||||
|
# Initialize writers
|
||||||
|
image_writer = FileBasedDataWriter(self.local_image_dir)
|
||||||
|
md_writer = FileBasedDataWriter(self.output_dir)
|
||||||
|
|
||||||
|
# Create Dataset Instance and process
|
||||||
|
ds = read_local_office(self.input_path)[0]
|
||||||
|
pipe_result = ds.apply(doc_analyze, ocr=True).pipe_txt_mode(image_writer)
|
||||||
|
|
||||||
|
# Generate markdown
|
||||||
|
md_content = pipe_result.get_markdown(self.image_dir)
|
||||||
|
pipe_result.dump_md(md_writer, f"{self.name_without_suff}.md", self.image_dir)
|
||||||
|
|
||||||
|
return md_content
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error converting DOCX to MD: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
# def process_content(self, content: str) -> str:
|
||||||
|
# logger.info("Processing DOCX content")
|
||||||
|
|
||||||
|
# # Split content into sentences and apply masking
|
||||||
|
# sentences = content.split("。")
|
||||||
|
# final_md = ""
|
||||||
|
# for sentence in sentences:
|
||||||
|
# if sentence.strip(): # Only process non-empty sentences
|
||||||
|
# formatted_prompt = get_masking_mapping_prompt(sentence)
|
||||||
|
# logger.info("Calling ollama to generate response, prompt: %s", formatted_prompt)
|
||||||
|
# response = self.ollama_client.generate(formatted_prompt)
|
||||||
|
# logger.info(f"Response generated: {response}")
|
||||||
|
# final_md += response + "。"
|
||||||
|
|
||||||
|
# return final_md
|
||||||
|
|
||||||
|
def save_content(self, content: str) -> None:
|
||||||
|
# Ensure output path has .md extension
|
||||||
|
output_dir = os.path.dirname(self.output_path)
|
||||||
|
base_name = os.path.splitext(os.path.basename(self.output_path))[0]
|
||||||
|
md_output_path = os.path.join(output_dir, f"{base_name}.md")
|
||||||
|
|
||||||
|
logger.info(f"Saving masked content to: {md_output_path}")
|
||||||
|
try:
|
||||||
|
with open(md_output_path, 'w', encoding='utf-8') as file:
|
||||||
|
file.write(content)
|
||||||
|
logger.info(f"Successfully saved content to {md_output_path}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error saving content: {e}")
|
||||||
|
raise
|
||||||
|
|
@ -0,0 +1,39 @@
|
||||||
|
import os
|
||||||
|
from document_handlers.document_processor import DocumentProcessor
|
||||||
|
from services.ollama_client import OllamaClient
|
||||||
|
import logging
|
||||||
|
from config.settings import settings
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class MarkdownDocumentProcessor(DocumentProcessor):
|
||||||
|
def __init__(self, input_path: str, output_path: str):
|
||||||
|
super().__init__() # Call parent class's __init__
|
||||||
|
self.input_path = input_path
|
||||||
|
self.output_path = output_path
|
||||||
|
self.ollama_client = OllamaClient(model_name=settings.OLLAMA_MODEL, base_url=settings.OLLAMA_API_URL)
|
||||||
|
|
||||||
|
def read_content(self) -> str:
|
||||||
|
"""Read markdown content from file"""
|
||||||
|
try:
|
||||||
|
with open(self.input_path, 'r', encoding='utf-8') as file:
|
||||||
|
content = file.read()
|
||||||
|
logger.info(f"Successfully read markdown content from {self.input_path}")
|
||||||
|
return content
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error reading markdown file {self.input_path}: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def save_content(self, content: str) -> None:
|
||||||
|
"""Save processed markdown content"""
|
||||||
|
try:
|
||||||
|
# Ensure output directory exists
|
||||||
|
output_dir = os.path.dirname(self.output_path)
|
||||||
|
os.makedirs(output_dir, exist_ok=True)
|
||||||
|
|
||||||
|
with open(self.output_path, 'w', encoding='utf-8') as file:
|
||||||
|
file.write(content)
|
||||||
|
logger.info(f"Successfully saved masked content to {self.output_path}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error saving content to {self.output_path}: {e}")
|
||||||
|
raise
|
||||||
|
|
@ -0,0 +1,105 @@
|
||||||
|
import os
|
||||||
|
import PyPDF2
|
||||||
|
from document_handlers.document_processor import DocumentProcessor
|
||||||
|
from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader
|
||||||
|
from magic_pdf.data.dataset import PymuDocDataset
|
||||||
|
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
|
||||||
|
from magic_pdf.config.enums import SupportedPdfParseMethod
|
||||||
|
from prompts.masking_prompts import get_masking_prompt, get_masking_mapping_prompt
|
||||||
|
import logging
|
||||||
|
from services.ollama_client import OllamaClient
|
||||||
|
from config.settings import settings
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class PdfDocumentProcessor(DocumentProcessor):
|
||||||
|
def __init__(self, input_path: str, output_path: str):
|
||||||
|
super().__init__() # Call parent class's __init__
|
||||||
|
self.input_path = input_path
|
||||||
|
self.output_path = output_path
|
||||||
|
self.output_dir = os.path.dirname(output_path)
|
||||||
|
self.name_without_suff = os.path.splitext(os.path.basename(input_path))[0]
|
||||||
|
|
||||||
|
# Setup output directories
|
||||||
|
self.local_image_dir = os.path.join(self.output_dir, "images")
|
||||||
|
self.image_dir = os.path.basename(self.local_image_dir)
|
||||||
|
os.makedirs(self.local_image_dir, exist_ok=True)
|
||||||
|
|
||||||
|
# Setup work directory under output directory
|
||||||
|
self.work_dir = os.path.join(
|
||||||
|
os.path.dirname(output_path),
|
||||||
|
".work",
|
||||||
|
os.path.splitext(os.path.basename(input_path))[0]
|
||||||
|
)
|
||||||
|
os.makedirs(self.work_dir, exist_ok=True)
|
||||||
|
|
||||||
|
self.work_local_image_dir = os.path.join(self.work_dir, "images")
|
||||||
|
self.work_image_dir = os.path.basename(self.work_local_image_dir)
|
||||||
|
os.makedirs(self.work_local_image_dir, exist_ok=True)
|
||||||
|
self.ollama_client = OllamaClient(model_name=settings.OLLAMA_MODEL, base_url=settings.OLLAMA_API_URL)
|
||||||
|
|
||||||
|
def read_content(self) -> str:
|
||||||
|
logger.info("Starting PDF content processing")
|
||||||
|
|
||||||
|
# Read the PDF file
|
||||||
|
with open(self.input_path, 'rb') as file:
|
||||||
|
content = file.read()
|
||||||
|
|
||||||
|
# Initialize writers
|
||||||
|
image_writer = FileBasedDataWriter(self.work_local_image_dir)
|
||||||
|
md_writer = FileBasedDataWriter(self.work_dir)
|
||||||
|
|
||||||
|
# Create Dataset Instance
|
||||||
|
ds = PymuDocDataset(content)
|
||||||
|
|
||||||
|
logger.info("Classifying PDF type: %s", ds.classify())
|
||||||
|
# Process based on PDF type
|
||||||
|
if ds.classify() == SupportedPdfParseMethod.OCR:
|
||||||
|
infer_result = ds.apply(doc_analyze, ocr=True)
|
||||||
|
pipe_result = infer_result.pipe_ocr_mode(image_writer)
|
||||||
|
else:
|
||||||
|
infer_result = ds.apply(doc_analyze, ocr=False)
|
||||||
|
pipe_result = infer_result.pipe_txt_mode(image_writer)
|
||||||
|
|
||||||
|
logger.info("Generating all outputs")
|
||||||
|
# Generate all outputs
|
||||||
|
infer_result.draw_model(os.path.join(self.work_dir, f"{self.name_without_suff}_model.pdf"))
|
||||||
|
model_inference_result = infer_result.get_infer_res()
|
||||||
|
|
||||||
|
pipe_result.draw_layout(os.path.join(self.work_dir, f"{self.name_without_suff}_layout.pdf"))
|
||||||
|
pipe_result.draw_span(os.path.join(self.work_dir, f"{self.name_without_suff}_spans.pdf"))
|
||||||
|
|
||||||
|
md_content = pipe_result.get_markdown(self.work_image_dir)
|
||||||
|
pipe_result.dump_md(md_writer, f"{self.name_without_suff}.md", self.work_image_dir)
|
||||||
|
|
||||||
|
content_list = pipe_result.get_content_list(self.work_image_dir)
|
||||||
|
pipe_result.dump_content_list(md_writer, f"{self.name_without_suff}_content_list.json", self.work_image_dir)
|
||||||
|
|
||||||
|
middle_json = pipe_result.get_middle_json()
|
||||||
|
pipe_result.dump_middle_json(md_writer, f'{self.name_without_suff}_middle.json')
|
||||||
|
|
||||||
|
return md_content
|
||||||
|
|
||||||
|
# def process_content(self, content: str) -> str:
|
||||||
|
# logger.info("Starting content masking process")
|
||||||
|
# sentences = content.split("。")
|
||||||
|
# final_md = ""
|
||||||
|
# for sentence in sentences:
|
||||||
|
# if not sentence.strip(): # Skip empty sentences
|
||||||
|
# continue
|
||||||
|
# formatted_prompt = get_masking_mapping_prompt(sentence)
|
||||||
|
# logger.info("Calling ollama to generate response, prompt: %s", formatted_prompt)
|
||||||
|
# response = self.ollama_client.generate(formatted_prompt)
|
||||||
|
# logger.info(f"Response generated: {response}")
|
||||||
|
# final_md += response + "。"
|
||||||
|
# return final_md
|
||||||
|
|
||||||
|
def save_content(self, content: str) -> None:
|
||||||
|
# Ensure output path has .md extension
|
||||||
|
output_dir = os.path.dirname(self.output_path)
|
||||||
|
base_name = os.path.splitext(os.path.basename(self.output_path))[0]
|
||||||
|
md_output_path = os.path.join(output_dir, f"{base_name}.md")
|
||||||
|
|
||||||
|
logger.info(f"Saving masked content to: {md_output_path}")
|
||||||
|
with open(md_output_path, 'w', encoding='utf-8') as file:
|
||||||
|
file.write(content)
|
||||||
|
|
@ -0,0 +1,28 @@
|
||||||
|
from document_handlers.document_processor import DocumentProcessor
|
||||||
|
from services.ollama_client import OllamaClient
|
||||||
|
import logging
|
||||||
|
from prompts.masking_prompts import get_masking_prompt
|
||||||
|
from config.settings import settings
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
class TxtDocumentProcessor(DocumentProcessor):
|
||||||
|
def __init__(self, input_path: str, output_path: str):
|
||||||
|
super().__init__()
|
||||||
|
self.input_path = input_path
|
||||||
|
self.output_path = output_path
|
||||||
|
self.ollama_client = OllamaClient(model_name=settings.OLLAMA_MODEL, base_url=settings.OLLAMA_API_URL)
|
||||||
|
|
||||||
|
def read_content(self) -> str:
|
||||||
|
with open(self.input_path, 'r', encoding='utf-8') as file:
|
||||||
|
return file.read()
|
||||||
|
|
||||||
|
# def process_content(self, content: str) -> str:
|
||||||
|
|
||||||
|
# formatted_prompt = get_masking_prompt(content)
|
||||||
|
# response = self.ollama_client.generate(formatted_prompt)
|
||||||
|
# logger.debug(f"Processed content: {response}")
|
||||||
|
# return response
|
||||||
|
|
||||||
|
def save_content(self, content: str) -> None:
|
||||||
|
with open(self.output_path, 'w', encoding='utf-8') as file:
|
||||||
|
file.write(content)
|
||||||
|
|
@ -0,0 +1,81 @@
|
||||||
|
import textwrap
|
||||||
|
|
||||||
|
def get_masking_prompt(text: str) -> str:
|
||||||
|
"""
|
||||||
|
Returns the prompt for masking sensitive information in legal documents.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text (str): The input text to be masked
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: The formatted prompt with the input text
|
||||||
|
"""
|
||||||
|
prompt = textwrap.dedent("""
|
||||||
|
您是一位专业的法律文档脱敏专家。请按照以下规则对文本进行脱敏处理:
|
||||||
|
|
||||||
|
规则:
|
||||||
|
1. 人名:
|
||||||
|
- 两字名改为"姓+某"(如:张三 → 张某)
|
||||||
|
- 三字名改为"姓+某某"(如:张三丰 → 张某某)
|
||||||
|
2. 公司名:
|
||||||
|
- 保留地理位置信息(如:北京、上海等)
|
||||||
|
- 保留公司类型(如:有限公司、股份公司等)
|
||||||
|
- 用"某"替换核心名称
|
||||||
|
3. 保持原文其他部分不变
|
||||||
|
4. 确保脱敏后的文本保持原有的语言流畅性和可读性
|
||||||
|
|
||||||
|
输入文本:
|
||||||
|
{text}
|
||||||
|
|
||||||
|
请直接输出脱敏后的文本,无需解释或其他备注。
|
||||||
|
""")
|
||||||
|
|
||||||
|
return prompt.format(text=text)
|
||||||
|
|
||||||
|
def get_masking_mapping_prompt(text: str) -> str:
|
||||||
|
"""
|
||||||
|
Returns a prompt that generates a mapping of original names/companies to their masked versions.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text (str): The input text to be analyzed for masking
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: The formatted prompt that will generate a mapping dictionary
|
||||||
|
"""
|
||||||
|
prompt = textwrap.dedent("""
|
||||||
|
您是一位专业的法律文档脱敏专家。请分析文本并生成一个脱敏映射表,遵循以下规则:
|
||||||
|
|
||||||
|
规则:
|
||||||
|
1. 人名映射规则:
|
||||||
|
- 对于同一姓氏的不同人名,使用字母区分:
|
||||||
|
* 第一个出现的用"姓+某"(如:张三 → 张某)
|
||||||
|
* 第二个出现的用"姓+某A"(如:张四 → 张某A)
|
||||||
|
* 第三个出现的用"姓+某B"(如:张五 → 张某B)
|
||||||
|
依此类推
|
||||||
|
- 三字名同样遵循此规则(如:张三丰 → 张某某,张四海 → 张某某A)
|
||||||
|
|
||||||
|
2. 公司名映射规则:
|
||||||
|
- 保留地理位置信息(如:北京、上海等)
|
||||||
|
- 保留公司类型(如:有限公司、股份公司等)
|
||||||
|
- 用"某"替换核心名称,但保留首尾字(如:北京智慧科技有限公司 → 北京智某科技有限公司)
|
||||||
|
- 对于多个相似公司名,使用字母区分(如:
|
||||||
|
北京智慧科技有限公司 → 北京某科技有限公司
|
||||||
|
北京智能科技有限公司 → 北京某科技有限公司A)
|
||||||
|
|
||||||
|
3. 公权机关不做脱敏处理(如:公安局、法院、检察院、中国人民银行、银监会及其他未列明的公权机关)
|
||||||
|
|
||||||
|
请分析以下文本,并生成一个JSON格式的映射表,包含所有需要脱敏的名称及其对应的脱敏后的形式:
|
||||||
|
|
||||||
|
{text}
|
||||||
|
|
||||||
|
请直接输出JSON格式的映射表,格式如下:
|
||||||
|
{{
|
||||||
|
"原文1": "脱敏后1",
|
||||||
|
"原文2": "脱敏后2",
|
||||||
|
...
|
||||||
|
}}
|
||||||
|
如无需要输出的映射,请输出空json,如下:
|
||||||
|
{{}}
|
||||||
|
""")
|
||||||
|
|
||||||
|
return prompt.format(text=text)
|
||||||
|
|
@ -0,0 +1,30 @@
|
||||||
|
import logging
|
||||||
|
from document_handlers.document_factory import DocumentProcessorFactory
|
||||||
|
from services.ollama_client import OllamaClient
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class DocumentService:
|
||||||
|
def __init__(self, ollama_client: OllamaClient):
|
||||||
|
self.ollama_client = ollama_client
|
||||||
|
|
||||||
|
def process_document(self, input_path: str, output_path: str) -> bool:
|
||||||
|
try:
|
||||||
|
processor = DocumentProcessorFactory.create_processor(input_path, output_path)
|
||||||
|
if not processor:
|
||||||
|
logger.error(f"Unsupported file format: {input_path}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Read content
|
||||||
|
content = processor.read_content()
|
||||||
|
|
||||||
|
# Process with Ollama
|
||||||
|
masked_content = processor.process_content(content)
|
||||||
|
|
||||||
|
# Save processed content
|
||||||
|
processor.save_content(masked_content)
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error processing document {input_path}: {str(e)}")
|
||||||
|
return False
|
||||||
|
|
@ -0,0 +1,54 @@
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from services.document_service import DocumentService
|
||||||
|
from services.ollama_client import OllamaClient
|
||||||
|
from config.settings import settings
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class FileMonitor:
|
||||||
|
def __init__(self, input_directory: str, output_directory: str):
|
||||||
|
self.input_directory = input_directory
|
||||||
|
self.output_directory = output_directory
|
||||||
|
|
||||||
|
# Create OllamaClient instance using settings
|
||||||
|
ollama_client = OllamaClient(
|
||||||
|
model_name=settings.OLLAMA_MODEL,
|
||||||
|
base_url=settings.OLLAMA_API_URL
|
||||||
|
)
|
||||||
|
# Inject OllamaClient into DocumentService
|
||||||
|
self.document_service = DocumentService(ollama_client=ollama_client)
|
||||||
|
|
||||||
|
def process_new_file(self, file_path: str) -> None:
|
||||||
|
try:
|
||||||
|
# Get the filename without directory path
|
||||||
|
filename = os.path.basename(file_path)
|
||||||
|
# Create output path
|
||||||
|
output_path = os.path.join(self.output_directory, filename)
|
||||||
|
|
||||||
|
logger.info(f"Processing file: {filename}")
|
||||||
|
# Process the document using document service
|
||||||
|
self.document_service.process_document(file_path, output_path)
|
||||||
|
logger.info(f"File processed successfully: {filename}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error processing file {file_path}: {str(e)}")
|
||||||
|
|
||||||
|
def start_monitoring(self):
|
||||||
|
import time
|
||||||
|
|
||||||
|
# Ensure output directory exists
|
||||||
|
os.makedirs(self.output_directory, exist_ok=True)
|
||||||
|
|
||||||
|
already_seen = set(os.listdir(self.input_directory))
|
||||||
|
while True:
|
||||||
|
time.sleep(1) # Check every second
|
||||||
|
current_files = set(os.listdir(self.input_directory))
|
||||||
|
new_files = current_files - already_seen
|
||||||
|
|
||||||
|
for new_file in new_files:
|
||||||
|
file_path = os.path.join(self.input_directory, new_file)
|
||||||
|
logger.info(f"New file found: {new_file}")
|
||||||
|
self.process_new_file(file_path)
|
||||||
|
|
||||||
|
already_seen = current_files
|
||||||
|
|
@ -0,0 +1,91 @@
|
||||||
|
import requests
|
||||||
|
import logging
|
||||||
|
from typing import Dict, Any
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class OllamaClient:
|
||||||
|
def __init__(self, model_name: str, base_url: str = "http://localhost:11434"):
|
||||||
|
"""Initialize Ollama client.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
model_name (str): Name of the Ollama model to use
|
||||||
|
host (str): Ollama server host address
|
||||||
|
port (int): Ollama server port
|
||||||
|
"""
|
||||||
|
self.model_name = model_name
|
||||||
|
self.base_url = base_url
|
||||||
|
self.headers = {"Content-Type": "application/json"}
|
||||||
|
|
||||||
|
def generate(self, prompt: str, strip_think: bool = True) -> str:
|
||||||
|
"""Process a document using the Ollama API.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
document_text (str): The text content to process
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: Processed text response from the model
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
RequestException: If the API call fails
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
url = f"{self.base_url}/api/generate"
|
||||||
|
payload = {
|
||||||
|
"model": self.model_name,
|
||||||
|
"prompt": prompt,
|
||||||
|
"stream": False
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.debug(f"Sending request to Ollama API: {url}")
|
||||||
|
response = requests.post(url, json=payload, headers=self.headers)
|
||||||
|
response.raise_for_status()
|
||||||
|
|
||||||
|
result = response.json()
|
||||||
|
logger.debug(f"Received response from Ollama API: {result}")
|
||||||
|
if strip_think:
|
||||||
|
# Remove the "thinking" part from the response
|
||||||
|
# the response is expected to be <think>...</think>response_text
|
||||||
|
# Check if the response contains <think> tag
|
||||||
|
if "<think>" in result.get("response", ""):
|
||||||
|
# Split the response and take the part after </think>
|
||||||
|
response_parts = result["response"].split("</think>")
|
||||||
|
if len(response_parts) > 1:
|
||||||
|
# Return the part after </think>
|
||||||
|
return response_parts[1].strip()
|
||||||
|
else:
|
||||||
|
# If no closing tag, return the full response
|
||||||
|
return result.get("response", "").strip()
|
||||||
|
else:
|
||||||
|
# If no <think> tag, return the full response
|
||||||
|
return result.get("response", "").strip()
|
||||||
|
else:
|
||||||
|
# If strip_think is False, return the full response
|
||||||
|
return result.get("response", "")
|
||||||
|
|
||||||
|
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
logger.error(f"Error calling Ollama API: {str(e)}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def get_model_info(self) -> Dict[str, Any]:
|
||||||
|
"""Get information about the current model.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict[str, Any]: Model information
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
RequestException: If the API call fails
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
url = f"{self.base_url}/api/show"
|
||||||
|
payload = {"name": self.model_name}
|
||||||
|
|
||||||
|
response = requests.post(url, json=payload, headers=self.headers)
|
||||||
|
response.raise_for_status()
|
||||||
|
|
||||||
|
return response.json()
|
||||||
|
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
logger.error(f"Error getting model info: {str(e)}")
|
||||||
|
raise
|
||||||
|
|
@ -0,0 +1,20 @@
|
||||||
|
def read_file(file_path):
|
||||||
|
with open(file_path, 'r') as file:
|
||||||
|
return file.read()
|
||||||
|
|
||||||
|
def write_file(file_path, content):
|
||||||
|
with open(file_path, 'w') as file:
|
||||||
|
file.write(content)
|
||||||
|
|
||||||
|
def file_exists(file_path):
|
||||||
|
import os
|
||||||
|
return os.path.isfile(file_path)
|
||||||
|
|
||||||
|
def delete_file(file_path):
|
||||||
|
import os
|
||||||
|
if file_exists(file_path):
|
||||||
|
os.remove(file_path)
|
||||||
|
|
||||||
|
def list_files_in_directory(directory_path):
|
||||||
|
import os
|
||||||
|
return [f for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f))]
|
||||||
|
|
@ -0,0 +1,141 @@
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
from typing import Any, Optional, Dict, TypeVar, Type
|
||||||
|
|
||||||
|
|
||||||
|
T = TypeVar('T')
|
||||||
|
|
||||||
|
class LLMJsonExtractor:
|
||||||
|
"""Utility class for extracting and parsing JSON from LLM outputs"""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def extract_json(text: str) -> Optional[str]:
|
||||||
|
"""
|
||||||
|
Extracts JSON string from text using regex pattern matching.
|
||||||
|
Handles both single and multiple JSON objects in text.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text (str): Raw text containing JSON
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Optional[str]: Extracted JSON string or None if no valid JSON found
|
||||||
|
"""
|
||||||
|
# Pattern to match JSON objects with balanced braces
|
||||||
|
pattern = r'{[^{}]*(?:{[^{}]*}[^{}]*)*}'
|
||||||
|
matches = re.findall(pattern, text)
|
||||||
|
|
||||||
|
if not matches:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Return the first valid JSON match
|
||||||
|
for match in matches:
|
||||||
|
try:
|
||||||
|
# Verify it's valid JSON
|
||||||
|
json.loads(match)
|
||||||
|
return match
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_json(text: str) -> Optional[Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Extracts and parses JSON from text into a Python dictionary.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text (str): Raw text containing JSON
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Optional[Dict[str, Any]]: Parsed JSON as dictionary or None if parsing fails
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
json_str = LLMJsonExtractor.extract_json(text)
|
||||||
|
if json_str:
|
||||||
|
return json.loads(json_str)
|
||||||
|
return None
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_to_dataclass(text: str, dataclass_type: Type[T]) -> Optional[T]:
|
||||||
|
"""
|
||||||
|
Extracts JSON and converts it to a specified dataclass type.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text (str): Raw text containing JSON
|
||||||
|
dataclass_type (Type[T]): Target dataclass type
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Optional[T]: Instance of specified dataclass or None if conversion fails
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
data = LLMJsonExtractor.parse_json(text)
|
||||||
|
if data:
|
||||||
|
return dataclass_type(**data)
|
||||||
|
return None
|
||||||
|
except (json.JSONDecodeError, TypeError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def parse_raw_json_str(text: str) -> Optional[Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Extracts and parses JSON from text into a Python dictionary.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text (str): Raw text containing JSON
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Optional[Dict[str, Any]]: Parsed JSON as dictionary or None if parsing fails
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
json_str = LLMJsonExtractor.extract_json_max(text)
|
||||||
|
if json_str:
|
||||||
|
return json.loads(json_str)
|
||||||
|
return None
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def extract_json_max(text: str) -> Optional[str]:
|
||||||
|
"""
|
||||||
|
Extracts the maximum valid JSON object from text using stack-based brace matching.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text (str): Raw text containing JSON
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Optional[str]: Maximum valid JSON object as string or None if no valid JSON found
|
||||||
|
"""
|
||||||
|
max_json = None
|
||||||
|
max_length = 0
|
||||||
|
|
||||||
|
# Iterate through each character as a potential start of JSON
|
||||||
|
for start in range(len(text)):
|
||||||
|
if text[start] != '{':
|
||||||
|
continue
|
||||||
|
|
||||||
|
stack = []
|
||||||
|
for end in range(start, len(text)):
|
||||||
|
if text[end] == '{':
|
||||||
|
stack.append(end)
|
||||||
|
elif text[end] == '}':
|
||||||
|
if not stack: # Unmatched closing brace
|
||||||
|
break
|
||||||
|
|
||||||
|
opening_pos = stack.pop()
|
||||||
|
|
||||||
|
# If stack is empty, we have a complete JSON object
|
||||||
|
if not stack:
|
||||||
|
json_candidate = text[opening_pos:end + 1]
|
||||||
|
try:
|
||||||
|
# Verify it's valid JSON
|
||||||
|
json.loads(json_candidate)
|
||||||
|
if len(json_candidate) > max_length:
|
||||||
|
max_length = len(json_candidate)
|
||||||
|
max_json = json_candidate
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
return max_json
|
||||||
|
|
||||||
Loading…
Reference in New Issue