Saturday, August 4, 2018

Assignment on graph processing using GraphX in Apache Spark

  1. Generate a un-directional graph RDD from a given graph data.
  2. Compute listed vertex-based similarity measures for all the pairs of nodes in label data file. These similarity measures are computed between two nodes by utilizing neighborhood and/or node information of both nodes. Common neighbors Jaccard coefficient Adamic/Adar Preferential Attachment
  3. Bonus Question: Link Prediction Model Using the measures generated from the graph and labels from the labeled data to predict the possibility of new link formation. Please use the following steps. 1. Create a dataset by combining measures ( as features) and class labels from the labeled data. 2. Use decision tree based algorithm in SparkML to train the prediction model. 3. Split the dataset to generate the training data and testing data. 4. Use training data to build model and testing data to evaluate the model. 5. Present the model performance metrics: Accuracy, Recall, and Precision.
  1. Please follow the program submission instructions ( same as the previous assignments)
  2. Must use spark and GraphX for generating measures and use SparkML for bonus questions.
  3. More explanation on above graph measures here (
Graph data:
Use this link ( dl=0) to download graph data and file is formatted as shown below:

Label data:
click here (h ?dl=0) to download label data

package com.assign.graphEdge

// Import for first graph
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD.numericRDDToDoubleRDDFunctions
//Import for Common neighbor
import ml.sparkling.graph.operators.OperatorsDSL._
import org.apache.spark.SparkContext
import ml.sparkling.graph.operators.measures.edge.AdamicAdar
import org.apache.spark.graphx.Graph
import ml.sparkling.graph.operators.measures.edge.{AdamicAdar, CommonNeighbours}
import ml.sparkling.graph.operators.measures.edge.CommonNeighbours
//Import for spark ML
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.classification.{NaiveBayes, NaiveBayesModel}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.LinearRegressionModel
import org.apache.spark.mllib.regression.LinearRegressionWithSGD
import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.sql.DataFrame
import{OneHotEncoder, StringIndexer}
import org.apache.spark.sql.SparkSession
import java.lang.Long;
import org.apache.spark.graphx._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.evaluation.MulticlassMetrics

object GraphPlot {

def main(args: Array[String]){

val jobName = "siteRequest"
val conf = new SparkConf().setAppName(jobName).setMaster("local[*]").set("spark.executor.memory","5g")
val sc =  SparkContext.getOrCreate(conf)

//Question 1
val RDData = sc.textFile("/home/deola/workspace/siteRequest/data/graph_1965_1969.csv").cache();

val PreprocessedRDD =",")).cache();

The first line of action is creating the graph from the graph data-set and to do this we need to select the vertex and edges which will then be used to generate the graph. Firstly we select the nodes from each row of the data and combine them together to form a single list of nodes. After that we select the next three values of the data-set from each row to create the edges

val Node1Vertex =>(line(0).drop(1).toLong,(line(0)))).distinct()
val Node2Vertex =>(line(1).drop(1).toLong,(line(4)))).distinct()
val completeVertex = Node1Vertex.union(Node2Vertex).distinct
val EdgesRDD =>(Edge(line(0).drop(1).toLong,line(1).drop(1).toLong,line(2)))).distinct()

val graph=Graph(completeVertex,EdgesRDD).persist().cache();
        val neigh =graph.collectNeighborIds(EdgeDirection.Either)
        val broadcastVar = sc.broadcast(neigh.collect())
val graph_result = graph.edges.take(20)  
val output3 = graph.vertices.take(20)
//The graph vertices and edges results

//Question 2
val RDDataLabel = sc.textFile("/home/deola/workspace/siteRequest/data/labeled_1965_1969_1970_1974.csv").cache();
val PreprocessedRDDLabelRaw =",")).cache();
val labelDataToRdd = sc.broadcast(PreprocessedRDDLabelRaw.collect())

//val output3 = broadcastVar.take(2)
//val output4 = PreprocessedRDDLabelRaw.take(2)

    val r_rdd = PreprocessedRDDLabelRaw.mapPartitions(rows => {

      val nvalues = broadcastVar.value.toMap

A for loop that compares the graph and label data-sets and selects the common neighbors between them and uses the nodes with the common neighbors to generate the similarity measure>{
                val n1 = row(0).drop(1).toLong
                val n2 = row(1).drop(1).toLong

                val n1_neigh =nvalues(n1)
                val n2_neigh =nvalues(n2)

                //Number of common neigbors
                val common_neig = n1_neigh.intersect(n2_neigh).length
                //compute Preferential
                val pre_attch = n1_neigh.length*n2_neigh.length
                val x = n1_neigh.intersect(n2_neigh).size/n1_neigh.union(n2_neigh).size.toDouble




//Question 3
 //val r_rdd1 = sc.parallelize(r_rdd)
  val r_rdd1= r_rdd.take(100)
  val r_rdd2= sc.parallelize(r_rdd1)

This section of the code separates the data into the vector and label(class/predicted) section. The data has to be converted into label points where the label is the predicted array based on the given data and the vector is the other section of the array that holds the evaluated rows used for prediction

  val parsedData ={x => 
    val parts1 = Array(x._1, x._2, x._3, x._4)
    val parts2 = if (x._5 > 0.05) 1 else 0

        LabeledPoint(parts2.toDouble, Vectors.dense(
  val result_U = parsedData.take(100).foreach(println)

  val Array(trainingDataRDD, testDataRDD) = parsedData.randomSplit(Array(0.7, 0.3))
      //val numIterations = 100
     // val stepSize = 0.00000001
     // val model = NaiveBayes.train(trainingDataRDD, numIterations )
//val logisticregression = new LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
    //  use logistic regression to train the model
//val model =  
    //val model = NaiveBayes.train(trainingDataRDD, lambda = 1.0)
    val numClasses = 2
    val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "gini"
val maxDepth = 5
val maxBins = 32

Using decision tree we train the model using 70% of the data and make predictions with the remaining 30% test data

val model = DecisionTree.trainClassifier(trainingDataRDD, numClasses, categoricalFeaturesInfo,impurity, maxDepth, maxBins)
    val predictions = => (model.predict(p.features), p.label))
     //val bb1 = predictions.take(5)
     //val bb1r = trainingDataRDD.take(5)
      //val bb1y = testDataRDD.take(5)
   val accuracy = 100.0 * predictions.filter(x => x._1 == x._2).count() / (testDataRDD.count())
    val positively_Predicted = predictions.filter(x => x._1 == x._2).count()
    val total_test_dataset = testDataRDD.count()
    val total_training_dataset = trainingDataRDD.count()

This section prints the accuracy and the correctly classified data

    println("Total Positive Prediction Correctly = " + positively_Predicted)
    println("Total test data = " + total_test_dataset)
    println("Total train data = " + total_training_dataset)
    println("Accuracy = " + accuracy)

    //val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
    //val lrmodel =
    //val data = MLUtils.loadLibSVMFile(sc, "/home/deola/workspace/siteRequest/data/graph_1965_1969.csv")

     // Get evaluation metrics.
    //val metrics = new BinaryClassificationMetrics(predictions)
    //val precision_value = metrics.precisionByThreshold()
    //val recall_value = metrics.recallByThreshold()
    //val auROC = metrics.areaUnderROC()

    //println("Area under ROC = " + auROC)
    val metrics = new MulticlassMetrics( => (x._1,x._2)))
    val precision_1 =  metrics.precision
     val accuracy_1 = metrics.accuracy
     val recall_1 = metrics.recall
     println("Precision = " + precision_1,"Accuracy = " + accuracy_1,"Recall = " + recall_1)

  Precision and Recall with different threshold values 
      val metrics2 = new BinaryClassificationMetrics(predictions)

// Precision by threshold
val precision = metrics2.precisionByThreshold
precision.foreach { case (t, p) =>
  println(s"Threshold: $t, Precision: $p")

// Recall by threshold
val recall = metrics2.recallByThreshold
recall.foreach { case (t, r) =>
  println(s"Threshold: $t, Recall: $r")



No comments:

Post a Comment

How To Upgrade (Flash) Linksys' WRT54G/GL/GS Firmware to Tomato Firmware For IP Address and Bandwidth Monitoring

As a System Administrator one is usually faced with the challenge of providing the best possible technology solution within the confinem...