Data Ingestion to Cloud SQL from GCS using Google’s Dataflow with Apache Beam (Python)
Apache Beam was open sourced by Google in 2016 via the Apache Software Foundation project. Apache Beam (batch + stream), is a model and a set of APIs for doing both batch and streaming data processing. Beam pipelines are defined using one of the provided SDKs and executed in one of the Beam’s supported runners, which are the distributed processing back-ends such as Spark, Flink, Google’s Dataflow or in your local machine “direct runner”.
In this exercise we will use Google’s Dataflow, which is a cloud-based data processing service for both batch and real-time data streaming applications. This service enables developers to set up processing beam pipelines to integrate, clean and transform data of large data sets, such as those found in big data analytics applications.
We can work with a variety of languages like Go, Scala, Java and Python that Apache Beam supports. But for today’s example we will use the Dataflow Python SDK, given that Python is an easy language to grasp, and also quite popular over here in Peru when talking about data processing. Notice that Dataflow Java SDK appeared first, so it’s best supported, nevertheless our choice still is the Python SDK.
We will start creating a Google Cloud Storage bucket where we will place the files to be ingested to Cloud SQL. In Cloud SQL service, we will create a Cloud SQL instance, this time we will use MySQL instance, and then create a database. We will use the corresponding credentials to connect Dataflow to Cloud SQL. Given that in Beam there is no Cloud SQL pipeline I/O transform to write data, we will build one using a ParDo function, and this is actually the fun of this article!!
Create a Cloud Storage Bucket
There are several ways to create a cloud storage bucket, this time we will use Cloud Shell command line:
$ gsutil mb gs://mybucket
Create a Cloud SQL Instance
In order to create a Cloud SQL instance we need to go to the Google cloud console, go to the menu and choose Cloud SQL. We can choose between different flavors of RDBMS’s like MySQL, PostGreSQL or Microsoft SQL. This time let’s create a MySQL instance. For that you need to choose a name for your instance, password, and location, the rest of the parameters will be left as default.
Create a Database and a Table with the required Schema
Once created the instance let’s use the command line to access our instance and create the database. Enable the API to connect to Cloud SQL and create a database, in this example we will use the name ‘mysqldb’:
$ gcloud sql databases create mysqldb --instance=myinstance
To create a table inside in our brand new mysql database, we need to enter :
$ gcloud sql connect myinstance --user=root
To create a table inside in our brand new mysql database:
USE mysqldb;
CREATE TABLE beer_data (sr INT, abv FLOAT, name VARCHAR(255), style VARCHAR(255), ounces FLOAT, id INT NOT NULL, PRIMARY KEY(id));
Build a Pipeline to run in Dataflow
In this phase we will use Apache Beam Python SDK to build our pipeline. A pipeline manage a directed acyclic graph (DAG), which is a list of todo tasks of PTransforms and PCollections for execution.
- Pcollections: Represents a distributed data collection bounded or unbounded.
- PTransforms: Transforms input PCollections into output PCollections
Beam provides a set of transforms that you will find in the documentation like ParDo, GroupByKey, Combine, Flatten, Partition. Also Beam provides read and write transforms (Pipeline I/O) for a number of common data storage types.
ParDo Function
The ParDo
processing paradigm is similar to the “Map” phase of a Map/Shuffle/Reduce-style algorithm: a ParDo
transform considers each element in the input PCollection
, performs some processing function (your user code) on that element, and emits zero, one, or multiple elements to an output PCollection
.
Building a ParDo Function to Implement Cloud SQL Write Transform
In our particular case, since we will use Dataflow to connect to Cloud SQL and there is no an available pipeline I/O transform, we will need to create a ParDo function to implement our own write transform.
Conclusion
With this example we have shown how to create an specific ParDo function to connect Dataflow with Cloud SQL, given that the pipeline I/O transform is not avaible as with other common data storage types.