from flask import Flask, request import socket import logging from datetime import datetime import json import pprint # Imports required for the sample - Google Auth and API Client Library Imports. # Get these packages from https://pypi.org/project/google-api-python-client/ or run $ pip # install google-api-python-client from your terminal from google.oauth2 import service_account from googleapiclient import _auth from google_auth_httplib2 import AuthorizedHttp # Configure logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger('googleapiclient.discovery_cache') logger.setLevel(logging.ERROR) # To avoid logging discovery cache warnings app = Flask(__name__) SCOPES = ['https://www.googleapis.com/auth/malachite-ingestion'] SERVICE_ACCOUNT_FILE = '/opt/superna/cgi-bin/nfrsuperna_IngestAPI_creds.json' # Update with the actual path # Create credentials credentials = service_account.Credentials.from_service_account_file( SERVICE_ACCOUNT_FILE, scopes=SCOPES ) # Use AuthorizedHttp to create an HTTP client that takes care of authorization http_client = AuthorizedHttp(credentials) def test_authentication(http_client): url = 'https://malachiteingestion-pa.googleapis.com/v2/udmevents:batchCreate' headers = {'Content-Type': 'application/json'} test_event = { "customer_id": "customer id", "events": [ { "metadata": { "event_timestamp": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'), "event_type": "SCAN_HOST", # valid type "product_name": "Superna Test", "vendor_name": "Superna" }, "principal": { "ip": ["127.0.0.1"], # ✅ array of strings, not nested arrays "user": {"userid": "testuser"} } } ] } try: response, content = http_client.request( uri=url, method='POST', body=json.dumps(test_event), headers=headers ) if response.status == 200: logging.info("Authentication successful with Chronicle API.") else: logging.error("Failed Chronicle auth. HTTP %s, Response: %s", response.status, content.decode('utf-8')) except Exception as e: logging.exception("Error during Chronicle authentication test: %s", str(e)) def get_host_ip(): try: # Get the hostname host_name = socket.gethostname() (f"Hostname: {host_name}") # Debug print # Get the IP address of the host host_ip = socket.gethostbyname(host_name) (f"Host IP: {host_ip}") # Debug print return host_ip except Exception as e: pprint.pprint(f"Error getting host IP: {e}") # Debug pprint.pprint return "Unable to get Host IP" def format_udm(payload, source_ip): # Example UDM event based on the provided payload and source_ip # This is a simplified example; you should expand this according to your data and the UDM schema event = { "customer_id": "customer id", # Replace with your actual Chronicle customer ID "events": [ { "metadata": { "event_timestamp": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'), "description": "Superna Zero Trust Cyber Storage Threat Detection", "event_type": "SCAN_HOST", "product_name": "Eyeglass Zero Trust", "vendor_name": "Superna", "product_log_id": "Zerot Trust Webhook", # Additional note: Ensure this field is supported or needed. "product_version": "V2.8.0" }, "principal": { "ip": [payload.get('clientIPs', 'Unknown')], # Assuming 'clientIPs' can be directly used here. "user": { "userid": payload.get('userName', 'Unknown') } }, "observer": { "hostname": "Superna-Eyeglass", "ip": source_ip }, "target": { "application": "Unknown Application", # Specific application name should be provided if known. "ip": [payload.get('clientIPs', 'Unknown')] }, "security_result": { "category": "SOFTWARE_MALICIOUS", "summary": "The list of affected files from the storage layer incident: " + json.dumps(payload.get('files', [])), "category_details": "Ransomware", "priority_details": json.dumps(payload.get('shares', [])), "threat_name": "Superna Threat detector", # Ensure correct key names and uncomment as necessary # "threat_status": 1, # "verdict_type": 0, # "threat_verdict": 3, "severity": payload.get('severity', 'Unknown'), "severity_details": "Critical", "alert_state": "ALERTING", "action": "BLOCK", "action_details": payload.get('actions', [{'action': 'Unknown'}])[0].get('action', 'Unknown'), "confidence": "HIGH_CONFIDENCE", # "verdictinfo": { # "category_details": "Ransomware", # "source_provider": "Superna Security Edition", # "malicious_count": payload.get('peakCritical', 0) # critical signal strength # }, }, "extensions": { # Include any additional context or metadata related to the security event here. } } ] } # Hardcoded example payload test_payload = { "customer_id": "0a0d6ba3-9a06-4916-8a09-8103a4617a08", "events": [ { "metadata": { "event_timestamp": "2019-10-22T12:00:00.000Z", "event_type": "USER_LOGIN", "product_name": "Acme SSO", "vendor_name": "Acme" }, "principal": { "ip": ["10.1.2.3"] }, "target": { "application": "Acme Connect", "user": { "user_display_name": "Mary Jane", "userid": "mary@altostrat.com" } }, "extensions": { "auth": { "type": "MACHINE", "mechanism": ["NETWORK"] } } }, { "metadata": { "event_timestamp": "2019-10-23T04:00:00.000Z", "event_type": "NETWORK_HTTP", "product_name": "Acme Proxy", "vendor_name": "Acme" }, "network": { "http": { "method": "GET", "response_code": 200 } }, "principal": { "hostname": "host0", "ip": ["10.1.2.3"], "port": 60000 }, "target": { "hostname": "www.altostrat.com", "ip": ["198.51.100.68"], "port": 443, "url": "www.altostrat.com/images/logo.png" } } ] } # The return statement should use the 'event' variable, not 'udm_event' return json.dumps(event) def send_to_chronicle(udm_events, http_client): url = 'https://malachiteingestion-pa.googleapis.com/v2/udmevents:batchCreate' headers = {'Content-Type': 'application/json'} body = udm_events # Make the POST request with detailed logging logging.info(f"Sending request to {url} with body: {body} and headers: {headers}") response, content = http_client.request(uri=url, method='POST', body=body, headers=headers) # The response object is a httplib2.Response, status code is accessed with .status logging.debug(f"Response status: {response.status}") logging.debug(f"Response headers: {response}") try: # Attempt to parse and log the response body. response_body = json.loads(content) logging.debug(f"Response body: {response_body}") # Check the response status code for success/failure if response.status == 200: logging.info("Successfully sent to Chronicle") else: logging.error(f"Failed to send to Chronicle: HTTP status {response.status}, Response body: {response_body}") except json.JSONDecodeError: logging.error(f"Failed to parse response body as JSON. Raw response: {content}") # To use this function, ensure you pass the correctly initialized `http_client` (AuthorizedHttp instance) and the `udm_events` payload. @app.route('/webhook', methods=['POST']) def webhook(): try: payload = request.json source_ip = get_host_ip() # Get the host IP #test_authentication(http_client) # uef_message = format_uef(payload, source_ip) not used uef_message = format_udm(payload, source_ip) # Format the payload as UDM # Convert the dictionary to a JSON string with indentation for formatting formatted_json = (uef_message) # Print the formatted JSON string to the console pprint.pprint(formatted_json) send_to_chronicle(uef_message,http_client) # Pass source_ip here as well return "Success", 200 except Exception as e: return str(e), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=5001,debug=True)