119 lines
4.4 KiB
Python
119 lines
4.4 KiB
Python
import aiohttp
|
|
import logging
|
|
from typing import Dict, Any, Optional, List
|
|
from config import Config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class NotificationClient:
|
|
"""Client for sending notifications to both Bark and Ntfy services."""
|
|
|
|
def __init__(self, config: Config):
|
|
self.config = config
|
|
self.session = None
|
|
|
|
async def __aenter__(self):
|
|
self.session = aiohttp.ClientSession()
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
if self.session:
|
|
await self.session.close()
|
|
|
|
async def send_notification(self, message: str, title: str = "AI Thought") -> bool:
|
|
"""Send notification to both Bark and Ntfy services."""
|
|
results = []
|
|
|
|
# Send to Bark if configured
|
|
if self.config.bark_api_url and self.config.bark_device_key:
|
|
bark_success = await self._send_bark_notification(message, title)
|
|
results.append(bark_success)
|
|
|
|
# Send to Ntfy if configured
|
|
if self.config.ntfy_api_url and self.config.ntfy_topic:
|
|
ntfy_success = await self._send_ntfy_notification(message, title)
|
|
results.append(ntfy_success)
|
|
|
|
# Return True if at least one service succeeded
|
|
return any(results) if results else True
|
|
|
|
async def _send_bark_notification(self, message: str, title: str) -> bool:
|
|
"""Send notification to Bark service."""
|
|
if not self.session:
|
|
self.session = aiohttp.ClientSession()
|
|
|
|
# Bark API format: https://api.day.app/{device_key}/{title}/{body}
|
|
url = f"{self.config.bark_api_url}/{self.config.bark_device_key}/{title}/{message}"
|
|
|
|
try:
|
|
async with self.session.get(url) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
if data.get("code") == 200:
|
|
logger.info("Bark notification sent successfully")
|
|
return True
|
|
else:
|
|
logger.error(f"Bark API error: {data}")
|
|
return False
|
|
else:
|
|
logger.error(f"Bark HTTP error: {response.status}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sending Bark notification: {e}")
|
|
return False
|
|
|
|
async def _send_ntfy_notification(self, message: str, title: str) -> bool:
|
|
"""Send notification to Ntfy service."""
|
|
if not self.session:
|
|
self.session = aiohttp.ClientSession()
|
|
|
|
url = f"{self.config.ntfy_api_url}/{self.config.ntfy_topic}"
|
|
|
|
headers = {
|
|
"Title": title,
|
|
"Content-Type": "text/plain",
|
|
"User-Agent": "Ollama-Notification-System/1.0"
|
|
}
|
|
|
|
if self.config.ntfy_access_token:
|
|
headers["Authorization"] = f"Bearer {self.config.ntfy_access_token}"
|
|
|
|
try:
|
|
async with self.session.post(url, data=message, headers=headers) as response:
|
|
if response.status == 200:
|
|
logger.info("Ntfy notification sent successfully")
|
|
return True
|
|
else:
|
|
logger.error(f"Ntfy HTTP error: {response.status}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sending Ntfy notification: {e}")
|
|
return False
|
|
|
|
async def test_connections(self) -> Dict[str, bool]:
|
|
"""Test connections to both notification services."""
|
|
results = {}
|
|
|
|
# Test Bark
|
|
if self.config.bark_api_url:
|
|
try:
|
|
async with self.session.get(self.config.bark_api_url, timeout=aiohttp.ClientTimeout(total=10)) as response:
|
|
results["bark"] = response.status < 400
|
|
except Exception:
|
|
results["bark"] = False
|
|
else:
|
|
results["bark"] = None
|
|
|
|
# Test Ntfy
|
|
if self.config.ntfy_api_url:
|
|
try:
|
|
async with self.session.get(self.config.ntfy_api_url, timeout=aiohttp.ClientTimeout(total=10)) as response:
|
|
results["ntfy"] = response.status < 400
|
|
except Exception:
|
|
results["ntfy"] = False
|
|
else:
|
|
results["ntfy"] = None
|
|
|
|
return results |