from flask import Flask, request import socket import logging from datetime import datetime import json import pprint # Imports required for the sample - Google Auth and API Client Library Imports. # Get these packages from https://pypi.org/project/google-api-python-client/ or run $ pip # install google-api-python-client from your terminal from google.oauth2 import service_account from googleapiclient import _auth from google_auth_httplib2 import AuthorizedHttp # Configure logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger('googleapiclient.discovery_cache') logger.setLevel(logging.ERROR) # To avoid logging discovery cache warnings app = Flask(__name__) ################ Google Chronicle SIEM Integraion section ######################################### SCOPES = ['https://www.googleapis.com/auth/malachite-ingestion'] SERVICE_ACCOUNT_FILE = '/opt/superna/cgi-bin/nfrsuperna_IngestAPI_creds.json' # Update with the actual path # Create credentials credentials = service_account.Credentials.from_service_account_file( SERVICE_ACCOUNT_FILE, scopes=SCOPES ) ################ Google Chronicle SIEM Integraion section ######################################### # Use AuthorizedHttp to create an HTTP client that takes care of authorization http_client = AuthorizedHttp(credentials) def get_host_ip(): try: # Get the hostname host_name = socket.gethostname() (f"Hostname: {host_name}") # Debug print # Get the IP address of the host host_ip = socket.gethostbyname(host_name) (f"Host IP: {host_ip}") # Debug print return host_ip except Exception as e: pprint.pprint(f"Error getting host IP: {e}") # Debug pprint.pprint return "Unable to get Host IP" def format_udm(payload, source_ip): # Example UDM event based on the provided payload and source_ip # This is a simplified example; you should expand this according to your data and the UDM schema event = { "customer_id": "0a0d6ba3-9a06-4916-8a09-8103a4617a08", # Replace with your actual Chronicle customer ID "events": [ { "metadata": { "event_timestamp": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'), "description": "Superna Zero Trust Cyber Storage Threat Detection", "event_type": "SCAN_HOST", "product_name": "Eyeglass Zero Trust", "vendor_name": "Superna", "product_log_id": "Zerot Trust Webhook", # Additional note: Ensure this field is supported or needed. "product_version": "V2.8.0" }, "principal": { "ip": [payload.get('clientIPs', 'Unknown')], # Assuming 'clientIPs' can be directly used here. "user": { "userid": payload.get('userName', 'Unknown') } }, "observer": { "hostname": "Superna-Eyeglass", "ip": source_ip }, "target": { "application": "Unknown Application", # Specific application name should be provided if known. "ip": [payload.get('clientIPs', 'Unknown')] }, "security_result": { "category": "SOFTWARE_MALICIOUS", "summary": "The list of affected files from the storage layer incident: " + json.dumps(payload.get('files', [])), "category_details": "Ransomware", "priority_details": json.dumps(payload.get('shares', [])), "threat_name": "Superna Threat detector", # Ensure correct key names and uncomment as necessary # "threat_status": 1, # "verdict_type": 0, # "threat_verdict": 3, "severity": payload.get('severity', 'Unknown'), "severity_details": "Critical", "alert_state": "ALERTING", "action": "BLOCK", "action_details": payload.get('actions', [{'action': 'Unknown'}])[0].get('action', 'Unknown'), "confidence": "HIGH_CONFIDENCE", # "verdictinfo": { # "category_details": "Ransomware", # "source_provider": "Superna Security Edition", # "malicious_count": payload.get('peakCritical', 0) # critical signal strength }, }, "extensions": { # Include any additional context or metadata related to the security event here. } } ] } # The return statement should use the 'event' variable, not 'udm_event' return json.dumps(event) def send_to_chronicle(udm_events, http_client): url = 'https://malachiteingestion-pa.googleapis.com/v2/udmevents:batchCreate' headers = {'Content-Type': 'application/json'} body = udm_events # Make the POST request with detailed logging logging.info(f"Sending request to {url} with body: {body} and headers: {headers}") response, content = http_client.request(uri=url, method='POST', body=body, headers=headers) # The response object is a httplib2.Response, status code is accessed with .status logging.debug(f"Response status: {response.status}") logging.debug(f"Response headers: {response}") try: # Attempt to parse and log the response body. response_body = json.loads(content) logging.debug(f"Response body: {response_body}") # Check the response status code for success/failure if response.status == 200: logging.info("Successfully sent to Chronicle") else: logging.error(f"Failed to send to Chronicle: HTTP status {response.status}, Response body: {response_body}") except json.JSONDecodeError: logging.error(f"Failed to parse response body as JSON. Raw response: {content}") # To use this function, ensure you pass the correctly initialized `http_client` (AuthorizedHttp instance) and the `udm_events` payload. @app.route('/webhook', methods=['POST']) def webhook(): try: payload = request.json source_ip = get_host_ip() # Get the host IP # test_authentication(http_client) # uef_message = format_uef(payload, source_ip) not used uef_message = format_udm(payload, source_ip) # Format the payload as UDM # Convert the dictionary to a JSON string with indentation for formatting formatted_json = (uef_message) # Print the formatted JSON string to the console pprint.pprint(formatted_json) send_to_chronicle(uef_message,http_client) # Pass source_ip here as well return "Success", 200 except Exception as e: return str(e), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=5001,debug=True)