from flask import Flask, request, jsonify import requests import json import logging from datetime import datetime, timezone app = Flask(__name__) # Replace with your SentinelOne HEC token and URL SENTINELONE_HEC_TOKEN = 'xxxxxxx' SENTINELONE_HEC_URL = 'https://ingest.us1.sentinelone.net/services/collector/event' # Configure logging logging.basicConfig(level=logging.DEBUG) # You can change this to INFO, WARNING, ERROR as needed # -------------------------- # OCSF UTILITY FUNCTIONS # -------------------------- def severity_to_ocsf(sev): """Map your severity string into OCSF severity.""" if not sev: return {"id": 0, "label": "Unknown"} sev = str(sev).upper() if sev == "LOW": return {"id": 1, "label": "Low"} if sev == "MEDIUM": return {"id": 2, "label": "Medium"} if sev == "HIGH": return {"id": 3, "label": "High"} if sev == "CRITICAL": return {"id": 5, "label": "Critical"} return {"id": 0, "label": "Unknown"} def epoch_to_iso(epoch): """Convert epoch seconds to OCSF time format.""" try: return datetime.fromtimestamp(epoch, tz=timezone.utc).strftime( "%Y-%m-%dT%H:%M:%S.%fZ" ) except Exception: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ") def extract_client_ip(value): """ Extract a single client IP from clientIPs. Handles: - list like ["172.31.1.45"] - string like "172.31.1.45" """ if not value: return None if isinstance(value, list) and value: return value[0] if isinstance(value, str): return value return None # -------------------------- # OCSF MAPPING LOGIC # -------------------------- def convert_to_ocsf(payload: dict) -> dict: """ Convert your incoming DASM/ZeroTrust payload into a valid OCSF Security Finding using ONLY the fields your script already uses. """ event_id = payload.get("id") detected_epoch = payload.get("detectedTime", 0) ocsf_event = { "class_name": "Security Finding", "class_uid": 2001, "category_name": "Findings", "severity": severity_to_ocsf(payload.get("severity")), "metadata": { "vendor_name": "Superna", "product_name": "Data Security Edition", "product_version": "1.0", # event_code is intended to be the vendor’s alert/detection ID "event_code": event_id, "time": epoch_to_iso(detected_epoch), }, "finding": { "uid": event_id, "title": f"Data Security {payload.get('severity')} event", "description": f"Detected event involving {payload.get('numFiles')} files.", "status": payload.get("state"), }, "actor": { "user": { "name": payload.get("userName"), "uid": payload.get("user"), } }, "network": { "application_protocol": payload.get("protocol"), "client": { "ip": extract_client_ip(payload.get("clientIPs")) } }, # Files involved in the event "file": [{"path": f} for f in payload.get("files", [])], } return ocsf_event # -------------------------- # FLASK WEBHOOK ENDPOINT # -------------------------- @app.route('/webhook', methods=['POST']) def webhook(): try: logging.debug("Incoming request payload: %s", request.data) payload = request.get_json(silent=True) if not payload: logging.error("Missing or invalid JSON payload") return jsonify({"error": "Missing or invalid JSON payload"}), 400 # Build OCSF-compliant event ocsf_event = convert_to_ocsf(payload) # Pretty print OCSF payload to console logging.info( "\n==== OCSF EVENT ====\n%s\n====================\n", json.dumps(ocsf_event, indent=2) ) # Send OCSF event to SentinelOne headers = { 'Authorization': f'Bearer {SENTINELONE_HEC_TOKEN}', 'Content-Type': 'application/json', } logging.debug( "Sending OCSF event to SentinelOne %s:\n%s", SENTINELONE_HEC_URL, json.dumps(ocsf_event, indent=2), ) response = requests.post( SENTINELONE_HEC_URL, headers=headers, data=json.dumps(ocsf_event), verify=True, timeout=10 ) logging.debug("Response from SentinelOne: %s - %s", response.status_code, response.text) response.raise_for_status() return jsonify({"message": "OCSF event sent to SentinelOne successfully"}), 200 except requests.exceptions.HTTPError as err: logging.error(f"HTTPError: {err}") status_code = err.response.status_code if err.response is not None else 502 return jsonify({"error": str(err)}), status_code except Exception as e: logging.error(f"Error: {e}", exc_info=True) return jsonify({"error": str(e)}), 500 # -------------------------- # MAIN ENTRYPOINT # -------------------------- if __name__ == '__main__': # Enable detailed logging in Flask's built-in web server app.run(host='0.0.0.0', port=5000, debug=True)