from flask import Flask, request, jsonify import requests import json from datetime import datetime app = Flask(__name__) @app.route('/defenderxdr', methods=['POST']) def webhook(): try: # Parse the incoming JSON payload payload = request.json # Filter and restructure the payload as needed for Splunk zt_event = { "event": { "user": payload.get("user"), "state": payload.get("state"), "userName": payload.get("userName"), "protocol": payload.get("protocol"), "eventSource": payload.get("eventSource"), "numFiles": payload.get("numFiles"), "nes": payload.get("nes"), "detected": payload.get("detected"), "clientIPs": payload.get("clientIPs"), "shares": payload.get("shares"), "files": payload.get("files"), # Add other fields as required } } # Pretty print the event data print(json.dumps(zt_event, indent=4)) # create comment 4000 bytes only payload zt_summary_event = { "event": { #"user": payload.get("user"), "state": payload.get("state"), "userName": payload.get("userName"), #"protocol": payload.get("protocol"), #"eventSource": payload.get("eventSource"), #"numFiles": payload.get("numFiles"), #"nes": payload.get("nes"), #"detected": payload.get("detected"), #"clientIPs": payload.get("clientIPs"), "shares": payload.get("shares"), # Add other fields as required } } # Assuming payload['shares'] is an array of objects, and each object has a 'name' property shares_array = payload.get("shares", []) # Extract the 'name' property from each object in the array, then join them shares_string = ", ".join([item.get('name', '') for item in shares_array]) # Replace 'shares' in the zt_summary_event object with the string zt_summary_event["event"]["shares"] = shares_string # Pretty print the updated zt_summary_event object pretty_printed_summary = json.dumps(zt_summary_event, indent=4) print(pretty_printed_summary) ######################## Integration Option Settings ################### global host_isolation host_isolation = 'false' # default setting, will not isolate the host in Microsoft defender XDR, if enable to true, a critical lockout from Zero trust will request defender to isolate the host from the network ####################################################################### def count_bytes_in_object(obj): # Convert the object to a JSON string json_string = json.dumps(obj) # Count the bytes in the JSON string byte_count = len(json_string.encode('utf-8')) # Print the byte count to the console print(f"Byte count of the summary event object: {byte_count}") count_bytes_in_object(zt_summary_event) # Extract the client IPs global client_ips client_ips = zt_event["event"].get("clientIPs", "No client IP found") # Extract the client IPs client_ips = zt_event["event"].get("clientIPs", []) if client_ips: # Check if the list is not empty client_ips = client_ips[0] # Get the first IP address else: client_ips = "No client IP found" # Print the client IPs to the console print("Client IP of zero trust alert:", client_ips) # Set XDR application authentication material # Tenant ID, Client ID, and Client Secret tenant_id = "xxxxx" client_id = "yyyy" client_secret = "zzzzzz" # Token Endpoint token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token" print("URL for token request:", token_url) # Function to request access token def get_token(url, client_id, client_secret, scope): headers = { "Content-Type": "application/x-www-form-urlencoded" } body = { "grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret, "scope": scope } print(f"Requesting token for scope: {scope}") response = requests.post(url, headers=headers, data=body) if response: print("Received response for scope:", scope) print("Response Status Code:", response.status_code) if response.status_code == 200: access_token = response.json().get("access_token") if access_token: print("Access token obtained for scope:", scope) return access_token else: print("No access token in response. Response content:", response.json()) else: print("Failed to obtain token. Response content:", response.json()) else: print("No response received for scope:", scope) return None # Scopes for the two different APIs scope_security_center = "https://api.securitycenter.microsoft.com/.default" scope_graph_api = "https://graph.microsoft.com/.default" # Get token for Microsoft Security Center API global security_center_token security_center_token = get_token(token_url, client_id, client_secret, scope_security_center) global graph_api_token # Get token for Microsoft Graph API graph_api_token = get_token(token_url, client_id, client_secret, scope_graph_api) # Get list of hosts in Defender and extract the IP address information for mapping to ZT event data. # API endpoint URL for listing machines list_machines_url = "https://api.securitycenter.microsoft.com/api/machines" # Set the Authorization header with the obtained access token headers = { "Authorization": f"Bearer {security_center_token}", "Content-Type": "application/json" } # Make the API request to list machines response = requests.get(list_machines_url, headers=headers) # Process the response if response.status_code == 200: machines_data = response.json() for machine in machines_data['value']: print(f"Machine ID: {machine['id']}") print(f"Machine Name: {machine['computerDnsName']}") print(f"Last Internal IP: {machine['lastIpAddress']}") print(f"Last External IP: {machine['lastExternalIpAddress']}") else: print(f"Failed to retrieve machines: {response.status_code}, {response.text}") # Function to find machine ID by IP and timestamp def find_machine_id_by_ip_and_timestamp(security_center_token, ip_address_to_search): """ Function to find the machine ID in Microsoft Defender by IP address and timestamp, and pretty print all returned data. """ # Get the current time in ISO 8601 format current_timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") # API endpoint URL for finding machine information by IP address and timestamp find_machine_url = f"https://api.securitycenter.microsoft.com/api/machines/findbyip(ip='{ip_address_to_search}',timestamp={current_timestamp})" # Set the Authorization header with the obtained access token headers = { "Authorization": f"Bearer {security_center_token}", "Content-Type": "application/json" } print(f"Searching for machine ID with IP: {ip_address_to_search} at timestamp: {current_timestamp}") print(f"Endpoint URL: {find_machine_url}") # Making the API request response = requests.get(find_machine_url, headers=headers) # Process the response if response.status_code == 200: response_data = response.json() print("\nAll returned data (pretty-printed):") print(json.dumps(response_data, indent=4)) # Pretty print the entire response data machines = response_data.get('value', []) if machines: # Assuming the first machine in the list is the one we're interested in machine_id = machines[0].get('id', None) return machine_id else: print("No machines found with the specified IP address and timestamp.") return None else: print(f"Failed to find machine: {response.status_code}, {response.text}") return None # Call the function machine_id = find_machine_id_by_ip_and_timestamp(security_center_token, client_ips) # use ip from WH payload to lookup a machine in XDR print(f"Machine ID located from zero trust query: {machine_id}") ################# if zero trust alert contains status lockout AND the global flag to Isolate the host is enabled act against the host via API ########## ### https://learn.microsoft.com/en-us/microsoft-365/security/defender-endpoint/api/isolate-machine?view=o365-worldwide. print(f"Machine/ Host Isolation global flag is : {host_isolation}") def isolate_machine(access_token, machine_id, comment, isolation_type): """ Isolates a machine using Microsoft Defender for Endpoint API. Parameters: access_token (str): The OAuth2 access token for Defender API. machine_id (str): The ID of the machine to isolate. comment (str): A comment about the isolation action. isolation_type (str): Type of isolation, e.g., 'Full' or 'Selective'. Returns: dict: The response from the API call. """ url = f"https://api.securitycenter.windows.com/api/machines/{machine_id}/isolate" headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } body = { "Comment": comment, "IsolationType": isolation_type } response = requests.post(url, headers=headers, data=json.dumps(body)) if response.status_code == 200 or response.status_code == 201: return response.json() else: print(f"Failed to isolate machine: {response.status_code}, {response.text}") return None # Usage example comment = "Isolating host for security reasons, User lockout on NAS storage" isolation_type = "Full" # Can be 'Full' or 'Selective' if zt_summary_event["event"].get("state") == "LOCKED_OUT" and host_isolation: # Call the isolate_machine function comment = "Isolating host/machine due to locked out state on NAS storage by Superna Security Edition" isolation_type = "Full" # Can be 'Full' or 'Selective' response = isolate_machine(security_center_token, machine_id, comment, isolation_type) if response: print("Machine isolation initiated:", json.dumps(response, indent=4)) else: print("Failed to initiate machine isolation.") ######################## end of isilation function ############################# # Create investigation for a machine ID based on ZT alert ####################### # ... [Code for initiate_investigation function] ####################### create investigation for a machine ID ##################### print("create investigation for a machine ID based on ZT alert") def initiate_investigation(access_token, machine_id, comment): """ Function to initiate an investigation on a specific machine in Microsoft Defender with a comment. Parameters: security_center_token (str): The access token for Microsoft Defender API. machine_id (str): The ID of the machine to investigate. comment (str): The comment to include in the investigation. Returns: dict: The response from the API. """ # API endpoint URL for initiating an investigation on a specific machine url = f"https://api.security.microsoft.com/api/machines/{machine_id}/startInvestigation" # Prepare the request data with the comment data = { "Comment": comment } # Set the Authorization header with the obtained access token headers = { "Authorization": f"Bearer {security_center_token}", "Content-Type": "application/json" } # Making the API request response = requests.post(url, headers=headers, data=json.dumps(data)) # Process the response if response.status_code in [200, 201, 202]: response_data = response.json() # Extract the alert ID from the response (the key name depends on the actual API response structure) alert_id = response_data.get('triggeringAlertId', None) print(f"Alert ID created from investigation: {alert_id}") return response.json() else: print(f"Failed to initiate investigation on machine {machine_id}: {response.status_code}, {response.text}") return None # Example usage of the function # Replace 'your_security_center_token' and 'your_comment' with your actual access token and comment # machine_id = "63a2e9425d017969130d81d015bf719c55ca40f4" # Replace with actual machine IDcomment = "Test investigation" # Replace with your comment # comment = "Superna CyberStorage detected possible Ransomware Activity on this host" # Replace with your commen comment = json.dumps(zt_summary_event, indent=4) #comment = 'Superna Zero Trust threat alert in progress' response = initiate_investigation(security_center_token, machine_id, comment) alert_id = response.get('triggeringAlertId', None) print(f"Alert ID to be used in the update request to the alert: {alert_id}") print(response) #######. Update the alert with extra data from the ZT webhook payload --- using Graph Security v2 alert API ############################## print(f"Updating alert (using v2 alert api) created from automated investigation using alert id {alert_id}") def update_alert(graph_api_token, alert_id, alert_update_data): """ Function to update an alert in Microsoft Defender. Parameters: security_center_token (str): The access token for Microsoft Defender API. alert_id (str): The ID of the alert to update. update_data (dict): Data containing the updates to be made to the alert. Returns: dict: The response from the API. """ # API endpoint URL for updating an alert url = f"https://graph.microsoft.com/v1.0/security/alerts_v2/{alert_id}" # Set the Authorization header with the obtained access token headers = { "Authorization": f"Bearer {graph_api_token}", "Content-Type": "application/json" } print(f"Endpoint for alertv2 update URL: {url}") # Making the API request response = requests.patch(url, headers=headers, data=json.dumps(alert_update_data)) # Process the response if response.status_code in [200, 204]: print("Alert updated successfully.") return response.json() if response.status_code == 200 else {} else: print(f"Failed to update alert: {response.status_code}, {response.text}") return None # Example usage of the function # Replace 'your_security_center_token', 'your_alert_id', and 'update_data' with actual values alert_update_data = { "status": "inProgress", "assignedTo": "Superna ZeroTrust Alert", "classification": "truePositive", "determination": "malware", # graph security api new value https://learn.microsoft.com/en-us/graph/api/security-alert-update?view=graph-rest-1.0&tabs=http "comment": "Updated by Superna Zero Trust Microsoft Defender XDR integration" } response = update_alert(graph_api_token, alert_id, alert_update_data) print(response) ############## get alert details using graph alertv2 api #################### print(f"Listing alert details after setting properties using v2 alert api {alert_id}") def get_alert_details(access_token, alert_id): """ Fetches details of a specific alert using the Microsoft Graph API. Parameters: access_token (str): The OAuth2 access token for Microsoft Graph API. alert_id (str): The ID of the alert to retrieve. Returns: dict: The response containing the details of the alert. """ url = f"https://graph.microsoft.com/v1.0/security/alerts_v2/{alert_id}" headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } response = requests.get(url, headers=headers) if response.status_code == 200: return response.json() else: print(f"Failed to get alert details: {response.status_code}, {response.text}") return None # Usage example alert_details = get_alert_details(graph_api_token, alert_id) if alert_details: print(json.dumps(alert_details, indent=4)) ######## function done is returned to calling application not the console ########### return jsonify({"status": "successful Zero Trust integration execution!"}) except Exception as e: print(f"An error occurred: {str(e)}") return jsonify({"status": "error", "message": str(e)}) if __name__ == '__main__': app.run(host='0.0.0.0', port=5000)