Building a Data Pipeline Using Apache Airflow and AWS S3
This is a simple walk through on how I was able to extract Youtube comments as JSON, cleaned and transformed the data into csv format, and then loaded the cleaned data into an AWS S3 bucket using Apache Airflow.
What is a data pipeline?
Simply said, a data pipeline is a network of interconnected pipes used to transfer data from one location to another. Data travels through a data pipeline, much like water travels through pipes to get to your faucet.
In my case, I extracted the data using the Google Cloud API, then used python(pandas) to transform the data , and finally deployed the code on Airflow/EC2 and stored the final result on Amazon S3 bucket.
Below is a visual representation of the steps taken in this project:
Data Source
Initially, the idea was to work with tweets using Twitter API, but this was not possible as Twitter has announced that the company will no longer support free access to its API. Thanks Elon! Thankfully, Google Cloud Services API has free access.
I chose a random video of a famous youtuber to extract comments.
Data Extraction and Normalization
I wrote a python script that extracts comments from a YouTube video using the YouTube Data API (v3) and stores them in a list called comments
#Necessary imports
import googleapiclient.discovery
import googleapiclient.errors
import pandas as pd
#The function to extract the data
def run_youtube_etl():
api_service_name = "youtube"
api_version = "v3"
key = "--insert your api key here--"
#Initializing API client
youtube = googleapiclient.discovery.build(api_service_name, api_version, developerKey=key)
#Creating API request
request = youtube.commentThreads().list(part="snippet", videoId="KrLj6nc516A", maxResults = 100)
#Executing request and storing the returned value in 'response'
response = request.execute()
#Processing response (JSON data)
comments = []
for item in response['items']:
comment = item['snippet']['topLevelComment']['snippet']
comments.append([
comment['authorDisplayName'],
comment['publishedAt'],
comment['updatedAt'],
comment['likeCount'],
comment['textDisplay']
])
#Using pandas to transorm the data into a dataframe
df = pd.DataFrame(comments, columns = ['author', 'publishedDate', 'updateDate', 'likeCount', 'text'])
#Changing to csv format
df.to_csv("s3://bashir-aiflow-youtube-bucket/mrBeastVideo.csv")
Here is a sample of the cvs file after extraction:
Airflow/EC2
Next we need to set up an Apache Airflow DAG (Directed Acyclic Graph) for an ETL (Extract, Transform, Load) process that runs the run_youtube_etl
function from the data extraction script.
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import timedelta
from youtube_etl import run_youtube_etl
import googleapiclient.discovery
import googleapiclient.errors
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 11, 8),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
'etl_dag',
default_args=default_args,
description='First DAG with ETL process!',
schedule_interval=timedelta(days=1),
)
run_etl = PythonOperator(
task_id='complete_youtube_etl',
python_callable=run_youtube_etl,
dag=dag,
)
run_etl
Here's a breakdown of what's happening:
Imports: The code imports necessary modules and classes from Airflow and Python’s datetime module. It also imports the run_youtube_etl
function from an external module named youtube_etl
, which is to contain the code that was described earlier for data extraction.
Default Arguments: It defines a dictionary called default_args
that contains default configuration parameters for the DAG. These parameters include the owner of the DAG, start date, email settings, retry settings, etc. These parameters provide default values for various DAG settings.
DAG Configuration: It creates a DAG object named dag
with the following parameters:
'etl_dag'
: This is the DAG's name.default_args
: This sets the default configuration parameters for the DAG.'First DAG with ETL process!'
: This is the description of the DAG.timedelta(days=1)
: This specifies that the DAG should run on a daily schedule.
PythonOperator: It defines a PythonOperator named run_etl
. This operator is a task within the DAG that will execute Python code when the DAG is run.
task_id='complete_youtube_etl'
: This is the unique identifier for this task.python_callable=run_youtube_etl
: This specifies that therun_youtube_etl
function should be executed when this task runs.dag=dag
: This assigns therun_etl
task to thedag
that was previously defined.
Run ETL Task: Finally, the run_etl
task is added to the DAG. This task will execute the run_youtube_etl
function when the DAG is triggered. The DAG is configured to run on a daily schedule (timedelta(days=1)
), and this task is part of that schedule.
Loading Data
Finally, I used Apache Airflow to create a set of instructions (DAG) that includes a task to upload your data to AWS S3. Airflow takes care of executing these instructions, ensuring your data is safely stored in your S3 bucket.