from flask import Flask, request import logging from logging.handlers import RotatingFileHandler import json import requests from datetime import datetime, timezone import re from typing import Any, Dict, Optional, Tuple, List # ============================================================================= # Logging # ============================================================================= log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') log_file = "ivanti_itsm_ransomware.log" file_handler = RotatingFileHandler(log_file, maxBytes=5 * 1024 * 1024, backupCount=3) file_handler.setFormatter(log_formatter) file_handler.setLevel(logging.INFO) console_handler = logging.StreamHandler() console_handler.setFormatter(log_formatter) console_handler.setLevel(logging.INFO) root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) root_logger.handlers.clear() root_logger.addHandler(file_handler) root_logger.addHandler(console_handler) app = Flask(__name__) # ============================================================================= # Config # ============================================================================= IVANTI_TENANT_URL = "https://xxxx".rstrip("/") IVANTI_REST_API_KEY = "yyyy" PROBLEMS_URL = f"{IVANTI_TENANT_URL}/api/odata/businessobject/problems" # Journal business object name can vary; your error explicitly says "Journal.Notes" # so we will try the canonical odata names commonly used for that object. JOURNAL_URL_CANDIDATES = [ f"{IVANTI_TENANT_URL}/api/odata/businessobject/Journal.Notes", f"{IVANTI_TENANT_URL}/api/odata/businessobject/journal.notes", f"{IVANTI_TENANT_URL}/api/odata/businessobject/Journal__Notess", ] STATIC_SOURCE = "Incident Management" STATIC_TEAM_VALUE = "Security" STATIC_CATEGORY = "Accessibility" STATIC_IMPACT_VALUE = "High" STATIC_URGENCY_VALUE = "High" JOURNAL_CATEGORY_VALUE = "Memo" def ivanti_headers() -> Dict[str, str]: return { "Authorization": f"rest_api_key={IVANTI_REST_API_KEY}", "Content-Type": "application/json", "Accept": "application/json", } def log_http(method: str, url: str, r: requests.Response, payload: Optional[dict] = None): try: body = r.text except Exception: body = "" logging.info("HTTP %s %s -> %s", method, url, r.status_code) if payload is not None: logging.info("Request payload: %s", json.dumps(payload, indent=2)[:8000]) logging.info("Response body: %s", body[:8000]) def sanitize_search_term(term: str) -> str: return re.sub(r"[^\w\-:.]", "", str(term or "")) def odata_value_list(resp_json: Any) -> List[Dict[str, Any]]: if isinstance(resp_json, dict) and isinstance(resp_json.get("value"), list): return resp_json["value"] if isinstance(resp_json, list): return resp_json return [] def escape_html(s: Any) -> str: s = "" if s is None else str(s) return ( s.replace("&", "&") .replace("<", "<") .replace(">", ">") .replace('"', """) .replace("'", "'") ) def to_ivanti_div_html(text_block: str) -> str: """ Convert text with \n into Ivanti rich-text storage format using
rows. """ if text_block is None: return "" t = str(text_block).replace("\r\n", "\n").replace("\r", "\n") lines = t.split("\n") def nbsp(s: str) -> str: return escape_html(s).replace(" ", " ") if not lines: return "" first = nbsp(lines[0]) divs: List[str] = [] for line in lines[1:]: if line.strip() == "": divs.append("

") else: divs.append(f"
{nbsp(line)}
") return first + "".join(divs) # ============================================================================= # Schema discovery by sampling an existing record # ============================================================================= _cached_problem_fields: Optional[List[str]] = None _cached_problem_team_field: Optional[str] = None _cached_journal_notes_field: Optional[str] = None _cached_journal_summary_field: Optional[str] = None _cached_journal_endpoint: Optional[str] = None def get_sample_problem_fields() -> List[str]: global _cached_problem_fields if _cached_problem_fields is not None: return _cached_problem_fields url = f"{PROBLEMS_URL}?$top=1" r = requests.get(url, headers=ivanti_headers(), timeout=30) log_http("GET", url, r) if r.status_code == 200: items = odata_value_list(r.json()) if items: _cached_problem_fields = list(items[0].keys()) return _cached_problem_fields _cached_problem_fields = [] return _cached_problem_fields def discover_problem_team_field() -> Optional[str]: """ Your tenant does not have 'Team'. Find the actual field name. """ global _cached_problem_team_field if _cached_problem_team_field is not None: return _cached_problem_team_field fields = get_sample_problem_fields() # common variants seen in ITSM tenants candidates = [ "Team", "team", "OwningTeam", "OwnerTeam", "AssignmentTeam", "AssignedTeam", "SupportTeam", "ResolverTeam", "ResponsibleTeam", "Group", "AssignmentGroup", "SupportGroup", "OwningGroup", "TeamName", ] for c in candidates: if c in fields: _cached_problem_team_field = c logging.info("✅ Discovered Problem team field: %s", c) return c logging.warning("⚠️ Could not discover a Team-like field on Problem from sample record. Fields=%s", fields) _cached_problem_team_field = None return None def pick_existing_field(fields: List[str], preferred: List[str]) -> Optional[str]: for name in preferred: if name in fields: return name return None def discover_journal_schema() -> Tuple[Optional[str], Optional[str], Optional[str]]: """ Discover: - working journal endpoint URL - Notes field name - Summary field name We probe by doing a GET top=1; whichever endpoint returns a record gives us keys. """ global _cached_journal_notes_field, _cached_journal_summary_field, _cached_journal_endpoint if _cached_journal_endpoint is not None: return _cached_journal_endpoint, _cached_journal_notes_field, _cached_journal_summary_field for base in JOURNAL_URL_CANDIDATES: url = f"{base}?$top=1" r = requests.get(url, headers=ivanti_headers(), timeout=30) log_http("GET", url, r) if r.status_code == 200: items = odata_value_list(r.json()) if items: keys = list(items[0].keys()) else: # no records, but endpoint exists; we can’t see keys. # Still accept endpoint and fall back to common field guesses. keys = [] notes_field = pick_existing_field(keys, ["Notes", "Note", "NotesBody", "Body", "Description"]) or "Notes" summary_field = pick_existing_field(keys, ["Summary", "Subject", "Title"]) or "Summary" _cached_journal_endpoint = base _cached_journal_notes_field = notes_field _cached_journal_summary_field = summary_field logging.info("✅ Using journal endpoint=%s notes_field=%s summary_field=%s", base, notes_field, summary_field) return _cached_journal_endpoint, _cached_journal_notes_field, _cached_journal_summary_field # if 404/400, try next candidate logging.warning("⚠️ Could not auto-discover a working Journal endpoint from candidates.") _cached_journal_endpoint = JOURNAL_URL_CANDIDATES[0] _cached_journal_notes_field = "Notes" _cached_journal_summary_field = "Summary" return _cached_journal_endpoint, _cached_journal_notes_field, _cached_journal_summary_field # ============================================================================= # Build Problem payload # ============================================================================= def build_problem_from_payload(payload: Dict[str, Any]) -> Dict[str, Any]: event_id = ( payload.get("eventID") or payload.get("eventId") or payload.get("id") or (payload.get("rowKeys") or [None])[0] or "Unknown" ) severity = payload.get("severity", "Unknown") username = payload.get("userName", payload.get("username", "Unknown")) sid = payload.get("user", "Unknown") client_ips = payload.get("clientIPs") or [] client_ip = client_ips[0] if isinstance(client_ips, list) and client_ips else "Unknown" protocol = payload.get("protocol", "Unknown") state = payload.get("state", payload.get("status", "Unknown")) event_url = payload.get("url", payload.get("link", "Unknown")) files = payload.get("files", []) if not isinstance(files, list): files = [str(files)] files = [str(f) for f in files] actions = payload.get("actions", []) if isinstance(actions, list): action_names = [str(a.get("action", a)) for a in actions] else: action_names = [str(actions)] subject = f"Ransomware {severity} detection | eventID={event_id}" lines: List[str] = [] lines.append("Ransomware Detection Alert (Problem)") lines.append("") lines.append(f"eventid: {event_id}") lines.append(f"event_url: {event_url}") lines.append("") lines.append(f"username: {username}") lines.append(f"sid: {sid}") lines.append(f"clientip: {client_ip}") lines.append(f"protocol: {protocol}") lines.append(f"state: {state}") lines.append("") lines.append("actions:") if action_names: lines.extend([f" - {a}" for a in action_names]) else: lines.append(" - (none)") lines.append("") lines.append("files:") if files: lines.extend([f" - {f}" for f in files]) else: lines.append(" - (none)") text_block = "\n".join(lines) description_html = to_ivanti_div_html(text_block) return { "event_id": str(event_id), "subject": subject, "description_html": description_html, } # ============================================================================= # Ivanti operations # ============================================================================= def search_problem_by_event_id(event_id: str) -> Optional[Dict[str, Any]]: sanitized = sanitize_search_term(event_id) if not sanitized or sanitized == "Unknown": return None url = f"{PROBLEMS_URL}?$search={sanitized}" r = requests.get(url, headers=ivanti_headers(), timeout=30) log_http("GET", url, r) if r.status_code == 204: return None if r.status_code != 200: return None for item in odata_value_list(r.json()): if sanitized in str(item.get("Subject", "")): return item return None def create_problem(subject: str, description_html: str) -> Tuple[bool, Dict[str, Any], int]: # minimal required + your requested fields payload: Dict[str, Any] = { "Subject": subject, "Description": description_html, "Source": STATIC_SOURCE, "Category": STATIC_CATEGORY, # REQUIRED } # Add Impact/Urgency only if those fields exist on your tenant’s Problem object problem_fields = get_sample_problem_fields() if "Impact" in problem_fields: payload["Impact"] = STATIC_IMPACT_VALUE if "Urgency" in problem_fields: payload["Urgency"] = STATIC_URGENCY_VALUE # Add Team using the *actual field name* discovered from your tenant team_field = discover_problem_team_field() if team_field: payload[team_field] = STATIC_TEAM_VALUE else: logging.warning("Skipping Team: no Team-like field discovered on Problem.") r = requests.post(PROBLEMS_URL, headers=ivanti_headers(), json=payload, timeout=30) log_http("POST", PROBLEMS_URL, r, payload) if r.status_code in (200, 201): try: return True, r.json(), r.status_code except Exception: return True, {"status": r.status_code, "response": r.text}, r.status_code return False, {"status": r.status_code, "response": r.text}, r.status_code def create_memo_for_problem(problem_recid: str, event_id: str, notes_html: str) -> Tuple[bool, Dict[str, Any], int]: endpoint, notes_field, summary_field = discover_journal_schema() summary_value = f"Update to event ID {event_id}" payload: Dict[str, Any] = { "ParentLink_Category": "Problem", "ParentLink_RecID": str(problem_recid), "Category": JOURNAL_CATEGORY_VALUE, # Memo summary_field: summary_value, notes_field: notes_html, } # IMPORTANT: DO NOT send PublishToSelfService (your tenant rejected it) r = requests.post(endpoint, headers=ivanti_headers(), json=payload, timeout=30) log_http("POST", endpoint, r, payload) if r.status_code in (200, 201): try: return True, r.json(), r.status_code except Exception: return True, {"status": r.status_code, "response": r.text}, r.status_code return False, {"status": r.status_code, "response": r.text}, r.status_code # ============================================================================= # Flask Webhook # ============================================================================= @app.route("/webhook", methods=["POST"]) def webhook(): payload = request.get_json(force=True) or {} problem = build_problem_from_payload(payload) try: existing = search_problem_by_event_id(problem["event_id"]) if existing and existing.get("RecId"): rec_id = existing["RecId"] ok, resp, status = create_memo_for_problem( problem_recid=rec_id, event_id=problem["event_id"], notes_html=problem["description_html"], ) return json.dumps(resp), (200 if ok else status) ok, resp, status = create_problem(problem["subject"], problem["description_html"]) return json.dumps(resp), (200 if ok else status) except Exception as e: logging.exception("Webhook error") return json.dumps({"error": str(e)}), 500 if __name__ == "__main__": print("🚀 Ivanti ITSM Ransomware webhook listening on port 5000") app.run(host="0.0.0.0", port=5000, debug=True)