#!/usr/bin/env python3 """ Splunk SOAR (Phantom) webhook receiver Keeps your ORIGINAL field mappings (artifacts + container fields) and ONLY adds: - state (top-level payload["state"]) - action (derived from latest payload["actions"][].action by dateInLong) - nes (payload["nes"] device name(s)) Key behavior: - Normalizes payload.id to numeric-only for Phantom filter safety (31:24 -> 3124) - Container description includes a machine-search token: event_id= - Find existing container via Phantom filters (quoted values required by your build) - If found: add a NOTE with a compact JSON using the SAME schema/fields you already send (NOT the entire payload), plus state/action/nes - If not found: create container + artifacts using your existing mappings - Clean logging, no tracebacks, no urllib3 TLS warning spam """ import json import logging import re import sys from typing import Any, Dict, Optional, List import requests import urllib3 from flask import Flask, jsonify, request from werkzeug.exceptions import HTTPException # ========================================================== # SPLUNK SOAR (PHANTOM) CONFIG — HARDCODED # ========================================================== PHANTOM_SERVER = "https://y.y.y.y:8443" PH_AUTH_TOKEN = "xxxxxx" VERIFY_SSL = False PHANTOM_LABEL = "events" # ========================================================== # LOGGING # ========================================================== logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) log = logging.getLogger("splunksoar") logging.getLogger("werkzeug").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) if not VERIFY_SSL: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # ========================================================== # FLASK # ========================================================== app = Flask(__name__) app.config["PROPAGATE_EXCEPTIONS"] = False # ========================================================== # SESSION (TOKEN ALWAYS INCLUDED) # ========================================================== SESSION = requests.Session() SESSION.headers.update( { "ph-auth-token": PH_AUTH_TOKEN, "Content-Type": "application/json", "Accept": "application/json", } ) # ========================================================== # HELPERS # ========================================================== def phantom_quote(value: str) -> str: """ Your Phantom build parses _filter_* values as Python literals. json.dumps("events") -> "\"events\"" which parses as a string. """ return json.dumps(value) def normalize_event_id_numeric(event_id: str) -> str: """Digits-only normalization to keep Phantom filters safe.""" return re.sub(r"\D", "", event_id or "") def splunk_normalize_severity(severity: Optional[str]) -> str: if not severity: return "medium" s = severity.upper() if s in ("CRITICAL", "MAJOR", "HIGH"): return "high" if s in ("WARNING", "MEDIUM"): return "medium" return "low" def derive_latest_action(payload: Dict[str, Any]) -> str: """ Pull latest action from payload["actions"] based on dateInLong. Falls back to first action if dateInLong missing. """ actions = payload.get("actions") or [] if not actions: return "unknown" # If any action has dateInLong use max; otherwise use first if any(isinstance(a, dict) and "dateInLong" in a for a in actions): latest = max(actions, key=lambda a: (a or {}).get("dateInLong", 0)) else: latest = actions[0] if isinstance(latest, dict): return latest.get("action") or "unknown" return "unknown" def phantom_message(resp: requests.Response) -> str: try: j = resp.json() if isinstance(j, dict): return j.get("message") or resp.text return resp.text except Exception: return resp.text def error_response(code: int, where: str, message: str): return ( jsonify({"status": "error", "http_status": code, "where": where, "message": message}), code, ) def splunk_request(method: str, path: str, where: str, **kwargs) -> requests.Response: url = f"{PHANTOM_SERVER}{path}" log.info("PHANTOM %s %s (%s)", method, path, where) try: r = SESSION.request( method=method, url=url, verify=VERIFY_SSL, timeout=30, **kwargs, ) if r.status_code >= 400: msg = phantom_message(r) log.error("PHANTOM ERROR %s %s [%s]: %s", method, path, r.status_code, msg) raise RuntimeError(f"{r.status_code} {msg}") log.info("PHANTOM OK %s %s [%s]", method, path, r.status_code) return r except requests.exceptions.RequestException as e: log.error("PHANTOM REQUEST FAILED %s %s: %s", method, path, e) raise RuntimeError(f"0 {str(e)}") from None # ========================================================== # DEDUP / SEARCH # ========================================================== def find_container_by_event_id(event_id_norm: str) -> Optional[int]: """ Find container by description token event_id=. Uses quoted filter params required by your Phantom build. """ token = f"event_id={event_id_norm}" params = { "_filter_label": phantom_quote(PHANTOM_LABEL), "_filter_description__contains": phantom_quote(token), "page_size": 5, "sort": "-create_time", } r = splunk_request("GET", "/rest/container", "find_container", params=params) data = r.json().get("data", []) return int(data[0]["id"]) if data else None # ========================================================== # PAYLOAD -> "SCHEMA" (used for notes so we don't dump everything) # Keep the same fields you previously sent + add state/action/nes. # ========================================================== def build_update_schema(payload: Dict[str, Any], event_id_norm: str) -> Dict[str, Any]: share_paths: List[str] = [s.get("path") for s in payload.get("shares", []) if isinstance(s, dict) and s.get("path")] return { # identity / keys "event_id": event_id_norm, "original_event_id": payload.get("id"), "userName": payload.get("userName"), "user": payload.get("user"), "clientIPs": payload.get("clientIPs"), "protocol": payload.get("protocol"), "url": payload.get("url"), # existing content you were mapping "files": payload.get("files"), "rswExtensions": payload.get("rswExtensions"), "numFiles": payload.get("numFiles"), "shares": share_paths, # additions requested "state": payload.get("state"), "action": derive_latest_action(payload), "nes": payload.get("nes"), # optional but useful "severity": payload.get("severity"), "detected": payload.get("detected"), "lockedOut": payload.get("lockedOut"), } # ========================================================== # CONTAINER / NOTES / ARTIFACTS # ========================================================== def create_container(payload: Dict[str, Any], event_id_norm: str) -> int: severity = splunk_normalize_severity(payload.get("severity")) user = payload.get("userName") or payload.get("user") or "unknown" url = payload.get("url") or "" state = payload.get("state") action = derive_latest_action(payload) nes = payload.get("nes") original_id = payload.get("id") or "" # Keep your original “Ransomware User behavior detection , event id ” # Add GUI URL + state/action/nes + machine-search token description = ( f"Ransomware User behavior detection {user}, event id {event_id_norm}\n" f"event_id={event_id_norm}\n" f"State: {state}\n" f"Action: {action}\n" f"NES: {nes}\n" f"Original event id: {original_id}\n" f"GUI URL: {url}" ) container = { "name": "Ransomware User Behavior Detection", "label": PHANTOM_LABEL, "severity": severity, "description": description, # Store state/action/nes in source_data as well (easy playbook access) "source_data": { "event_id": event_id_norm, "original_event_id": original_id, "state": state, "action": action, "nes": nes, "userName": payload.get("userName"), "user": payload.get("user"), "clientIPs": payload.get("clientIPs"), "protocol": payload.get("protocol"), "url": payload.get("url"), }, } r = splunk_request("POST", "/rest/container", "create_container", json=container) return int(r.json()["id"]) def add_case_note(container_id: int, payload: Dict[str, Any], event_id_norm: str): """ Note uses the same schema/fields you already send (compact JSON), plus state/action/nes. No full payload dump. """ note_obj = build_update_schema(payload, event_id_norm) note = { "container_id": container_id, "title": "Ransomware Event Update", "content": json.dumps(note_obj, indent=2), # "json" is accepted by some builds; if yours rejects, change to "markdown" "format": "json", } splunk_request("POST", "/rest/container_note", "add_note", json=note) def create_artifact(container_id: int, name: str, cef: Dict[str, Any], data=None): artifact = { "container_id": container_id, "name": name, "label": PHANTOM_LABEL, "severity": "high", "cef": cef, } if data is not None: artifact["data"] = data splunk_request("POST", "/rest/artifact", "create_artifact", json=artifact) def create_initial_artifacts(container_id: int, payload: Dict[str, Any], event_id_norm: str): # User + Network Context (same as you had) create_artifact( container_id, "User and Network Context", { "userName": payload.get("userName"), "userSid": payload.get("user"), "clientIPs": payload.get("clientIPs"), "transportProtocol": payload.get("protocol"), "requestURL": payload.get("url"), }, ) # Ransomware File Activity (same as you had) create_artifact( container_id, "Ransomware File Activity", { "filePath": payload.get("files"), "fileExtension": payload.get("rswExtensions"), "fileCount": payload.get("numFiles"), }, ) # Affected Shares (same as you had previously) share_paths = [s.get("path") for s in payload.get("shares", []) if isinstance(s, dict) and s.get("path")] if share_paths: create_artifact( container_id, "Affected File Shares", {"fileSharePath": share_paths}, ) # Keep Raw Payload artifact on create (as you had before) create_artifact( container_id, "Raw Superna Payload", {}, data=payload, ) # NEW: Add a small “Event State/Action” artifact so playbooks can key off it easily (optional but useful) create_artifact( container_id, "Event State/Action", {}, data={ "event_id": event_id_norm, "original_event_id": payload.get("id"), "state": payload.get("state"), "action": derive_latest_action(payload), "nes": payload.get("nes"), }, ) # ========================================================== # ERROR HANDLER # ========================================================== @app.errorhandler(Exception) def handle_errors(e): if isinstance(e, HTTPException): log.error("HTTPException: %s", e) return error_response(e.code, "flask", str(e)) if isinstance(e, RuntimeError): log.error("RuntimeError: %s", e) parts = str(e).split(" ", 1) if len(parts) == 2 and parts[0].isdigit(): code = int(parts[0]) msg = parts[1] return error_response(502 if code == 0 else code, "phantom", msg) return error_response(500, "runtime", str(e)) log.error("Unhandled exception: %s", e) return error_response(500, "unknown", "Unhandled error") # ========================================================== # ROUTES # ========================================================== @app.route("/health", methods=["GET"]) def health(): r = splunk_request("GET", "/rest/system_info", "health", timeout=15) return jsonify({"status": "ok", "system_info": r.json()}), 200 @app.route("/webhook", methods=["POST"]) def webhook(): payload = request.get_json(force=True) raw_id = payload.get("id") log.info("Webhook received (event_id=%s)", raw_id) if not raw_id: return error_response(400, "webhook", "Missing payload.id") event_id_norm = normalize_event_id_numeric(raw_id) if not event_id_norm: return error_response(400, "webhook", f"Could not normalize payload.id '{raw_id}' to digits") # Dedup by description token container_id = find_container_by_event_id(event_id_norm) if container_id: log.info("Existing case found (container_id=%s, event_id_norm=%s)", container_id, event_id_norm) add_case_note(container_id, payload, event_id_norm) log.info("Case note added (container_id=%s)", container_id) return jsonify({"status": "updated_existing_case", "container_id": container_id}), 200 # Create new case container_id = create_container(payload, event_id_norm) log.info("New case created (container_id=%s, event_id_norm=%s)", container_id, event_id_norm) # Add your original artifacts + new state/action/nes artifact create_initial_artifacts(container_id, payload, event_id_norm) return jsonify({"status": "created_new_case", "container_id": container_id}), 201 # ========================================================== # MAIN # ========================================================== if __name__ == "__main__": log.info("Starting webhook listener on 0.0.0.0:5000 (VERIFY_SSL=%s)", VERIFY_SSL) app.run(host="0.0.0.0", port=5000, debug=False)