from flask import Flask, request, jsonify import requests, json, hashlib, logging, sys, traceback from datetime import datetime, timezone app = Flask(__name__) # === Logging setup === logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s - %(message)s", stream=sys.stdout, ) logger = logging.getLogger("superna->swimlane") SWIMLANE_WEBHOOK_URL = ( "https://usn.swimlane.app/account/xxxxxxxxxxxxx" ) # MITRE (explicit label as requested) MITRE_LABEL = "Impact / Data Encrypted for Impact" # Optional severity → numeric risk SEVERITY_TO_RISK = {"WARNING": 30, "MAJOR": 60, "CRITICAL": 90} # Hardcoded alert categories (only these will be used) HARDCODED_CATEGORIES = ["ransomware", "malware", "malicious user behavior"] def now_iso(): return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") def to_iso(ts): if ts is None: return now_iso() if isinstance(ts, str) and ("T" in ts or ts.endswith("Z")): return ts if isinstance(ts, str) and "," in ts and ":" in ts: try: dt = datetime.strptime(ts.strip(), "%b %d, %Y, %I:%M:%S %p") return dt.replace(tzinfo=timezone.utc).isoformat().replace("+00:00", "Z") except Exception: pass try: num = int(ts) if num > 10**12: num /= 1000.0 return datetime.fromtimestamp(num, tz=timezone.utc).isoformat().replace("+00:00", "Z") except Exception: return now_iso() def ensure_list(v): return v if isinstance(v, list) else ([] if v is None else [v]) def make_uid(p): # Prefer provided id; else deterministic hash if p.get("id"): return str(p["id"]) h = hashlib.sha256(json.dumps(p, sort_keys=True).encode("utf-8")).hexdigest()[:16] return f"superna-{h}" def build_description(p, severity, impacted_ips): bits = [] if p.get("Description"): bits.append(str(p["Description"])) if severity: bits.append(f"Severity: {severity}") nes_vals = ensure_list(p.get("nes")) if nes_vals: bits.append("Storage device(s): " + ", ".join(nes_vals)) if impacted_ips: bits.append("IP(s): " + ", ".join(impacted_ips)) if p.get("numFiles") is not None: bits.append(f"numFiles: {p['numFiles']}") if p.get("rswExtensions"): bits.append("Rules: " + ", ".join(ensure_list(p["rswExtensions"]))) return " | ".join(bits) if bits else "Superna alert" def compute_risk_score(p, severity_str): # Use numeric if you already send it for key in ("risk_score", "risk"): v = p.get(key) try: return int(v) except Exception: pass # Else derive from your three severities sev = (severity_str or "").strip().upper() return SEVERITY_TO_RISK.get(sev) @app.route('/webhook', methods=['POST']) def webhook(): try: p = request.json or {} # Resolve alert UID early and log receipt alert_uid = make_uid(p) logger.info("Webhook received for processing. alert_uid=%s", alert_uid) # Core fields impacted_ips = [ip for ip in ensure_list(p.get("clientIPs")) if ip] impacted_usernames = sorted({u for u in ensure_list(p.get("userName")) + ensure_list(p.get("user")) if u}) originating_files = [f for f in ensure_list(p.get("files")) if f] severity = (p.get("severity") or p.get("state") or "").strip() created_ts = to_iso(p.get("detected") or p.get("firstSignalTimeStamp")) start_ts = to_iso(p.get("detected") or p.get("firstSignalTimeStamp")) end_ts = to_iso(p.get("lockedOutTime")) if p.get("lockedOutTime") else None permalink = p.get("url", "") title_user = impacted_usernames[0] if impacted_usernames else "unknown" # Title per your format title = f"Superna Storage Behavior Incident for user {title_user}" # Observables incl. NES as "storage device name" and hardcoded categories observables = [] observables += [f"ip:{ip}" for ip in impacted_ips] observables += [f"user:{u}" for u in impacted_usernames] observables += [f"file:{f}" for f in originating_files] for name in ensure_list(p.get("nes")): observables.append(f"storage device name:{name}") for cat in HARDCODED_CATEGORIES: observables.append(f"category:{cat}") risk_score = compute_risk_score(p, severity) # Build Swimlane fields swimlane_fields = { "alert_category": HARDCODED_CATEGORIES[:], # only hardcoded categories "alert_created_timestamp": created_ts, "description": build_description(p, severity, impacted_ips), "alert_end_timestamp": end_ts, "alert_impacted_hostnames": impacted_ips[:], # host fields from client IPs "alert_impacted_ip_addresses": impacted_ips, "alert_impacted_usernames": impacted_usernames, "alert_ingested_timestamp": now_iso(), "MITRE Attack Tactic/Technique": [MITRE_LABEL], # exact string you requested "alert_organization": "Superna", # hardcoded org "alert_originating_files": originating_files, "alert_permalink": permalink, # from payload.url "alert_provider": "Superna Data Security Edition", "alert_rules": [*ensure_list(p.get("rswExtensions")), *( [p.get("state")] if p.get("state") else [] )], "alert_severity": severity, "alert_start_timestamp": start_ts, "title": title, "alert_uid": alert_uid, "observables": observables } if risk_score is not None: swimlane_fields["alert_risk_score"] = risk_score # Merge Swimlane fields into raw_alert on top of original payload p merged_raw = dict(p) # original payload # Avoid raw_alert recursion if p already had a raw_alert merged_raw.pop("raw_alert", None) merged_raw.update(swimlane_fields) # overlay Swimlane fields # Final payload to Swimlane (with merged raw_alert) swimlane_alert = dict(swimlane_fields) swimlane_alert["raw_alert"] = merged_raw # Pretty-print outgoing payload to console logger.info("Prepared Swimlane payload (to be POSTed):\n%s", json.dumps(swimlane_alert, indent=2, ensure_ascii=False)) # POST to Swimlane headers = {"Content-Type": "application/json"} resp = requests.post( SWIMLANE_WEBHOOK_URL, headers=headers, data=json.dumps(swimlane_alert), timeout=15 ) # Log HTTP details (status + raw body) before raising for status logger.info("Swimlane response received. status=%s", resp.status_code) logger.info("Swimlane raw response body:\n%s", resp.text) resp.raise_for_status() logger.info("Alert %s successfully ingested by Swimlane.", alert_uid) return jsonify({"message": "Alert sent to Swimlane SOC webhook successfully", "alert_uid": alert_uid}), 200 except requests.exceptions.HTTPError as err: status = err.response.status_code if getattr(err, "response", None) else "n/a" body = getattr(err.response, "text", "") logger.error("HTTP error posting to Swimlane. status=%s body=%s", status, body) logger.exception("Traceback:") return jsonify({"error": str(err), "status": status, "body": body}), status if status != "n/a" else 502 except Exception as e: logger.exception("Unhandled exception while processing webhook:") return jsonify({"error": str(e)}), 500 if __name__ == '__main__': # debug of flask app is enabled app.run(host='0.0.0.0', port=5000, debug=True)