import boto3 import logging import random from flask import Flask, request, jsonify from pprint import pprint from datetime import datetime app = Flask(__name__) # Configure logging logging.basicConfig(level=logging.ERROR) app.logger.addHandler(logging.StreamHandler()) @app.route('/awssechub', methods=['GET', 'POST']) def webhook(): if request.method == 'GET': # Handle GET request return 'Hello, AWS SecHub webhook to AWS API application is waiting for webooks from Superna ZeroTrust API' else: #Handle post to AWS Sechub # Handle POST request print("Processing webhook request from Defender.") data = request.json # This assumes the incoming request has a JSON payload pprint(data) # dump json payload to console for reviewing what data was recieved # Create a SecurityHub client with the specified region # session = boto3.Session(profile_name='myprofile'). needs variable set on where to get credentials file print('Mapping webhook alarm data to AWS Security Hub AFSS format data') # extract from json payload id = data.get('id') # unique id if not id: id = random.randint(1_000_000_000, 9_999_999_999) print('id value of alarm is '.format(id)) severity = data.get('severity') # severity # map severity to AWS Sec hub values INFORMATIONAL | LOW | MEDIUM | HIGH | CRITICAL if severity == 'CRITICAL': severity = 'CRITICAL' elif severity == 'MAJOR': severity = 'HIGH' elif severity == 'WARNING': severity = 'MEDIUM' elif severity == 'MINOR': severity = 'LOW' elif severity == 'INFORMATIONAL': severity = 'INFORMATIONAL' state = data.get('state') userName = data.get('userName') # friendly name user = data.get('user') # sid protocol = data.get('protocol') # SMB or NFS eventSource = data.get('eventSource') # device type numFiles = data.get('numFiles') # number of affected files numFiles_str = str(numFiles) nes = data.get('nes',[]) # cluster name array of affected clusters nes = nes[0] if nes else None #get first cluster set to none if empty detectedDT = data.get('detected') # date and time of detection string clientIPs = data.get('clientIPs',[]) #client ip address array get all ip's clientIPs = clientIPs[0] if clientIPs else None #if empty array set value to None smbShareList = data.get('shares') # get array of share names # loop through array to get the name of the shares and store in new string of share names sharenames = [share['name'] for share in smbShareList] # Convert the list of names to a comma-separated string sharenames_str = ', '.join(sharenames) # list of comma separated share names in the security incident fileList = data.get('files',[])# get array of files fileList_str = "\r\n".join(map(str, fileList)) fileList_str_truncate = fileList_str[:1024] print('File list as string of affected files ' + fileList_str_truncate) session = boto3.Session( aws_access_key_id='xxxx', aws_secret_access_key='yyyyy', region_name='us-east-2') client = session.client('securityhub', region_name='us-east-2') print('AWS API session created') # Product ARN for your custom integration #product_arn = 'arn:aws:securityhub:us-east-2:617004647878:product/617004647878/default' # Enable the product for findings import (you only need to run this once per product) #try: # client.enable_import_findings_for_product(ProductArn=product_arn) # print(f"Product {product_arn} enabled successfully!") #except Exception as e: # # It's possible the product is already enabled, so handle any exceptions gracefully # print(f"Could not enable product {product_arn}. Reason: {e}") # Define the finding print('Building Zero Trust to AWS Securit Hub Schema object from webhook data') # setting required fields that need date and time in string format. current_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') FirstObservedAt = current_time LastObservedAt = current_time finding = { 'SchemaVersion': '2018-10-08', 'Id': id, # Change this for every new finding 'ProductArn': 'arn:aws:securityhub:us-east-2:123456:product/123456/default', # Your product ARN 'GeneratorId': 'SupernaZT', 'AwsAccountId': '617004647878', # Your AWS Account ID 'Types': [ 'Security finding from Defender' ], 'FirstObservedAt': FirstObservedAt, 'LastObservedAt': LastObservedAt, 'CreatedAt': current_time, 'UpdatedAt': current_time, 'Severity': { 'Label': severity }, 'Title': 'Superna Ransomware Defender Security Finding', 'Description': 'This security event should be investigated as possible Ransomware Attack.', 'CompanyName': 'Superna', 'ProductName': 'Defender for AWS', 'Malware': [ { 'Name': 'Ransomware Variant Unknown', 'State': 'OBSERVED', 'Type': 'RANSOMWARE' }], 'Network': { 'SourceIpV4': clientIPs }, 'Note': { 'Text': 'Comment: User locked out of file system', 'UpdatedAt': current_time, 'UpdatedBy': 'Defender by AWS' }, 'Resources': [ { 'Type': 'Other', #'Id': 'arn:aws:ec2:us-east-2:617004647878:instance/i-0b3d6f5b6a461dcab', 'Id': 'Cyber Storage Event', 'Partition': 'aws', 'Details': { 'Other': { 'AD User Name': userName, 'sid of user': user, 'Protocol': protocol, 'Device Type': eventSource, 'Number of files': numFiles_str, 'Cluster Name': nes, 'Detection Time': detectedDT, 'State of Event': state, 'Source Host IP': clientIPs, 'List of Affected SMB Shares': sharenames_str, 'Partial list of Files 1024 char limit': fileList_str_truncate } } } ] } # Import the finding print('Sending AWS secops API') response = client.batch_import_findings(Findings=[finding]) print(response) return('done sending AWS secops API call check for http 200 and success count in response') if __name__ == "__main__": webhook() if __name__ == '__main__': app.run(host='0.0.0.0', port=5000) # debug of flask app is enbled