#imports and setupfrom pyspark.sql import SparkSession
from pyspark.ml.feature import (VectorAssembler, OneHotEncoder, StringIndexer)
from pyspark.ml import Pipeline
from pyspark.ml.classification import (LogisticRegression, RandomForestClassifier, NaiveBayes)
from pyspark.sql.functions import (col, explode, array, lit)
from pyspark.ml.evaluation import (BinaryClassificationEvaluator, MulticlassClassificationEvaluator)
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt
spark = SparkSession.builder.appName('HeartDiseaseClassification').getOrCreate()
#Schema of the tabledf.printSchema()
root
|-- HeartDisease: string (nullable = true)
|-- BMI: double (nullable = true)
|-- Smoking: string (nullable = true)
|-- AlcoholDrinking: string (nullable = true)
|-- Stroke: string (nullable = true)
|-- PhysicalHealth: double (nullable = true)
|-- MentalHealth: double (nullable = true)
|-- DiffWalking: string (nullable = true)
|-- Sex: string (nullable = true)
|-- AgeCategory: string (nullable = true)
|-- Race: string (nullable = true)
|-- Diabetic: string (nullable = true)
|-- PhysicalActivity: string (nullable = true)
|-- GenHealth: string (nullable = true)
|-- SleepTime: double (nullable = true)
|-- Asthma: string (nullable = true)
|-- KidneyDisease: string (nullable = true)
|-- SkinCancer: string (nullable = true)
# stats of numerical variablesdf.select(numerical_cols).describe().show()
+-------+-----------------+------------------+-----------------+-----------------+
|summary| BMI| PhysicalHealth| MentalHealth| SleepTime|
+-------+-----------------+------------------+-----------------+-----------------+
| count| 319795| 319795| 319795| 319795|
| mean|28.32539852092807|3.3717100017198516|3.898366140808956|7.097074688472302|
| stddev|6.356100200470741| 7.95085018257137|7.955235218943606|1.436007060964281|
| min| 12.02| 0.0| 0.0| 1.0|
| max| 94.85| 30.0| 30.0| 24.0|
+-------+-----------------+------------------+-----------------+-----------------+
#spliting df by classesmajor_df = train_df.filter(col(label) == 'No')
minor_df = train_df.filter(col(label) == 'Yes')
#ratio of number observation major vs minor classr = int(major_df.count()/minor_df.count())
# duplicate the minority rowsoversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in range(r)]))).drop('dummy')
# combine both oversampled minority rows and previous majority rows combined_train_df = major_df.unionAll(oversampled_df)
combined_train_df.groupBy(label).count().toPandas().plot.bar(x='HeartDisease', rot=0, title='Number of Observations in Train subset after Oversampling')
Out[164]: <AxesSubplot:title={'center':'Number of Observations in Train subset after Oversampling'}, xlabel='HeartDisease'>
# Indexers for categorical columnsindexers = [StringIndexer(inputCol=col, outputCol=col+'_indexed') for col in categorical_cols]
# Encoders for categorical columnsencoders = [OneHotEncoder(inputCol=col+'_indexed', outputCol=col+'_encoded') for col in categorical_cols]
# Indexer for classification label:label_indexer = StringIndexer(inputCol=label, outputCol=label+'_indexed')
acc_lr = acc_evaluator.evaluate(pred_lr)
acc_rfc = acc_evaluator.evaluate(pred_rfc)
acc_nb = acc_evaluator.evaluate(pred_nb)
print('Logistic Regression accuracy: ', '{:.2f}'.format(acc_lr*100), '%', sep='')
print('Random Forest accuracy: ', '{:.2f}'.format(acc_rfc*100), '%', sep='')
print('Naive Bayes accuracy: ', '{:.2f}'.format(acc_nb*100), '%', sep='')
Logistic Regression accuracy: 76.02%
Random Forest accuracy: 74.45%
Naive Bayes accuracy: 81.55%
def confusion_matrix(pred_df):
preds_labels = pred_df.select(['prediction',label+'_indexed']).withColumn(label+'_indexed', F.col(label+'_indexed').cast(FloatType())).orderBy('prediction')
preds_labels = preds_and_labels.select(['prediction',label+'_indexed'])
metrics = MulticlassMetrics(preds_labels.rdd.map(tuple))
return metrics.confusionMatrix().toArray()
def confusion_matrix_plot(conf_mat, ax, title = 'Confusion Matrix'):
names = ['True Negative','False Positive','False Negative','True Positive']
number = ["{0:0.0f}".format(value) for value in conf_mat.flatten()]
percent = ["{0:.2%}".format(value) for value in conf_mat.flatten()/np.sum(conf_mat)]
labels = [f"{v1}\n\n{v2}\n\n{v3}" for v1, v2, v3 in zip(names, number, percent)]
labels = np.asarray(labels).reshape(2,2)
ax = sns.heatmap(conf_mat, annot=labels, fmt='', cmap='Blues', cbar=False, ax=ax)
ax.set_title(title+'\n');
ax.set_xlabel('\nPredicted Labels')
ax.set_ylabel('Real Labels');
ax.xaxis.set_ticklabels(['No','Yes'])
ax.yaxis.set_ticklabels(['No','Yes'])
return ax
fig, (ax1, ax2, ax3) = plt.subplots(1,3, figsize=(20,5))
ax1 = confusion_matrix_plot(conf_lr, ax1,'Logistic Regression - Confusion Matrix')
ax2 = confusion_matrix_plot(conf_rfc, ax2,'Random Forest Classifier - Confusion Matrix')
ax3 = confusion_matrix_plot(conf_nb, ax3, 'Naive Bayes - Confusion Matrix')
plt.show()
print('Logistic Regression sensitivity: ', (sensitivity(conf_lr)*100).round(2), '%', sep='')
print('Random Forest sensitivity: ', (sensitivity(conf_rfc)*100).round(2), '%', sep='')
print('Naive Bayes sensitivity: ', (sensitivity(conf_nb)*100).round(2), '%', sep='')
Logistic Regression sensitivity: 77.08%
Random Forest sensitivity: 69.15%
Naive Bayes sensitivity: 38.44%
Heart Disease Prediction - A Classification Problem using PySpark
Objective:
Topics and Methods Covered:
Objective of the Analysis: