from flask import Flask, request import logging import json import requests from datetime import datetime, timezone app = Flask(__name__) # === PagerDuty Configuration === PAGERDUTY_ROUTING_KEY = "yyyyyyyy" PAGERDUTY_API_URL = "https://events.pagerduty.com/v2/enqueue" # === Logging Configuration === logging.basicConfig( filename="pagerduty_integration.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) def map_severity_to_pd_action(severity): if severity.lower() == "resolved": return "resolve" return "trigger" # default for "critical", "warning", etc. def format_pd_payload(payload): event_id = payload.get("id", "Unknown") severity = payload.get("severity", "Unknown") state = payload.get("state", "Unknown") detected_time = payload.get("detectedTime", 0) try: timestamp = datetime.fromtimestamp(detected_time / 1000, tz=timezone.utc).isoformat() except Exception as e: timestamp = "Unknown" logging.error("Error converting detectedTime: %s", str(e)) client_ip = payload.get("clientIPs", ["Unknown"])[0] user = payload.get("userName", "Unknown") actions = " | ".join([a.get("action", "Unknown") for a in payload.get("actions", [])]) shares = "; ".join(share.get("name", "Unknown") for share in payload.get("shares", [])) files = "; ".join(payload.get("files", [])) alert_url = payload.get("url", "Unknown") return { "summary": f"Superna Zero Trust alert for user {user}", "source": "Superna DSE", "severity": severity, "custom_details": { "User": user, "Client IP": client_ip, "Timestamp": timestamp, "Detected State": state, "Action": actions, "Shares": shares, "Files": files, "Alert URL": alert_url }, "dedup_key": f"superna-{event_id}" } def send_event_to_pagerduty(uef): action = map_severity_to_pd_action(uef.get("severity", "Unknown")) payload = { "routing_key": PAGERDUTY_ROUTING_KEY, "event_action": action, "dedup_key": uef["dedup_key"], "payload": { "summary": uef["summary"], "source": uef["source"], "severity": "critical" if uef["severity"].lower() == "critical" else "warning", "custom_details": uef["custom_details"] } } print("📤 Sending event to PagerDuty:") print(json.dumps(payload, indent=2)) logging.info("Sending to PagerDuty: %s", json.dumps(payload)) try: response = requests.post(PAGERDUTY_API_URL, json=payload) print(f"⬅️ PagerDuty Response: {response.status_code} {response.text}") logging.info("PagerDuty response: %s - %s", response.status_code, response.text) if response.status_code == 202: return {"message": "✅ Event sent to PagerDuty"} elif response.status_code == 403 and "Invalid routing key" in response.text: return {"error": "❌ Invalid routing key. Please verify your PagerDuty Events API key."} else: return {"error": f"❌ Failed to send event: {response.status_code} - {response.text}"} except Exception as e: logging.error("Exception sending to PagerDuty: %s", str(e)) return {"error": f"❌ Exception: {str(e)}"} @app.route('/webhook', methods=['POST']) def webhook(): try: payload = request.get_json() print("📩 Webhook received:") print(json.dumps(payload, indent=2)) logging.info("Received webhook: %s", json.dumps(payload)) uef = format_pd_payload(payload) result = send_event_to_pagerduty(uef) return json.dumps(result), 200 if "message" in result else 500 except Exception as e: print(f"❌ Error in webhook handler: {str(e)}") logging.error("Webhook error: %s", str(e)) return json.dumps({"error": f"❌ Exception in webhook handler: {str(e)}"}), 500 if __name__ == '__main__': print("🚀 Starting Flask app on port 5000") app.run(host='0.0.0.0', port=5000, debug=True)