import requests import logging from flask import Flask, request, jsonify import socket from datetime import datetime import json app = Flask(__name__) # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Your CrowdStrike API credentials client_id = 'xxxxx' client_secret = 'yyyyyy' # The API base URL for CrowdStrike base_url = "https://api.us-2.crowdstrike.com" # host ip of eyeglass source_ip = 'zzzzzzz' # Initialize the IP address of the host you want to isolate extracted from webhook payload host_ip = '' # Flag to enable/disable host isolation enable_isolation = True # Set this to False to disable isolation def get_bearer_token(): """ Obtain the bearer token using client credentials. """ url = f"{base_url}/oauth2/token" headers = { 'Content-Type': 'application/x-www-form-urlencoded', } data = { 'client_id': client_id, 'client_secret': client_secret } response = requests.post(url, headers=headers, data=data) response.raise_for_status() logging.info("Bearer token obtained successfully.") return response.json()['access_token'] def format_udm(payload, source_ip): # Example UDM event based on the provided payload and source_ip # This is a simplified example; you should expand this according to your data and the UDM schema event = { "customer_id": "0a0d6ba3-9a06-4916-8a09-8103a4617a08", # Replace with your actual Chronicle customer ID "events": [ { "metadata": { "event_timestamp": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'), "description": "Superna Zero Trust Cyber Storage Threat Detection", "event_type": "SCAN_HOST", "product_name": "Eyeglass Zero Trust", "vendor_name": "Superna", "product_log_id": "Zerot Trust Webhook", # Additional note: Ensure this field is supported or needed. "product_version": "V2.8.0" }, "principal": { "ip": [payload.get('clientIPs', 'Unknown')], # Assuming 'clientIPs' can be directly used here. "user": { "userid": payload.get('userName', 'Unknown') } }, "observer": { "hostname": "Superna-Eyeglass", "ip": source_ip }, "target": { "application": "Unknown Application", # Specific application name should be provided if known. "ip": [payload.get('clientIPs', 'Unknown')] }, "security_result": { "category": "SOFTWARE_MALICIOUS", "summary": "The list of affected files from the storage layer incident: " + json.dumps(payload.get('files', [])), "category_details": "Ransomware", "priority_details": json.dumps(payload.get('shares', [])), "threat_name": "Superna Threat detector", "severity": payload.get('severity', 'Unknown'), "severity_details": "Critical", "alert_state": "ALERTING", "action": "BLOCK", "action_details": payload.get('actions', [{'action': 'Unknown'}])[0].get('action', 'Unknown'), "confidence": "HIGH_CONFIDENCE", "verdictinfo": { "category_details": "Ransomware", "source_provider": "Superna Security Edition", "malicious_count": payload.get('peakCritical', 0) # critical signal strength }, }, "extensions": { # Include any additional context or metadata related to the security event here. } } ] } # The return statement should use the 'event' variable, not 'udm_event' return json.dumps(event) def get_device_id_by_ip(access_token, ip_address): """ Find the device ID for a given IP address. """ logging.info(f"Attempting to find device ID for IP address: {ip_address}") url = f"{base_url}/devices/queries/devices/v1" headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } params = { 'filter': f'local_ip:"{ip_address}"' } response = requests.get(url, headers=headers, params=params) response.raise_for_status() device_ids = response.json().get('resources', []) if not device_ids: logging.error("No device found with the specified IP address") raise ValueError("No device found with the specified IP address") logging.info(f"Device ID found: {device_ids[0]}") return device_ids[0] def isolate_host(access_token, device_id): """ Contain a host by its device ID if isolation is enabled. """ if enable_isolation: logging.info(f"Initiating containment for device with ID: {device_id}") url = f"{base_url}/devices/entities/devices-actions/v2" headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } params = { 'action_name': 'contain' } body = { 'ids': [device_id] } response = requests.post(url, headers=headers, json=body, params=params) response.raise_for_status() logging.info("Containment action initiated successfully.") return response.json() else: logging.info("Containment is disabled.") return {"message": "Containment disabled by configuration."} def query_all_vulnerabilities(access_token): """ Fetch all vulnerabilities by pagination without sorting to test API behavior. """ all_vulns = [] after = None while True: url = f"{base_url}/spotlight/queries/vulnerabilities/v1" headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } params = { 'limit': 100 # Reduce limit to default or minimum to test } if after: params['after'] = after try: response = requests.get(url, headers=headers) response.raise_for_status() data = response.json() all_vulns.extend(data.get('resources', [])) if 'meta' in data and 'pagination' in data['meta'] and 'after' in data['meta']['pagination']: after = data['meta']['pagination']['after'] else: break except requests.exceptions.HTTPError as e: logging.error(f"HTTP Error: {e} for URL: {response.request.url}") break except requests.exceptions.RequestException as e: logging.error(f"Request failed: {e}") break logging.info(f"Total vulnerabilities found: {len(all_vulns)}") return all_vulns @app.route('/webhook', methods=['POST']) def webhook(): try: access_token = get_bearer_token() # Ensure this function is defined to handle token retrieval payload = request.json # Assuming Flask and that the request has JSON body # udf_message = format_udm(payload, source_ip) # Make sure format_udm is defined and source_ip is available host_ip = payload.get('clientIPs', [None])[0] # Safely extract the first IP or None if not host_ip: logging.error("No host IP address provided in the payload.") return jsonify({'error': 'No host IP address provided'}), 400 print(f"Compromised host from Webhook payload: {host_ip}") if enable_isolation: device_id = get_device_id_by_ip(access_token, host_ip) # Ensure this function is defined isolation_result = isolate_host(access_token, device_id) # Ensure this function is defined print(f"Isolation successful: {isolation_result}") return jsonify({'message': 'Isolation successful', 'result': isolation_result}), 200 else: print("Isolation is disabled. No action taken.") return jsonify({'message': 'Isolation is disabled, no action taken'}), 200 except Exception as e: print(f"An error occurred: {str(e)}") return jsonify({"status": "error", "message": str(e)}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=5000,debug=True)