from flask import Flask, request, jsonify
import requests
import logging
import json
import os
# === CONFIGURATION VARIABLES ===
SOLARWINDS_URL = 'https://api.samanage.com/incidents.json'
COMMENTS_URL_TEMPLATE = 'https://api.samanage.com/incidents/{incident_id}/comments.json'
CATEGORIES_URL = 'https://api.samanage.com/categories.json'
CATEGORY_CACHE_FILE = '.category_cache.json'
AUTH_TOKEN = 'xxxxxxxxxxxx'
DEFAULT_REQUESTER_EMAIL = "email here"
DEFAULT_CATEGORY_NAME = "Security Alert"
HEADERS = {
'X-Samanage-Authorization': f'Bearer {AUTH_TOKEN}',
'Content-Type': 'application/json',
'Accept': 'application/vnd.samanage.v2.1+json'
}
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
def load_cached_categories():
if os.path.exists(CATEGORY_CACHE_FILE):
with open(CATEGORY_CACHE_FILE, 'r') as f:
try:
return json.load(f)
except json.JSONDecodeError:
return {}
return {}
def save_cached_categories(cache):
with open(CATEGORY_CACHE_FILE, 'w') as f:
json.dump(cache, f, indent=2)
def ensure_category_exists(category_name):
logging.info(f"[Category] Ensuring category exists: '{category_name}'")
cache = load_cached_categories()
if category_name in cache:
logging.info(f"[Category] Found in cache: {category_name} = {cache[category_name]}")
return category_name
try:
response = requests.get(CATEGORIES_URL, headers=HEADERS)
if response.status_code == 200:
for cat in response.json():
if cat["name"].lower() == category_name.lower():
cache[category_name] = cat["id"]
save_cached_categories(cache)
logging.info(f"[Category] Found online: {category_name} = {cat['id']}")
return category_name
except Exception:
logging.exception("❌ Exception during category listing.")
try:
payload = {"category": {"name": category_name, "state": "active"}}
response = requests.post(CATEGORIES_URL, headers=HEADERS, json=payload)
if response.status_code in (200, 201):
category_id = response.json()["id"]
cache[category_name] = category_id
save_cached_categories(cache)
logging.info(f"[Category] Created: {category_name} = {category_id}")
return category_name
except Exception:
logging.exception("❌ Exception during category creation.")
return category_name
def create_servicedesk_ticket(subject, description, priority='Low', requester_email=None, category_name=None):
payload = {
"incident": {
"name": subject,
"description": description,
"priority": priority,
"category": {
"name": category_name
},
"requester": {
"email": requester_email or DEFAULT_REQUESTER_EMAIL
}
}
}
logging.info("[ServiceDesk] Creating new ticket...")
logging.info(json.dumps(payload, indent=2))
try:
response = requests.post(SOLARWINDS_URL, headers=HEADERS, json=payload)
logging.info(f"[ServiceDesk] HTTP status: {response.status_code}")
logging.info(json.dumps(response.json(), indent=2))
return response.json() if response.status_code in (200, 201) else None
except Exception:
logging.exception("❌ Exception during ticket creation.")
return None
def search_ticket_by_event_id(event_id):
query = f"eventID {event_id}"
params = {"search": query}
logging.info(f"[ServiceDesk Search] Searching for: {query}")
try:
response = requests.get(SOLARWINDS_URL, headers=HEADERS, params=params)
logging.info(f"[ServiceDesk Search] HTTP status: {response.status_code}")
logging.info(json.dumps(response.json(), indent=2))
return response.json() if response.status_code == 200 else []
except Exception:
logging.exception("❌ Exception during search request.")
return []
def add_comment_to_ticket(incident_id, description):
url = COMMENTS_URL_TEMPLATE.format(incident_id=incident_id)
payload = {
"comment": {
"body": description,
"is_private": False
}
}
logging.info(f"[ServiceDesk] Adding comment to incident ID {incident_id}")
logging.info(json.dumps(payload, indent=2))
try:
response = requests.post(url, headers=HEADERS, json=payload)
logging.info(f"[Comment API] HTTP status: {response.status_code}")
logging.info(json.dumps(response.json(), indent=2))
return response.status_code in (200, 201)
except Exception:
logging.exception("❌ Exception during comment creation.")
return False
@app.route('/webhook', methods=['POST'])
def webhook():
logging.info("📩 Incoming webhook received")
data = request.json
logging.info(json.dumps(data, indent=2))
severity = data.get("severity", "Unspecified").lower()
raw_event_id = data.get("id", "no-eventid")
cleaned_event_id = raw_event_id.replace(":", "_")
user = data.get("userName", "N/A")
client_ip = data.get("client_ip", "N/A")
timestamp = data.get("detected", "N/A")
state = data.get("state", "N/A")
actions = data.get("actions", "N/A")
shares = data.get("shares", [])
files = data.get("files", "N/A")
alert_url = data.get("url", "N/A")
priority_map = {"warning": "Low", "major": "Medium", "critical": "High"}
priority = priority_map.get(severity, "Low")
requester_email = data.get("email")
if not requester_email or '@' not in requester_email:
logging.warning("⚠️ Invalid or missing email; using default.")
requester_email = DEFAULT_REQUESTER_EMAIL
category_name = ensure_category_exists(DEFAULT_CATEGORY_NAME)
share_names = [s.get("name") for s in shares if isinstance(s, dict) and "name" in s]
share_list = ", ".join(sorted(set(share_names))) if share_names else "N/A"
logging.info(f"🔎 Normalized Event ID: {cleaned_event_id}")
logging.info(f"🔎 User: {user}, IP: {client_ip}, Severity: {severity}, Timestamp: {timestamp}")
logging.info(f"🔎 Requester Email: {requester_email}")
subject = f"Storage Anomaly Security Alert for {user}, on host {client_ip} - eventID {cleaned_event_id}"
description = f"""A security alert was detected: