import requests import logging from flask import Flask, request, jsonify import json app = Flask(__name__) # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Flags to enable/disable features enable_isolation = True # Set this to False to disable isolation enable_broadcast = False # Set this to False to disable broadcast messages enable_full_scan = False # Set this to False to disable full disk scan # SentinelOne instance URL and API token instance_url = "https://xxxxx.sentinelone.net" api_token = "yyyyyyyy" # Headers including Authorization token headers = { "Authorization": f"Bearer {api_token}", "Content-Type": "application/json" } def get_device_id_by_ip(host_ip): # Step 1: Get a list of all agents and their IP addresses query_agents_url = f"{instance_url}/web/api/v2.1/agents" try: response = requests.get(query_agents_url, headers=headers) if response.status_code == 200: agents = response.json().get("data", []) # Search for the agent ID with the given IP address matching_agents = [agent["id"] for agent in agents if host_ip in [ip for ni in agent.get("networkInterfaces", []) for ip in ni.get("inet", [])]] if matching_agents: return matching_agents[0] # Return the first matching agent ID else: logging.error(f"No agents found with IP address {host_ip}") return None else: logging.error(f"Failed to query agents: {response.status_code} - {response.text}") return None except Exception as e: logging.error(f"An error occurred while querying agents: {str(e)}") return None def isolate_host(device_id): # Step 2: Isolate the host based on the agent ID if device_id: isolate_url = f"{instance_url}/web/api/v2.1/agents/actions/disconnect" payload = { "filter": { "ids": [device_id] } } try: isolate_response = requests.post(isolate_url, json=payload, headers=headers) if isolate_response.status_code == 200: logging.info(f"Host isolated successfully: {isolate_response.json()}") return isolate_response.json() else: logging.error(f"Failed to isolate host: {isolate_response.status_code} - {isolate_response.text}") return None except Exception as e: logging.error(f"An error occurred while isolating the host: {str(e)}") return None else: logging.error("No valid device ID provided for isolation.") return None def broadcast_message(device_id, message): # Step 3: Broadcast a message to the host based on the agent ID if device_id and len(message) <= 140: broadcast_url = f"{instance_url}/web/api/v2.1/agents/actions/broadcast" payload = { "filter": { "ids": [device_id] }, "data": { "message": message } } try: broadcast_response = requests.post(broadcast_url, json=payload, headers=headers) if broadcast_response.status_code == 200: logging.info(f"Message broadcasted successfully: {broadcast_response.json()}") return broadcast_response.json() else: logging.error(f"Failed to broadcast message: {broadcast_response.status_code} - {broadcast_response.text}") return None except Exception as e: logging.error(f"An error occurred while broadcasting the message: {str(e)}") return None else: logging.error("No valid device ID or message too long.") return None def initiate_full_scan(device_id): # Step 4: Initiate a full disk scan on the host based on the agent ID if device_id: scan_url = f"{instance_url}/web/api/v2.1/agents/actions/initiate-scan" payload = { "filter": { "ids": [device_id] } } try: scan_response = requests.post(scan_url, json=payload, headers=headers) if scan_response.status_code == 200: logging.info(f"Full scan initiated successfully: {scan_response.json()}") return scan_response.json() else: logging.error(f"Failed to initiate full scan: {scan_response.status_code} - {scan_response.text}") return None except Exception as e: logging.error(f"An error occurred while initiating the full scan: {str(e)}") return None else: logging.error("No valid device ID provided for full scan.") return None @app.route('/webhook', methods=['POST']) def webhook(): try: payload = request.json logging.info(f"Received webhook: {json.dumps(payload)}") # Extract the host IP from the payload host_ip = payload.get('clientIPs', [None])[0] if not host_ip: logging.error("No host IP address provided in the payload.") return jsonify({'error': 'No host IP address provided'}), 400 logging.info(f"Compromised host from Webhook payload: {host_ip}") # Extract severity from payload severity = payload.get('severity') # Proceed only if severity is CRITICAL if severity != 'CRITICAL': logging.info(f"Severity is {severity}, not CRITICAL. No isolation, broadcast, or scan performed.") return jsonify({'message': f'Severity is {severity}, no isolation, broadcast, or scan performed'}), 200 # Find the device by IP device_id = get_device_id_by_ip(host_ip) if not device_id: return jsonify({'error': 'No matching agent found for the provided IP address'}), 404 # Initialize variables for results isolation_result = None broadcast_result = None scan_result = None # Send broadcast message if enabled if enable_broadcast: message = "Suspicious data access behavior detected on this machine, mitigation actions have been taken. Contact IT support." broadcast_result = broadcast_message(device_id, message) if broadcast_result: logging.info("Broadcast message sent successfully") else: return jsonify({'error': 'Failed to send broadcast message'}), 500 # Perform isolation if enabled if enable_isolation: isolation_result = isolate_host(device_id) if isolation_result: logging.info("Isolation successful") else: return jsonify({'error': 'Failed to isolate host'}), 500 # Initiate full scan if enabled if enable_full_scan: scan_result = initiate_full_scan(device_id) if scan_result: logging.info("Full scan initiated successfully") else: return jsonify({'error': 'Failed to initiate full scan'}), 500 return jsonify({ 'message': 'Action completed', 'isolation_result': isolation_result, 'broadcast_result': broadcast_result, 'scan_result': scan_result }), 200 except Exception as e: logging.error(f"An error occurred: {str(e)}") return jsonify({"status": "error", "message": str(e)}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=True)