Data Engineering using Apache Airflow

Uttasarga Singh
4 min readDec 14, 2020

--

The following story will be a long one to read, so let’s not utilize our time in introduction to Airflow.

I was working on a Data Engineering project in which I had several tasks on my Bucket: Parsing JSON Data which consisted of various Data Sources; out of which picking up the Data Sources belonging to COVID-19. Then, I had to download the Data which was in JSON Format, convert it into a Pandas Data Frame and then Store it in Postgres Database. These activities should then be scheduled periodically using Apache Airflow (ETL Tool for Automation). So, lets get started by pulling each task out of the bucket; one at a time.

If you go to “https://healthdata.gov/data.json"; you will see that there are around a total of 1757 Data Sources; out of which 47 Data Sources consists Data related to COVID-19. I started looking for Keywords which are related to Coronavirus and found around 5 different ones. After this, I downloaded the required Data Set with this custom function I first built using Jupyter Notebook.

COVID-19 Keywords

Now, since I have the keywords; I separated the URLS and downloaded the respective data belonging to the URLS.

Links with the Keywords

This List is huge, I have added the Top 6 URLS from which I can download the Files required for this Project.

JSON File consists of Key:Value pairs; and it was nested in my case. So, I had to explore the data file first, before starting to write the Syntax to export the JSON Data properly.

Keys Present in the JSON Dataset (Downloaded using 1st URL)
Part of a Columns Key

The above picture gives us a explanation that the Dataset consists of Variables which are stored with the Key or FieldName. Each Variable in the Data set is stored in this manner. We have to now extract those names for getting the Data set as we expect.

Extracted the Columns

Now, after extracting the columns from the Dataset, we can proceed to get the columns/variables which are of the Significance in the whole data set. After this, I will now start to fetch the Data only for the Columns that I need.

Data extracted and converted in a Data Frame

This helps us to create a Pandas Data frame out of a JSON Data set. Since, this Data is now ready, I can write a DAG to schedule each functioning of the Above Code snippets in a Single File and execute it periodically. Please visit my Github Repository created for this in order to look at the whole End-To-End workflow.

An Airflow DAG consists of various Operators, which serves as a medium to a particular task that needs to be performed. Here, I have used a Python Operator to run the above code explanation and also a Postgres Operator to transfer the Data downloaded in the location

Airflow is a state-of-the-art Data Engineering Software developed by AirBnB, which helps to reiterate the Scripts that are developed to load the Data from a particular Database into the Database of your choice (Local Machine, Cloud Sources like AZURE, GCP and AWS).

There are various parameters that we have to keep in mind while wiring a Python Script for running as an ETL job in Airflow.

1. Importing the Necessary Libraries.

We can import python operator in order to communicate with Airflow that the function/script that we want to run is a Python File.

We should now give some default arguments so that we can schedule the number of times we want to run this Job.

** Please note that the dag_id in the DAG function should be similar to the name of your Python Script in which you are writing these functions.**

Create a function to include your whole end-to-end code which is functioning as expected in your Jupyter Notebook.

** You can check this function in data.py file uploaded with this repository.**

Next step is to call the Python Operator and execute the function with the associated DAG (Data- Acyclic Graph)

** Several Operators can be included and also they can be organized in the manner we want to execute the multiple operators.**** For Example: run_etl >> dummy_etl >> dummy_etl2**

Triggering the DAG Created and checking its Status and Logs.

The DAG was running successfully

--

--

Uttasarga Singh

Machine Learning Engineer / Software Developer. More than 3 years of experience in developing/deploying Machine learning Models and Web-based applications.