AWS Glue mutual TLS authentication for Amazon MSK

In in the present day’s panorama, knowledge streams repeatedly from numerous sources equivalent to social media interactions to Web of Issues (IoT) machine readings. This torrent of real-time data presents each a problem and a chance for companies. To harness the ability of this knowledge successfully, organizations want strong methods for ingesting, processing, and analyzing streaming knowledge at scale. Enter Apache Kafka: a distributed streaming platform that has revolutionized how firms deal with real-time knowledge pipelines and construct responsive, event-driven functions. AWS Glue is used to course of and analyze giant volumes of real-time knowledge and carry out advanced transformations on the streaming knowledge from Apache Kafka.

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a totally managed Apache Kafka service. You may activate a mixture of authentication modes on new or present MSK clusters. The supported authentication modes are AWS Id and Entry Administration (IAM) entry management, mutual Transport Layer Safety (TLS), and Easy Authentication and Safety Layer/Salted Problem Response Mechanism (SASL/SCRAM). For extra details about utilizing IAM authentication, confer with Securely course of near-real-time knowledge from Amazon MSK Serverless utilizing an AWS Glue streaming ETL job with IAM authentication.

Mutual TLS authentication requires each the server and the consumer to current certificates to show their id. It’s perfect for hybrid functions that want a standard authentication mannequin. It’s additionally a generally used authentication mechanism for business-to-business functions and is utilized in requirements equivalent to open banking, which permits safe open API integrations for monetary establishments. For Amazon MSK, AWS Non-public Certificates Authority (AWS Non-public CA) is used to situation the X.509 certificates and for authenticating shoppers.

This put up describes easy methods to arrange AWS Glue jobs to provide, eat, and course of messages on an MSK cluster utilizing mutual TLS authentication. AWS Glue will mechanically infer the schema from the streaming knowledge and retailer the metadata within the AWS Glue Knowledge Catalog for evaluation utilizing analytics instruments equivalent to Amazon Athena.

Instance use case

In our instance use case, a hospital facility frequently screens the physique temperatures for sufferers admitted within the emergency ward utilizing good thermometers. Every machine mechanically information the sufferers’ temperature readings and posts the information to a central monitoring utility API. Every posted file is a JSON formatted message that accommodates the deviceId that uniquely identifies the thermometer, a patientId to establish the affected person, the affected person’s temperature studying, and the eventTime when the temperature was recorded.

Record schema

The central monitoring utility checks the hourly common temperature readings for every affected person and notifies the hospital’s healthcare staff when a affected person’s common temperature exceeds accepted thresholds (36.1–37.2°C). In our case, we use the Athena console to research the readings.

Overview of the answer

On this put up, we use an AWS Glue Python shell job to simulate incoming knowledge from the hospital thermometers. This job produces messages which can be securely written to an MSK cluster utilizing mutual TLS authentication.

To course of the streaming knowledge from the MSK cluster, we deploy an AWS Glue Streaming extract, rework, and cargo (ETL) job. This job mechanically infers the schema from the incoming knowledge, shops the schema metadata within the Knowledge Catalog, after which shops the processed knowledge as environment friendly Parquet recordsdata in Amazon Easy Storage Service (Amazon S3). We use Athena to question the output desk within the Knowledge Catalog and uncover insights.

The next diagram illustrates the structure of the answer.

Solution architecture

The answer workflow consists of the next steps:

  1. Create a non-public certificates authority (CA) utilizing AWS Certificates Supervisor (ACM).
  2. Arrange an MSK cluster with mutual TLS authentication.
  3. Create a Java keystore (JKS) file and generate a consumer certificates and personal key.
  4. Create a Kafka connection in AWS Glue.
  5. Create a Python shell job in AWS Glue to create a subject and push messages to Kafka.
  6. Create an AWS Glue Streaming job to eat and course of the messages.
  7. Analyze the processed knowledge in Athena.

Conditions

It is best to have the next stipulations:

Cloud Formation stack set

This template creates two NAT gateways as proven within the following diagram. Nevertheless, it’s potential to route the visitors to a single NAT gateway in a single Availability Zone for take a look at and growth workloads. For redundancy in manufacturing workloads, it’s really useful that there’s one NAT gateway out there in every Availability Zone.

VPC setup

The stack additionally creates a safety group with a self-referencing rule to permit communication between AWS Glue parts.

Create a non-public CA utilizing ACM

Full the next steps to create a root CA. For extra particulars, confer with Creating a non-public CA.

  1. On the AWS Non-public CA console, select Create a non-public CA.
  2. For Mode choices, choose both Common-purpose or Brief-lived certificates for decrease pricing.
  3. For CA kind choices, choose Root.
  4. Present certificates particulars by offering at the very least one distinguished identify.

Create private CA

  1. Depart the remaining default choices and choose the acknowledge checkbox.
  2. Select Create CA.
  3. On the Actions menu, select Set up CA certificates and select Affirm and set up.

Install certificate

Arrange an MSK cluster with mutual TLS authentication

Earlier than establishing the MSK cluster, be sure you have a VPC with at the very least two non-public subnets in numerous Availability Zones and a NAT gateway with a path to the web. A CloudFormation template is supplied within the stipulations part.

Full the next steps to arrange your cluster:

  1. On the Amazon MSK console, select Create cluster.
  2. For Creation technique, Customized create.
  3. For Cluster kind, choose Provisioned.
  4. For Dealer measurement, you may select kafka.t3.small for the aim of this put up.
  5. For Variety of zones, select 2.
  6. Select Subsequent.
  7. Within the Networking part, choose the VPC, non-public subnets, and safety group you created within the stipulations part.
  8. Within the Safety settings part, underneath Entry management strategies, choose TLS consumer authentication by means of AWS Certificates Supervisor (ACM).
  9. For AWS Non-public CAs, select the AWS non-public CA you created earlier.

The MSK cluster creation can take as much as half-hour to finish.

Create a JKS file and generate a consumer certificates and personal key

Utilizing the basis CA, you generate consumer certificates to make use of for authentication. The next directions are for CloudShell, however will also be tailored for a consumer machine with Java and the AWS CLI put in.

  1. Open a brand new CloudShell session and run the next instructions to create the certs listing and set up Java:
mkdir certs
cd certs
sudo yum -y set up java-11-amazon-corretto-headless

  1. Run the next command to create a keystore file with a non-public key in JKS format. Change Distinguished-TitleInstance-AliasYour-Retailer-Cross, and Your-Key-Cross with strings of your alternative:

keytool -genkey -keystore kafka.consumer.keystore.jks -validity 300 -storepass Your-Retailer-Cross -keypass Your-Key-Cross -dname "CN=Distinguished-Title" -alias Instance-Alias -storetype pkcs12

  1. Generate a certificates signing request (CSR) with the non-public key created within the previous step:

keytool -keystore kafka.consumer.keystore.jks -certreq -file csr.pem -alias Instance-Alias -storepass Your-Retailer-Cross -keypass Your-Key-Cross

  1. Run the next command to take away the phrase NEW (and the only house that follows it) from the start and finish of the file:

sed -i -E '1,$ s/NEW //' csr.pem

The file ought to begin with -----BEGIN CERTIFICATE REQUEST----- and finish with -----END CERTIFICATE REQUEST-----

  1. Utilizing the CSR file, create a consumer certificates utilizing the next command. Change Non-public-CA-ARN with the ARN of the non-public CA you created.

aws acm-pca issue-certificate --certificate-authority-arn Non-public-CA-ARN --csr fileb://csr.pem --signing-algorithm "SHA256WITHRSA" --validity Worth=300,Kind="DAYS"

The command ought to print out the ARN of the issued certificates. Save the CertificateArn worth to be used within the subsequent step.

{
"CertificateArn": "arn:aws:acm-pca:area:account:certificate-authority/CA_ID/certificates/certificate_ID"
}

  1. Use the Non-public-CA-ARN along with the CertificateArn (arn:aws:acp-pca:<area>:...) generated within the previous step to retrieve the signed consumer certificates. This may create a client-cert.pem file.

aws acm-pca get-certificate --certificate-authority-arn Non-public-CA-ARN --certificate-arn Certificates-ARN | jq -r '.Certificates + "n" + .CertificateChain' >> client-cert.pem

  1. Add the certificates into the Java keystore so you may current it once you speak to the MSK brokers:

keytool -keystore kafka.consumer.keystore.jks -import -file client-cert.pem -alias Instance-Alias -storepass Your-Retailer-Cross -keypass Your-Key-Cross -noprompt

  1. Extract the non-public key from the JKS file. Present the identical destkeypass and deststorepass and enter the keystore password when prompted.

keytool -importkeystore -srckeystore kafka.consumer.keystore.jks -destkeystore keystore.p12 -srcalias Instance-Alias -deststorepass Your-Retailer-Cross -destkeypass Your-Key-Cross -deststoretype PKCS12

  1. Convert the non-public key to PEM format. Enter the keystore password you supplied within the earlier step when prompted.

openssl pkcs12 -in keystore.p12 -nodes -nocerts -out private-key.pem

  1. Take away the strains that start with Bag Attributes.. from the highest of the file:

sed -i -ne '/-BEGIN PRIVATE KEY-/,/-END PRIVATE KEY-/p' private-key.pem

  1. Add the client-cert.pem, consumer.keystore.jks, and private-key.pem recordsdata to Amazon S3. You may both create a brand new S3 bucket or use an present bucket to retailer the next objects. Change <s3://aws-glue-assets-11111111222222-us-east-1/certs/> together with your S3 location.

aws s3 sync ~/certs s3://aws-glue-assets-11111111222222-us-east-1/certs/ --exclude '*' --include 'client-cert.pem' --include 'private-key.pem' --include 'kafka.consumer.keystore.jks'

Create a Kafka connection in AWS Glue

Full the next steps to create a Kafka connection:

  1. On the AWS Glue console, select Knowledge connections within the navigation pane.
  2. Select Create connection.
  3. Choose Apache Kafka and select Subsequent.
  4. For Amazon Managed Streaming for Apache Kafka Cluster, select the MSK cluster you created earlier.

Create Glue Kafka connection

  1. Select TLS consumer authentication for Authentication technique.
  2. Enter the S3 path to the keystore you created earlier and supply the keystore and consumer key passwords you used for the -storepass and -keypass

Add authentication method to connection

  1. Below Networking choices, select your VPC, a non-public subnet, and a safety group. The safety group ought to comprise a self-referencing rule.
  2. On the following web page, present a reputation for the connection (for instance, Kafka-connection) and select Create connection.

Create a Python shell job in AWS Glue to create a subject and push messages to Kafka

On this part, you create a Python shell job to create a brand new Kafka matter and push JSON messages to the subject. Full the next steps:

  1. On the AWS Glue console, select ETL jobs.
  2. Within the Script part, for Engine, select Python shell.
  3. Select Create script.

Create Python shell job

  1. Enter the next script within the editor:
import sys
from awsglue.utils import getResolvedOptions
from kafka.admin import KafkaAdminClient, NewTopic
from kafka import KafkaProducer
from kafka.errors import TopicAlreadyExistsError
from urllib.parse import urlparse

import json
import uuid
import datetime
import boto3
import time
import random

# Fetch job parameters
args = getResolvedOptions(sys.argv, ['connection-names', 'client-cert', 'private-key'])

# Obtain consumer certificates and personal key recordsdata from S3
TOPIC = 'example_topic'
client_cert = urlparse(args['client_cert'])
private_key = urlparse(args['private_key'])

s3 = boto3.consumer('s3')
s3.download_file(client_cert.netloc, client_cert.path.lstrip('/'),  client_cert.path.cut up('/')[-1])
s3.download_file(private_key.netloc, private_key.path.lstrip('/'),  private_key.path.cut up('/')[-1])

# Fetch bootstrap servers from connection
args = getResolvedOptions(sys.argv, ['connection-names'])
if ',' in args['connection_names']:
    increase ValueError("Select just one connection identify within the job particulars tab!")
glue_client = boto3.consumer('glue')
response = glue_client.get_connection(Title=args['connection_names'], HidePassword=True)
bootstrapServers = response['Connection']['ConnectionProperties']['KAFKA_BOOTSTRAP_SERVERS']

# Create matter and push messages 
admin_client = KafkaAdminClient(bootstrap_servers= bootstrapServers, security_protocol="SSL", ssl_certfile= client_cert.path.cut up('/')[-1], ssl_keyfile= private_key.path.cut up('/')[-1])
strive:
    admin_client.create_topics(new_topics=[NewTopic(name=TOPIC, num_partitions=1, replication_factor=1)], validate_only=False)
besides TopicAlreadyExistsError:
    # Subject already exists
    go
admin_client.shut()

# Generate JSON messages for the brand new matter
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'), bootstrap_servers=bootstrapServers, security_protocol="SSL", 
                         ssl_check_hostname=True, ssl_certfile= client_cert.path.cut up('/')[-1], ssl_keyfile= private_key.path.cut up('/')[-1])
                         
for i in vary(1200):
    _event = {
        "deviceId": str(uuid.uuid4()),
        "patientId": "PI" + str(random.randint(1,15)).rjust(5, '0'),
        "temperature": spherical(random.uniform(32.1, 40.9), 1),
        "eventTime": str(datetime.datetime.now())
    }
    producer.ship(TOPIC, _event)
    time.sleep(3)
    
producer.shut()

  1. On the Job particulars tab, present a reputation on your job, equivalent to Kafka-msk-producer.
  2. Select an IAM function. Should you don’t have one, create one following the directions in Configuring IAM permissions for AWS Glue.
  3. Below Superior properties, for Connections, select the Kafka-connection connection you created.
  4. Below Job parameters, add the next parameters and values:
    1. Key: --additional-python-modules, worth: kafka-python.
    2. Key: --client-cert, worth: s3://aws-glue-assets-11111111222222-us-east-1/certs/client-cert.pem. Change together with your client-cert.pem Amazon S3 location from earlier.
    3. Key: --private-key, worth: s3://aws-glue-assets-11111111222222-us-east-1/certs/private-key.pem. Change together with your private-key.pem Amazon S3 location from earlier.
      AWS Glue Job parameters
  5. Save and run the job.

You may verify that the job run standing is Working on the Runs tab.

At this level, we now have efficiently created a Python shell job to simulate the thermometers sending temperature readings to the monitoring utility. The job will run for about 1 hour and push 1,200 information to Amazon MSK.

Alternatively, you may exchange the Python shell job with a Scala ETL job to behave as a producer to ship messages to the MSK cluster. On this case, use the JKS file for authentication utilizing ssl.keystore.kind=JKS. Should you’re utilizing PEM format keys, the present model of Kafka shoppers libraries (2.4.1) put in in AWS Glue model 4 don’t but help authentication by means of certificates in PEM format (as of this writing).

Create an AWS Glue Streaming job to eat and course of the messages

Now you can create an AWS Glue ETL job to eat and course of the messages within the Kafka matter. AWS Glue will mechanically infer the schema from the recordsdata. Full the next steps:

  1. On the AWS Glue console, select Visible ETL within the navigation pane.
  2. Select Visible ETL to writer a brand new job.
  3. For Sources, select Apache Kafka.
  4. For Connection identify, select the node and connection identify you created earlier.
  5. For Subject identify, enter the subject identify (example_topic) you created earlier.
  6. Depart the remainder of the choices as default.

Kafka data source

  1. Add a brand new goal node known as Amazon S3 to retailer the output Parquet recordsdata generated from the streaming knowledge.
  2. Select Parquet as the information format and supply an S3 output location for the generated recordsdata.
  3. Choose the choice to permit AWS Glue to create a desk within the Knowledge Catalog and supply the database and desk names.

S3 Output node

  1. On the job particulars tab, present the next choices:
    1. For the requested variety of staff, enter 2.
    2. For IAM Position, select an IAM function with permissions to learn and write to the S3 output location.
    3. For Job timeout, enter 60 (for the job to cease after 60 minutes).
    4. Below Superior properties, for Connections, select the connection you created.
  2. Save and run the job.

You may verify the S3 output location for brand new Parquet recordsdata created underneath the prefixes s3://<output-location>/ingest_year=XXXX/ingest_month=XX/ingest_day=XX/ingest_hour=XX/.

At this level, you will have created a streaming job to course of occasions from Amazon MSK and retailer the JSON formatted information as Parquet recordsdata in Amazon S3. AWS Glue streaming jobs are supposed to be working repeatedly to course of streaming knowledge. Now we have set the timeout to cease the job after 60 minutes. You can even cease the job manually after the information have been processed to Amazon S3.

Analyze the information in Athena

Going again to our instance use case, you may run the next question in Athena to watch and observe the hourly common temperature readings for sufferers that exceed the traditional thresholds (36.1–37.2°C):

SELECT
date_format(parse_datetime(eventTime, 'yyyy-MM-dd HH:mm:ss.SSSSSS'), '%h %p') hour,
patientId,
spherical(avg(temperature), 1) average_temperature,
depend(temperature) readings
FROM "default"."devices_data"
GROUP BY 1, 2
HAVING avg(temperature) > 37.2 or avg(temperature) < 36.1
ORDER BY 2, 1 DESC

Amazon Athena Console

Run the question a number of instances and observe how the average_temperature and the variety of readings adjustments with new incoming knowledge from the AWS Glue Streaming job. In our instance situation, healthcare staff can use this data to establish sufferers who’re experiencing constant excessive or low physique temperatures and provides the required consideration.

At this level, we now have efficiently created and ingested streaming knowledge to our MSK cluster utilizing mutual TLS authentication. We solely wanted the certificates generated by AWS Non-public CA to authenticate our AWS Glue shoppers to the MSK cluster and course of the streaming knowledge with an AWS Glue Streaming job. Lastly, we used Athena to visualise the information and noticed how the information adjustments in close to actual time.

Clear up

To scrub up the sources created on this put up, full the next steps:

  1. Delete the non-public CA you created.
  2. Delete the MSK cluster you created.
  3. Delete the AWS Glue connection you created.
  4. Cease the roles if they’re nonetheless working and delete the roles you created.
  5. Should you used the CloudFormation stack supplied within the stipulations, delete the CloudFormation stack to delete the VPC and different networking parts.

Conclusion

This put up demonstrated how you should utilize AWS Glue to eat, course of, and retailer streaming knowledge for Amazon MSK utilizing mutual TLS authentication. AWS Glue Streaming mechanically infers the schema and creates a desk within the Knowledge Catalog. You may then question the desk utilizing different knowledge evaluation instruments like Athena, Amazon Redshift, and Amazon QuickSight to offer insights into the streaming knowledge.

Check out the answer for your self, and tell us your questions and suggestions within the feedback part.


Concerning the Authors

Edward Okemwa OndariEdward Okemwa is a Massive Knowledge Cloud Help Engineer (ETL) at AWS Nairobi specializing in AWS Glue and Amazon Athena. He’s devoted to offering clients with technical steering and resolving points associated to processing and analyzing giant volumes of information. In his free time, he enjoys singing choral music and enjoying soccer.

Edward Okemwa OndariEmmanuel Mashandudze is a Senior Massive Knowledge Cloud Engineer specializing in AWS Glue. He collaborates with product groups to assist clients effectively rework knowledge within the cloud. He helps clients design and implements strong knowledge pipelines. Outdoors of labor, Emmanuel is an avid marathon runner, sports activities fanatic and enjoys creating reminiscences together with his household.

Leave a Reply

Your email address will not be published. Required fields are marked *