Now that we know how to create a simple linear regression model both in bare bigDL and with PipelineML, let's move to the slightly more elaborate logistic regression. This example will help us to illustrate how networks with more than one layer are constructed in bigDL, as well as serve as further practice.
Because at the moment of writing this guide the RDD API is being deprecated and bare bigDL requires RDD manipulation we will use from now on the ML Pipeline API to train and test our models.
For this exercise we will use the well known Iris dataset, hosted at the UCI Machine Learning Repository. In order to pull the data directly from the repository you can use sc.addFile()
(we can do this because the file is quite small, avoid doing this for large files) and then it can be loaded into a DataFrame
with spark.read.format()
:
from pyspark import SparkFiles
from pyspark.sql.types import StructType, StructField, FloatType, StringType
from pyspark import SparkContext
from bigdl.util.common import *
from pyspark.sql import SparkSession
# Create Spark Session
sc = SparkContext.getOrCreate(conf=create_spark_conf().setMaster("local[4]").set("spark.driver.memory","10g"))
spark = SparkSession.builder.getOrCreate()
# Set a seed for RNG repeatability
seed = 2019
sc.addFile("http://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data")
# Define te amount of features and classes for later use
num_features = 4
num_classes = 3
# Declare the DataFrame schema
iris_schema = StructType([
StructField('sepal_length', FloatType(), True),
StructField('sepal_width', FloatType(), True),
StructField('petal_length', FloatType(), True),
StructField('petal_width', FloatType(), True),
StructField('class', StringType(), True)])
# Have to add the format change to avoid an error
iris_raw = spark.read.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ") .format("csv")\
.schema(iris_schema)\
.load(SparkFiles.get("iris.data"))
print('Number of rows: {}'.format(iris_raw.count()))
iris_raw.show(5)
iris_raw.printSchema()
Because the file we pulled from the UCI repository did not include the column headers we had to manually specify them by creating the DataFrame
schema (following the dataset description provided by UCI) and passing it to spark.read.format
.
You can see that all labels in columnclass
are preceded by Iris-
. This prefix does not add any kind of information, so although it is not strictly necessary let's get rid of it to reduce some clutter:
import pyspark.sql.functions as f
iris_df = iris_raw.withColumn('class', f.split(f.col('class'), '-')[1])
iris_df.show(5)
Do not forget to start the BigDL engine before running any BigDL code:
from bigdl.util.common import init_engine
init_engine()
Strings in a DataFrame
column can be split with the split(column, separator)
function, passing the corresponding columns and separator. split
returns an array with all the tokens it finds.
Let's take a quick look to what the data looks like:
import seaborn as sns
sns.pairplot(iris_df.toPandas(), hue="class");
Out of the three possible classes, you can see that setosa
stands out as it is particularly well separated from the other to, who appear more entangled in most scatter plots. We expect then our classifier to do a quite good job on distinugishing setosa
and an OK one for the other versicolor
and virginica
.
We will seize the fact that we read the data directly into DataFrame
format to make all the necessary data transformations using Spark ML and then take the DataFrame
's internal RDD
out when necessary.
The first step is to convert all categorical values to numerical. Clasifiers in BigDL only take float
values both for their labels and features, so we need to turn the strings in the class
columns into float
s. The second step is to assemble all feature columns into a vector as we already saw in the previous notebook:
Important: BigDL uses 1-indexed values for classification labels (Due to its Torch heritage). This means that for a binary classification problem, it expects labels to take values 1.0
and 2.0
, rather than 0.0
and 1.0
. Providing the wrong labels will cause the optimizer to crash when trying to train the model
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
# String Indexer
indexer = StringIndexer(inputCol='class', outputCol='label')
# Vector Assembler
assembler = VectorAssembler(inputCols=iris_df.columns[:num_features], outputCol='features')
# Define a SparkML pipeline for the preprocessing:
iris_prepped = Pipeline(stages = [indexer, assembler])\
.fit(iris_df)\
.transform(iris_df)\
.withColumn('label', f.col('label')+1)
iris_prepped.show(5)
iris_prepped.printSchema()
We have used a Spark ML pipeline to build all preprocessing steps into a single instruction.
Let's now define our logistic regression model using the BigDL modelling framework. In order to build a logistic regression we need two layers: a linear layer and a LogSoftMax activation layer. As we saw in the previous notebook BigDL offers two different APIs for defining models, the Sequential API and the Functional API. For illustration purposes we will define a model in each one of them:
With more than one layer we need to create a container to put them in. The simplest flavour is the Sequential
container, that simply stacks the layers in the order they were added to it:
from bigdl.nn.layer import Sequential, Linear, LogSoftMax
lr_seq = Sequential()
lr_seq.add(Linear(num_features, num_classes))
lr_seq.add(LogSoftMax());
print(lr_seq)
In the Functional API models are defined as if layers were directed edges of a graph connecting its input and output:
from bigdl.nn.layer import Model
lin = Linear(num_features, num_classes)()
lsm = LogSoftMax()(lin)
lr_fun = Model(lin, lsm)
for l in lr_fun.layers:
print(l)
To connect two layers simply pass the first layer as a parameter to the second one.
As we already saw in the Linear Regression notebook, bigDL models require their input to be represented as a RDD of Sample
:
Important: Remember that the BigDL team instructs to only use from_ndarray
to build Sample
s out of Numpy arrays in python.
from bigdl.util.common import Sample
import numpy as np
iris_rdd = iris_prepped.select(['features', 'label'])\
.rdd.map(lambda x : Sample.from_ndarray(np.array(x[0]), np.array(x[1])))
iris_rdd.take(5)
As usual, we split the data into training and testing datasets
train_test_rate = 0.7
train_rdd, test_rdd = iris_rdd.randomSplit([train_test_rate, 1-train_test_rate], seed)
print("Train observations: {}".format(train_rdd.count()))
print("Test observations: {}".format(test_rdd.count()))
Once the layout of the network has been defined, we need to setup the optimizer that will fit the network to the training data, as well as the training hyperparameters:
Now that the data is ready we need to define the Optimizer
and associated hyperparameters. When dealing for classification, the Negative Log Likelihood (NLL) is the optimization method of choice:
from bigdl.optim.optimizer import *
from bigdl.nn.criterion import *
from bigdl.optim.optimizer import *
batch_size = 16
epochs = 20
learning_rate = 0.05
optimizer = Optimizer(
model=lr_seq,
training_rdd=train_rdd,
criterion=ClassNLLCriterion(),
end_trigger=MaxEpoch(epochs),
optim_method=SGD(learningrate=learning_rate),
batch_size=batch_size)
Because we also want to visualize the training process, let's also define a validator and logging. Loss()
defaults to ClassNLLCriterion
so we do not need to specify it for this case:
# Define a validator for the Optimizer
optimizer.set_validation(
batch_size=batch_size,
val_rdd=test_rdd,
trigger=EveryEpoch(),
val_method=[Loss()]
)
# Define the Logs and pass them to the Optimizer
import datetime as dt
log_dir = '/tmp/bigdl_summaries'
app_name = 'iris-logreg-' + dt.datetime.now().strftime("%Y%m%d-%H%M%S")
# Create the train and validation summaries
train_summary = TrainSummary(log_dir=log_dir, app_name=app_name)
val_summary = ValidationSummary(log_dir=log_dir, app_name=app_name)
# Pass them to the optimizer
optimizer.set_train_summary(train_summary)
optimizer.set_val_summary(val_summary)
print("Logs saved to: {}/{}".format(log_dir, app_name))
And finally train the linear regression model:
optimizer.optimize();
In order to validate the model, we can take a look at the training and validation logs:
import matplotlib.pyplot as plt
loss_train = np.array(train_summary.read_scalar("Loss"))
loss_test = np.array(val_summary.read_scalar("Loss"))
plt.figure(figsize = (12,12))
plt.subplot(2,1,1)
plt.plot(loss_train[:,0],loss_train[:,1],label='Training loss')
plt.xlim(0,loss_train.shape[0]+10)
plt.grid(True)
plt.title("Training loss")
plt.subplot(2,1,2)
plt.plot(loss_test[:,0],loss_test[:,1],label='Test loss')
plt.xlim(0,loss_train.shape[0]+10)
plt.title("Test Loss")
plt.grid(True)
And also evaluate the trained model against the test data:
test_results = lr_seq.evaluate(test_rdd, batch_size, [Loss()])
print(test_results[0])
As usual, you can access the model coefficients using get_weights()
on the model:
print("Coefficients:\n{}".format(lr_seq.get_weights()[0]))
print("Intercept:\n{}".format(lr_seq.get_weights()[1]))
A particularly effective way to visualize classifier performance is to use a confusion matrix. For that purpose we will use the utilities provided by sklearn
:
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score
import pandas as pd
# Remember that labels are 1-indexed in BigDL:
y_pred = np.array(lr_seq.predict_class(test_rdd).collect())-1
y_label = np.array([s.label.to_ndarray()[0] - 1 for s in test_rdd.collect()])
acc = accuracy_score(y_label, y_pred)
print("The prediction accuracy is %.2f%%"%(acc*100))
cm = confusion_matrix(y_label, y_pred)
df_cm = pd.DataFrame(cm)
plt.figure(figsize = (10,8))
sns.heatmap(df_cm, annot=True,fmt='d');
We already transformed the data into the appropriate DataFrame
format in the Preprocessing section, so we only need to split it into training and validation:
train_df, test_df = iris_prepped.randomSplit([train_test_rate, 1-train_test_rate], seed)
print("Train observations: {}".format(train_df.count()))
print("Test observations: {}".format(test_df.count()))
Let's now define the logistic regression model. This model is composed of two layers, a linear layer and a LogSoftMax layer. In order to be able to put the layers together we will create a container
For classification tasks instead of using DLEstimator
we need to use DLClassifier
to wrap the BigDL model. Recall that the DLEstimator
and DLClassifier
constructor does not takes values for the batch size, stopping trigger or learning rate, so we need to specify them afterwards. We will use the Functional model we already created:
from bigdl.dlframes.dl_classifier import DLClassifier
lr_ml = DLClassifier(lr_fun, ClassNLLCriterion(), feature_size=[num_features])\
.setBatchSize(batch_size)\
.setMaxEpoch(epochs)\
.setLearningRate(learning_rate)
To train the model, simply call fit()
passing it the training data:
lr_model = lr_ml.fit(train_df)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator=MulticlassClassificationEvaluator(predictionCol='prediction')
pred_test = lr_model.transform(test_df)\
.select('label', 'prediction')\
.withColumn('label', f.col('label')-1)\
.withColumn('prediction', f.col('prediction')-1)
acc = evaluator.evaluate(pred_test, {evaluator.metricName: "accuracy"})
print("The prediction accuracy is %.2f%%"%(acc*100))
# Remember that labels are 1-indexed in BigDL:
cm = confusion_matrix(pred_test.select('label').collect(), pred_test.select('prediction').collect())
df_cm = pd.DataFrame(cm)
plt.figure(figsize = (10,8))
sns.heatmap(df_cm, annot=True,fmt='d');
Copyright © Barcelona Supercomputing Center, 2019-2020 - All Rights Reserved - AI in DataCenters