128 lines
3.6 KiB
Python
128 lines
3.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Generate a formatted data analysis report from database statistics.
|
|
|
|
Usage:
|
|
python generate_report.py <table_name> [--output report.md]
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
|
|
def format_number(n: int | float) -> str:
|
|
"""Format large numbers with commas."""
|
|
if isinstance(n, float):
|
|
return f"{n:,.2f}"
|
|
return f"{n:,}"
|
|
|
|
|
|
def generate_report(table_name: str, stats: dict[str, Any]) -> str:
|
|
"""Generate a markdown report from table statistics."""
|
|
|
|
report = f"""# Data Analysis Report: {table_name}
|
|
|
|
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}
|
|
|
|
## Overview
|
|
|
|
- **Total Rows**: {format_number(stats.get('row_count', 0))}
|
|
- **Columns**: {len(stats.get('columns', []))}
|
|
- **Analysis Type**: Automated Data Profiling
|
|
|
|
## Column Summary
|
|
|
|
| Column | Type | Nulls | Unique | Notes |
|
|
|--------|------|-------|--------|-------|
|
|
"""
|
|
|
|
for col in stats.get('columns', []):
|
|
null_pct = col.get('null_pct', 0)
|
|
null_badge = "⚠️ " if null_pct > 50 else ""
|
|
unique_badge = "🎲 " if col.get('unique_ratio', 0) > 0.9 else ""
|
|
|
|
notes = []
|
|
if null_pct > 50:
|
|
notes.append("High nulls")
|
|
if col.get('unique_ratio', 0) > 0.9:
|
|
notes.append("Near-unique")
|
|
if col.get('is_date'):
|
|
notes.append("Date range: {} to {}".format(
|
|
col.get('min_date', '?'), col.get('max_date', '?')))
|
|
|
|
note_str = ", ".join(notes) if notes else "-"
|
|
|
|
report += f"| {col['name']} | {col['type']} | {null_pct:.1f}% {null_badge}| {format_number(col.get('unique', 0))} {unique_badge}| {note_str} |\n"
|
|
|
|
# Data quality section
|
|
report += "\n## Data Quality Assessment\n\n"
|
|
|
|
issues = []
|
|
warnings = []
|
|
|
|
for col in stats.get('columns', []):
|
|
if col.get('null_pct', 0) > 50:
|
|
issues.append(f"- **{col['name']}**: {col['null_pct']:.1f}% null values")
|
|
elif col.get('null_pct', 0) > 20:
|
|
warnings.append(f"- **{col['name']}**: {col['null_pct']:.1f}% null values")
|
|
|
|
if issues:
|
|
report += "### ⚠️ Issues Found\n\n"
|
|
report += "\n".join(issues) + "\n\n"
|
|
|
|
if warnings:
|
|
report += "### 📋 Warnings\n\n"
|
|
report += "\n".join(warnings) + "\n\n"
|
|
|
|
if not issues and not warnings:
|
|
report += "✅ No major data quality issues detected.\n\n"
|
|
|
|
# Recommendations
|
|
report += """## Recommendations
|
|
|
|
1. **Review high-null columns** for data collection issues
|
|
2. **Check date ranges** are within expected bounds
|
|
3. **Validate unique constraints** on ID columns
|
|
4. **Consider indexing** frequently queried columns
|
|
|
|
---
|
|
|
|
*Report generated by PostgreSQL Analyzer Skill*
|
|
"""
|
|
|
|
return report
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Generate data analysis report')
|
|
parser.add_argument('table_name', help='Table name analyzed')
|
|
parser.add_argument('--stats', help='JSON file with statistics', default='-')
|
|
parser.add_argument('--output', '-o', help='Output file', default='-')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Read stats
|
|
if args.stats == '-':
|
|
stats = json.load(sys.stdin)
|
|
else:
|
|
with open(args.stats) as f:
|
|
stats = json.load(f)
|
|
|
|
# Generate report
|
|
report = generate_report(args.table_name, stats)
|
|
|
|
# Output
|
|
if args.output == '-':
|
|
print(report)
|
|
else:
|
|
with open(args.output, 'w') as f:
|
|
f.write(report)
|
|
print(f"Report written to {args.output}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|