Reading Data From Kinesis Using Python

Estimated Reading Time: 4 minutes

I love the Amazon Kinesis real-time data processing service.  The possibilities are endless when thinking about the products that could be created around it.  However, it can be a pain to use. There are streams that contain shards that have records in them.  You need iterators that expire in short amounts of time to get the data, and you must make sure to not make too many requests in a small amount of time or Amazon’s API will freak out.  That’s alot to keep in mind when writing a robust piece of code to read from one of these streams.Amazon Kinesis Workflow Diagram

Recently, at InfoTrust, I had the pleasure of writing such a piece of code in Python using the boto library.  Our use case was to read data from a stream that contained only a single shard for a set period of time. The code is written as a Python generator.  This makes using it much simpler, keeping your code much more readable.

First, the code uses boto to create a connection to Kinesis.  It then gets all information about that stream.  This information is used to get the shards in the stream from which we extract a list containing just the IDs of these shards.

Next, we get an iterator that points at the first piece of data in the stream.  Notice that for our use case, since we only have a single shard in the stream, we only get an iterator for the shard with an ID that is first in the list.  This part could easily be extended to accommodate more shards without changing the code that uses it.

We calculate when the end time will be so that we know when to stop looping over data and exit the generator.  We enter a loop that will never end.  This is the best way to emulate a traditional do-while loop in Python.  We get the next set of records using the shard iterator we obtained earlier.  Then we check to make sure that the end time has not passed and that we received data.  The loop will exit if either is true.  We then iterate over each of the records in the response and yield them to the calling piece of code.  It automatically dumped it from JSON since our data was stored as JSON strings.  The last step in the loop is to get the next iterator that was returned in the response and set it to the current iterator so that it will be used in the current request.

There is also some error handling provided since a few exceptions can be thrown by the boto module.  First is a ProvisionedThroughputExceededException, which means we have been hitting the Kinesis API too much, and we have been rate limited.  This rate limiting is based on the number of hits per second.  The code simply catches the error and sleeps for half a second before continuing the loop.  The other exception thrown is an ExpiredIteratorException.  Shard iterators expire after five minutes.  If the data cannot be processed within five minutes, then the generator will attempt to use an expired iterator.  Boto will throw this exception, which is easily caught.  We then make a request to get a new iterator and continue on with processing data.

Reading from Kinesis can be strange.  Hopefully this code above gives you what you need to make this part of your application much easier to create.

Author

Facebook
Twitter
LinkedIn
Email
Originally Published: September 29, 2015

Subscribe To Our Newsletter

October 23, 2023
Originally published on September 29, 2015

Other Articles You Will Enjoy

Tracking User Behavior with Events in Google Analytics 4: Examples and Use Cases

Tracking User Behavior with Events in Google Analytics 4: Examples and Use Cases

So you’ve created your Google Analytics 4 (GA4) properties, created your data stream(s), and followed all the necessary steps to configure your property. Now…

5-minute read
Is It Time to Upgrade? 4 Signs Your Organization Needs Google Analytics 4 360

Is It Time to Upgrade? 4 Signs Your Organization Needs Google Analytics 4 360

As VP of Partnerships at InfoTrust, I’ve had the opportunity to talk with hundreds of decision-makers about their interest in upgrading to Google Analytics…

4-minute read
App Install Attribution in Google Analytics 4: What You Need to Know

App Install Attribution in Google Analytics 4: What You Need to Know

App install attribution in Google Analytics for Firebase (GA4) is a feature that helps you understand how users discover and install your app. It…

6-minute read
Google Tag Best Practices for Google Analytics 4

Google Tag Best Practices for Google Analytics 4

After collaborating with several of my colleagues at InfoTrust including Bryan Lamb, Head of Capabilities, Corey Chapman, Senior Tag Management Engineer, Chinonso Emma-Ebere, Tech…

4-minute read
Leveraging Attribution Models in Google Analytics 4 to Improve Your Marketing Strategy: Tips and Best Practices

Leveraging Attribution Models in Google Analytics 4 to Improve Your Marketing Strategy: Tips and Best Practices

In the dynamic landscape of digital marketing, understanding the customer journey is crucial for optimizing strategies and maximizing ROI. Google Analytics 4 (GA4) introduces…

5-minute read
How to Track User Engagement and Behavior on Your Website Using Google Analytics 4 Custom Metrics

How to Track User Engagement and Behavior on Your Website Using Google Analytics 4 Custom Metrics

Understanding how users engage with your website is crucial for improving user experience and achieving your business goals. Google Analytics 4 (GA4) offers a…

5-minute read
Leveraging Custom Dimensions and Metrics in Google Analytics 4 for Content Performance Measurement: Best Practices and Real-World Examples

Leveraging Custom Dimensions and Metrics in Google Analytics 4 for Content Performance Measurement: Best Practices and Real-World Examples

In today’s digital landscape where content reigns supreme, understanding how your audience interacts with your content is paramount for success. For news and media…

5-minute read
How Does BigQuery Data Import for Google Analytics 4 Differ from Universal Analytics?

How Does BigQuery Data Import for Google Analytics 4 Differ from Universal Analytics?

All Google Analytics 4 (GA4) property owners can now enable ‌data export to BigQuery and start to utilize the raw event data collected on…

2-minute read
How to Integrate Google Analytics 4 with BigQuery for Enhanced Data Analysis and Reporting

How to Integrate Google Analytics 4 with BigQuery for Enhanced Data Analysis and Reporting

Has your business found that its reporting needs require advanced analysis of your analytics data beyond what is practical in the Google Analytics 4…

4-minute read

Get Your Assessment

Thank you! We will be in touch with your results soon.
{{ field.placeholder }}
{{ option.name }}

Talk To Us

Talk To Us

Receive Book Updates

Fill out this form to receive email announcements about Crawl, Walk, Run: Advancing Analytics Maturity with Google Marketing Platform. This includes pre-sale dates, official publishing dates, and more.

Search InfoTrust

Leave Us A Review

Leave a review and let us know how we’re doing. Only actual clients, please.