Initial commit: PostgreSQL Analyzer with MCP + Skills demo

This commit is contained in:
tigerenwork 2026-03-17 23:34:07 +08:00
commit df3c5b3296
13 changed files with 2662 additions and 0 deletions

21
.gitignore vendored Normal file
View File

@ -0,0 +1,21 @@
__pycache__/
*.pyc
*.pyo
*.pyd
.Python
*.so
*.egg
*.egg-info/
dist/
build/
.venv/
venv/
ENV/
.env
.idea/
.vscode/
*.swp
*.swo
*~
.DS_Store
*.log

285
HOW_IT_WORKS.md Normal file
View File

@ -0,0 +1,285 @@
# How Kimi Decides to Use Skills + MCP
## The Decision Flow
```
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ USER INPUT │
│ "What's the most expensive book?" │
└────────────────────────────────────────┬────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ STEP 1: SKILL TRIGGER EVALUATION │
│ ───────────────────────────────── │
│ │
│ Kimi checks ALL skill frontmatters (always loaded): │
│ │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ postgres-analyzer skill │ │
│ │ │ │
│ │ description: "PostgreSQL database analysis and querying. Use when the │ │
│ │ user needs to explore database schema, query data, analyze table stats..." │ │
│ │ │ │
│ │ ✅ MATCH! Keywords detected: │ │
│ │ • "book" → relates to database content │ │
│ │ • "expensive" → implies analysis/comparison │ │
│ │ • "most" → implies aggregation query (MAX) │ │
│ └─────────────────────────────────────────────────────────────────────────────┘ │
│ │
│ Other skills checked (no match): │
│ • docx-skill → "book" doesn't mean document │
│ • python-skill → not a coding question │
│ • git-skill → not related to version control │
└────────────────────────────────────────┬────────────────────────────────────────────┘
│ ✅ TRIGGERED: postgres-analyzer
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ STEP 2: SKILL BODY LOADED │
│ ───────────────────────── │
│ │
│ Now Kimi reads pg_analyzer_skill/SKILL.md for guidance: │
│ │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ From SKILL.md: │ │
│ │ │ │
│ │ ## When to Use │ │
│ │ "- Querying data with SQL" ← ✅ THIS APPLIES │ │
│ │ "- Analyzing table statistics" ← ✅ THIS APPLIES │ │
│ │ │ │
│ │ ## Available Tools │ │
│ │ "| execute_query | Run SELECT queries | Getting specific data |" │ │
│ │ ← ✅ USE THIS TOOL │ │
│ │ │ │
│ │ ## Query Patterns │ │
│ │ "Custom analysis: Use execute_query() with appropriate SQL" │ │
│ └─────────────────────────────────────────────────────────────────────────────┘ │
└────────────────────────────────────────┬────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ STEP 3: MCP TOOL SELECTION │
│ ──────────────────────────── │
│ │
│ Kimi sees available MCP tools from postgres-analyzer server: │
│ │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ Available Tools: │ │
│ │ │ │
│ │ 1. get_schema → "List tables and columns" │ │
│ │ ❌ Not needed - we know we need books table │ │
│ │ │ │
│ │ 2. execute_query → "Run SELECT queries" │ │
│ │ ✅ PERFECT! Can query products WHERE category='Books'│ │
│ │ │ │
│ │ 3. get_table_stats → "Table statistics + sample" │ │
│ │ ❌ Not needed - we need specific aggregation │ │
│ │ │ │
│ │ 4. analyze_column → "Deep column analysis" │ │
│ │ ❌ Not needed - we need comparison across rows │ │
│ └─────────────────────────────────────────────────────────────────────────────┘ │
│ │
│ ✅ SELECTED: execute_query │
└────────────────────────────────────────┬────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ STEP 4: QUERY GENERATION │
│ ───────────────────────── │
│ │
│ Kimi constructs SQL based on user intent + skill guidance: │
│ │
│ User wants: "most expensive book" │
│ Skill says: "Use execute_query with SELECT...WHERE category='Books'...ORDER BY" │
│ │
│ Generated SQL: │
│ ┌─────────────────────────────────────────────────────────────────────────────┐ │
│ │ SELECT │ │
│ │ p.name as book_name, │ │
│ │ p.price, │ │
│ │ COALESCE(SUM(oi.quantity), 0) as total_sold │ │
│ │ FROM products p │ │
│ │ LEFT JOIN order_items oi ON p.id = oi.product_id │ │
│ │ WHERE p.category = 'Books' │ │
│ │ GROUP BY p.id, p.name, p.price │ │
│ │ ORDER BY p.price DESC │ │
│ └─────────────────────────────────────────────────────────────────────────────┘ │
└────────────────────────────────────────┬────────────────────────────────────────────┘
│ MCP Protocol Call
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ STEP 5: EXECUTION │
│ ───────────────── │
│ │
│ MCP Server receives: │
│ { │
│ "tool": "execute_query", │
│ "arguments": { │
│ "query": "SELECT p.name, p.price...", │
│ "limit": 100 │
│ } │
│ } │
│ │
│ → PostgreSQL executes query │
│ → Returns results │
│ → MCP formats as markdown table │
│ → Returns to Kimi │
└────────────────────────────────────────┬────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ STEP 6: RESPONSE SYNTHESIS │
│ ────────────────────────── │
│ │
│ Raw data from MCP: │
│ | book_name | price | total_sold | │
│ | Novel Collection| 65.00 | 212 | │
│ | Science Fiction | 55.00 | 176 | │
│ | Python Book | 45.00 | 213 | │
│ │
│ Kimi synthesizes natural language response: │
│ │
│ "The most expensive book is the Novel Collection at $65.00. Interestingly, │
│ despite being the priciest, it sold 212 copies - nearly matching the cheaper │
│ Python Programming Book (213 copies) which costs $20 less." │
│ │
└─────────────────────────────────────────────────────────────────────────────────────┘
```
---
## Key Decision Points
### 1. Why Not a General Answer?
| Approach | Would Kimi Use It? | Why? |
|----------|-------------------|------|
| General knowledge ("Books usually cost $10-30") | ❌ No | User asked about THEIR database, not general market |
| Web search | ❌ No | Question is specific to user's private data |
| File search | ❌ No | No files mentioned, data is in database |
| **MCP + Skill** | ✅ Yes | Only way to access user's PostgreSQL database |
### 2. What If The Question Was Different?
| User Question | Skill Triggered | Tool Used | Reason |
|---------------|-----------------|-----------|--------|
| "What's in my database?" | postgres-analyzer | `get_schema` | Discovery |
| "Analyze the users table" | postgres-analyzer | `get_table_stats` | Overview |
| "Tell me about the email column" | postgres-analyzer | `analyze_column` | Deep dive |
| "Show top customers" | postgres-analyzer | `execute_query` | Custom analysis |
| "Write a Python script" | python-skill | N/A (no MCP) | Different domain |
| "Fix this git issue" | git-skill | N/A (no MCP) | Different domain |
---
## How Triggers Work
### Frontmatter Matching (Always Active)
```yaml
---
name: postgres-analyzer
description: PostgreSQL database analysis and querying.
Use when the user needs to:
1. Explore database schema
2. Query data
3. Analyze table statistics
4. Get insights from PostgreSQL
Requires PG_CONNECTION_STRING environment variable.
---
```
Kimi evaluates:
- **Keywords**: "database", "table", "query", "SQL", "analyze", "expensive" (implies comparison)
- **Context**: "book" in context of data implies database content, not a document
- **Intent**: "what's the most" implies aggregation query (MAX/ORDER BY)
### Skill Body (Loaded After Trigger)
The SKILL.md provides:
1. **Workflow guidance** → "Use execute_query for specific data"
2. **Tool selection** → "execute_query: Run SELECT queries"
3. **SQL patterns** → "Use WHERE, GROUP BY, ORDER BY for analysis"
---
## Why This Is Powerful
### Without MCP + Skills
```
User: "What's the most expensive book?"
Kimi: "I don't have access to your database.
Please provide the data or export it to a file."
```
### With MCP + Skills
```
User: "What's the most expensive book?"
Kimi: [Connects via MCP] → [Queries database] → [Analyzes results]
"The most expensive book is Novel Collection at $65.
It sold 212 copies, nearly matching the cheaper Python
book at 213 copies - showing strong demand despite
the premium price."
```
---
## Debug: How to See What's Happening
### 1. Check MCP Connection
```bash
kimi
# In Kimi shell:
/mcp
# Shows:
# Connected MCP servers:
# postgres
# Tools: get_schema, execute_query, get_table_stats, analyze_column
```
### 2. Check Skill Trigger
```bash
# In Kimi shell, ask with verbose:
# (Kimi will show thinking process)
User: What's the most expensive book?
[Thinking: User asking about "book" - checking skills...]
[Thinking: postgres-analyzer skill matches (database content)]
[Thinking: Loading postgres-analyzer skill...]
[Thinking: User wants MAX(price) WHERE category='Books']
[Thinking: execute_query tool is appropriate]
[Thinking: Generating SQL: SELECT name, MAX(price)...]
```
### 3. Test Tool Directly
You can force a specific tool:
```
User: Use the execute_query tool to find the most expensive book
```
---
## Summary
| Component | Role | When Loaded |
|-----------|------|-------------|
| **Skill Frontmatter** | Trigger detection | Always (metadata only) |
| **Skill Body** | Usage guidance | Only when triggered |
| **MCP Tools** | Execution capability | When MCP server connected |
The magic happens when:
1. **Frontmatter** matches user intent → Triggers skill
2. **Skill body** guides tool selection → Chooses MCP tool
3. **MCP server** executes safely → Returns structured data
4. **Kimi synthesizes** → Natural language response with insights

284
NANOBOT_SETUP.md Normal file
View File

@ -0,0 +1,284 @@
# Using PostgreSQL Analyzer with Nanobot
## Compatibility
| Component | Works with Nanobot? | Notes |
|-----------|---------------------|-------|
| **MCP Server** (`pg_mcp_server/server.py`) | ✅ **YES** | Nanobot fully supports MCP servers (added in v0.1.4) |
| **Skill** (`pg_analyzer_skill/SKILL.md`) | ❌ **NO** | Nanobot has its own skill system (different from Kimi CLI) |
## What is Nanobot?
[Nanobot](https://nanobot.ai/) is an ultra-lightweight (~4,000 lines of Python) AI agent framework and OpenClaw alternative. It's built entirely on the **Model Context Protocol (MCP)**.
**Key differences from Kimi Code CLI:**
- Kimi CLI: Uses skills (SKILL.md) + MCP
- Nanobot: Uses MCP natively + its own Python-based skill system
## Installation
```bash
# Via Homebrew (macOS/Linux)
brew install nanobot-ai/tap/nanobot
# Or via pip
pip install nanobot-ai
# Or via uv
uv tool install nanobot-ai
```
## Configuration
### 1. Create `nanobot.yaml` Config File
```yaml
# nanobot.yaml
agents:
postgres-analyst:
name: PostgreSQL Data Analyst
description: Analyzes PostgreSQL databases and answers data questions
model: openrouter/gpt-4o # or any model you prefer
# MCP servers this agent can use
mcpServers:
- postgres
# System prompt (replaces SKILL.md functionality)
systemPrompt: |
You are a PostgreSQL data analyst. You help users explore their database
and extract insights using SQL queries.
When the user asks about data:
1. Use the postgres MCP tools to query the database
2. Available tools: get_schema, execute_query, get_table_stats, analyze_column
3. Always start with get_schema if user asks about "database" or "tables"
4. For specific questions, use execute_query with appropriate SQL
5. Present results clearly with insights
Safety: Only SELECT queries are allowed. The MCP server enforces read-only.
# MCP server definitions
mcpServers:
postgres:
# stdio transport (local process)
transport: stdio
command: python3
args:
- /absolute/path/to/pg_mcp_server/server.py
env:
PG_CONNECTION_STRING: "postgresql://user:pass@localhost:5432/db"
# Alternative: If you wrap it as an HTTP server
# transport: http
# url: http://localhost:3000/mcp
```
### 2. Project Structure for Nanobot
```
pg_analyzer_demo/
├── pg_mcp_server/ # MCP Server (✅ USE WITH NANOBOT)
│ ├── server.py
│ └── requirements.txt
├── pg_analyzer_skill/ # Skill (❌ NOT COMPATIBLE - Kimi CLI only)
│ └── SKILL.md
├── nanobot.yaml # ✅ NEW: Nanobot configuration
└── nanobot_skill.py # ✅ NEW: Nanobot Python skill (optional)
```
### 3. Run Nanobot
```bash
# Start the agent
nanobot run ./nanobot.yaml
# Or use the agent CLI
nanobot agent postgres-analyst
```
## Alternative: Python Skill for Nanobot
Instead of relying on the system prompt, you can create a proper Nanobot skill:
```python
# postgres_skill.py
from nanobot import skill, Context
@skill(name="postgres-analyzer")
class PostgresAnalyzerSkill:
"""PostgreSQL database analysis skill for Nanobot."""
@skill.intent("analyze database")
async def analyze_database(self, ctx: Context):
"""When user wants to analyze their database."""
# This skill can call MCP tools via ctx.mcp
schema = await ctx.mcp.postgres.get_schema()
return f"Database has these tables:\n{schema}"
@skill.intent("expensive book")
async def expensive_book(self, ctx: Context):
"""When user asks about expensive books."""
result = await ctx.mcp.postgres.execute_query(
query="""
SELECT name, price
FROM products
WHERE category = 'Books'
ORDER BY price DESC
LIMIT 1
"""
)
return f"The most expensive book is: {result}"
```
Then register in `nanobot.yaml`:
```yaml
agents:
postgres-analyst:
# ... other config ...
skills:
- postgres_skill.py
```
## Complete Working Example
### nanobot.yaml
```yaml
agents:
data-analyst:
name: Data Analyst
model: anthropic/claude-3-5-sonnet
mcpServers:
- postgres
systemPrompt: |
You are a helpful data analyst with access to a PostgreSQL database.
GUIDELINES:
- Use get_schema() to explore database structure
- Use execute_query() for custom SQL
- Use get_table_stats() for table overviews
- Use analyze_column() for column details
ANALYSIS WORKFLOW:
1. Discovery: get_schema() to see tables
2. Deep dive: get_table_stats() for specific tables
3. Investigation: analyze_column() or execute_query()
4. Insights: Synthesize findings with context
Always explain your reasoning and show the SQL used.
mcpServers:
postgres:
transport: stdio
command: python3
args:
- /Users/tigeren/Dev/agent_demo/pg_analyzer_demo/pg_mcp_server/server.py
env:
PG_CONNECTION_STRING: "postgresql://postgres:demo@localhost:5432/shop"
```
### Usage
```bash
# Start nanobot with this config
nanobot run ./nanobot.yaml
# Then in the chat:
User: What's the most expensive book?
Nanobot: [Uses MCP tool execute_query]
Result: The most expensive book is "Novel Collection" at $65.00.
It has sold 212 copies, generating $28,626 in revenue.
```
## Comparison: Kimi CLI vs Nanobot
| Feature | Kimi Code CLI | Nanobot |
|---------|--------------|---------|
| **MCP Support** | ✅ Yes | ✅ Yes (native) |
| **Skill System** | SKILL.md (markdown) | Python decorators |
| **Codebase** | ~中型 | ~4,000 lines |
| **Memory** | ~200MB | ~45MB |
| **Startup** | ~2-3s | ~0.8s |
| **Transport** | stdio, http | stdio, http, sse |
| **Platform** | CLI | CLI + Web UI |
## Migration Guide: Kimi Skill → Nanobot
### Kimi Skill (SKILL.md)
```markdown
---
name: postgres-analyzer
description: PostgreSQL analysis...
---
## Available Tools
| Tool | Purpose |
|------|---------|
| execute_query | Run SQL |
## Workflow
1. get_schema()
2. execute_query()
```
### Nanobot Equivalent
```python
# postgres_skill.py
from nanobot import skill, Context
@skill(name="postgres-analyzer",
description="PostgreSQL analysis and querying")
class PostgresSkill:
@skill.tool_usage("execute_query")
async def query_data(self, ctx: Context, query: str):
"""Run SQL queries."""
return await ctx.mcp.postgres.execute_query(query=query)
@skill.workflow("analyze_database")
async def analyze(self, ctx: Context):
"""Analysis workflow."""
# Step 1: Schema
schema = await ctx.mcp.postgres.get_schema()
# Step 2: Stats
# ... etc
```
## Troubleshooting
### MCP Server Not Found
```bash
# Use absolute path in nanobot.yaml
args:
- /absolute/path/to/pg_mcp_server/server.py
```
### Environment Variables Not Passed
```yaml
mcpServers:
postgres:
transport: stdio
command: python3
args: [server.py]
env:
PG_CONNECTION_STRING: "..." # Must be explicit
```
### Connection Issues
```bash
# Test MCP server manually first
export PG_CONNECTION_STRING="..."
python3 pg_mcp_server/server.py
# In another terminal, test with mcp CLI
mcp test postgres
```
## References
- [Nanobot Documentation](https://nanobot.ai/docs)
- [Nanobot GitHub](https://github.com/hkuds/nanobot)
- [MCP Specification](https://modelcontextprotocol.io/)

386
NANOBOT_SKILL_SETUP.md Normal file
View File

@ -0,0 +1,386 @@
# PostgreSQL Analyzer Skill for Nanobot
## Correction: Nanobot DOES Support Skills!
From the [Nanobot GitHub repo](https://github.com/HKUDS/nanobot):
```
nanobot/
├── agent/
│ ├── skills.py # 🎯 Skills loader
│ └── ...
├── skills/ # 🎯 Bundled skills (github, weather, tmux...)
│ └── ...
```
Nanobot has its own skill system that's **different from Kimi CLI's SKILL.md format**:
- **Kimi CLI**: Markdown-based (`SKILL.md`)
- **Nanobot**: Python-based skills + ClawHub integration
---
## Nanobot Skill System Overview
### 1. Built-in Skills
Nanobot comes with bundled skills in the `skills/` directory:
- `github` - GitHub operations
- `weather` - Weather queries
- `tmux` - Terminal multiplexer
- And more...
### 2. ClawHub Skills
Nanobot can search and install skills from **ClawHub** (similar to OpenClaw's skill ecosystem):
```bash
# Search for skills
nanobot skill search postgres
# Install a skill
nanobot skill install postgres-analyzer
```
### 3. Custom Python Skills
You can create custom skills using Python (more powerful than markdown).
---
## Option 1: MCP Server + System Prompt (Simplest)
This is what I showed earlier - use the MCP server with a system prompt:
```json
// ~/.nanobot/config.json
{
"agents": {
"defaults": {
"model": "anthropic/claude-sonnet-4",
"systemPrompt": "You are a PostgreSQL analyst. Use MCP tools..."
}
},
"tools": {
"mcpServers": {
"postgres": {
"command": "python3",
"args": ["/path/to/pg_mcp_server/server.py"],
"env": {
"PG_CONNECTION_STRING": "postgresql://..."
}
}
}
}
}
```
---
## Option 2: Create a Proper Nanobot Skill (Recommended)
Create a Python skill file that Nanobot can load:
### File: `~/.nanobot/skills/postgres_analyzer/skill.py`
```python
"""
PostgreSQL Analyzer Skill for Nanobot
Provides intelligent database analysis capabilities
"""
from nanobot.skills import Skill, intent, tool
from nanobot.agent.context import Context
class PostgresAnalyzerSkill(Skill):
"""
Analyze PostgreSQL databases and generate insights.
"""
name = "postgres-analyzer"
description = "PostgreSQL database analysis and querying"
@intent("explore database")
@intent("show tables")
@intent("what's in my database")
async def explore_database(self, ctx: Context):
"""
When user wants to explore database structure.
Triggered by: 'explore database', 'show tables', etc.
"""
# Call MCP tool via ctx.tools.mcp.postgres
schema = await ctx.tools.mcp.postgres.get_schema()
return {
"type": "text",
"content": f"📊 Database Schema:\n\n{schema}"
}
@intent("analyze table")
@intent("tell me about table")
async def analyze_table(self, ctx: Context, table_name: str = None):
"""
When user wants to analyze a specific table.
Triggered by: 'analyze the orders table'
"""
if not table_name:
# Try to extract from context or ask
return "Which table would you like me to analyze?"
stats = await ctx.tools.mcp.postgres.get_table_stats(
table_name=table_name,
sample_size=5
)
return {
"type": "text",
"content": f"📈 Analysis of '{table_name}':\n\n{stats}"
}
@intent("expensive book")
@intent("cheapest book")
@intent("best seller")
async def book_analysis(self, ctx: Context):
"""
When user asks about book prices or sales.
"""
result = await ctx.tools.mcp.postgres.execute_query(
query="""
SELECT
p.name,
p.price,
SUM(oi.quantity) as sold,
SUM(oi.quantity * oi.unit_price) as revenue
FROM products p
LEFT JOIN order_items oi ON p.id = oi.product_id
WHERE p.category = 'Books'
GROUP BY p.id, p.name, p.price
ORDER BY p.price DESC
""",
limit=10
)
return {
"type": "text",
"content": f"📚 Book Analysis:\n\n{result}"
}
@intent("run query")
@intent("execute sql")
async def custom_query(self, ctx: Context, query: str = None):
"""
When user wants to run a custom SQL query.
"""
if not query:
return "What SQL query would you like to run?"
# Safety check - ensure it's read-only
query_lower = query.lower()
forbidden = ['insert', 'update', 'delete', 'drop', 'create', 'alter']
if any(word in query_lower for word in forbidden):
return "⚠️ For safety, only SELECT queries are allowed."
result = await ctx.tools.mcp.postgres.execute_query(
query=query,
limit=100
)
return {
"type": "text",
"content": f"📝 Query Results:\n\n{result}"
}
@intent("price analysis")
@intent("pricing strategy")
async def pricing_analysis(self, ctx: Context):
"""
When user wants pricing insights.
"""
analysis = await ctx.tools.mcp.postgres.execute_query(
query="""
SELECT
p.category,
COUNT(*) as products,
MIN(p.price) as min_price,
MAX(p.price) as max_price,
AVG(p.price)::numeric(10,2) as avg_price,
SUM(oi.quantity) as units_sold
FROM products p
LEFT JOIN order_items oi ON p.id = oi.product_id
GROUP BY p.category
ORDER BY units_sold DESC
""",
limit=20
)
return {
"type": "text",
"content": f"💰 Pricing Analysis by Category:\n\n{analysis}"
}
# Export the skill
skill = PostgresAnalyzerSkill()
```
### File: `~/.nanobot/skills/postgres_analyzer/skill.json`
```json
{
"name": "postgres-analyzer",
"version": "1.0.0",
"description": "PostgreSQL database analysis and querying",
"author": "Your Name",
"entry": "skill.py",
"intents": [
"explore database",
"analyze table",
"expensive book",
"run query",
"price analysis"
],
"mcpServers": ["postgres"]
}
```
### Register in Config
```json
// ~/.nanobot/config.json
{
"agents": {
"defaults": {
"skills": ["postgres_analyzer"]
}
},
"tools": {
"mcpServers": {
"postgres": {
"command": "python3",
"args": ["/path/to/pg_mcp_server/server.py"],
"env": {
"PG_CONNECTION_STRING": "postgresql://..."
}
}
}
}
}
```
---
## Option 3: ClawHub-Compatible Skill (For Sharing)
To make your skill installable via `nanobot skill install`, create this structure:
```
postgres-analyzer-skill/
├── skill.md # Skill documentation
├── skill.py # Main skill code
├── config.json # Default config
└── README.md
```
### skill.md
```markdown
---
name: postgres-analyzer
version: 1.0.0
description: Analyze PostgreSQL databases and extract insights
author: your-github-username
tags: [database, postgres, sql, analytics]
---
# PostgreSQL Analyzer
This skill helps you analyze PostgreSQL databases and generate insights.
## Requirements
- PostgreSQL database
- PG_CONNECTION_STRING environment variable
## Features
- Explore database schema
- Run SQL queries
- Analyze table statistics
- Generate pricing insights
## Usage
Simply ask:
- "Show me all tables"
- "What's the most expensive book?"
- "Analyze the orders table"
- "Run: SELECT * FROM users LIMIT 10"
```
---
## How Nanobot Decides to Use Skills
```
User: "What's the most expensive book?"
┌─────────────────────────────┐
│ 1. Intent Recognition │
│ ───────────────── │
│ Matches against skill │
@intent decorators: │
│ • "expensive book" ✅ │
│ • "analyze table" │
│ • "run query" │
└─────────────┬───────────────┘
┌─────────────────────────────┐
│ 2. Skill Method Called │
│ ───────────────────── │
│ PostgresAnalyzerSkill │
│ .book_analysis() │
└─────────────┬───────────────┘
┌─────────────────────────────┐
│ 3. MCP Tool Execution │
│ ───────────────────── │
│ ctx.tools.mcp.postgres │
│ .execute_query(...) │
└─────────────┬───────────────┘
┌─────────────────────────────┐
│ 4. Result Synthesis │
│ ───────────────── │
│ Return formatted response │
└─────────────────────────────┘
```
---
## Comparison: Kimi CLI vs Nanobot Skills
| Aspect | Kimi CLI | Nanobot |
|--------|----------|---------|
| **Format** | `SKILL.md` (markdown) | Python code |
| **Trigger** | Frontmatter description matching | `@intent()` decorators |
| **Logic** | LLM decides based on instructions | Python code + LLM hybrid |
| **MCP Usage** | Via tool descriptions | Via `ctx.tools.mcp` |
| **Flexibility** | Text-based guidance | Code-based, programmatic |
| **Installation** | Copy to `~/.config/agents/skills/` | `nanobot skill install` or copy to `~/.nanobot/skills/` |
---
## Recommended Approach
For your PostgreSQL analyzer:
1. **Keep the MCP server** (`pg_mcp_server/server.py`) - this is **portable** across Kimi CLI, Nanobot, Claude Desktop, etc.
2. **Choose skill approach based on your needs**:
- **Quick setup**: Use system prompt (Option 1)
- **More control**: Create Python skill (Option 2)
- **Share with community**: ClawHub format (Option 3)
3. **The MCP server is the reusable part** - skills are the interface layer that differs between platforms.

404
README.md Normal file
View File

@ -0,0 +1,404 @@
# PostgreSQL Analyzer - Practical MCP + Skills Demo
A production-ready database analysis tool using **MCP (Model Context Protocol)** and **Skills**. Connect to any PostgreSQL database, explore schemas, query data, and generate insights.
## What This Does
```
┌────────────────────────────────────────────────────────────────────┐
│ User Request │
│ "Analyze my orders table and find revenue trends" │
└────────────────────────────────────┬───────────────────────────────┘
┌────────────────────────────────▼─────────────────────────────┐
│ Skill: postgres-analyzer │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Triggers on: database, analysis, query, insights │ │
│ │ │ │
│ │ Workflow: │ │
│ │ 1. get_schema() → Understand table structure │ │
│ │ 2. get_table_stats() → Get row counts, samples │ │
│ │ 3. execute_query() → Run revenue analysis SQL │ │
│ │ 4. analyze_column() → Check date ranges, distributions │ │
│ │ 5. Synthesize → Generate insights report │ │
│ └─────────────────────────────────────────────────────────┘ │
└────────────────────────────────┬─────────────────────────────┘
│ MCP Protocol
┌────────────────────────────────▼─────────────────────────────┐
│ MCP Server: postgres-analyzer │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Tools: │ │
│ │ • get_schema() - List tables/columns │ │
│ │ • execute_query() - Run SELECT queries │ │
│ │ • get_table_stats() - Stats + sample data │ │
│ │ • analyze_column() - Deep column analysis │ │
│ │ │ │
│ │ Safety: Read-only, query limits, injection protection │ │
│ └─────────────────────────────────────────────────────────┘ │
└────────────────────────────────┬─────────────────────────────┘
│ psycopg2
┌────────────────────────────────▼─────────────────────────────┐
│ PostgreSQL Database │
│ (Any accessible PostgreSQL instance) │
└──────────────────────────────────────────────────────────────┘
```
## Quick Start
### 1. Install Dependencies
```bash
cd pg_analyzer_demo
pip install -r pg_mcp_server/requirements.txt
```
### 2. Set Database Connection
```bash
export PG_CONNECTION_STRING="postgresql://user:password@host:port/database"
# Examples:
# Local database:
export PG_CONNECTION_STRING="postgresql://postgres:secret@localhost:5432/myapp"
# Supabase:
export PG_CONNECTION_STRING="postgresql://postgres.xxxx:password@aws-0-region.pooler.supabase.com:5432/postgres"
# Railway/Render:
export PG_CONNECTION_STRING="postgresql://user:pass@host.render.com:5432/dbname"
```
### 3. Test with Demo Client
```bash
python demo.py
```
This interactively guides you through:
- Schema discovery
- Table analysis
- Custom queries
- Column deep-dives
## Components
### MCP Server (`pg_mcp_server/server.py`)
Exposes 4 tools for safe database access:
| Tool | Parameters | Returns |
|------|------------|---------|
| `get_schema` | `table_name` (optional) | All tables or specific table schema |
| `execute_query` | `query`, `limit` | Query results as markdown table |
| `get_table_stats` | `table_name`, `sample_size` | Row count, column stats, sample rows |
| `analyze_column` | `table_name`, `column_name` | Distribution, nulls, top values |
**Safety Features:**
- Read-only: Rejects INSERT/UPDATE/DELETE/DROP/CREATE
- Query limits: Auto-limits to 100 rows (max 1000)
- Connection pooling: Proper cleanup
- SQL injection protection: Uses parameterized queries
### Skill (`pg_analyzer_skill/SKILL.md`)
Teaches the AI:
1. **When to use**: Database questions, analysis needs
2. **Workflow**: Discovery → Deep Dive → Insights
3. **SQL Patterns**: Common analysis queries
4. **Safety Rules**: Read-only, performance, PII warnings
5. **Output Format**: Structured insights with recommendations
### Helper Script (`pg_analyzer_skill/scripts/generate_report.py`)
Generates formatted markdown reports from analysis results.
## Using with Kimi Code CLI
### Setup
**1. Configure MCP Server**
Add to `~/.kimi/mcp.json`:
```json
{
"mcpServers": {
"postgres": {
"command": "python3",
"args": ["/absolute/path/to/pg_mcp_server/server.py"],
"env": {
"PG_CONNECTION_STRING": "postgresql://user:pass@host:5432/db"
}
}
}
}
```
**2. Install Skill**
```bash
mkdir -p ~/.config/agents/skills/
cp -r pg_analyzer_skill ~/.config/agents/skills/
```
Or use a local skill path:
```bash
kimi --skills-dir ./pg_analyzer_skill
```
**3. Start Kimi**
```bash
kimi
```
### Example Sessions
#### Schema Exploration
```
User: What tables are in my database?
→ get_schema()
← Shows all tables with column counts
Kimi: Your database has 4 tables:
• users (8 columns) - User accounts
• orders (10 columns) - Purchase orders
• products (6 columns) - Product catalog
• order_items (5 columns) - Line items
```
#### Data Analysis
```
User: Analyze the orders table
→ get_table_stats("orders", sample_size=5)
← Row count: 15,420 | Columns: 10 | Sample data...
→ analyze_column("orders", "total_amount")
← Min: $12.50 | Max: $2,450.00 | Avg: $187.30
→ analyze_column("orders", "status")
← completed: 9,252 (60%) | pending: 4,626 (30%) | cancelled: 1,542 (10%)
→ execute_query("SELECT DATE_TRUNC('month', created_at) as month,
SUM(total_amount) as revenue
FROM orders WHERE status = 'completed'
GROUP BY 1 ORDER BY 1 LIMIT 12")
← Monthly revenue trend data
Kimi: ## Orders Table Analysis
**Overview**: 15,420 orders with healthy volume
**Key Findings**:
- Average order value: $187.30 (healthy for retail)
- 10% cancellation rate - investigate why
- Revenue trending up 15% MoM
**Recommendations**:
1. Review cancellation process - rate is above industry avg (5-7%)
2. Promote products in $150-250 range (sweet spot)
3. Follow up on pending orders (30% of total)
```
#### Custom Queries
```
User: Who are my top 10 customers by revenue?
→ execute_query("""
SELECT u.email, SUM(o.total_amount) as total
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE o.status = 'completed'
GROUP BY u.id, u.email
ORDER BY total DESC
LIMIT 10
""")
Kimi: Here are your top 10 customers:
| Email | Total Revenue |
|-------|---------------|
| john@example.com | $12,450 |
| sarah@example.com | $11,230 |
...
```
## Real-World Use Cases
### 1. Data Quality Audit
```
User: Check data quality in the users table
Kimi runs:
1. get_table_stats("users") - Overview
2. analyze_column("users", "email") - Check for nulls, duplicates
3. analyze_column("users", "created_at") - Date range validation
4. execute_query("SELECT COUNT(*) FROM users WHERE email NOT LIKE '%@%'")
Output: Data quality report with issues and recommendations
```
### 2. Business Metrics Dashboard
```
User: Give me a business overview
Kimi analyzes:
- User growth (signups by month)
- Revenue trends (completed orders)
- Product performance (top sellers)
- Churn indicators (inactive users)
Output: Executive summary with charts (as markdown tables)
```
### 3. Anomaly Detection
```
User: Find any unusual patterns in orders
Kimi checks:
- Orders with extreme amounts (outliers)
- Sudden spikes in cancellations
- Unusual time patterns (3am orders)
- Duplicate transactions
Output: Anomaly report with investigation queries
```
## Configuration Reference
### Environment Variables
| Variable | Required | Description |
|----------|----------|-------------|
| `PG_CONNECTION_STRING` | Yes | PostgreSQL connection URI |
| `PG_POOL_SIZE` | No | Connection pool size (default: 5) |
| `PG_QUERY_TIMEOUT` | No | Query timeout in seconds (default: 30) |
### Connection String Format
```
postgresql://[user[:password]@][host][:port][/dbname][?param1=value1&...]
Examples:
postgresql://localhost/mydb
postgresql://user:secret@localhost:5432/mydb?sslmode=require
postgresql://user:pass@host.supabase.co:5432/postgres?sslmode=require
```
## Security Considerations
### MCP Server Safety
1. **Read-Only Enforcement**: Only SELECT queries allowed
2. **Query Limits**: Max 1000 rows returned
3. **No DDL**: CREATE/ALTER/DROP rejected
4. **Connection Isolation**: Per-request connections
### Best Practices
- Use read-only database users
- Enable SSL for remote connections
- Monitor query logs
- Set appropriate query timeouts
## Extending
### Adding New Tools
Edit `pg_mcp_server/server.py`:
```python
Tool(
name="get_slow_queries",
description="Find slow running queries from pg_stat_statements",
inputSchema={
"type": "object",
"properties": {
"limit": {"type": "integer", "default": 10}
}
},
)
```
### Adding Analysis Patterns
Edit `pg_analyzer_skill/SKILL.md`:
```markdown
### Cohort Analysis
```sql
SELECT
DATE_TRUNC('month', first_order) as cohort,
COUNT(*) as users
FROM (
SELECT user_id, MIN(created_at) as first_order
FROM orders GROUP BY user_id
) first_orders
GROUP BY 1
```
```
## Troubleshooting
### Connection Issues
```bash
# Test connection manually
psql "$PG_CONNECTION_STRING" -c "SELECT 1"
# Check server is running
pg_isready -h localhost -p 5432
```
### Permission Errors
Create a read-only user:
```sql
CREATE USER analyst WITH PASSWORD 'safe_password';
GRANT CONNECT ON DATABASE mydb TO analyst;
GRANT USAGE ON SCHEMA public TO analyst;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO analyst;
```
### Performance
For large tables, add WHERE clauses:
```sql
-- Good: Limited time range
SELECT * FROM orders WHERE created_at > NOW() - INTERVAL '30 days'
-- Bad: Full table scan
SELECT * FROM orders
```
## Comparison: MCP vs Direct Connection
| Aspect | MCP + Skills | Direct SQL |
|--------|--------------|------------|
| **Safety** | ✅ Read-only enforced | ⚠️ User responsibility |
| **Guidance** | ✅ AI knows analysis patterns | ❌ Manual SQL writing |
| **Insights** | ✅ Automatic synthesis | ❌ Raw data only |
| **Reusability** | ✅ Skill applies to any DB | ❌ Custom each time |
| **Setup** | ⚠️ Requires configuration | ✅ Direct access |
## Resources
- [MCP Documentation](https://modelcontextprotocol.io/)
- [PostgreSQL Docs](https://www.postgresql.org/docs/)
- [psycopg2 Guide](https://www.psycopg.org/docs/)
## License
MIT - Use this as a foundation for your own database analysis tools!

93
bestseller.py Normal file
View File

@ -0,0 +1,93 @@
#!/usr/bin/env python3
"""Find best selling books."""
import asyncio
import os
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def main():
server_params = StdioServerParameters(
command="python3",
args=["pg_mcp_server/server.py"],
env=os.environ.copy(),
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
print("=" * 70)
print("📚 BEST SELLING BOOKS ANALYSIS")
print("=" * 70)
# Query: Best sellers by quantity sold
query_qty = """
SELECT
p.name as book_name,
p.price,
SUM(oi.quantity) as total_sold,
COUNT(DISTINCT oi.order_id) as orders,
SUM(oi.quantity * oi.unit_price) as total_revenue
FROM products p
JOIN order_items oi ON p.id = oi.product_id
WHERE p.category = 'Books'
GROUP BY p.id, p.name, p.price
ORDER BY total_sold DESC
"""
result = await session.call_tool("execute_query", {
"query": query_qty,
"limit": 10
})
print("\n🏆 By Quantity Sold:")
print(result.content[0].text)
# Query: Best sellers by revenue
query_revenue = """
SELECT
p.name as book_name,
p.price,
SUM(oi.quantity) as total_sold,
SUM(oi.quantity * oi.unit_price) as total_revenue,
ROUND(AVG(oi.unit_price), 2) as avg_selling_price
FROM products p
JOIN order_items oi ON p.id = oi.product_id
WHERE p.category = 'Books'
GROUP BY p.id, p.name, p.price
ORDER BY total_revenue DESC
"""
result = await session.call_tool("execute_query", {
"query": query_revenue,
"limit": 10
})
print("\n💰 By Revenue:")
print(result.content[0].text)
# Overall books category stats
query_stats = """
SELECT
p.category,
COUNT(DISTINCT p.id) as num_products,
SUM(oi.quantity) as total_units_sold,
SUM(oi.quantity * oi.unit_price) as total_revenue,
ROUND(AVG(oi.quantity), 1) as avg_qty_per_order
FROM products p
JOIN order_items oi ON p.id = oi.product_id
GROUP BY p.category
ORDER BY total_revenue DESC
"""
result = await session.call_tool("execute_query", {
"query": query_stats,
"limit": 10
})
print("\n📊 Category Comparison:")
print(result.content[0].text)
if __name__ == "__main__":
os.environ["PG_CONNECTION_STRING"] = "postgresql://postgres:demo@localhost:5432/shop"
asyncio.run(main())

173
demo.py Normal file
View File

@ -0,0 +1,173 @@
#!/usr/bin/env python3
"""
PostgreSQL Analyzer Demo - Practical database analysis with MCP.
Usage:
# Set your database connection
export PG_CONNECTION_STRING="postgresql://user:pass@localhost:5432/mydb"
# Run the demo
python demo.py
This demo connects to a real PostgreSQL database and performs
automated analysis using the MCP server and skill guidance.
"""
import asyncio
import os
import sys
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def analyze_database():
"""
Demonstrate database analysis workflow using MCP.
"""
# Check connection string
if not os.environ.get("PG_CONNECTION_STRING"):
print("""
PG_CONNECTION_STRING not set!
Please set your database connection string:
export PG_CONNECTION_STRING="postgresql://user:password@host:port/database"
For local testing with Docker:
docker run -d --name pg-demo -e POSTGRES_PASSWORD=demo -p 5432:5432 postgres:15
export PG_CONNECTION_STRING="postgresql://postgres:demo@localhost:5432/postgres"
""")
sys.exit(1)
print("=" * 70)
print("PostgreSQL Database Analyzer Demo")
print("=" * 70)
# Configure server
server_params = StdioServerParameters(
command="python3",
args=["pg_mcp_server/server.py"],
env=os.environ.copy(),
)
print("\n📡 Connecting to PostgreSQL MCP server...")
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
print("✅ Connected to database via MCP server\n")
# Step 1: Get database schema
print("🔍 Step 1: Discovering database schema...")
print("-" * 70)
schema_result = await session.call_tool("get_schema", {})
print(schema_result.content[0].text)
# Step 2: Ask user which table to analyze
table_name = input("\n📋 Enter table name to analyze (or 'skip' to exit): ").strip()
if table_name.lower() == 'skip':
print("\nSkipping table analysis.")
return
# Step 3: Get table statistics
print(f"\n📊 Step 2: Analyzing table '{table_name}'...")
print("-" * 70)
stats_result = await session.call_tool("get_table_stats", {
"table_name": table_name,
"sample_size": 5
})
print(stats_result.content[0].text)
# Step 4: Custom query
print("\n📝 Step 3: Running custom analysis queries...")
print("-" * 70)
# Example: Get column names for suggestions
print("\nExample queries you can run:")
print(f" SELECT COUNT(*) FROM {table_name}")
print(f" SELECT * FROM {table_name} LIMIT 10")
custom_query = input("\nEnter a SELECT query (or press Enter to skip): ").strip()
if custom_query:
query_result = await session.call_tool("execute_query", {
"query": custom_query,
"limit": 100
})
print("\nQuery Results:")
print(query_result.content[0].text)
# Step 5: Column deep dive
column_name = input("\n🔎 Enter column name for deep analysis (or press Enter to skip): ").strip()
if column_name:
print(f"\nAnalyzing column '{column_name}'...")
print("-" * 70)
col_result = await session.call_tool("analyze_column", {
"table_name": table_name,
"column_name": column_name
})
print(col_result.content[0].text)
print("\n" + "=" * 70)
print("Analysis complete!")
print("=" * 70)
def print_usage_examples():
"""Print usage examples for different scenarios."""
print("""
🎯 Usage Examples with Kimi Code CLI:
Once configured, you can ask Kimi:
1. Schema Exploration:
"Show me all tables in my database"
"What columns does the users table have?"
2. Data Querying:
"How many orders were placed last month?"
"Show me the top 10 customers by revenue"
3. Data Analysis:
"Analyze the orders table"
"What's the distribution of user signups by month?"
4. Column Investigation:
"Tell me about the status column in orders"
"Are there any data quality issues in the email column?"
5. Business Insights:
"What's our monthly revenue trend?"
"Which products have the highest return rate?"
The skill will guide Kimi to:
- Use get_schema() to understand structure
- Use get_table_stats() for overview
- Use execute_query() for custom analysis
- Use analyze_column() for deep dives
- Synthesize insights and recommendations
""")
async def main():
"""Main entry point."""
if len(sys.argv) > 1 and sys.argv[1] == '--examples':
print_usage_examples()
return
try:
await analyze_database()
except Exception as e:
print(f"\n❌ Error: {e}")
print("\nMake sure:")
print("1. PostgreSQL is running and accessible")
print("2. PG_CONNECTION_STRING is set correctly")
print("3. The MCP server dependencies are installed:")
print(" pip install -r pg_mcp_server/requirements.txt")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())

193
pg_analyzer_skill/SKILL.md Normal file
View File

@ -0,0 +1,193 @@
---
name: postgres-analyzer
description: PostgreSQL database analysis and querying. Use when the user needs to explore database schema, query data, analyze table statistics, or get insights from PostgreSQL databases. Requires PG_CONNECTION_STRING environment variable to be set.
---
# PostgreSQL Analyzer Skill
Guide for analyzing PostgreSQL databases and extracting insights.
## When to Use
- Exploring database structure and schema
- Querying data with SQL
- Analyzing table statistics and data quality
- Understanding column distributions
- Generating data insights and reports
## Available Tools
| Tool | Purpose | Use When |
|------|---------|----------|
| `get_schema` | List tables and columns | Starting analysis, understanding structure |
| `execute_query` | Run SELECT queries | Getting specific data, custom analysis |
| `get_table_stats` | Table statistics + sample | Understanding a table's data profile |
| `analyze_column` | Deep column analysis | Investigating specific column patterns |
## Analysis Workflow
### 1. Discovery Phase
Always start with schema exploration:
```
User: "Analyze my database"
Step 1: get_schema() → See all tables
Step 2: For each interesting table, get_table_stats()
```
### 2. Deep Dive Phase
Focus on specific areas:
```
Step 3: analyze_column() for key columns
Step 4: execute_query() for custom analysis
```
### 3. Insight Phase
Synthesize findings:
```
Step 5: Identify patterns, anomalies, recommendations
```
## Query Patterns
### Common Analysis Queries
**Time-based aggregation:**
```sql
SELECT
DATE_TRUNC('month', created_at) as month,
COUNT(*) as count,
AVG(amount) as avg_amount
FROM orders
GROUP BY 1
ORDER BY 1
```
**Distribution analysis:**
```sql
SELECT
CASE
WHEN age < 18 THEN 'Under 18'
WHEN age BETWEEN 18 AND 30 THEN '18-30'
WHEN age BETWEEN 31 AND 50 THEN '31-50'
ELSE '50+'
END as age_group,
COUNT(*) as count
FROM users
GROUP BY 1
```
**Correlation check:**
```sql
SELECT
category,
CORR(price, quantity) as price_qty_correlation
FROM sales
GROUP BY category
```
## Safety Rules
1. **Read-Only**: The MCP server only allows SELECT queries
2. **Row Limits**: Queries auto-limit to 100 rows (max 1000)
3. **No PII**: Warn users if analyzing tables with potential PII
4. **Performance**: Add appropriate WHERE clauses for large tables
## Analysis Templates
### Data Quality Report
```
For table X:
1. get_table_stats(table_name="X")
2. analyze_column() for each key column
3. Check for:
- High null percentages
- Duplicate values
- Outliers in numeric columns
- Date ranges
```
### User Activity Analysis
```sql
-- Active users over time
SELECT
DATE_TRUNC('week', last_login) as week,
COUNT(DISTINCT user_id) as active_users
FROM users
WHERE last_login >= NOW() - INTERVAL '90 days'
GROUP BY 1
ORDER BY 1
```
### Revenue Analysis
```sql
-- Monthly revenue trends
SELECT
DATE_TRUNC('month', order_date) as month,
SUM(total_amount) as revenue,
COUNT(*) as orders,
AVG(total_amount) as aov
FROM orders
WHERE status = 'completed'
GROUP BY 1
ORDER BY 1
```
## Common Insights to Look For
### Data Quality Issues
- Columns with >50% nulls
- Duplicate primary keys
- Outliers (use percentiles)
- Stale data (old max dates)
### Business Patterns
- Growth trends (time series)
- Seasonality (day of week, month)
- User segmentation
- Product/category performance
### Anomalies
- Sudden spikes/drops
- Unexpected distributions
- Missing expected data
## Output Format
Present findings with:
1. **Executive Summary**: Key findings in 2-3 sentences
2. **Data Overview**: Tables analyzed, row counts
3. **Key Insights**: Bullet points with supporting numbers
4. **Recommendations**: Actionable next steps
5. **Queries Used**: For reproducibility
## Example Session
```
User: "What's in my database?"
→ get_schema()
← Shows 3 tables: users, orders, products
→ get_table_stats("users")
← 10,000 users, created 2020-2024
→ analyze_column("users", "created_at")
← Growth peaked in 2022, slowed in 2023
→ execute_query("SELECT status, COUNT(*) FROM orders GROUP BY status")
← 60% completed, 30% pending, 10% cancelled
Insight: "Your database has healthy order flow but
cancellation rate (10%) is above industry
average (5-7%). Consider investigating
cancellation reasons."
```

View File

@ -0,0 +1,127 @@
#!/usr/bin/env python3
"""
Generate a formatted data analysis report from database statistics.
Usage:
python generate_report.py <table_name> [--output report.md]
"""
import argparse
import json
import sys
from datetime import datetime
from typing import Any
def format_number(n: int | float) -> str:
"""Format large numbers with commas."""
if isinstance(n, float):
return f"{n:,.2f}"
return f"{n:,}"
def generate_report(table_name: str, stats: dict[str, Any]) -> str:
"""Generate a markdown report from table statistics."""
report = f"""# Data Analysis Report: {table_name}
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}
## Overview
- **Total Rows**: {format_number(stats.get('row_count', 0))}
- **Columns**: {len(stats.get('columns', []))}
- **Analysis Type**: Automated Data Profiling
## Column Summary
| Column | Type | Nulls | Unique | Notes |
|--------|------|-------|--------|-------|
"""
for col in stats.get('columns', []):
null_pct = col.get('null_pct', 0)
null_badge = "⚠️ " if null_pct > 50 else ""
unique_badge = "🎲 " if col.get('unique_ratio', 0) > 0.9 else ""
notes = []
if null_pct > 50:
notes.append("High nulls")
if col.get('unique_ratio', 0) > 0.9:
notes.append("Near-unique")
if col.get('is_date'):
notes.append("Date range: {} to {}".format(
col.get('min_date', '?'), col.get('max_date', '?')))
note_str = ", ".join(notes) if notes else "-"
report += f"| {col['name']} | {col['type']} | {null_pct:.1f}% {null_badge}| {format_number(col.get('unique', 0))} {unique_badge}| {note_str} |\n"
# Data quality section
report += "\n## Data Quality Assessment\n\n"
issues = []
warnings = []
for col in stats.get('columns', []):
if col.get('null_pct', 0) > 50:
issues.append(f"- **{col['name']}**: {col['null_pct']:.1f}% null values")
elif col.get('null_pct', 0) > 20:
warnings.append(f"- **{col['name']}**: {col['null_pct']:.1f}% null values")
if issues:
report += "### ⚠️ Issues Found\n\n"
report += "\n".join(issues) + "\n\n"
if warnings:
report += "### 📋 Warnings\n\n"
report += "\n".join(warnings) + "\n\n"
if not issues and not warnings:
report += "✅ No major data quality issues detected.\n\n"
# Recommendations
report += """## Recommendations
1. **Review high-null columns** for data collection issues
2. **Check date ranges** are within expected bounds
3. **Validate unique constraints** on ID columns
4. **Consider indexing** frequently queried columns
---
*Report generated by PostgreSQL Analyzer Skill*
"""
return report
def main():
parser = argparse.ArgumentParser(description='Generate data analysis report')
parser.add_argument('table_name', help='Table name analyzed')
parser.add_argument('--stats', help='JSON file with statistics', default='-')
parser.add_argument('--output', '-o', help='Output file', default='-')
args = parser.parse_args()
# Read stats
if args.stats == '-':
stats = json.load(sys.stdin)
else:
with open(args.stats) as f:
stats = json.load(f)
# Generate report
report = generate_report(args.table_name, stats)
# Output
if args.output == '-':
print(report)
else:
with open(args.output, 'w') as f:
f.write(report)
print(f"Report written to {args.output}")
if __name__ == '__main__':
main()

View File

@ -0,0 +1,2 @@
mcp>=1.0.0
psycopg2-binary>=2.9.0

426
pg_mcp_server/server.py Normal file
View File

@ -0,0 +1,426 @@
#!/usr/bin/env python3
"""
PostgreSQL MCP Server - Database analysis and querying tools.
Provides safe, read-only database access for analysis.
Environment variable PG_CONNECTION_STRING required.
"""
import asyncio
import os
import json
from urllib.parse import urlparse
from mcp.server.models import InitializationOptions
from mcp.server import NotificationOptions, Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import mcp.types as types
# Database imports
import psycopg2
from psycopg2 import sql
from psycopg2.extras import RealDictCursor
server = Server("postgres-analyzer")
# Track connection info for error messages
_connection_info = None
def get_connection():
"""Get database connection from environment."""
global _connection_info
conn_str = os.environ.get("PG_CONNECTION_STRING")
if not conn_str:
raise ValueError("PG_CONNECTION_STRING environment variable not set")
# Parse for safe logging (hide password)
parsed = urlparse(conn_str)
_connection_info = f"{parsed.scheme}://{parsed.username}@***:{parsed.port}{parsed.path}"
return psycopg2.connect(conn_str)
def check_read_only(query: str) -> bool:
"""Check if query is read-only (no modifications)."""
forbidden = ['insert', 'update', 'delete', 'drop', 'create', 'alter', 'truncate', 'grant', 'revoke']
query_lower = query.lower()
return not any(keyword in query_lower for keyword in forbidden)
@server.list_tools()
async def handle_list_tools() -> list[Tool]:
"""List available database analysis tools."""
return [
Tool(
name="get_schema",
description="Get database schema - lists all tables and their columns",
inputSchema={
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "Optional: specific table name. If omitted, returns all tables."
}
}
},
),
Tool(
name="execute_query",
description="Execute a read-only SQL query and return results (max 1000 rows)",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "SQL SELECT query to execute"
},
"limit": {
"type": "integer",
"description": "Maximum rows to return (default 100, max 1000)",
"default": 100
}
},
"required": ["query"]
},
),
Tool(
name="get_table_stats",
description="Get statistics for a table: row count, column stats, sample data",
inputSchema={
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "Table name to analyze"
},
"sample_size": {
"type": "integer",
"description": "Number of sample rows (default 5)",
"default": 5
}
},
"required": ["table_name"]
},
),
Tool(
name="analyze_column",
description="Analyze a specific column: distribution, nulls, unique values",
inputSchema={
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "Table name"
},
"column_name": {
"type": "string",
"description": "Column name to analyze"
}
},
"required": ["table_name", "column_name"]
},
),
]
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict | None) -> list[types.TextContent]:
"""Execute database tools."""
if arguments is None:
arguments = {}
try:
conn = get_connection()
cursor = conn.cursor(cursor_factory=RealDictCursor)
if name == "get_schema":
table_name = arguments.get("table_name")
return _get_schema(cursor, table_name)
elif name == "execute_query":
query = arguments.get("query", "")
limit = min(arguments.get("limit", 100), 1000)
return _execute_query(cursor, query, limit)
elif name == "get_table_stats":
table_name = arguments.get("table_name", "")
sample_size = arguments.get("sample_size", 5)
return _get_table_stats(cursor, table_name, sample_size)
elif name == "analyze_column":
table_name = arguments.get("table_name", "")
column_name = arguments.get("column_name", "")
return _analyze_column(cursor, table_name, column_name)
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
except ValueError as e:
return [TextContent(type="text", text=f"Configuration error: {str(e)}")]
except psycopg2.Error as e:
return [TextContent(type="text", text=f"Database error: {str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals():
conn.close()
def _get_schema(cursor, table_name: str | None) -> list[TextContent]:
"""Get database schema information."""
if table_name:
# Get specific table schema
cursor.execute("""
SELECT
column_name,
data_type,
is_nullable,
column_default
FROM information_schema.columns
WHERE table_name = %s
ORDER BY ordinal_position
""", (table_name,))
columns = cursor.fetchall()
if not columns:
return [TextContent(type="text", text=f"Table '{table_name}' not found.")]
result = f"Table: {table_name}\n"
result += "-" * 60 + "\n"
for col in columns:
nullable = "NULL" if col['is_nullable'] == 'YES' else "NOT NULL"
default = f" DEFAULT {col['column_default']}" if col['column_default'] else ""
result += f" {col['column_name']}: {col['data_type']} {nullable}{default}\n"
return [TextContent(type="text", text=result)]
else:
# Get all tables
cursor.execute("""
SELECT
table_name,
(SELECT COUNT(*) FROM information_schema.columns
WHERE table_name = t.table_name) as column_count
FROM information_schema.tables t
WHERE table_schema = 'public'
ORDER BY table_name
""")
tables = cursor.fetchall()
if not tables:
return [TextContent(type="text", text="No tables found in public schema.")]
result = "Database Schema\n"
result += "=" * 60 + "\n\n"
for table in tables:
result += f"📋 {table['table_name']} ({table['column_count']} columns)\n"
# Get columns for this table
cursor.execute("""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = %s
ORDER BY ordinal_position
""", (table['table_name'],))
columns = cursor.fetchall()
for col in columns:
result += f"{col['column_name']}: {col['data_type']}\n"
result += "\n"
return [TextContent(type="text", text=result)]
def _execute_query(cursor, query: str, limit: int) -> list[TextContent]:
"""Execute a read-only query."""
if not check_read_only(query):
return [TextContent(type="text",
text="Error: Only SELECT queries are allowed for safety.")]
# Add limit if not present
if "limit" not in query.lower():
query = f"{query} LIMIT {limit}"
cursor.execute(query)
rows = cursor.fetchall()
if not rows:
return [TextContent(type="text", text="Query returned no results.")]
# Format as markdown table
columns = list(rows[0].keys())
result = "| " + " | ".join(columns) + " |\n"
result += "| " + " | ".join(["---"] * len(columns)) + " |\n"
for row in rows[:limit]:
values = [str(row.get(col, "NULL"))[:50] for col in columns]
result += "| " + " | ".join(values) + " |\n"
if len(rows) > limit:
result += f"\n... and {len(rows) - limit} more rows"
return [TextContent(type="text", text=result)]
def _get_table_stats(cursor, table_name: str, sample_size: int) -> list[TextContent]:
"""Get comprehensive table statistics."""
# Check if table exists
cursor.execute("""
SELECT COUNT(*) as count
FROM information_schema.tables
WHERE table_name = %s AND table_schema = 'public'
""", (table_name,))
if cursor.fetchone()['count'] == 0:
return [TextContent(type="text", text=f"Table '{table_name}' not found.")]
result = f"📊 Table Analysis: {table_name}\n"
result += "=" * 60 + "\n\n"
# Row count
cursor.execute(sql.SQL("SELECT COUNT(*) as count FROM {}").format(
sql.Identifier(table_name)))
row_count = cursor.fetchone()['count']
result += f"Total Rows: {row_count:,}\n\n"
# Column analysis
cursor.execute("""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = %s
ORDER BY ordinal_position
""", (table_name,))
columns = cursor.fetchall()
result += "Column Statistics:\n"
result += "-" * 60 + "\n"
for col in columns:
col_name = col['column_name']
data_type = col['data_type']
# Get null count and distinct count
cursor.execute(sql.SQL("""
SELECT
COUNT(*) - COUNT({col}) as null_count,
COUNT(DISTINCT {col}) as distinct_count
FROM {table}
""").format(col=sql.Identifier(col_name), table=sql.Identifier(table_name)))
stats = cursor.fetchone()
null_pct = (stats['null_count'] / row_count * 100) if row_count > 0 else 0
result += f" {col_name} ({data_type}):\n"
result += f" • Nulls: {stats['null_count']} ({null_pct:.1f}%)\n"
result += f" • Unique values: {stats['distinct_count']:,}\n"
# Sample data
result += f"\n📝 Sample Data ({min(sample_size, row_count)} rows):\n"
result += "-" * 60 + "\n"
cursor.execute(sql.SQL("SELECT * FROM {} LIMIT %s").format(
sql.Identifier(table_name)), (sample_size,))
samples = cursor.fetchall()
if samples:
col_names = list(samples[0].keys())
result += "| " + " | ".join(col_names) + " |\n"
result += "| " + " | ".join(["---"] * len(col_names)) + " |\n"
for row in samples:
values = [str(row.get(col, "NULL"))[:30] for col in col_names]
result += "| " + " | ".join(values) + " |\n"
return [TextContent(type="text", text=result)]
def _analyze_column(cursor, table_name: str, column_name: str) -> list[TextContent]:
"""Deep analysis of a single column."""
result = f"🔍 Column Analysis: {table_name}.{column_name}\n"
result += "=" * 60 + "\n\n"
# Basic stats
cursor.execute(sql.SQL("""
SELECT
COUNT(*) as total,
COUNT({col}) as non_null,
COUNT(*) - COUNT({col}) as null_count,
COUNT(DISTINCT {col}) as unique_count,
MIN({col}) as min_val,
MAX({col}) as max_val
FROM {table}
""").format(col=sql.Identifier(column_name), table=sql.Identifier(table_name)))
stats = cursor.fetchone()
result += f"Total Rows: {stats['total']:,}\n"
result += f"Non-Null: {stats['non_null']:,}\n"
result += f"Null: {stats['null_count']:,} ({stats['null_count']/stats['total']*100:.1f}%)\n"
result += f"Unique Values: {stats['unique_count']:,}\n"
if stats['min_val'] is not None:
result += f"Min: {stats['min_val']}\n"
result += f"Max: {stats['max_val']}\n"
# Numeric stats if applicable
cursor.execute("""
SELECT data_type
FROM information_schema.columns
WHERE table_name = %s AND column_name = %s
""", (table_name, column_name))
type_info = cursor.fetchone()
if type_info and any(t in type_info['data_type'].lower()
for t in ['int', 'float', 'double', 'decimal', 'numeric', 'real']):
cursor.execute(sql.SQL("""
SELECT
AVG({col})::numeric(10,2) as avg_val,
STDDEV({col})::numeric(10,2) as stddev_val
FROM {table}
""").format(col=sql.Identifier(column_name), table=sql.Identifier(table_name)))
num_stats = cursor.fetchone()
if num_stats['avg_val']:
result += f"\n📈 Numeric Statistics:\n"
result += f" Average: {num_stats['avg_val']}\n"
result += f" Std Dev: {num_stats['stddev_val']}\n"
# Top values
cursor.execute(sql.SQL("""
SELECT {col} as value, COUNT(*) as count
FROM {table}
WHERE {col} IS NOT NULL
GROUP BY {col}
ORDER BY count DESC
LIMIT 10
""").format(col=sql.Identifier(column_name), table=sql.Identifier(table_name)))
top_values = cursor.fetchall()
if top_values:
result += f"\n🏆 Top Values:\n"
for i, row in enumerate(top_values, 1):
pct = row['count'] / stats['total'] * 100
result += f" {i}. {row['value'][:50]} ({row['count']:,}, {pct:.1f}%)\n"
return [TextContent(type="text", text=result)]
async def main():
"""Run the MCP server."""
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="postgres-analyzer",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
asyncio.run(main())

222
setup_test_db.py Normal file
View File

@ -0,0 +1,222 @@
#!/usr/bin/env python3
"""
Setup test database with sample data for demo purposes.
Usage:
# Start PostgreSQL (Docker)
docker run -d --name pg-analyzer-demo \
-e POSTGRES_PASSWORD=demo \
-e POSTGRES_DB=shop \
-p 5432:5432 postgres:15
# Setup test data
export PG_CONNECTION_STRING="postgresql://postgres:demo@localhost:5432/shop"
python setup_test_db.py
# Run demo
python demo.py
"""
import os
import sys
import psycopg2
from psycopg2.extras import execute_values
from datetime import datetime, timedelta
import random
def create_tables(conn):
"""Create test tables."""
cursor = conn.cursor()
cursor.execute("""
DROP TABLE IF EXISTS order_items CASCADE;
DROP TABLE IF EXISTS orders CASCADE;
DROP TABLE IF EXISTS products CASCADE;
DROP TABLE IF EXISTS users CASCADE;
CREATE TABLE users (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_login TIMESTAMP,
country VARCHAR(50)
);
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name VARCHAR(200) NOT NULL,
category VARCHAR(50),
price DECIMAL(10,2) NOT NULL,
stock_quantity INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
total_amount DECIMAL(10,2) NOT NULL,
status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
shipping_country VARCHAR(50)
);
CREATE TABLE order_items (
id SERIAL PRIMARY KEY,
order_id INTEGER REFERENCES orders(id),
product_id INTEGER REFERENCES products(id),
quantity INTEGER NOT NULL,
unit_price DECIMAL(10,2) NOT NULL
);
""")
conn.commit()
cursor.close()
print("✅ Tables created")
def insert_sample_data(conn):
"""Insert realistic sample data."""
cursor = conn.cursor()
# Sample data
countries = ['USA', 'UK', 'Canada', 'Germany', 'France', 'Japan', 'Australia']
categories = ['Electronics', 'Clothing', 'Books', 'Home', 'Sports']
statuses = ['completed', 'completed', 'completed', 'pending', 'cancelled'] # 60/20/20 split
# Generate users
users_data = []
for i in range(100):
email = f"user{i+1}@example.com"
name = f"User {i+1}"
created = datetime.now() - timedelta(days=random.randint(1, 365))
last_login = created + timedelta(days=random.randint(0, 100)) if random.random() > 0.3 else None
country = random.choice(countries)
users_data.append((email, name, created, last_login, country))
execute_values(cursor, """
INSERT INTO users (email, name, created_at, last_login, country)
VALUES %s
""", users_data)
# Generate products
products_data = [
('Wireless Headphones', 'Electronics', 89.99, 150),
('Running Shoes', 'Sports', 129.50, 80),
('Python Programming Book', 'Books', 45.00, 200),
('Coffee Maker', 'Home', 79.99, 45),
('Yoga Mat', 'Sports', 35.00, 120),
('Smart Watch', 'Electronics', 249.99, 60),
('Winter Jacket', 'Clothing', 189.00, 40),
('Desk Lamp', 'Home', 34.99, 90),
('Novel Collection', 'Books', 65.00, 75),
('Tennis Racket', 'Sports', 159.00, 30),
('Bluetooth Speaker', 'Electronics', 59.99, 110),
('Sneakers', 'Clothing', 95.00, 65),
('Cookware Set', 'Home', 149.99, 25),
('Science Fiction Set', 'Books', 55.00, 85),
('Basketball', 'Sports', 29.99, 150),
]
execute_values(cursor, """
INSERT INTO products (name, category, price, stock_quantity)
VALUES %s
""", products_data)
# Generate orders (500 orders)
orders_data = []
for i in range(500):
user_id = random.randint(1, 100)
total = round(random.uniform(25, 500), 2)
status = random.choice(statuses)
created = datetime.now() - timedelta(days=random.randint(1, 180))
country = random.choice(countries)
orders_data.append((user_id, total, status, created, country))
execute_values(cursor, """
INSERT INTO orders (user_id, total_amount, status, created_at, shipping_country)
VALUES %s
""", orders_data)
# Generate order items (2-3 items per order)
items_data = []
for order_id in range(1, 501):
num_items = random.randint(1, 3)
for _ in range(num_items):
product_id = random.randint(1, 15)
quantity = random.randint(1, 5)
unit_price = round(random.uniform(15, 250), 2)
items_data.append((order_id, product_id, quantity, unit_price))
execute_values(cursor, """
INSERT INTO order_items (order_id, product_id, quantity, unit_price)
VALUES %s
""", items_data)
conn.commit()
cursor.close()
print(f"✅ Inserted: 100 users, 15 products, 500 orders, {len(items_data)} order items")
def verify_data(conn):
"""Print summary of created data."""
cursor = conn.cursor()
print("\n📊 Test Database Summary:")
print("-" * 50)
cursor.execute("SELECT COUNT(*) FROM users")
print(f"Users: {cursor.fetchone()[0]:,}")
cursor.execute("SELECT COUNT(*) FROM products")
print(f"Products: {cursor.fetchone()[0]:,}")
cursor.execute("SELECT COUNT(*) FROM orders")
print(f"Orders: {cursor.fetchone()[0]:,}")
cursor.execute("SELECT COUNT(*) FROM order_items")
print(f"Order Items: {cursor.fetchone()[0]:,}")
cursor.execute("SELECT status, COUNT(*) FROM orders GROUP BY status")
print("\nOrder Status Distribution:")
for row in cursor.fetchall():
print(f"{row[0]}: {row[1]}")
cursor.execute("SELECT category, COUNT(*) FROM products GROUP BY category")
print("\nProduct Categories:")
for row in cursor.fetchall():
print(f"{row[0]}: {row[1]}")
cursor.close()
def main():
conn_str = os.environ.get("PG_CONNECTION_STRING")
if not conn_str:
print("Error: PG_CONNECTION_STRING not set")
print("\nExample:")
print(" export PG_CONNECTION_STRING=\"postgresql://postgres:demo@localhost:5432/shop\"")
sys.exit(1)
try:
conn = psycopg2.connect(conn_str)
print("🔌 Connected to database\n")
create_tables(conn)
insert_sample_data(conn)
verify_data(conn)
print("\n✅ Test database setup complete!")
print(f"\nConnection string: {conn_str}")
print("\nNext: Run 'python demo.py' to analyze the data")
conn.close()
except Exception as e:
print(f"❌ Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

46
show_tables.py Normal file
View File

@ -0,0 +1,46 @@
#!/usr/bin/env python3
"""Quick script to show all tables and their details."""
import asyncio
import os
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def main():
server_params = StdioServerParameters(
command="python3",
args=["pg_mcp_server/server.py"],
env=os.environ.copy(),
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
print("=" * 70)
print("📊 DATABASE TABLES")
print("=" * 70)
# Get all tables
schema = await session.call_tool("get_schema", {})
print(schema.content[0].text)
# Analyze each table
tables = ["users", "products", "orders", "order_items"]
for table in tables:
print(f"\n{'=' * 70}")
print(f"📈 TABLE STATS: {table.upper()}")
print("=" * 70)
stats = await session.call_tool("get_table_stats", {
"table_name": table,
"sample_size": 3
})
print(stats.content[0].text)
if __name__ == "__main__":
os.environ["PG_CONNECTION_STRING"] = "postgresql://postgres:demo@localhost:5432/shop"
asyncio.run(main())