import win32evtlog import win32evtlogutil import json import requests import os import sys import logging from datetime import datetime # File to store the last processed record number, datetime, and event ID LAST_PROCESSED_FILE = 'last_processed_record.txt' def setup_logging(): # Get the directory where the script is running script_directory = os.path.dirname(os.path.abspath(__file__)) log_file_path = os.path.join(script_directory, "dse-servicenow-sir-integration.log") # Set up logging to both console and file logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file_path, mode='w'), logging.StreamHandler(sys.stdout) ]) # Log the script start time logging.info(f"Script run on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") logging.info(f"Logging to file: {log_file_path}") def get_last_processed_record(): """Retrieve the last processed event record number, time, and event ID from a file.""" if os.path.exists(LAST_PROCESSED_FILE): with open(LAST_PROCESSED_FILE, 'r') as f: lines = f.readlines() if len(lines) >= 2: last_record = int(lines[0].strip()) last_time_and_event_id = lines[1].strip() return last_record, last_time_and_event_id return 0, None def save_last_processed_record(record_number, event_time, event_id): """Save the last processed event record number, time, and event ID to a file.""" with open(LAST_PROCESSED_FILE, 'w') as f: f.write(f"{record_number}\n") f.write(f"{event_time} | EventID: {event_id}\n") def refresh_event_log_view(server, log_name): logging.debug(f"Refreshing event log view: {log_name} on server: {server}") h = win32evtlog.OpenEventLog(server, log_name) return h def send_to_servicenow(event_data, servicenow_url, api_key): headers = { 'Content-Type': 'application/json', 'Authorization': f'Basic {api_key}' } # Output the JSON payload to the log and console logging.debug(f"Webhook Payload to be sent:\n{json.dumps(event_data, indent=2)}") response = requests.post(servicenow_url, headers=headers, data=json.dumps(event_data)) if response.status_code == 201: logging.debug(f"Successfully sent data to ServiceNow. Response payload: {response.text}") else: logging.debug(f"Failed to send data to ServiceNow: {response.status_code}, {response.text}") def read_custom_event_logs(server, log_name, source_names, servicenow_url, api_key): h = refresh_event_log_view(server, log_name) flags = win32evtlog.EVENTLOG_FORWARDS_READ | win32evtlog.EVENTLOG_SEQUENTIAL_READ events = [] last_processed_record, last_time_and_event_id = get_last_processed_record() logging.debug(f"Last processed record number: {last_processed_record}") if last_time_and_event_id: logging.debug(f"Last processed event: {last_time_and_event_id}") logging.debug(f"Starting to read event logs from sources: {source_names}...") total_events_processed = 0 matched_events = 0 latest_record = last_processed_record latest_event_time = None latest_event_id = None while True: events_chunk = win32evtlog.ReadEventLog(h, flags, 0) if not events_chunk: logging.debug("No more events to read.") break total_events_processed += len(events_chunk) logging.debug(f"Read {len(events_chunk)} events.") for event in events_chunk: record_number = event.RecordNumber # Skip already processed events if record_number <= last_processed_record: continue event_time = event.TimeGenerated.Format() event_id = event.EventID & 0xFFFF # Get the EventID as a 16-bit value # Filter for Error (1) and Warning (2) levels if event.EventType not in [1, 2]: continue if event.SourceName not in source_names: continue matched_events += 1 logging.debug(f"Matched Event from Source: {event.SourceName} on {event_time} with EventID: {event_id}") try: # Decode the EventData field assuming it contains JSON event_data_json = json.loads(event.Data.decode('utf-8', errors='replace')) logging.debug(f"Extracted Event Data JSON: {json.dumps(event_data_json, indent=2)}") # Construct the payload according to the JSON structure from the attached file payload = { "id": event_data_json.get("id"), "severity": event_data_json.get("severity"), "state": event_data_json.get("state"), "files": event_data_json.get("files", []), "rowKeys": event_data_json.get("rowKeys", []), "nes": event_data_json.get("nes", []), "user": event_data_json.get("user"), "userName": event_data_json.get("userName"), "shares": event_data_json.get("shares", []), "detected": event_data_json.get("detected"), "detectedTime": event_data_json.get("detectedTime"), "firstSignalTimeStamp": event_data_json.get("firstSignalTimeStamp"), "archivedTime": event_data_json.get("archivedTime"), "lockedOut": event_data_json.get("lockedOut"), "lockedOutTime": event_data_json.get("lockedOutTime"), "expiryTime": event_data_json.get("expiryTime"), "expiry": event_data_json.get("expiry"), "actions": event_data_json.get("actions", []), "possibleActions": event_data_json.get("possibleActions", []), "signalStrengths": event_data_json.get("signalStrengths", {}), "predicted": event_data_json.get("predicted", {}), "isRoot": event_data_json.get("isRoot", False), "monitorOnly": event_data_json.get("monitorOnly", False), "peakMonitor": event_data_json.get("peakMonitor", 0), "peakWarning": event_data_json.get("peakWarning", 0), "peakMajor": event_data_json.get("peakMajor", 0), "peakCritical": event_data_json.get("peakCritical", 0), "clientIPs": event_data_json.get("clientIPs", []), "numFiles": event_data_json.get("numFiles", 0), "isAudit": event_data_json.get("isAudit", False), "isRSW": event_data_json.get("isRSW", False), "isNFSMonitorMode": event_data_json.get("isNFSMonitorMode", False), "isSMBSnapshotEnabled": event_data_json.get("isSMBSnapshotEnabled", False), "isFilePolicy": event_data_json.get("isFilePolicy", False), "eventSource": event_data_json.get("eventSource"), "displayUserActivity": event_data_json.get("displayUserActivity", False), "protocol": event_data_json.get("protocol", ""), "snapshots": event_data_json.get("snapshots", {}), "deletedSnapshots": event_data_json.get("deletedSnapshots", []), "nfsProtocols": event_data_json.get("nfsProtocols", []), "isAPIEvent": event_data_json.get("isAPIEvent", False), "rswExtensions": event_data_json.get("rswExtensions", []), "extraParams": event_data_json.get("extraParams", {}) } # Send the constructed payload to ServiceNow send_to_servicenow(payload, servicenow_url, api_key) except json.JSONDecodeError as e: logging.error(f"Failed to decode JSON from EventData: {str(e)}") continue # Update the latest processed record number, event time, and event ID latest_record = max(latest_record, record_number) latest_event_time = event_time latest_event_id = event_id logging.debug(f"Total events processed: {total_events_processed}") logging.debug(f"Total matched events: {matched_events}") # Save the latest processed record number, time, and event ID to persist state if latest_event_time and latest_event_id: save_last_processed_record(latest_record, latest_event_time, latest_event_id) logging.debug(f"Last processed record saved: {latest_record} | {latest_event_time} | EventID: {latest_event_id}") return events if __name__ == "__main__": setup_logging() # Example usage server = 'localhost' log_name = 'Data Security Essentials' source_names = ['Superna Data Security Essentials BOT Service', 'Superna Data Security Policy Engine'] # Your ServiceNow webhook details # edit the servicenow url to match your instance url and scripted REST API resource, this is example below. servicenow_url = 'https://dev285558.service-now.com/api/xxxx/supernazt/zt' api_key = 'your_api_key' logging.debug(f"Retrieving events from {log_name} with sources {source_names}...") events = read_custom_event_logs(server, log_name, source_names, servicenow_url, api_key)