from flask import Flask, request
import socket
import logging
from datetime import datetime
import json
import pprint
# Imports required for the sample - Google Auth and API Client Library Imports.
# Get these packages from https://pypi.org/project/google-api-python-client/ or run $ pip
# install google-api-python-client from your terminal
from google.oauth2 import service_account
from googleapiclient import _auth
from google_auth_httplib2 import AuthorizedHttp
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('googleapiclient.discovery_cache')
logger.setLevel(logging.ERROR) # To avoid logging discovery cache warnings
app = Flask(__name__)
################ Google Chronicle SIEM Integraion section #########################################
SCOPES = ['https://www.googleapis.com/auth/malachite-ingestion']
SERVICE_ACCOUNT_FILE = '/opt/superna/cgi-bin/nfrsuperna_IngestAPI_creds.json' # Update with the actual path
# Create credentials
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE,
scopes=SCOPES
)
################ Google Chronicle SIEM Integraion section #########################################
# Use AuthorizedHttp to create an HTTP client that takes care of authorization
http_client = AuthorizedHttp(credentials)
def get_host_ip():
try:
# Get the hostname
host_name = socket.gethostname()
(f"Hostname: {host_name}") # Debug print
# Get the IP address of the host
host_ip = socket.gethostbyname(host_name)
(f"Host IP: {host_ip}") # Debug print
return host_ip
except Exception as e:
pprint.pprint(f"Error getting host IP: {e}") # Debug pprint.pprint
return "Unable to get Host IP"
def format_udm(payload, source_ip):
# Example UDM event based on the provided payload and source_ip
# This is a simplified example; you should expand this according to your data and the UDM schema
event = {
"customer_id": "0a0d6ba3-9a06-4916-8a09-8103a4617a08", # Replace with your actual Chronicle customer ID
"events": [
{
"metadata": {
"event_timestamp": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'),
"description": "Superna Zero Trust Cyber Storage Threat Detection",
"event_type": "SCAN_HOST",
"product_name": "Eyeglass Zero Trust",
"vendor_name": "Superna",
"product_log_id": "Zerot Trust Webhook", # Additional note: Ensure this field is supported or needed.
"product_version": "V2.8.0"
},
"principal": {
"ip": [payload.get('clientIPs', 'Unknown')], # Assuming 'clientIPs' can be directly used here.
"user": {
"userid": payload.get('userName', 'Unknown')
}
},
"observer": {
"hostname": "Superna-Eyeglass",
"ip": source_ip
},
"target": {
"application": "Unknown Application", # Specific application name should be provided if known.
"ip": [payload.get('clientIPs', 'Unknown')]
},
"security_result": {
"category": "SOFTWARE_MALICIOUS",
"summary": "The list of affected files from the storage layer incident: " + json.dumps(payload.get('files', [])),
"category_details": "Ransomware",
"priority_details": json.dumps(payload.get('shares', [])),
"threat_name": "Superna Threat detector",
# Ensure correct key names and uncomment as necessary
# "threat_status": 1,
# "verdict_type": 0,
# "threat_verdict": 3,
"severity": payload.get('severity', 'Unknown'),
"severity_details": "Critical",
"alert_state": "ALERTING",
"action": "BLOCK",
"action_details": payload.get('actions', [{'action': 'Unknown'}])[0].get('action', 'Unknown'),
"confidence": "HIGH_CONFIDENCE",
# "verdictinfo": {
# "category_details": "Ransomware",
# "source_provider": "Superna Security Edition",
# "malicious_count": payload.get('peakCritical', 0) # critical signal strength
},
},
"extensions": {
# Include any additional context or metadata related to the security event here.
}
}
]
}
# The return statement should use the 'event' variable, not 'udm_event'
return json.dumps(event)
def send_to_chronicle(udm_events, http_client):
url = 'https://malachiteingestion-pa.googleapis.com/v2/udmevents:batchCreate'
headers = {'Content-Type': 'application/json'}
body = udm_events
# Make the POST request with detailed logging
logging.info(f"Sending request to {url} with body: {body} and headers: {headers}")
response, content = http_client.request(uri=url, method='POST', body=body, headers=headers)
# The response object is a httplib2.Response, status code is accessed with .status
logging.debug(f"Response status: {response.status}")
logging.debug(f"Response headers: {response}")
try:
# Attempt to parse and log the response body.
response_body = json.loads(content)
logging.debug(f"Response body: {response_body}")
# Check the response status code for success/failure
if response.status == 200:
logging.info("Successfully sent to Chronicle")
else:
logging.error(f"Failed to send to Chronicle: HTTP status {response.status}, Response body: {response_body}")
except json.JSONDecodeError:
logging.error(f"Failed to parse response body as JSON. Raw response: {content}")
# To use this function, ensure you pass the correctly initialized `http_client` (AuthorizedHttp instance) and the `udm_events` payload.
@app.route('/webhook', methods=['POST'])
def webhook():
try:
payload = request.json
source_ip = get_host_ip() # Get the host IP
# test_authentication(http_client)
# uef_message = format_uef(payload, source_ip) not used
uef_message = format_udm(payload, source_ip) # Format the payload as UDM
# Convert the dictionary to a JSON string with indentation for formatting
formatted_json = (uef_message)
# Print the formatted JSON string to the console
pprint.pprint(formatted_json)
send_to_chronicle(uef_message,http_client) # Pass source_ip here as well
return "Success", 200
except Exception as e:
return str(e), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5001,debug=True)