Distributed Machine Learning Model Training with Spark (PySpark)
Apache Spark was designed to function as a simple API for distributed data processing in general-purpose programming languages. It enables tasks that would otherwise require thousands of lines of code to be reduced to dozens.
What is PySpark?
PySpark is an interface for Apache Spark in Python. It not only allows you to write Spark applications using Python APIs but also provides the PySpark shell for interactively analyzing your data in a distributed environment. PySpark supports most of Spark's features such as Spark SQL, DataFrame, Streaming, MLlib (Machine Learning), and Spark Core.
The goal of this challenge is to predict whether a user will churn after their subscription expires — specifically, whether they make a new service subscription transaction within 30 days after the current membership expiration date.
Creating the PySpark Job
Data aggregation
We already have daily user logs describing the listening behaviors of a user within a month. We aggregate this data so that we have a single record with the sum of all fields for each user.
Features
num_25— songs played less than 25% of song lengthnum_50— songs played between 25%–50%num_75— songs played between 50%–75%num_985— songs played between 75%–98.5%num_100— songs played over 98.5%num_unq— unique songs playedtotal_secs— total seconds played
We also add a feature for the total number of all songs played in a month, then build a simple LogisticRegression model, evaluate it, and track the accuracy.
Run PySpark locally on Minikube
Prerequisites
- Docker
- Minikube
Set up the local cluster
We create a Minikube cluster, mount our repo, and build a custom PySpark image that includes NumPy:
make allDataproc is a fully managed and highly scalable service for running Apache Spark, Apache Flink, Presto, and 30+ open source tools and frameworks.
Dataproc cluster types and how to set it up
Dataproc has three cluster types: Standard, Single-Node, and High Availability. We use the Standard type, which consists of 1 master and N worker nodes.
Creating the Dataproc cluster is easy — first enable Dataproc, then create the cluster on Cloud Engine VMs with the default options. When creating the cluster make sure to include Anaconda, as our job requires NumPy.
Submit PySpark Job
First, create a GCS bucket (replace <bucket-name> with your bucket name), then upload the GitHub repo including the dataset. Submit the job by selecting PySpark as the job type and the gsutil URI of job.py as the main Python file.
You can also run Spark jobs using the built-in SparkSubmitOperator in Airflow. Install requirements and start Airflow:
pip install -r requirements_airflow.txt
AIRFLOW_HOME=$(pwd) airflow standaloneTo connect to a local Spark cluster, create a connection with id spark, type spark, and hostname local[*]. Optionally, set load_examples = False in airflow.cfg. Then access the Airflow UI:
# URL: http://localhost:8080
# username: admin
# password: see terminal output or standalone_admin_password.txtConclusions
There is no magic out-of-the-box solution to run PySpark jobs. This article demonstrates what is involved when running any data processing pipeline — from local Minikube to a managed cloud service like Dataproc.
For any questions, feel free to reach out to us at hello@datamax.ai.
DataMax Team
DataMax Team