# This code comes with the Hands-On for the Introduction to Spark # - some examples are adaptions from Advanced Analytics with Spark (Sandy Ryza et al.) # - some examples are adaptions form the Databricks API examples # # AI and Predictive Analytics in Datacenter Environments # http://dcai.bsc.es # Install numpy, if not installed before # sudo apt-get install python-pip # pip install --upgrade pip # pip install numpy ## SPARK ML EXAMPLES # Vectors from pyspark.ml.linalg import Vectors vs = Vectors.sparse(4, [(0, 1.0), (3, -2.0)]) vd = Vectors.dense([1.0, 0.0, 0.0, -2.0]) # Linear Regression from pyspark.ml.regression import LinearRegression from pyspark.ml.regression import LinearRegressionModel from pyspark.ml.evaluation import RegressionEvaluator dataset = spark.read.format("libsvm").load("/home/vagrant/spark/data/mllib/sample_linear_regression_data.txt") splits = dataset.randomSplit((0.6, 0.4), seed = 123) splits training = splits[0] test = splits[1] training.take(5) training.limit(5).show() a = training.select("features").collect() a[0] a[0][0] lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8) lrModel = lr.fit(training) print("Coefficients: %s" % str(lrModel.coefficients)) print("Intercept: %s" % str(lrModel.intercept)) trainingSummary = lrModel.summary print("numIterations: %d" % trainingSummary.totalIterations) print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory)) trainingSummary.residuals.show() print("RMSE: %f" % trainingSummary.rootMeanSquaredError) print("r2: %f" % trainingSummary.r2) predictions = lrModel.transform(test) predictions.show() predictionAndLabels = predictions.select("prediction", "label") evaluator = RegressionEvaluator(metricName = "mse") print("Test set accuracy: %f" % evaluator.evaluate(predictionAndLabels)) lrModel.write().overwrite().save("LR_Model") sameLrModel = LinearRegressionModel.load("LR_Model") # Support Vector Machines from pyspark.ml.classification import LinearSVC from pyspark.ml.classification import LinearSVCModel from pyspark.ml.evaluation import MulticlassClassificationEvaluator dataset = spark.read.format("libsvm").load("/home/vagrant/spark/data/mllib/sample_libsvm_data.txt") splits = dataset.randomSplit((0.6, 0.4), seed = 123) training = splits[0] test = splits[1] lsvc = LinearSVC(maxIter=10, regParam=0.1) lsvcModel = lsvc.fit(training) print("Coefficients: " + str(lsvcModel.coefficients)) print("Intercept: " + str(lsvcModel.intercept)) predictions = lsvcModel.transform(test) predictions.show() predictionAndLabels = predictions.select("prediction", "label") evaluator = MulticlassClassificationEvaluator(metricName = "accuracy") print("Test set accuracy: %f" % evaluator.evaluate(predictionAndLabels)) lsvcModel.write().overwrite().save("SVM_SGD_Model") sameLsvcModel = LinearSVCModel.load("SVM_SGD_Model") # Clustering from pyspark.ml.clustering import KMeans from pyspark.ml.evaluation import ClusteringEvaluator dataset = spark.read.format("libsvm").load("/home/vagrant/spark/data/mllib/sample_kmeans_data.txt") kmeans = KMeans().setK(2).setSeed(1) modelKMeans = kmeans.fit(dataset) predictions = modelKMeans.transform(dataset) evaluator = ClusteringEvaluator() silhouette = evaluator.evaluate(predictions) print("Silhouette with squared euclidean distance = " + str(silhouette)) centers = modelKMeans.clusterCenters() print("Cluster Centers: ") for center in centers: print(center) ## VectorAssembler from pyspark.ml.feature import VectorAssembler from pyspark.ml.linalg import Vectors df = spark.read.csv("/home/vagrant/hus/ss13husa.csv", header=True, mode="DROPMALFORMED", inferSchema = True) slice1 = df.select("SERIALNO","PUMA","DIVISION").limit(10) assembler = VectorAssembler(inputCols = ["SERIALNO", "PUMA", "DIVISION"], outputCol = "features") output = assembler.transform(slice1) output.select("features").show() output.select("features", "PUMA").withColumnRenamed("PUMA", "label").show()