from flask import Flask, request import socket import logging from datetime import datetime import json import pprint from google.oauth2 import service_account from google_auth_httplib2 import AuthorizedHttp # Configure logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger('googleapiclient.discovery_cache') logger.setLevel(logging.ERROR) app = Flask(__name__) SCOPES = ['https://www.googleapis.com/auth/malachite-ingestion'] SERVICE_ACCOUNT_FILE = '/opt/superna/cgi-bin/nfrsuperna_IngestAPI_creds.json' credentials = service_account.Credentials.from_service_account_file( SERVICE_ACCOUNT_FILE, scopes=SCOPES ) http_client = AuthorizedHttp(credentials) # ------------------------ # Severity Mapping (UDM text only) # ------------------------ def map_severity(product_severity): """ Map product severity (any case) to official UDM textual severity. Returns: UDM text severity (CRITICAL, HIGH, MEDIUM, LOW, INFORMATIONAL) """ severity_map = { "critical": "CRITICAL", "major": "HIGH", "high": "HIGH", "warning": "MEDIUM", "medium": "MEDIUM", "minor": "LOW", "low": "LOW", "info": "INFORMATIONAL", "informational": "INFORMATIONAL" } if not isinstance(product_severity, str): product_severity = str(product_severity or "").strip() sev_key = product_severity.lower().strip() return severity_map.get(sev_key, "LOW") # Default to LOW if unknown def test_authentication(http_client): url = 'https://malachiteingestion-pa.googleapis.com/v2/udmevents:batchCreate' headers = {'Content-Type': 'application/json'} test_event = { "customer_id": "0a0d6ba3-9a06-4916-8a09-8103a4617a08", "events": [{ "metadata": { "event_timestamp": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'), "event_type": "SCAN_HOST", "product_name": "Superna Test", "vendor_name": "Superna" }, "principal": { "ip": ["127.0.0.1"], "user": {"userid": "testuser"} } }] } try: response, content = http_client.request( uri=url, method='POST', body=json.dumps(test_event), headers=headers ) if response.status == 200: logging.info("Test Authentication successful with Chronicle API sent test event.") else: logging.error("Failed Chronicle auth. HTTP %s, Response: %s", response.status, content.decode('utf-8')) except Exception as e: logging.exception("Error during Chronicle authentication test: %s", str(e)) def get_host_ip(): try: host_name = socket.gethostname() host_ip = socket.gethostbyname(host_name) return host_ip except Exception as e: pprint.pprint(f"Error getting host IP: {e}") return "Unknown" def format_udm(payload, source_ip): udm_text = map_severity(payload.get('severity', 'unknown')) # Extract only the share names share_names = [] shares = payload.get("shares", []) if isinstance(shares, list): for share in shares: name = share.get("name") if name and name not in share_names: share_names.append(name) # Combine share names into comma-separated string share_summary = ", ".join(share_names) if share_names else "None" event = { "customer_id": "0a0d6ba3-9a06-4916-8a09-8103a4617a08", "events": [{ "metadata": { "event_timestamp": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'), "description": "Superna Zero Trust Cyber Storage Threat Detection", "event_type": "SCAN_HOST", "product_name": "Eyeglass Zero Trust", "vendor_name": "Superna", "product_version": "V2.8.0", "product_log_id": "Zero Trust Webhook" }, "principal": { "ip": [payload.get('clientIPs', ['Unknown'])[0]], "user": {"userid": payload.get('userName', 'Unknown')} }, "observer": { "hostname": "Superna-Eyeglass", "ip": source_ip }, "target": { "application": "Unknown Application", "ip": [payload.get('clientIPs', ['Unknown'])[0]] }, "security_result": { "category": "SOFTWARE_MALICIOUS", "summary": "Affected files: " + json.dumps(payload.get('files', [])), "category_details": "Ransomware", "priority_details": share_summary, # ✅ share names only "threat_name": "Superna Threat Detector", "severity": udm_text, # ✅ Only UDM text severity "alert_state": "ALERTING", "action": "BLOCK", "action_details": payload.get('actions', [{'action': 'Unknown'}])[0].get('action', 'Unknown'), "confidence": "HIGH_CONFIDENCE" } }] } # ✅ Pretty-print output before sending pretty = json.dumps(event, indent=4) print("\n--- JSON sent to Chronicle ---") print(pretty) print("-----------------------------\n") return json.dumps(event) def send_to_chronicle(udm_events, http_client): url = 'https://malachiteingestion-pa.googleapis.com/v2/udmevents:batchCreate' headers = {'Content-Type': 'application/json'} logging.info(f"Sending request to {url} with body: {udm_events}") response, content = http_client.request(uri=url, method='POST', body=udm_events, headers=headers) logging.debug(f"Response status: {response.status}") try: response_body = json.loads(content) if response.status == 200: logging.info("Successfully sent to Chronicle") else: logging.error(f"Failed to send to Chronicle: HTTP {response.status}, Response: {response_body}") except json.JSONDecodeError: logging.error(f"Failed to parse Chronicle response: {content}") @app.route('/webhook', methods=['POST']) def webhook(): try: payload = request.json source_ip = get_host_ip() test_authentication(http_client) udm_message = format_udm(payload, source_ip) pprint.pprint(json.loads(udm_message)) send_to_chronicle(udm_message, http_client) return "Success", 200 except Exception as e: logging.exception("Webhook processing failed") return str(e), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=5001, debug=True)