dom/notification_client.py

119 lines
4.4 KiB
Python

import aiohttp
import logging
from typing import Dict, Any, Optional, List
from config import Config
logger = logging.getLogger(__name__)
class NotificationClient:
"""Client for sending notifications to both Bark and Ntfy services."""
def __init__(self, config: Config):
self.config = config
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def send_notification(self, message: str, title: str = "AI Thought") -> bool:
"""Send notification to both Bark and Ntfy services."""
results = []
# Send to Bark if configured
if self.config.bark_api_url and self.config.bark_device_key:
bark_success = await self._send_bark_notification(message, title)
results.append(bark_success)
# Send to Ntfy if configured
if self.config.ntfy_api_url and self.config.ntfy_topic:
ntfy_success = await self._send_ntfy_notification(message, title)
results.append(ntfy_success)
# Return True if at least one service succeeded
return any(results) if results else True
async def _send_bark_notification(self, message: str, title: str) -> bool:
"""Send notification to Bark service."""
if not self.session:
self.session = aiohttp.ClientSession()
# Bark API format: https://api.day.app/{device_key}/{title}/{body}
url = f"{self.config.bark_api_url}/{self.config.bark_device_key}/{title}/{message}"
try:
async with self.session.get(url) as response:
if response.status == 200:
data = await response.json()
if data.get("code") == 200:
logger.info("Bark notification sent successfully")
return True
else:
logger.error(f"Bark API error: {data}")
return False
else:
logger.error(f"Bark HTTP error: {response.status}")
return False
except Exception as e:
logger.error(f"Error sending Bark notification: {e}")
return False
async def _send_ntfy_notification(self, message: str, title: str) -> bool:
"""Send notification to Ntfy service."""
if not self.session:
self.session = aiohttp.ClientSession()
url = f"{self.config.ntfy_api_url}/{self.config.ntfy_topic}"
headers = {
"Title": title,
"Content-Type": "text/plain",
"User-Agent": "Ollama-Notification-System/1.0"
}
if self.config.ntfy_access_token:
headers["Authorization"] = f"Bearer {self.config.ntfy_access_token}"
try:
async with self.session.post(url, data=message, headers=headers) as response:
if response.status == 200:
logger.info("Ntfy notification sent successfully")
return True
else:
logger.error(f"Ntfy HTTP error: {response.status}")
return False
except Exception as e:
logger.error(f"Error sending Ntfy notification: {e}")
return False
async def test_connections(self) -> Dict[str, bool]:
"""Test connections to both notification services."""
results = {}
# Test Bark
if self.config.bark_api_url:
try:
async with self.session.get(self.config.bark_api_url, timeout=aiohttp.ClientTimeout(total=10)) as response:
results["bark"] = response.status < 400
except Exception:
results["bark"] = False
else:
results["bark"] = None
# Test Ntfy
if self.config.ntfy_api_url:
try:
async with self.session.get(self.config.ntfy_api_url, timeout=aiohttp.ClientTimeout(total=10)) as response:
results["ntfy"] = response.status < 400
except Exception:
results["ntfy"] = False
else:
results["ntfy"] = None
return results