655 lines
21 KiB
Python
655 lines
21 KiB
Python
from fastapi import FastAPI, Request, HTTPException, Query, Depends
|
|
from fastapi.responses import HTMLResponse, Response
|
|
from fastapi.templating import Jinja2Templates
|
|
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
|
from fastapi.staticfiles import StaticFiles
|
|
from datetime import datetime
|
|
import sqlite3
|
|
import json
|
|
import httpx
|
|
import os
|
|
import csv
|
|
import base64
|
|
from io import StringIO
|
|
from urllib.parse import urlparse
|
|
import asyncio
|
|
import logging
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
app = FastAPI(title="Webhook Catcher")
|
|
|
|
app.mount(
|
|
"/static",
|
|
StaticFiles(directory=os.path.join(BASE_DIR, "static")),
|
|
name="static"
|
|
)
|
|
|
|
templates = Jinja2Templates(
|
|
directory=os.path.join(BASE_DIR, "templates")
|
|
)
|
|
|
|
templates.env.filters["tojson"] = lambda value, indent=2: json.dumps(value, indent=indent)
|
|
|
|
SENSITIVE_HEADERS = {'authorization', 'cookie', 'x-api-key', 'api-key'}
|
|
security = HTTPBasic()
|
|
|
|
FORWARD_WEBHOOK_URL = os.getenv("FORWARD_WEBHOOK_URL")
|
|
FORWARD_WEBHOOK_TOKEN = os.getenv("FORWARD_WEBHOOK_TOKEN")
|
|
ADMIN_TOKEN = os.getenv("ADMIN_TOKEN")
|
|
FRONTEND_PASSWORD = os.getenv("FRONTEND_PASSWORD")
|
|
|
|
# Database configuration - single source of truth for DB path
|
|
DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data")
|
|
DB_PATH = os.path.join(DATA_DIR, "webhooks.db")
|
|
|
|
def verify_admin_token(request: Request) -> bool:
|
|
"""Verify admin token from header or query parameter"""
|
|
if not ADMIN_TOKEN or ADMIN_TOKEN.strip() == "":
|
|
logger.info("No ADMIN_TOKEN set or empty, allowing access")
|
|
return True
|
|
|
|
token = request.headers.get("X-Admin-Token")
|
|
if token and token == ADMIN_TOKEN:
|
|
logger.info("Valid admin token from header")
|
|
return True
|
|
|
|
token = request.query_params.get("admin_token")
|
|
if token and token == ADMIN_TOKEN:
|
|
logger.info("Valid admin token from query")
|
|
return True
|
|
|
|
logger.warning("Admin token verification failed")
|
|
return False
|
|
|
|
def require_admin(request: Request):
|
|
"""Dependency to require admin authentication"""
|
|
if not verify_admin_token(request):
|
|
logger.warning("Admin access denied")
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Admin token required. Set X-Admin-Token header or admin_token query parameter.",
|
|
headers={"WWW-Authenticate": "Bearer"}
|
|
)
|
|
return True
|
|
|
|
def get_optional_credentials(request: Request):
|
|
"""Get HTTP Basic credentials if provided, or None if not."""
|
|
auth_header = request.headers.get("Authorization")
|
|
if not auth_header or not auth_header.startswith("Basic "):
|
|
return None
|
|
try:
|
|
credentials = base64.b64decode(auth_header[6:]).decode("utf-8")
|
|
username, _, password = credentials.partition(":")
|
|
return HTTPBasicCredentials(username=username, password=password)
|
|
except Exception:
|
|
return None
|
|
|
|
def verify_frontend_password(request: Request):
|
|
"""Dependency to require frontend password via HTTP Basic Auth.
|
|
|
|
If FRONTEND_PASSWORD is not set, access is allowed without authentication.
|
|
Username can be anything; only the password is checked.
|
|
"""
|
|
if not FRONTEND_PASSWORD or FRONTEND_PASSWORD.strip() == "":
|
|
return True # No protection if password not set
|
|
|
|
credentials = get_optional_credentials(request)
|
|
if credentials and credentials.password == FRONTEND_PASSWORD:
|
|
return True
|
|
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Invalid password",
|
|
headers={"WWW-Authenticate": "Basic realm=\"Webhook Catcher\""}
|
|
)
|
|
|
|
def sanitize_headers(headers):
|
|
"""Redact sensitive header values"""
|
|
return {k: '***REDACTED***' if k.lower() in SENSITIVE_HEADERS else v
|
|
for k, v in headers.items()}
|
|
|
|
def validate_url(url: str) -> bool:
|
|
"""Basic URL validation to prevent SSRF"""
|
|
try:
|
|
result = urlparse(url)
|
|
return all([result.scheme in ['http', 'https'], result.netloc])
|
|
except:
|
|
return False
|
|
|
|
def try_json(data: str):
|
|
"""Safely parse JSON data"""
|
|
try:
|
|
return json.loads(data)
|
|
except:
|
|
return None
|
|
|
|
def extract_metadata(headers: dict) -> dict:
|
|
"""Extract useful metadata from headers"""
|
|
return {
|
|
"ip": headers.get("x-real-ip") or headers.get("x-forwarded-for") or "Unknown",
|
|
"user_agent": headers.get("user-agent", "Unknown"),
|
|
"source": headers.get("x-webhook-source", "Unknown"),
|
|
"timestamp": headers.get("x-request-start", None)
|
|
}
|
|
|
|
def format_timestamp(timestamp):
|
|
"""Format timestamp consistently with relative time"""
|
|
try:
|
|
dt = datetime.fromisoformat(timestamp)
|
|
return {
|
|
"iso": dt.isoformat(),
|
|
"display": dt.strftime("%Y-%m-%d %H:%M:%S")
|
|
}
|
|
except:
|
|
return {
|
|
"iso": timestamp,
|
|
"display": timestamp
|
|
}
|
|
|
|
def init_db():
|
|
# Ensure data directory exists
|
|
os.makedirs(DATA_DIR, exist_ok=True)
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
c.execute('''
|
|
CREATE TABLE IF NOT EXISTS webhooks
|
|
(id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT,
|
|
headers TEXT,
|
|
body TEXT)
|
|
''')
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
init_db()
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def home(request: Request, _: bool = Depends(verify_frontend_password)):
|
|
return templates.TemplateResponse("index.html", {"request": request})
|
|
|
|
@app.get("/healthz")
|
|
async def health_check():
|
|
"""Simple health check endpoint"""
|
|
try:
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.close()
|
|
return {"status": "ok", "admin_protected": bool(ADMIN_TOKEN and ADMIN_TOKEN.strip())}
|
|
except Exception as e:
|
|
logger.error(f"Health check failed: {e}")
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
async def forward_webhook(headers: dict, body: str, original_url: str) -> dict:
|
|
"""Forward webhook to configured endpoint if enabled"""
|
|
if not FORWARD_WEBHOOK_URL:
|
|
return {"status": "disabled", "message": "Forwarding not configured"}
|
|
|
|
try:
|
|
forward_headers = {
|
|
"Content-Type": "application/json",
|
|
"X-Forwarded-From": original_url,
|
|
"X-Original-Timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
if FORWARD_WEBHOOK_TOKEN:
|
|
forward_headers["Authorization"] = f"Bearer {FORWARD_WEBHOOK_TOKEN}"
|
|
|
|
safe_headers = {k: v for k, v in headers.items()
|
|
if k.lower() not in SENSITIVE_HEADERS}
|
|
forward_headers.update({f"X-Original-{k}": v for k, v in safe_headers.items()})
|
|
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
response = await client.post(
|
|
FORWARD_WEBHOOK_URL,
|
|
content=body,
|
|
headers=forward_headers
|
|
)
|
|
|
|
return {
|
|
"status": "success",
|
|
"target_url": FORWARD_WEBHOOK_URL,
|
|
"response_status": response.status_code,
|
|
"response_time_ms": int(response.elapsed.total_seconds() * 1000)
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"status": "error",
|
|
"target_url": FORWARD_WEBHOOK_URL,
|
|
"error": str(e)
|
|
}
|
|
|
|
@app.post("/webhook")
|
|
async def webhook(request: Request):
|
|
try:
|
|
body = await request.body()
|
|
headers = dict(request.headers)
|
|
|
|
body_str = ""
|
|
if body:
|
|
try:
|
|
body_str = body.decode('utf-8')
|
|
except UnicodeDecodeError:
|
|
body_str = body.decode('utf-8', errors='replace')
|
|
else:
|
|
body_str = "{}"
|
|
|
|
parsed_json = None
|
|
try:
|
|
if body_str.strip():
|
|
parsed_json = json.loads(body_str)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
print(f"Received webhook: {body_str}")
|
|
print(f"Headers: {headers}")
|
|
|
|
forward_task = None
|
|
if FORWARD_WEBHOOK_URL:
|
|
forward_task = asyncio.create_task(
|
|
forward_webhook(headers, body_str, str(request.url))
|
|
)
|
|
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
|
|
print("Inserting into database...")
|
|
c.execute(
|
|
"INSERT INTO webhooks (timestamp, headers, body) VALUES (?, ?, ?)",
|
|
(datetime.now().isoformat(), json.dumps(headers), body_str)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
print("Database insert complete")
|
|
|
|
forwarding_result = None
|
|
if forward_task:
|
|
try:
|
|
forwarding_result = await forward_task
|
|
print(f"Forwarding result: {forwarding_result}")
|
|
except Exception as e:
|
|
print(f"Forwarding failed: {e}")
|
|
forwarding_result = {"status": "error", "error": str(e)}
|
|
|
|
response_data = {
|
|
"status": "success",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"received_body": body_str,
|
|
"is_json": parsed_json is not None,
|
|
"forwarding": forwarding_result
|
|
}
|
|
|
|
return response_data
|
|
except Exception as e:
|
|
print(f"Error processing webhook: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to process webhook: {str(e)}"
|
|
)
|
|
|
|
def get_webhook_logs(offset: int = 0, limit: int = 20, search: str = None):
|
|
"""Helper function to fetch webhook logs"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
|
|
try:
|
|
if search:
|
|
search = search.strip()
|
|
search_terms = [f"%{term}%" for term in search.split()]
|
|
where_clauses = []
|
|
params = []
|
|
|
|
for term in search_terms:
|
|
where_clauses.extend([
|
|
"body LIKE ?",
|
|
"headers LIKE ?",
|
|
"timestamp LIKE ?"
|
|
])
|
|
params.extend([term, term, term])
|
|
|
|
where_sql = " OR ".join(where_clauses)
|
|
|
|
c.execute(f"SELECT COUNT(*) FROM webhooks WHERE {where_sql}", params)
|
|
total_count = c.fetchone()[0]
|
|
|
|
c.execute(f"""
|
|
SELECT * FROM webhooks
|
|
WHERE {where_sql}
|
|
ORDER BY timestamp DESC LIMIT ? OFFSET ?
|
|
""", params + [limit, offset])
|
|
else:
|
|
c.execute("SELECT COUNT(*) FROM webhooks")
|
|
total_count = c.fetchone()[0]
|
|
c.execute("""
|
|
SELECT * FROM webhooks
|
|
ORDER BY timestamp DESC
|
|
LIMIT ? OFFSET ?
|
|
""", (limit, offset))
|
|
|
|
logs = [
|
|
{
|
|
"id": row[0],
|
|
"timestamp": format_timestamp(row[1]),
|
|
"headers": sanitize_headers(json.loads(row[2])),
|
|
"metadata": extract_metadata(json.loads(row[2])),
|
|
"body": row[3],
|
|
"parsed_body": try_json(row[3]),
|
|
"matches": highlight_search_matches(row[3], search) if search else None
|
|
}
|
|
for row in c.fetchall()
|
|
]
|
|
|
|
return logs, total_count, (offset + len(logs)) < total_count
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
def highlight_search_matches(text: str, search: str) -> list:
|
|
"""Find and highlight search matches in text"""
|
|
if not search:
|
|
return []
|
|
|
|
matches = []
|
|
try:
|
|
search_terms = search.lower().split()
|
|
text_lower = text.lower()
|
|
|
|
for term in search_terms:
|
|
start = 0
|
|
while True:
|
|
pos = text_lower.find(term, start)
|
|
if pos == -1:
|
|
break
|
|
|
|
context_start = max(0, pos - 20)
|
|
context_end = min(len(text), pos + len(term) + 20)
|
|
|
|
matches.append({
|
|
"term": term,
|
|
"context": f"...{text[context_start:context_end]}..."
|
|
})
|
|
|
|
start = pos + len(term)
|
|
|
|
return matches
|
|
except:
|
|
return []
|
|
|
|
@app.get("/logs/view", response_class=HTMLResponse)
|
|
async def view_logs(request: Request, _: bool = Depends(verify_frontend_password)):
|
|
"""Full logs page view"""
|
|
logs, total_count, has_more = get_webhook_logs(limit=10)
|
|
return templates.TemplateResponse("logs.html", {
|
|
"request": request,
|
|
"logs": logs,
|
|
"count": len(logs),
|
|
"total_count": total_count,
|
|
"has_more": has_more,
|
|
"offset": 0,
|
|
"limit": 10
|
|
})
|
|
|
|
@app.get("/logs")
|
|
async def get_logs(
|
|
request: Request,
|
|
search: str = Query(None),
|
|
offset: int = Query(0),
|
|
limit: int = Query(20),
|
|
_: bool = Depends(verify_frontend_password)
|
|
):
|
|
"""Partial logs view for HTMX updates"""
|
|
try:
|
|
logs, total_count, has_more = get_webhook_logs(offset, limit, search)
|
|
print(f"Retrieved logs: count={len(logs)}, total={total_count}")
|
|
|
|
template_data = {
|
|
"request": request,
|
|
"logs": logs,
|
|
"count": total_count,
|
|
"total_count": total_count,
|
|
"has_more": has_more,
|
|
"offset": offset,
|
|
"limit": limit,
|
|
"empty": len(logs) == 0
|
|
}
|
|
|
|
if "application/json" in request.headers.get("accept", ""):
|
|
return {
|
|
"logs": logs,
|
|
"count": total_count,
|
|
"total_count": total_count,
|
|
"has_more": has_more,
|
|
"empty": len(logs) == 0
|
|
}
|
|
|
|
return templates.TemplateResponse("logs_list.html", template_data)
|
|
except Exception as e:
|
|
print(f"Error retrieving logs: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@app.get("/export")
|
|
async def export_logs(request: Request, format: str = Query("json", enum=["json", "csv"]), _: bool = Depends(verify_frontend_password)):
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
c.execute("SELECT * FROM webhooks ORDER BY timestamp DESC")
|
|
logs = [{"id": row[0], "timestamp": row[1], "headers": json.loads(row[2]), "body": row[3]}
|
|
for row in c.fetchall()]
|
|
conn.close()
|
|
|
|
if format == "csv":
|
|
output = StringIO()
|
|
writer = csv.DictWriter(output, fieldnames=["id", "timestamp", "headers", "body"])
|
|
writer.writeheader()
|
|
writer.writerows(logs)
|
|
return Response(
|
|
content=output.getvalue(),
|
|
media_type="text/csv",
|
|
headers={"Content-Disposition": "attachment; filename=webhooks.csv"}
|
|
)
|
|
|
|
return Response(
|
|
content=json.dumps(logs, indent=2, ensure_ascii=False),
|
|
media_type="application/json",
|
|
headers={
|
|
"Content-Disposition": "attachment; filename=webhooks.json",
|
|
"X-Total-Count": str(len(logs))
|
|
}
|
|
)
|
|
|
|
@app.post("/test")
|
|
async def test_webhook(request: Request):
|
|
try:
|
|
body = await request.body()
|
|
|
|
if not body or body == b'{}':
|
|
payload = {
|
|
"event": "test",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"message": "Test webhook payload"
|
|
}
|
|
else:
|
|
try:
|
|
payload = json.loads(body)
|
|
except json.JSONDecodeError:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Invalid JSON payload. Example: {'event': 'test', 'message': 'Hello'}"
|
|
)
|
|
|
|
base_url = str(request.base_url)
|
|
if base_url.startswith('http:'):
|
|
base_url = 'https:' + base_url[5:]
|
|
webhook_url = base_url + "webhook"
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.post(
|
|
webhook_url,
|
|
json=payload,
|
|
headers={"Content-Type": "application/json"}
|
|
)
|
|
|
|
return {
|
|
"status": "sent",
|
|
"response_status": response.status_code,
|
|
"url": webhook_url,
|
|
"payload": payload
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to send test webhook: {str(e)}"
|
|
)
|
|
|
|
@app.post("/replay/{webhook_id}")
|
|
async def replay_webhook(webhook_id: int, request: Request, target_url: str = Query(None)):
|
|
"""Replay webhook with optional admin protection - accepts target_url from query or body"""
|
|
if ADMIN_TOKEN and not verify_admin_token(request):
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Admin token required for replay operations",
|
|
headers={"WWW-Authenticate": "Bearer"}
|
|
)
|
|
|
|
if not target_url:
|
|
try:
|
|
body = await request.body()
|
|
if body:
|
|
body_data = json.loads(body)
|
|
target_url = body_data.get("target_url")
|
|
except (json.JSONDecodeError, AttributeError):
|
|
pass
|
|
|
|
if not target_url:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="target_url is required. Provide it as a query parameter (?target_url=...) or in request body as JSON {\"target_url\": \"...\"}"
|
|
)
|
|
|
|
if not validate_url(target_url):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Invalid target URL. Must be http(s)://..."
|
|
)
|
|
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
c.execute("SELECT headers, body FROM webhooks WHERE id = ?", (webhook_id,))
|
|
row = c.fetchone()
|
|
conn.close()
|
|
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail=f"Webhook {webhook_id} not found")
|
|
|
|
headers = json.loads(row[0])
|
|
body = row[1]
|
|
|
|
replay_headers = {k: v for k, v in headers.items()
|
|
if k.lower() not in ['host', 'content-length', 'connection']}
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
response = await client.post(
|
|
target_url,
|
|
headers=replay_headers,
|
|
content=body
|
|
)
|
|
|
|
return {
|
|
"status": "replayed",
|
|
"response_status": response.status_code,
|
|
"target_url": target_url,
|
|
"webhook_id": webhook_id
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to replay webhook: {str(e)}"
|
|
)
|
|
|
|
@app.post("/clear")
|
|
async def clear_logs(request: Request):
|
|
"""Clear logs with optional admin protection"""
|
|
if ADMIN_TOKEN and not verify_admin_token(request):
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Admin token required for clear operations",
|
|
headers={"WWW-Authenticate": "Bearer"}
|
|
)
|
|
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
c.execute("DELETE FROM webhooks")
|
|
conn.commit()
|
|
conn.close()
|
|
return {"status": "cleared"}
|
|
|
|
@app.get("/config")
|
|
async def get_config():
|
|
"""Get current configuration status"""
|
|
return {
|
|
"forwarding_enabled": bool(FORWARD_WEBHOOK_URL),
|
|
"forwarding_url": FORWARD_WEBHOOK_URL if FORWARD_WEBHOOK_URL else None,
|
|
"authentication_enabled": bool(FORWARD_WEBHOOK_TOKEN),
|
|
"admin_protection_enabled": bool(ADMIN_TOKEN and ADMIN_TOKEN.strip()),
|
|
"total_webhooks": get_total_webhook_count()
|
|
}
|
|
|
|
def get_total_webhook_count():
|
|
"""Get total count of webhooks received"""
|
|
try:
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
c.execute("SELECT COUNT(*) FROM webhooks")
|
|
count = c.fetchone()[0]
|
|
conn.close()
|
|
return count
|
|
except:
|
|
return 0
|
|
|
|
@app.get("/webhooks")
|
|
async def list_webhooks(request: Request, limit: int = Query(50), _: bool = Depends(verify_frontend_password)):
|
|
"""List available webhooks for replay testing"""
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
|
|
c.execute("SELECT COUNT(*) FROM webhooks")
|
|
total_count = c.fetchone()[0]
|
|
|
|
c.execute("""
|
|
SELECT id, timestamp, body, headers
|
|
FROM webhooks
|
|
ORDER BY timestamp DESC
|
|
LIMIT ?
|
|
""", (limit,))
|
|
|
|
webhooks = []
|
|
for row in c.fetchall():
|
|
webhook_id, timestamp, body, headers = row
|
|
|
|
body_preview = body[:100] + "..." if len(body) > 100 else body
|
|
|
|
headers_dict = json.loads(headers)
|
|
content_type = headers_dict.get("content-type", "unknown")
|
|
|
|
webhooks.append({
|
|
"id": webhook_id,
|
|
"timestamp": timestamp,
|
|
"body_preview": body_preview,
|
|
"content_type": content_type,
|
|
"size_bytes": len(body)
|
|
})
|
|
|
|
conn.close()
|
|
|
|
return {
|
|
"webhooks": webhooks,
|
|
"count": len(webhooks),
|
|
"total_count": total_count
|
|
}
|