Administration Guides

Golden Copy Integrations

Home

 

 


Overview

Golden Copy has rich data that can be leveraged by customers that need integrations or automations to integrate with external systems.   This guide will provide examples of how to extract data from Kafka.

NOTE:  All sample code is "as is" with no coverage under a support contract,  no express or implied warranty .  

Job History Sync to S3

Overview

  1. The examples provided will use python sample code to show how easy it is to subscribe to updates within Golden copy data sets and then store the information in an S3 bucket.  The example below can be used and updated to copy Job history data directly into an S3 bucket for a centralized security of all activity in Golden Copy.
  2. This example is build with AWS s3 as the target,  this sample code has not been tested with any other S3 targets.


Setup

  1. On a host separate from Golden Copy install python (NOTE: it is not supported to use the Golden copy nodes for automation tasks)
  2. Allow the automation host to connect to the Kafka service by updating the firewall.
    1. Login as ecaadmin
    2. nano /opt/superna/eca/scripts/eca_iptables.sh
    3. under rule #5 add the line below and update the x.x.x.x to the ip address of the automation host
    4. sudo /usr/sbin/iptables -A $CHAIN -p tcp --dport 9022 -s x.x.x.x -j ACCEPT
    5. control + x (Save an exit)
  3. Create a script to run the python script on a schedule of your choosing. (NOTE: how to create cron script is not covered in this guide)

Sample Python Code


This sample code is designed to run on a cron schedule,  connect to Kafka service and extra new jobs since the last time the script ran.  Kafka tracks which records the consumer has processed,  the script interval can be any interval since all records since the last time it run will always be processed correctly.  The script internval has more to with how often job history is synced into S3.     The script will run for 5 minutes and exit on it's own because this script is designed to run in a continoue polloing loop that can allow real-time job history syncing into S3.     

The script can be modified to remove the 5 minute processing per execution and be used in a continuous polling loop.

How to edit this script
  1. Ensure the automation host can the DNS name of Golden copy node 1 and replace this text with your DNS name used for node 1.
    1. kafka.node1.gcdemo.eca.local
  2.  The client registers with kafka using this name automation-cg, any text name can be used to view the automation host registration in the Kafka GUI 
  3. The kafka topic that contains all the job history data is named jobshistory, this does not need to be changed but shows how to request data from specific topics in Golden Copy.  
  4. The AWS keys section shows where to enter secret and access keys.  NOTE:  This code is "as is" credential should be stored in boto credential files and not embedded in the script.  This code is example code only and is not intended to be a complete guide.
    1. # Set AWS credentials. --  find this section and update the access and secret key values of XXXX and YYYYY.
  5. The S3 bucket is referenced in the script gc-jobhistory-bucket update the s3 bucket name to match your configuration.


Code
from confluent_kafka import Consumer, KafkaError
import boto3
import time
import sys



# Kafka consumer configuration
conf = {
'bootstrap.servers': 'kafka.node1.gcdemo.eca.local:9092',
'group.id': 'automation-cg',
'auto.offset.reset': 'earliest'
}

# Create Kafka consumer instance
consumer = Consumer(conf)

# Subscribe to Kafka topic
consumer.subscribe(['jobshistory'])

# Set AWS credentials
aws_access_key_id = 'xxxxxxxxx'
aws_secret_access_key = 'yyyyyyyy'

# Create S3 client instance
s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

# Reset offset for each partition to the beginning
for topic_partition in consumer.assignment():
topic_partition.offset = 0
consumer.seek(topic_partition)

# Initialize record number
record_num = 0
# get current time for the loop
start_time = time.time()
# Continuously poll for new messages from Kafka
while True:
current_time = time.time()
if (current_time - start_time) >= 300: # 300 seconds = 5 minutes
break
else:
msg = consumer.poll(1.0)
print('poll loop running')
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print('End of partition event')
else:
print('Kafka error: {}'.format(msg.error()))
continue

# Get message key and value
key = str(record_num)
from datetime import datetime
now = datetime.now()
date_time = now.strftime("%m-%d-%Y, %H:%M:%S")
key = date_time + '-' + key + '.json'
value = msg.value()

# Check if message key is None
if value is None:
continue

# Print message key and value
print('Received message: Key={}'.format(str(key)))
print('Received message: Value={}'.format(str(value)))
# Upload message to S3
try:
s3.put_object(Bucket='gc-jobhistory-bucket', Key=key, Body=value)
except Exception as e:
print('Error uploading message to S3: {}'.format(e))

# Increment record number
record_num += 1
© Superna Inc