Ship Amazon CloudWatch logs to Amazon OpenSearch Serverless

Amazon CloudWatch Logs accumulate, mixture, and analyze logs from totally different techniques in a single place. CloudWatch offers subcriptions as a real-time feed of those logs to different companies like Amazon Kinesis Knowledge Streams, AWS Lambda, and Amazon OpenSearch Service. These subscriptions are a well-liked mechanism to allow customized processing and superior evaluation of log information to achieve further beneficial insights. On the time of publishing this weblog submit, these subscription filters help delivering logs to Amazon OpenSearch Service provisioned clusters solely. Clients are more and more adopting Amazon OpenSearch Serverless as a cheap choice for rare, intermittent and unpredictable workloads.

On this weblog submit, we are going to present tips on how to use Amazon OpenSearch Ingestion to ship CloudWatch logs to OpenSearch Serverless in close to real-time. We define a mechanism to attach a Lambda subscription filter with OpenSearch Ingestion and ship logs to OpenSearch Serverless with out explicitly needing a separate subscription filter for it.

Answer overview

The next diagram illustrates the answer structure.

  1. CloudWatch Logs: Collects and shops logs from numerous AWS sources and functions. It serves because the supply of log information on this answer.
  2. Subscription filter : A CloudWatch Logs subscription filter filters and routes particular log information from CloudWatch Logs to the subsequent part within the pipeline.
  3. CloudWatch exporter Lambda operate: This can be a Lambda operate that receives the filtered log information from the subscription filter. Its goal is to remodel and put together the log information for ingestion into the OpenSearch Ingestion pipeline.
  4. OpenSearch Ingestion: This can be a part of OpenSearch Service. The Ingestion pipeline is accountable for processing and enriching the log information obtained from the CloudWatch exporter Lambda operate earlier than storing it within the OpenSearch Serverless assortment.
  5. OpenSearch Service: That is totally managed service that shops and indexes log information, making it searchable and obtainable for evaluation and visualization. OpenSearch Service gives two configurations: provisioned domains and serverless. On this setup, we use serverless, which is an auto-scaling configuration for OpenSearch Service.

Conditions

Deploy the answer

With the conditions in place, you’ll be able to create and deploy the items of the answer.

Step 1: Create PipelineRole for ingestion

  • Open the AWS Administration Console for AWS Id and Entry Administration (IAM).
  • Select Insurance policies, after which select Create coverage.
  • Choose JSON and paste the next coverage into the editor:
{
    "Model": "2012-10-17",
    "Assertion": [
        {
            "Action": [
                "aoss:BatchGetCollection",
                "aoss:APIAccessAll"
            ],
            "Impact": "Enable",
            "Useful resource": "arn:aws:aoss:us-east-1:{accountId}:assortment/{collectionId}"
        },
        {
            "Motion": [
                "aoss:CreateSecurityPolicy",
                "aoss:GetSecurityPolicy",
                "aoss:UpdateSecurityPolicy"
            ],
            "Impact": "Enable",
            "Useful resource": "*",
            "Situation": {
                "StringEquals": {
                    "aoss:assortment": "{assortment}"
                }
            }
        }
    ]
}

// Change {accountId}, {collectionId}, and {assortment} with your personal values

  • Select Subsequent, select Subsequent, and identify your coverage collection-pipeline-policy.
  • Select Create coverage.
  • Subsequent, create a job and fix the coverage to it. Select Roles, after which select Create function.
  • Choose Customized belief coverage and paste the next coverage into the editor:
{
   "Model":"2012-10-17",
   "Assertion":[
      {
         "Effect":"Allow",
         "Principal":{
            "Service":"osis-pipelines.amazonaws.com"
         },
         "Action":"sts:AssumeRole"
      }
   ]
}

  • Select Subsequent, after which seek for and choose the collection-pipeline-policy you simply created.
  • Select Subsequent and identify the function PipelineRole.
  • Select Create function.

Step 2: Configure the community and information coverage for OpenSearch assortment

  • Within the OpenSearch Service console, navigate to the Serverless menu.
  • Create a VPC endpoint by following the instruction in Create an interface endpoint for OpenSearch Serverless.
  • Go to Safety and select Community insurance policies.
  • Select Create community coverage.
  • Configure the next coverage
[
  {
    "Rules": [
      {
        "Resource": [
          "collection/{collection name}"
        ],
        "ResourceType": "assortment"
      }
    ],
    "AllowFromPublic": false,
    "SourceVPCEs": [
      "{VPC Enddpoint Id}"
    ]
  },
  {
    "Guidelines": [
      {
        "Resource": [
          "collection/{collection name}"
        ],
        "ResourceType": "dashboard"
      }
    ],
    "AllowFromPublic": true
  }
]

  • Go to Safety and select Knowledge entry insurance policies.
  • Select Create entry coverage.
  • Configure the next coverage:
[
  {
    "Rules": [
      {
        "Resource": [
          "index/{collection name}/*"
        ],
        "Permission": [
          "aoss:CreateIndex",
          "aoss:UpdateIndex",
          "aoss:DescribeIndex",
          "aoss:ReadDocument",
          "aoss:WriteDocument"
        ],
        "ResourceType": "index"
      }
    ],
    "Principal": [
      "arn:aws:iam::{accountId}:role/PipelineRole",
      "arn:aws:iam::{accountId}:role/Admin"
    ],
    "Description": "Rule 1"
  }
]

Step 3: Create an OpenSearch Ingestion pipeline

  • Navigate to the OpenSearch Service.
  • Go to the Ingestion pipelines part.
  • Select Create pipeline.
  • Outline the pipeline configuration.
model: "2"
 cwlogs-ingestion-pipeline:

  supply:

    http:

      path: /logs/ingest

  sink:

    - opensearch:

        # Present an AWS OpenSearch Service area endpoint

        hosts: ["https://{collectionId}.{region}.aoss.amazonaws.com"]

        index: "cwl-%{yyyy-MM-dd}"

        aws:

          # Present a Position ARN with entry to the area. This function ought to have a belief relationship with osis-pipelines.amazonaws.com

          sts_role_arn: "arn:aws:iam::{accountId}:function/PipelineRole"

          # Present the area of the area.

          area: "{area}"

          serverless: true

          serverless_options:

            network_policy_name: "{Community coverage identify}"
 # To get the values for the placeholders: 
 # 1. {collectionId}: Yow will discover the gathering ID by navigating to the Amazon OpenSearch Serverless Assortment within the AWS Administration Console, after which clicking on the Assortment. The gathering ID is listed beneath the "Overview" part. 
 # 2. {area}: That is the AWS area the place your Amazon OpenSearch Service area is situated. Yow will discover this info within the AWS Administration Console while you navigate to the area. 
 # 3. {accountId}: That is your AWS account ID. Yow will discover your account ID by clicking in your username within the top-right nook of the AWS Administration Console and choosing "My Account" from the dropdown menu. 
 # 4. {Community coverage identify}: That is the identify of the community coverage you have got configured in your Amazon OpenSearch Serverless Assortment. If you have not configured a community coverage, you'll be able to depart this placeholder as is or take away it from the configuration.
 # After acquiring the mandatory values, change the placeholders within the configuration with the precise values.            

Step 4: Create a Lambda operate

  • Create a Lambda layer for requests and sigv4 packages. Run the next instructions in AWS Cloudshell.
mkdir lambda_layers
 cd lambda_layers
 mkdir python
 cd python
 pip set up requests -t ./
 pip set up requests_auth_aws_sigv4 -t ./
 cd ..
 zip -r python_modules.zip .


 aws lambda publish-layer-version --layer-name Knowledge-requests --description "My Python layer" --zip-file fileb://python_modules.zip --compatible-runtimes python3.x

import base64
 import gzip
 import json
 import logging
 import json
 import jmespath
 import requests
 from datetime import datetime
 from requests_auth_aws_sigv4 import AWSSigV4
 import boto3


 LOGGER = logging.getLogger(__name__)
 LOGGER.setLevel(logging.INFO)


 def lambda_handler(occasion, context):

    """Extract the information from the occasion"""

    information = jmespath.search("awslogs.information", occasion)

    """Decompress the logs"""

    cwLogs = decompress_json_data(information)

    """Assemble the payload to ship to OpenSearch Ingestion"""

    payload = prepare_payload(cwLogs)

    print(payload)

    """Ingest the set of occasions to the pipeline"""    

    response = ingestData(payload)

    return {

        'statusCode': 200

    }
 def decompress_json_data(information):

    compressed_data = base64.b64decode(information)

    uncompressed_data = gzip.decompress(compressed_data)

    return json.masses(uncompressed_data)


 def prepare_payload(cwLogs):

    payload = []

    logEvents = cwLogs['logEvents']

    for logEvent in logEvents:

        request = {}

        request['id'] = logEvent['id']

        dt = datetime.fromtimestamp(logEvent['timestamp'] / 1000) 

        request['timestamp'] = dt.isoformat()

        request['message'] = logEvent['message'];

        request['owner'] = cwLogs['owner'];

        request['log_group'] = cwLogs['logGroup'];

        request['log_stream'] = cwLogs['logStream'];

        payload.append(request)

    return payload

 def ingestData(payload):

    ingestionEndpoint="{OpenSearch Pipeline Endpoint}"

    endpoint="https://" + ingestionEndpoint

    headers = {'Content material-Sort': 'software/json', 'Settle for':'software/json'}

    r = requests.request('POST', f'{endpoint}/logs/ingest', json=payload, auth=AWSSigV4('osis'), headers=headers)

    LOGGER.data('Response obtained: ' + r.textual content)

    return r

  • Change {OpenSearch Pipeline Endpoint}’ with the endpoint of your OpenSearch Ingestion pipeline.
  • Connect the next inline coverage in execution function.
{

    "Model": "2012-10-17",

    "Assertion": [

        {

            "Sid": "PermitsWriteAccessToPipeline",

            "Effect": "Allow",

            "Action": "osis:Ingest",

            "Resource": "arn:aws:osis:{region}:{accountId}:pipeline/{OpenSearch Pipeline Name}"

        }

    ]
 }

Step 5: Arrange a CloudWatch Logs subscription

  • Grant permission to a particular AWS service or AWS account to invoke the required Lambda operate. The next command grants permission to the CloudWatch Logs service to invoke the cloud-logs Lambda operate for the required log group. That is crucial as a result of CloudWatch Logs can’t instantly invoke a Lambda operate with out being granted permission. Run the next command in CloudShell so as to add permission.
aws lambda add-permission
 --function-name "{operate identify}"
 --statement-id "{operate identify}"
 --principal "logs.amazonaws.com"
 --action "lambda:InvokeFunction"
 --source-arn "arn:aws:logs:{area}:{accountId}:log-group:{log_group}:*"
 --source-account "{accountId}"

  • Create a subscription filter for a log group. The next command creates a subscription filter on the log group, which forwards all log occasions (as a result of the filter sample is an empty string) to the Lambda operate. Run the next command in Cloudshell to create the subscription filter.
aws logs put-subscription-filter
 --log-group-name {log_group}
 --filter-name {filter identify}
 --filter-pattern ""
 --destination-arn arn:aws:lambda:{area}:{accountId}:operate:{operate identify}

Step 6: Testing and verification

  • Generate some logs in your CloudWatch log group. Run the next command in Cloudshell to create pattern logs in log group.
aws logs put-log-events --log-group-name {log_group} --log-stream-name {stream_name} --log-events "[{"timestamp":{timestamp in millis} , "message": "Simple Lambda Test"}]"

  • Test the OpenSearch assortment to make sure logs are listed accurately.

Clear up

Take away the infrastructure for this answer when not in use to keep away from incurring pointless prices.

Conclusion

You noticed tips on how to arrange a pipeline to ship CloudWatch logs to an OpenSearch Serverless assortment inside a VPC. This integration makes use of CloudWatch for log aggregation, Lambda for log processing, and OpenSearch Serverless for querying and visualization. You need to use this answer to make the most of the pay-as-you-go pricing mannequin for OpenSearch Serverless to optimize operational prices for log evaluation.

To additional discover, you’ll be able to:


In regards to the Authors

Balaji Mohan is a senior modernization architect specializing in software and information modernization to the cloud. His business-first method ensures seamless transitions, aligning know-how with organizational targets. Utilizing cloud-native architectures, he delivers scalable, agile, and cost-effective options, driving innovation and progress.

Souvik Bose is a Software program Improvement Engineer engaged on Amazon OpenSearch Service.

Muthu Pitchaimani is a Search Specialist with Amazon OpenSearch Service. He builds large-scale search functions and options. Muthu is within the matters of networking and safety, and is predicated out of Austin, Texas.

Leave a Reply

Your email address will not be published. Required fields are marked *