Development of Kinesis/KCL applications locally

One of the challenges of development Kinesis consumers is to have the proper dev and testing environments. Unlike DynamoDB that has official(though buggy) local implementation, Kinesis doesn't have one. Kinesalite, unofficial implementation of Kinesis API, is a perfect choice for local development and integration tests. If you plan to use KCL(Kinesis Client Library) in your application you need both - local DynamoDB and local Kinesis. Unfortunately, I didn't find any documentation on how to configure KCL for a local environment. So after trial and error I came up with this post.
I will use both Kinesalite and Dynalite(local Dynamo implementation from authors of Kinesalite).
Examples are written in Scala, but they are a straightforward port of Java examples from official documentation.

Prerequisites

Step 1. Set up Kinesalite & Dynalite

Create a subfolder in your project, and initialize npm package.json inside. Install Kinesalite and Dynalite:

cd ~/local-kinesis-app
mkdir kinesis
cd kinesis
npm init
npm install --save aws-sdk kinesalite dynalite kinesis

Create a file kinesis/start-kinesis.js and add this code to it:

var kinesalite = require('kinesalite');
var kinesaliteServer = kinesalite({ createStreamMs: 0 });
var AWS = require('aws-sdk')

var config = {
  "accessKeyId": "FAKE",
  "secretAccessKey": "FAKE",
  "region": "FAKE",
  "kinesisEndpoint": "http://localhost:4567",
  "kinesisPort": 4567,
  "StreamName": "stream-name",
  "ShardCount": 1
}

kinesaliteServer.listen(config.kinesisPort, function(err) {
    if (err) throw err;
    console.log('Kinesalite listens on port ' + config.kinesisPort);

    console.log('Creating stream: ', config.StreamName);

    var kinesis = new AWS.Kinesis({
      endpoint: config.kinesisEndpoint,
      accessKeyId: config.accessKeyId,
      secretAccessKey: config.secretAccessKey,
      region: config.region
    });

    AWS.config.update({});

    kinesis.createStream({ StreamName: config.StreamName, ShardCount: config.ShardCount }, function (err) {
        if (err) throw err;

        kinesis.describeStream({ StreamName: config.StreamName }, function(err, data) {
        if (err) throw err;

        console.log('Stream ready: ', data);
        });
    });
})

This will start Kynesalite server on localhost:4567 and create stream stream_name

Add start script to package.json file:

{
  ...
  "scripts": {
    ...
    "start": "node start-kinesis.js ./node_modules/.bin/dynalite --port 4568"
  }
}

Now when you run cd kinesis && npm start from project root it will start Kinesalite and Dynalite on ports 4567 and 4568 respectively

Step 2. Implement KCL record processor

I won't go into the details here. You can find docs here and examples here.

Also here is an example that has everything from this post - https://github.com/avovsya/local-kinesis-app-example
I'll talk about it later. TODO

Step 3. Configure KCL to use local Kinesis & DynamoDB

Amazon provides good example on how to configure KCL. There are multiple things that need to be changed to support Kinesalite and Dynalite.

First, when configuring KinesisClientLibConfiguration we need to disable CloudWatch metrics reporting, Kinesalite doesn't support them:

val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(ApplicationName,
    ApplicationStreamName,
    credentialsProvider,
    workerId)
    .withInitialPositionInStream(PositionInStream)
    .withMetricsLevel(MetricsLevel.NONE) // Disable CloudWatch metrics

Second we need to create custom Kinesis and Dynamo clients, providing correct host and port:

val kinesisClient: AmazonKinesisClient = new AmazonKinesisClient().withEndpoint("http://localhost:4567/")
val dynamoClient: AmazonDynamoDBClient = new AmazonDynamoDBClient().withEndpoint("http://localhost:4568/")

and inject them into the KCL Worker:

val worker: Worker = new Worker.Builder()
  .recordProcessorFactory(recordProcessorFactory)
  .config(kinesisClientLibConfiguration)
  .kinesisClient(kinesisClient)
  .dynamoDBClient(dynamoClient)
  .build()

The last thing that we need is to provide KCL with fake credentials. You can just put them into environment variables, but having them right in you build.sbt is quite convenient for development:

fork := true
envVars := Map(
  "AWS_ACCESS_KEY_ID" -> "FAKE",
  "AWS_SECRET_ACCESS_KEY" -> "FAKE",
  "AWS_REGION" -> "FAKE",
  "AWS_CBOR_DISABLE" -> "true")

CBOR stands for Concise Binary Object Representation. It's a data format that AWS driver tries to use by default, but it's not supported by Kinesalite

Step 4. Run it

Now start the app with sbt run (don't forget to start Kinesalite/Dynalite first) and put some data to Kinesalite.

You can find fully functional example on my Github - https://github.com/avovsya/local-kinesis-app-example.
It also has a sample record producer.
To run example:

  1. Clone repo - git clone https://github.com/avovsya/local-kinesis-app-example local-kinesis-app && cd local-kinesis-app
  2. Install Kinesalite & Dynolite - (cd kinesis ; npm install)
  3. Run them in background - (cd kinesis ; npm start&)
  4. Run application - sbt run
  5. Send some data to local Kinesis - (cd kinesis ; node send-records.js)

comments powered by Disqus