-7.9 C
New York
Wednesday, January 22, 2025

Ingesting enriched IoT information into Amazon S3 utilizing Amazon Kinesis Information Firehose


Introduction

When sending information from Web of Issues (IoT) units to a knowledge lake, you could want to counterpoint the machine information payload with extra metadata within the cloud for additional information processing and visualization. There are a number of causes this information won’t exist within the machine payload, akin to minimizing the machine payload in restricted bandwidth environments or modifying it with enterprise inputs within the cloud. For instance, a machine on the manufacturing facility ground may be assigned to totally different operators through the day. This variable enterprise information can be saved in a database. In your information lake, you would possibly want this info to be saved together with the payload.

On this weblog put up, you’ll learn to ingest enriched IoT information to a knowledge lake in close to real-time.

Conditions

  • An AWS account
  • AWS Command Line Interface (AWS CLI). See AWS CLI fast setup for configuration.

Use case definition

Let’s assume that in your logistics firm, you could have containers outfitted with sensor-enabled IoT units. When the container is loaded right into a ship, the container ID is related to the ship ID. You could retailer the IoT machine payload with the ship ID in your information lake.

In such a use case, the sensor payload comes from the IoT machine hooked up to the container. Nevertheless, the related ship ID is barely saved within the metadata retailer. Due to this fact, the payload should be enriched with the ship ID earlier than placing it into the information lake.

Answer structure

Architecture diagram for ingesting enriched IoT data into Amazon S3 by using Amazon Kinesis Data Firehose

Within the structure diagram,

  1. The IoT units stream payloads to the AWS IoT Core message dealer to a particular MQTT matter machine/information/DEVICE_ID. The AWS IoT Core message dealer permits units to publish and subscribe to messages by utilizing supported protocols.
  2. The AWS IoT rule is triggered when there’s a payload in its matter. It’s configured with an Amazon Kinesis Information Firehose motion on this use case. You should use AWS IoT guidelines to work together with AWS companies by calling them when there’s a message in a particular MQTT matter or straight by utilizing Fundamental Ingest characteristic.
  3. Amazon Kinesis Information Firehose buffers the machine payloads earlier than delivering them to the information retailer primarily based on the scale or the time, whichever occurs first. Kinesis Information Firehose delivers real-time streaming information to locations for storing or processing.
  4. As soon as the buffer hits the scale or the time threshold, Kinesis Information Firehose calls an AWS Lambda operate to counterpoint the machine payloads in batches with the metadata retrieved from an Amazon DynamoDB AWS Lambda is a serverless compute service that runs your code for any sort of software. Amazon DynamoDB is a completely managed NoSQL database that gives quick efficiency.
  5. The enriched payloads are returned again to Kinesis Information Firehose to ship to the vacation spot.
  6. The enriched payloads are put into an Amazon Easy Storage Service (Amazon S3) bucket as a vacation spot. Amazon S3 is an object storage service which shops any quantity of information for a variety of use instances.

AWS CloudFormation template

Obtain the AWS Cloudformation template from the code repository.

The AWS CloudFormation template deploys all the mandatory assets to run this instance use case. Let’s have a better take a look at AWS IoT guidelines, Kinesis Information Firehose, and AWS Lambda operate assets.

AWS IoT guidelines useful resource

IoTToFirehoseRule:
  Sort: AWS::IoT::TopicRule
  Properties:
    TopicRulePayload:
      Actions:
        -
          Firehose:
            RoleArn: !GetAtt IoTFirehosePutRecordRole.Arn
            DeliveryStreamName: !Ref FirehoseDeliveryStream
            Separator: "n"
      AwsIotSqlVersion: ‘2016-03-23’
      Description: This rule logs IoT payloads to S3 Bucket by aggregating in Kinesis Firehose.
      RuleDisabled: false
      Sql: !Ref IotKinesisRuleSQL

The AWS IoT rule takes a SQL parameter which defines the IoT matter to set off the rule and information to extract from the payload.

  • Within the instance, the SQL parameter is ready to SELECT *, matter(3) as containerId FROM ‘machine/information/+’ by default. SELECT * means the entire payload is taken as it’s and containerId is generated from the second merchandise within the MQTT matter and included to the payload.
  • FROM ‘machine/information/+’ describes the IoT matter that may set off the AWS IoT rule. + is a wildcard character for MQTT subjects and the IoT units will publish information payloads to machine/information/DEVICE_ID matter to set off this rule.

The AWS IoT rule additionally defines actions. Within the instance, you possibly can see a Kinesis Information Firehose motion which defines the goal Kinesis Information Firehose supply stream and the AWS Id and Entry Administration (IAM) position wanted to place information into this supply stream. A separator may be chosen to separate every file, within the given instance it’s a new line character.

Kinesis Information Firehose supply stream useful resource

FirehoseDeliveryStream:
  Sort: AWS::KinesisFirehose::DeliveryStream
  Properties:
    ExtendedS3DestinationConfiguration:
      BucketARN: !GetAtt IoTLogBucket.Arn
      BufferingHints:
        IntervalInSeconds: 60
        SizeInMBs: 1
      Prefix: device-data/
      RoleARN: !GetAtt FirehosePutS3Role.Arn
      ProcessingConfiguration:
        Enabled: true
        Processors:
          - Sort: Lambda
             Parameters:
               - ParameterName: LambdaArn
                  ParameterValue: !Sub '${FirehoseTransformLambda.Arn}:$LATEST'
               - ParameterName: RoleArn
                  ParameterValue: !GetAtt FirehoseLambdaInvokeRole.Arn

Kinesis Information Firehose supply stream should outline a vacation spot to place the stream into. It helps various kinds of locations. You could find the out there vacation spot varieties and their utilization on this documentation. On this instance, you’re going to use Amazon S3 because the vacation spot.

The instance Supply Stream useful resource defines the next properties:

  • BucketARN: the vacation spot bucket which is able to retailer the aggregated information. The vacation spot bucket is created by the CloudFormation stack.
  • BufferingHints: the scale and time threshold for information buffering. On this instance, they’re set to 1MB and 60 seconds respectively to see the outcomes quicker. It may be adjusted in accordance with the enterprise wants. Protecting these thresholds low will trigger the Lambda operate to be invoked extra continuously. If the thresholds are excessive, the information shall be ingested to the information retailer much less continuously, subsequently, it would take time to see the newest information within the information retailer.
  • Prefix: the created objects shall be put below this prefix. Kinesis Information Firehose partitions the information primarily based on the timestamp by default. On this instance, the objects shall be put below the device-data/YYYY/MM/dd/HH folder. Kinesis Information Firehose has superior options for information partitioning akin to dynamic partitioning. The partitioning of the information is necessary when querying the information lake. For instance, if it’s essential question the information per machine foundation by utilizing Amazon Athena, scanning solely the partition of the related machine ID will considerably scale back the scan time and the fee. You could find particulars on partitioning on this documentation.
  • RoleARN: that is the IAM position that offers PutObject permission to Kinesis Information Firehose to have the ability to put aggregated information into the Amazon S3 bucket.
  • ProcessingConfiguration: As described within the use case, a remodel Lambda operate will enrich the IoT information with the metadata. Processing Configuration defines the processor which is a Lambda operate within the instance. For every batch of information, Kinesis Information Firehose will name this Lambda operate for the transformation of the information. You may learn extra about information processing on this documentation.

Transformation Lambda Operate

As you possibly can see within the following Python code, Kinesis Information Firehose returns a batch of information the place every file is a payload from the IoT units. First, the base64 encoded payload information is decoded. Then, the corresponding ship ID comes from the DynamoDB desk primarily based on the container ID. The payload is enriched with the ship ID and encoded again to base64. Lastly, the file record is returned again to Kinesis Information Firehose.

As soon as Kinesis Information Firehose receives the information, it places them as an aggregated file into the Amazon S3 bucket.

import os
import boto3
import json
import base64

dynamodb = boto3.useful resource('dynamodb')
desk = dynamodb.Desk(os.environ['METADATA_TABLE'])
information = []

def function_handler(occasion, context):
  for file in occasion["records"]:
    # Get information area of the file in json format. It's a base64 encoded string.
    json_data = json.hundreds(base64.b64decode(file["data"]))
    container_id = json_data["containerId"]

    # Get corresponding shipId from the DynamoDB desk
    res = desk.get_item(Key={'containerId': container_id})
    ddb_item = res["Item"]
    ship_id = ddb_item["shipId"]

    # Append shipId to the precise file information
    enriched_data = json_data
    enriched_data["shipId"] = ship_id

    # Encode the enriched file to base64
    json_string = json.dumps(enriched_data).encode("ascii")
    b64_encoded_data = base64.b64encode(json_string).decode("ascii")

    # Create a file with enriched information and return again to Firehose
    rec = {'recordId': file["recordId"], 'end result': 'Okay', 'information': b64_encoded_data}
    information.append(rec)
  return {'information': information}

Deployment

Run the next command in a terminal to deploy the stack.

aws cloudformation deploy --stack-name IoTKinesisDataPath --template-file IoTKinesisDataPath.yml --parameter-overrides IotKinesisRuleSQL="SELECT *, matter(3) as containerId FROM 'machine/information/+'" --capabilities CAPABILITY_NAMED_IAM

After the deployment is full, run the next command in a terminal to see the output of the deployment.

aws cloudformation describe-stacks --stack-name IoTKinesisDataPath

Observe the IoTLogS3BucketName, MetadataTableName output parameters.

Testing

After the deployment is full, very first thing it’s essential do is to create a metadata merchandise for information enrichment. Run the next command to create an merchandise within the DynamoDB desk. It’s going to create an merchandise with cont1 as containerId and ship1 as shipId. Change IoTKinesisDataPath-MetadataTable-SAMPLE parameter with the DynamoDB desk output parameter from the CloudFormation stack deployment.

aws dynamodb put-item --table-name IoTKinesisDataPath-MetadataTable-SAMPLE --item '{"containerId":{"S":"cont1"},"shipId":{"S":"ship1"}}'

In a real-life situation, the units publish the payloads to a particular MQTT matter. On this instance, as an alternative of making IoT units, you’ll use AWS CLI to publish payloads to MQTT subjects. Run the next command in a terminal to publish a pattern information payload AWS IoT Core. Take note of the payload area of the command, the one information offered by the machine is the dynamic information.

aws iot-data publish --topic "machine/information/cont1" --payload '{"temperature":20,"humidity":80,"latitude":0,"longitude":0}' --cli-binary-format raw-in-base64-out

Now, navigate to Amazon S3 from the AWS Administration Console and choose the bucket that has been created with the CloudFormation stack. You need to see the device-data folder on this bucket. It might take as much as 1 minute for the information to seem as a result of buffering configuration that’s set for the Firehose supply stream. When you navigate into the device-data/YYYY/MM/dd/HH folder, you will note an object has been created. Go forward and open this file. You will note the content material of the file is the information payload with enriched shipId area.

{“temperature”: 20, “humidity”: 80, “latitude”: 0, “longitude”: 0, “containerId”: “cont1”, “shipId”: “ship1”}

Troubleshooting

In case of failure within the system, the next assets may be helpful for analyzing the supply of the issue.

To observe AWS IoT Core Guidelines Engine, it’s essential allow AWS IoT Core logging. It will give detailed details about the occasions occurring in AWS IoT Core.

AWS Lambda may be monitored by utilizing Amazon CloudWatch. The instance CloudFormation template has crucial permissions to create a log group for the Lambda operate logging.

In case of failure, Kinesis Information Firehose will create a processing-failed folder below the device-data prefix within the AWS IoT Guidelines Engine motion, remodel Lambda operate or Amazon S3 bucket. The main points of the failure may be learn as json objects. You could find extra info on this documentation.

Clear up

To scrub up the assets which were created, first empty the Amazon S3 bucket. Run the next command by altering the bucket-name parameter with the identify of the bucket deployed by the CloudFormation stack. Necessary: this command will delete all the information contained in the bucket irreversibly.

aws s3 rm s3://bucket-name --recursive

Then, you possibly can delete the CloudFormation stack by operating the next command in a terminal.

aws cloudformation delete-stack --stack-name IoTKinesisDataPath

Conclusion

On this weblog, you could have discovered a typical sample of enriching IoT payloads with metadata and storing affordably in a knowledge lake in close to real-time by utilizing AWS IoT Guidelines Engine and Amazon Kinesis Information Firehose supply stream. The proposed answer and the CloudFormation template can be utilized as a baseline for a scalable IoT information ingestion structure.

You may learn additional about AWS IoT Core Guidelines Engine and Amazon Kinesis Information Firehose. Greatest practices for utilizing MQTT subjects within the AWS IoT Guidelines Engine will information you to outline your matter buildings.

Ozan Cihangir

Ozan Cihangir

Ozan is a Prototyping Engineer at AWS. He helps prospects to construct progressive options for his or her rising know-how tasks within the cloud. LinkedIn

Related Articles

Latest Articles