Essential Pyspark Utilities for Data Scientists

Nareshkumar Jayavelu
7 min readMar 29, 2021

--

In this article, we will go through the python library pyspark-easy and how it can can be used in your Pyspark development.

Introduction

Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Scala, Java, Python, and R, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and DataFrames, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for stream processing.

PySpark

PySpark is an interface for Apache Spark in Python. It not only allows you to write Spark applications using Python APIs, but also provides the PySpark shell for interactively analyzing your data in a distributed environment.

pyspark-easy

pyspark-easy is a python library to get quick insights into Spark DataFrame. It is also included with functions and methods aimed to assist Data Scientists in their model development by eliminating their need of doing repetitive tasks.

Install pyspark-easy

pip install pyspark-easy

Hide Spark Console Progress

Hide the spark console progress to avoid console progress details from breaking the outputs.

spark.ui.showConsoleProgress = False

It can be added this to the spark session as follows.

spark = SparkSession
.builder
.master("local[*]")
.appName("myApp")
.config("spark.ui.showConsoleProgress", "false")

I. Quick DataFrame Insights

Explore your Spark DataFrame with just two lines of code using ‘pyspark-easy’ that produces the following stats:

  1. Number of observations/rows.
  2. Number of columns.
  3. Number of, and percentage of duplicate rows if any.
  4. Number of columns by its data type and the list of those columns.
  5. Summary stats on numeric columns - count, mean, stddev, min, max, 25%, 50% and 75% percentile. This is an optional argument. Displays only 5 column stats per row to make it easier for user to view the results.
  6. Number of columns with missing values.
  7. List of columns with missing values along with no. of missing rows and percentage of missing rows.

The dataset can be downloaded from Kaggle. However, the dataset is modified for this article.

Let’s see how pyspark-easy does the magic:-

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pyspark-easy').getOrCreate()
df = spark.read.csv('bank.csv', header = True, inferSchema = True)
#import the library and pass the dataframe
from pyspark_easy import pyspark_easy
py=pyspark_easy(dataframe)
py.summary()

Results:

-----------------------------------+----------+
| The number of observations/rows : | 22312689 |
+-----------------------------------+----------+
| the number of variables/columns : | 10 |
+-----------------------------------+----------+
| No of Duplicate rows : | 0 |
+-----------------------------------+----------+
| % of Duplicate rows : | 0 |
+-----------------------------------+----------+

The number of string columns are : 6


The string columns are :
+-------------------+--------+
| Columns | Types |
+===================+========+
| business_dt | string |
+-------------------+--------+
| branchid | string |
+-------------------+--------+
| customerno | string |
+-------------------+--------+
| phone | string |
+-------------------+--------+
| customername | string |
+-------------------+--------+
| customerid | string |
+-------------------+--------+



The number of numerical columns are : 3


The numerical columns are :
+--------------------+--------+
| Columns | Types |
+====================+========+
| id | bigint |
+--------------------+--------+
| amount | bigint |
+--------------------+--------+
| average_bal | int |
+--------------------+--------+



The number of date/time columns are : 1


The date/time columns are :
+-----------+-----------+
| Columns | Types |
+===========+===========+
| updatedon | timestamp |
+-----------+-----------+


Number of columns with missing values are : 3


The columns with missing values are :


+-------------------+-----------------------+---------------------+
| Columns | No. of missing values | % of missing values |
+===================+=======================+=====================+
| customername | 64029 | 0.290 |
+-------------------+-----------------------+---------------------+
| customerid | 11 | 0 |
+-------------------+-----------------------+---------------------+
| id | 11 | 0 |
+-------------------+-----------------------+---------------------+

By default, summary stats are turned off. Turning it on “summary_stats=’Y’” produces the below stats along with the above results.

py.summary(summary_stats='Y')+-------+------------------+------------------+------------------+
|summary| age| balance| day|
+-------+------------------+------------------+------------------+
| count| 11162| 11162| 11162|
| mean|41.231947679627304|1528.5385235620856|15.658036194230425|
| stddev|11.913369192215518| 3225.413325946149| 8.420739541006462|
| min| 18| -6847| 1|
| 25%| 32| 122| 8|
| 50%| 39| 550| 15|
| 75%| 49| 1708| 22|
| max| 95| 81204| 31|
+-------+------------------+------------------+------------------+

II. Model evaluation

Evaluate the model with just two lines of code. The below function ‘pyspark_model_eval’ generates almost all necessary metrics and plots. It supports binary and multi-class classification models.

from pyspark_easy import pyspark_model_eval
model_res=pyspark_model_eval(model,predicted_df)
model_res.results()

Parameters:

model - Trained Model
predicted_df - DataFrame containing label and predictions.

We can train the binary classification model using ‘bank.csv’ and evaluate the results using pyspark-easy.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ml-bank').getOrCreate()
df = spark.read.csv('bank.csv', header = True, inferSchema = True)
# Preprocessing from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssemblercategoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome']
stages = []for categoricalCol in categoricalColumns:
stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
stages += [stringIndexer, encoder]label_stringIdx = StringIndexer(inputCol = 'deposit', outputCol = 'label')
stages += [label_stringIdx]numericCols = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
# Build pipelinesfrom pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['label', 'features'] + cols
df = df.select(selectedCols)
# Modeltrain, test = df.randomSplit([0.8, 0.2], seed = 2021)
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
model = lr.fit(train)
predictions = model.transform(test)

Now we pass the model and the DataFrame containing predictions into ‘pyspark_model_eval’.

from pyspark_easy import pyspark_model_eval
model_res=pyspark_model_eval(model,predictions)
model_res.results()

Results:

The Train vs Test evaluation Metrics are :
+--------------------+--------+--------+
| Metrics | Train | Test |
+====================+========+========+
| AreaUnderROC | 88.430 | 79.980 |
+--------------------+--------+--------+
| Accuracy | 80.070 | 80.110 |
+--------------------+--------+--------+
| Weighted Precision | 80.110 | 80.630 |
+--------------------+--------+--------+
| Weighted Recall | 80.070 | 80.110 |
+--------------------+--------+--------+
| Weighted F1 Score | 80.010 | 79.990 |
+--------------------+--------+--------+
Note: The Weighted metrics - calculated by label then it is averaged
The binary classification metrics for test data are:
+-----------+-----------+
| Metrics | Test Data |
+===========+===========+
| Precision | 84.560 |
+-----------+-----------+
| Recall | 72.790 |
+-----------+-----------+
| F1 Score | 78.240 |
+-----------+-----------+
The classification report follows :

III. Search for a Column name

Search for a column with the specified substring across a schema, or a list of schemas or in an entire database (Hive/Impala). This doesn’t require access to metastore.

from pyspark_easy import column_search
column_search(spark_session,substring, *schemas)

Parameters:

spark_session — spark session object
substring — To find all columns that contains this string . It is a LIKE operator.
*schemas — Optional argument. When left blank, it searches the entire database. To search by schemas, pass schema names into list. [‘schema1’,’schema2']. It is LIKE operator.

The below line searches in an entire database:-

column_search(spark,'avg')

Results:

The columns found in the schema ‘schema1’ under the table ‘table1’ are {‘account_bal_avg’}
The columns found in the schema ‘schema2’ under the table ‘table2’ are {‘customer_avg_balance, last_month_avg_deposit’}

IV. Dates generator

The most common requirement in any spark development project is feature engineering. Feature engineering is a skill every data scientist should know how to perform, especially in the case of date window features. pyspark-easy helps in creating rolling window date related features to take past values into account, in a few lines of code.

To generate time series dates by month:

from pyspark_easy import dates_generator
dates=dates_generator(date,column, backward,*forward)

Parameters:

Date — date string.
Column — string, column name.
Backward — Its the third argument. Number of months in the past from Date. *Forward — By default, it is 0. Number of months in future from the Date.

dates_generator('2020-03-01','customer_average_balance',12,0)

Results:

[['2021-02-01', '2021-02-28', 'customer_average_balance_b1'],
['2021-01-01', '2021-01-31', 'customer_average_balance_b2'],
['2020-12-01', '2020-12-31', 'customer_average_balance_b3'],
['2020-11-01', '2020-11-30', 'customer_average_balance_b4'],
['2020-10-01', '2020-10-31', 'customer_average_balance_b5'],
['2020-09-01', '2020-09-30', 'customer_average_balance_b6'],
['2020-08-01', '2020-08-31', 'customer_average_balance_b7'],
['2020-07-01', '2020-07-31', 'customer_average_balance_b8'],
['2020-06-01', '2020-06-30', 'customer_average_balance_b9'],
['2020-05-01', '2020-05-31', 'customer_average_balance_b10'],
['2020-04-01', '2020-04-30', 'customer_average_balance_b11'],
['2020-03-01', '2020-03-31', 'customer_average_balance_b12']]

It creates a nested list. There are three arguments in each sub list with first one as start date of month, then comes the end date of month and the column name with suffix ‘_b1’ where ‘b’ in ‘_b1’ indicates backward.

Thanks for reading! I am developing more utilities to make data scientists life easier and going to be writing more articles in the future too. This is my github: https://github.com/naresh-datanut/pyspark-easy. Life is more colorful when helping others.

--

--