import win32evtlog import json import requests import os import sys import logging from datetime import datetime # File to store the last processed record number LAST_PROCESSED_FILE = 'last_processed_record.txt' # CrowdStrike API credentials client_id = 'xxxxxx' client_secret = 'yyyyy' base_url = "https://api.us-2.crowdstrike.com" # Configuration server = 'localhost' log_name = 'Data Security Essentials' source_names = ['Superna Data Security Essentials BOT Service', 'Superna Data Security Policy Engine'] trigger_severities = ['MAJOR', 'CRITICAL'] # Configure severities that should trigger containment enable_isolation = True # Set to False to disable isolation actions def setup_logging(): """Set up logging to file and console.""" script_directory = os.path.dirname(os.path.abspath(__file__)) log_file_path = os.path.join(script_directory, "crowdstrike-dse.log") logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file_path, mode='w'), logging.StreamHandler(sys.stdout) ]) logging.info(f"Script run on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") logging.info(f"Logging to file: {log_file_path}") def get_last_processed_record(): """Retrieve the last processed event record number and initialize if the file doesn't exist.""" if os.path.exists(LAST_PROCESSED_FILE): try: with open(LAST_PROCESSED_FILE, 'r') as f: lines = f.readlines() if len(lines) >= 2: last_record = int(lines[0].strip()) last_time_and_event_id = lines[1].strip() logging.info(f"Loaded last processed record: {last_record}, {last_time_and_event_id}") return last_record, last_time_and_event_id except Exception as e: logging.error(f"Error reading {LAST_PROCESSED_FILE}: {e}") logging.info(f"No last processed record file found. Starting fresh.") return 0, None def save_last_processed_record(record_number, event_time, event_id): """Save the last processed event record number, time, and EventID to a file.""" try: with open(LAST_PROCESSED_FILE, 'w') as f: f.write(f"{record_number}\n") f.write(f"{event_time} | EventID: {event_id}\n") logging.info(f"Saved last processed record: {record_number}, {event_time}, EventID: {event_id}") except Exception as e: logging.error(f"Error saving {LAST_PROCESSED_FILE}: {e}") def collect_windows_event_log(server, log_name, source_names): """Collect event logs from the specified Windows Event Log and extract relevant client IPs.""" logging.info(f"Collecting Windows Event Logs from {log_name} on server {server}...") h = win32evtlog.OpenEventLog(server, log_name) flags = win32evtlog.EVENTLOG_BACKWARDS_READ | win32evtlog.EVENTLOG_SEQUENTIAL_READ last_processed_record, last_time_and_event_id = get_last_processed_record() logs = [] latest_record = last_processed_record latest_event_time = None latest_event_id = None try: while True: events = win32evtlog.ReadEventLog(h, flags, 0) if not events: logging.debug("No more events to read.") break for event in events: record_number = event.RecordNumber event_time = event.TimeGenerated.Format() event_id = event.EventID & 0xFFFF # Skip already processed events if record_number <= last_processed_record: continue # Filter by source names if event.SourceName not in source_names: continue # Log event metadata logging.info(f"Processing Event - RecordNumber: {record_number}, EventID: {event_id}, TimeGenerated: {event_time}, Source: {event.SourceName}") try: # Extract and log JSON payload from Data if event.Data: data_content = event.Data.decode('utf-8', errors='replace') logging.debug(f"Raw JSON Payload: {data_content}") try: event_json = json.loads(data_content) logging.debug(f"Parsed JSON: {json.dumps(event_json, indent=2)}") # Filter by severity severity = event_json.get('severity') if severity not in trigger_severities: logging.info(f"Event severity '{severity}' does not match trigger severities: {trigger_severities}. Skipping.") continue # Extract client IPs from the parsed JSON client_ips = event_json.get('clientIPs', []) if client_ips: logs.extend(client_ips) logging.info(f"Extracted Client IPs: {client_ips}") except json.JSONDecodeError as e: logging.error(f"Failed to parse JSON from event.Data: {e}") except Exception as e: logging.warning(f"Failed to process log entry: {e}") # Update the latest processed record info latest_record = max(latest_record, record_number) latest_event_time = event_time latest_event_id = event_id finally: win32evtlog.CloseEventLog(h) # Save the latest processed record info if latest_event_time and latest_event_id: save_last_processed_record(latest_record, latest_event_time, latest_event_id) logging.info(f"Total Extracted Client IPs: {len(logs)} - {logs}") return logs def authenticate_to_crowdstrike(): """Authenticate to the CrowdStrike API.""" logging.info("Authenticating to CrowdStrike API...") url = f"{base_url}/oauth2/token" headers = {'Content-Type': 'application/x-www-form-urlencoded'} data = {'client_id': client_id, 'client_secret': client_secret} response = requests.post(url, headers=headers, data=data) response.raise_for_status() token = response.json().get('access_token') logging.info("Successfully authenticated to CrowdStrike API.") return token def get_device_id_by_ip(access_token, ip_address): """Find the device ID for a given IP address.""" logging.info(f"Attempting to find device ID for IP address: {ip_address}") url = f"{base_url}/devices/queries/devices/v1" headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } params = { 'filter': f'local_ip:"{ip_address}"' } response = requests.get(url, headers=headers, params=params) response.raise_for_status() device_ids = response.json().get('resources', []) if not device_ids: logging.error("No device found with the specified IP address") raise ValueError("No device found with the specified IP address") logging.info(f"Device ID found: {device_ids[0]}") return device_ids[0] def isolate_host(access_token, device_id): """Contain a host by its device ID if isolation is enabled.""" if enable_isolation: logging.info(f"Initiating containment for device with ID: {device_id}") url = f"{base_url}/devices/entities/devices-actions/v2" headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } params = { 'action_name': 'contain' } body = { 'ids': [device_id] } response = requests.post(url, headers=headers, json=body, params=params) response.raise_for_status() logging.info("Containment action initiated successfully.") return response.json() else: logging.info("Containment is disabled.") return {"message": "Containment disabled by configuration."} def main(): """Main execution logic.""" setup_logging() client_ips = collect_windows_event_log(server, log_name, source_names) if not client_ips: logging.warning("No client IPs found in Windows Event Logs.") return try: access_token = authenticate_to_crowdstrike() except requests.RequestException as e: logging.error(f"Failed to authenticate to CrowdStrike API: {e}") return for client_ip in client_ips: try: device_id = get_device_id_by_ip(access_token, client_ip) isolate_host(access_token, device_id) except Exception as e: logging.error(f"Failed to process containment for IP {client_ip}: {e}") if __name__ == "__main__": main()