Data Pipeline for Data Science, Part 2: TMDb API Data“Crawler”

⚡️Hudson Ⓜ️endes
7 min readSep 16, 2020

Distributed TMDb API Data Download using AWS Lambda.

Wanna hear occasional rants about Tensorflow, Keras, DeepLearning4J, Python and Java?

Join me on twitter @ twitter.com/hudsonmendes!

Taking Machine Learning models to production is a battle. And there I share my learnings (and my sorrows) there, so we can learn together!

Data Pipeline for Data Science Series

This is a large tutorial that we tried to keep conveniently small for the occasional reader, and is divided into the following parts:

Part 1: Problem/Solution Fit
Part 2: TMDb Data “Crawler”

Part 3: Infrastructure As Code
Part 4: Airflow & Data Pipelines
(soon available) Part 5: DAG, Film Review Sentiment Classifier Model
(soon available) Part 6: DAG, Data Warehouse Building
(soon available) Part 7: Scheduling and Landings

The Problem: Linking IMDb ids and TMDb ids

This project has the following problem statement:

Data Analysts must be able to produce reports on-demand, as well as run several roll-ups and drill-down queries into what the Review Sentiment is for both IMDb films and IMDb actors/actresses, based on their TMDb Film Reviews; And the Sentiment Classifier must be our own.

Looking at the TMDb API specification, we find that they have links to the IMDb Film ids:

However, the "find" endpoint only supports one ID per request:

Given that we need as many links as we can have between IMDb films and TMDb films, from where we will get the reviews, we will need to request the API a large number of times.

Technical Challenge: Network Latency

Photo by Brett Sayles from Pexels

The TMDb API does indeed have a “fast, consistent and reliable way to get third party data”, as they claim in their documentation.

However, every time we request and endpoint, we are subject to network latency.

Millions of requests (7,167,768), would amount for approximately 35 days.

Solution: Parallelism & Distributed Data Download

Network Latency only affects us if we request the data serially. In simpler terms, it only affects if the second request waits until the first request is completely finished.

The terminology for the options we have vary greatly, but in simple terms, the approaches we could take are:

  1. CPU-level Parallelism: based on a multithreading (or similar) approach, where all the connections are handled but a single computer networking infrastructure, and the requests are parallelised by the CPU.
  2. Distributed Machine Parallelism: based on multiple machines, each one with a different networking infrastructure, running requests independently.

Given the vast number of requests we are going to make (millions), here are some constraints that we face when trying to download the data:

  • Large number of threads (millions)
  • Large number of connections to the API (millions)

These constraints basically lead us to go for the logical choice of Distributed Machine Parallelism to solve our problem.

Infrastructure: Serverless with AWS Lambda

We don't want to be setting up tons of servers to do the work for us just for this task, specially because after finishing this job, we want to teardown our whole infrastructure

For that reason, the obvious choice is Cloud Computing.

Each of our download tasks can be coded to be a simple “download one film JSON”; in other words, a single “download” can be a simple function.

AWS Lambda: https://aws.amazon.com/lambda/

That function is simple enough to be run by AWS Lambda, which removes our need to setup and entire server.

We give AWS Lambda a python function, it spans a number of servers and execute it for us.

Given our requirements, our function could be something as simples as the following:

import os
import json
from pipeline import IMDb, TMDb
from infra import Config


def lambda_handler(event, context):
"""
Downloads 'movies' for a particular {year}, with names that
start with a particular {initial} character

Parameters
----------
- event : 'Records' have the messages received from SQS (full body)
- context: lambda context wrapper

Message Body
------------
- year : the year for which movies will be downloaded
- initial: the first non-blank character of the name of the movie
"""

config = Config()

imdb = IMDb(
bucket_name=config.get_datalake_bucket_name())

tmdb = TMDb(
bucket_name=config.get_datalake_bucket_name(),
api_key=config.get_tmdb_api_key())

for record in event['Records']:

body = json.loads(record['body'])

year = int(body['year'])

initial = body['initial']

print(f'Lambda, processsing partition ({year}, {initial})')

imdb_movies_stream = imdb.get_movie_refs_stream(
year=year,
initial=initial)

tmdb_movie_and_reviews_generator = tmdb.get_movies_related_to(
imdb_movies_stream=imdb_movies_stream)

processed_count = 0
for tmdb_movie, tmdb_reviews in tmdb_movie_and_reviews_generator:
tmdb_movie.save()
tmdb_reviews.save()
processed_count += 1

print(f'Lambda, completed processing {processed_count}')

return {
'statusCode': 200,
'body': json.dumps(body)
}

The full source code for that module can be found here:

AWS Lambda Function: Installing

To make our life easier, we wrapped our entire code around a click CLI program:

import os
import json
import click


@click.group()
def cli():
"""
Command line group, allowing us to run `python tdd` commands
in the root folder of this repository.
"""
pass


@cli.command()
@click.option('--datalake_bucket_name', prompt='DataLake, Bucket Name', help='The S3 BucketName to which you will dump your files', default='hudsonmendes-datalake')
@click.option('--tmdb_api_key', prompt='TMDB, API Key', help='Find it in https://www.themoviedb.org/settings/api', default=lambda: os.environ.get('TMDB_API_KEY', None))
def development(datalake_bucket_name: str, tmdb_api_key: str):
"""
Setup the development environment locally, with the required configuration.
`python tdd development --datalake_bucket_name [bucket_name] --tmdb_api_key [api_key]`
"""
from infra import Config
Config().update(
datalake_bucket_name=datalake_bucket_name,
tmdb_api_key=tmdb_api_key)


@cli.command()
@click.option('--year', prompt='IMDB, Year', default=2004, help='Year of movies that will be downloaded')
@click.option('--initial', prompt='IMDB, Initial', default='AD', help='First letter of the films that will be downloaded')
@click.option('--queue_name', prompt='AWS SQS, Queue', default='hudsonmendes-tmdb-downloader-queue', help='The name of the queue to which we will send the message')
def simulate(year: int, initial: str, queue_name: str):
"""
Simulates the system by sending a one-off message to the SQS queue,
so that the Lambda Function can pick it up and we can evaluate that
the whole system is functioning.
"""
import boto3
messages = [{'year': year, 'initial': initial}]
sqs = boto3.resource('sqs')
queue = sqs.get_queue_by_name(QueueName=queue_name)
for message in messages:
body = json.dumps(message)
queue.send_message(MessageBody=body)


@cli.command()
@click.option('--year', prompt='IMDB, Year', default=2004, help='Year of movies that will be downloaded')
@click.option('--initial', prompt='IMDB, Initial', default='AD', help='First letter of the films that will be downloaded')
def download(year: int, initial: str):
"""
Invokes the lambda_function manually for a one-off download.
Can be used for debug purposes
"""
import lambda_function
event = {'Records': [{'body': json.dumps({'year': year, 'initial': initial})}]}
lambda_function.lambda_handler(event=event, context=None)


@cli.command()
@click.option('--lambda_name', prompt='AWS Lambda, Function Name', default='hudsonmendes-tmdb-downloader-lambda', help='The name of the function to which we will deploy')
@click.option('--queue_name', prompt='AWS SQS, Queue', default='hudsonmendes-tmdb-downloader-queue', help='The name of the queue to which we will send the message')
@click.option('--datalake_bucket_name', prompt='DataLake, Bucket Name', help='The S3 BucketName to which you will dump your files', default='hudsonmendes-datalake')
def deploy(lambda_name, queue_name, datalake_bucket_name):
"""
Deploy the system into lambda, creating everything that is necessary to run.
"""
from infra import Deploy
Deploy(
lambda_name=lambda_name,
queue_name=queue_name,
datalake_bucket_name=datalake_bucket_name).deploy()


if __name__ == "__main__":
cli()

That allows us running our code using an elegant CLI tool, with the following commands:

# from the root of the git repositorypython tdd development # runs our development server
python tdd download # downloads one year of data = one job
python tdd deploy # installs the lambda into AWS Lambda

The python tdd deploy then deploys our code into lambda.

Running our Distributed API Data Downloader

Our AWS Lambda configuration waits messages in a AWS SQS Queue. Whenever messages arrive there, it starts spinning of instances of our AWS Lambda function that will consume them.

In order to Launch our download process, we must then send messages to AWS SQS, and we do so by running our Jupyter Notebook called "launch_fleet.ipynb".

Github @hudsonmendes: "launch_fleet.ipynb"

This notebook will then schedule messages that partition the download jobs by initial (e.g: "TH" for The Matrix) and year.

Each one of these download jobs must take less than 15 minutes (which is the maximum setting for the lambda function timeout.

After running

You should have all the results dropped into the "datalake_bucket_name" that you have defined in your notebook.

Once that is done, we are ready to start building the infrastructure for our data pipeline.

In Summary

By looking at the requirements, we:

  1. Understood the need for Parallelism
  2. Chosen Distributed Machine Parallelism
  3. Understood that Cloud Computing was the best choice
  4. For this Task, Going Serverless with AWS Lambda was the best
  5. Wrote our lambda_function in python

We are now ready to create our infra-structure using Infrastructure as Code, with python.

Next Steps

In the next article Part 3: Infrastructure As Code we will deep dive into how how we used Amazon Lambda to crawl the TMDb Films information (using their API) using parallelisation.

Source Code

Find the end-to-end solution source code at https://github.com/hudsonmendes/nanodataeng-capstone.

Wanna keep in Touch? LinkedIn!

My name is Hudson Mendes (@hudsonmendes), I’m a 38 years old coder, husband, father of 3, ex Startup Founder, ex-Peloton L7 Staff Tech Lead/Manager, nearly BSc in Computer Science by the University of London & Senior AI/ML Engineering Manager.

I’ve been on the Software Engineering road for 22+ years, and occasionally write about topics of interest to other Senior Engineering Managers and Staff Machine Learning Engineering with a bit of focus (but not exclusively) on Natural Language Processing.

Join me there, and I will keep you in the loop with my new learnings in AI/ML/LLMs and beyond!

--

--