PySpark上分类输入的随机森林回归

aor9mmx1  于 5个月前  发布在  Spark
关注(0)|答案(2)|浏览(51)

我一直在尝试在PySpark上做一个简单的随机森林回归模型。我在R上有一个不错的机器学习经验。然而,对我来说,Pyspark上的ML似乎完全不同-特别是在处理分类变量,字符串索引和OneHotEncoding时(当只有数字变量时,我可以通过下面的例子来执行RF回归)。虽然有很多例子可用于处理分类变量,例如thisthis,我没有成功地使用它们中的任何一个,因为大多数都超出了我的理解范围(可能是因为我对Python ML不熟悉)。
以下是我的尝试:inputfile is here

from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import Row
from pyspark.sql.functions import col, round
train = sqlContext.read.format('com.databricks.spark.csv').options(header='true',inferschema = "true").load('filename.csv')
train.cache()
train.dtypes

字符串
输出为:

DataFrame[ID: int, Country: string, Carrier: double, TrafficType: string, ClickDate: timestamp, Device: string, Browser: string, OS: string, RefererUrl: string, UserIp: string, ConversionStatus: string, ConversionDate: string, ConversionPayOut: string, publisherId: string, subPublisherId: string, advertiserCampaignId: double, Fraud: double]


接下来我选择我感兴趣的变量:

IMP = ["Country","Carrier","TrafficType","Device","Browser","OS","Fraud","ConversionPayOut"]
train = train.fillna("XXX")
train = train.select([column for column in train.columns if column in IMP])
from pyspark.sql.types import DoubleType
train = train.withColumn("ConversionPayOut", train["ConversionPayOut"].cast("double"))
train.cache()


输出为:

DataFrame[Country: string, Carrier: double, TrafficType: string, Device: string, Browser: string, OS: string, ConversionPayOut: double, Fraud: double]


我的因变量是ConversionPayOut,以前的字符串类型现在转换为double类型。
从这里开始我的困惑:基于这篇文章,我明白了我必须将我的分类字符串类型变量转换为onehot编码向量。下面是我的尝试:
首先是StringIndexing:

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(junk) for column in list(set(junk.columns)-set(['Carrier','ConversionPayOut','Fraud'])) ]
pipeline = Pipeline(stages=indexers)
train_catind = pipeline.fit(train).transform(train)
train_catind.show()


StringIndexing的输出:

+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+
|Country|Carrier|TrafficType| Device|       Browser|     OS|  ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index|
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+
|     TH|   20.0|          A|   Lava|        chrome|Android|              41.6|  0.0|              0.0|          1.0|          0.0|     0.0|         7.0|
|     BR|  217.0|          A|     LG|        chrome|Android|        26.2680574|  0.0|              0.0|          2.0|          0.0|     0.0|         5.0|
|     TH|   20.0|          A|Generic|        chrome|Android|              41.6|  0.0|              0.0|          1.0|          0.0|     0.0|         0.0|`

Next, I think, I have to do the OneHOtEncoding of the String Indexes:

from pyspark.ml.feature import OneHotEncoder, StringIndexer
indexers_ON = [OneHotEncoder(inputCol=column, outputCol=column+"_Vec") for column in filter(lambda x: x.endswith('_index'), train_catind.columns) ]
pipeline = Pipeline(stages=indexers_ON)
train_OHE = pipeline.fit(train_catind).transform(train_catind)
train_OHE.show()


one-hot编码后的输出看起来像这样:

+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+
|Country|Carrier|TrafficType| Device|       Browser|     OS|  ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index|TrafficType_index_Vec|Country_index_Vec|Browser_index_Vec| OS_index_Vec|Device_index_Vec|
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+
|     TH|   20.0|          A|   Lava|        chrome|Android|              41.6|  0.0|              0.0|          1.0|          0.0|     0.0|         7.0|        (1,[0],[1.0])|    (9,[1],[1.0])|    (5,[0],[1.0])|(1,[0],[1.0])|  (15,[7],[1.0])|
|     BR|  217.0|          A|     LG|        chrome|Android|        26.2680574|  0.0|              0.0|          2.0|          0.0|     0.0|         5.0|        (1,[0],[1.0])|    (9,[2],[1.0])|    (5,[0],[1.0])|(1,[0],[1.0])|  (15,[5],[1.0])|
|     TH|   20.0|          A|Generic|        chrome|Android|              41.6|  0.0|              0.0|          1.0|          0.0|     0.0|         0.0|        (1,[0],[1.0])|    (9,[1],[1.0])|    (5,[0],[1.0])|(1,[0],[1.0])|  (15,[0],[1.0])|


我不知道如何继续下去。事实上,我不知道哪些Spark机器学习软件包需要我们做这种独热编码,哪些不需要。
如果StackOverflow社区能够澄清如何前进,对于所有PySpark的新手来说,这将是一个很好的学习。

xxslljrj

xxslljrj1#

要在预处理的数据上运行随机森林,您可以继续下面的代码。

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

#use VectorAssembler to combine all the feature columns into a single vector column
assemblerInputs = ["Carrier","Fraud","Country_index_Vec","TrafficType_index_Vec","Device_index_Vec","Browser_index_Vec","OS_index_Vec"]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
pipeline = Pipeline(stages=assembler)
df = pipeline.fit(train_OHE).transform(train_OHE)
df = df.withColumn("label", train_OHE.ConversionPayOut)

#randomly split data into training and test dataset
(train_data, test_data) = df.randomSplit([0.7, 0.3], seed = 111)

# train RandomForest model
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
rf_model = rf.fit(train_data)

# Make predictions on test data
predictions = rf_model.transform(test_data)

字符串
希望这有帮助!

mrzz3bfm

mrzz3bfm2#

以下是一个综合示例(数据文件在https://drive.google.com/open?id=1z4YKyqIrLmWY1wNeqGrKVdTGfckqikDt上共享)-

package com.nik.spark.ml.examples.regression.randomForest

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.sql.SparkSession
import scala.Range
import org.apache.spark.ml.classification.RandomForestClassifier

object RandomForestDemo {

  def main(args: Array[String]) {
    // Optional: Use the following code below to set the Error reporting
    import org.apache.log4j._
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Spark Session
    val spark = SparkSession.builder().master("local[*]").getOrCreate()

    // Use Spark to read in the Titanic csv file.
    val data = spark.read.option("header", "true").option("inferSchema", "true").format("csv").load("adult-training.csv")

    // Print the Schema of the DataFrame
    data.printSchema()

    ///////////////////////
    /// Display Data /////
    /////////////////////
    val colnames = data.columns
    val firstrow = data.head(1)(0)
    println("\n")
    println("Example Data Row")
    for (ind <- Range(1, colnames.length)) {
      println(colnames(ind))
      println(firstrow(ind))
      println("\n")
    }

    ////////////////////////////////////////////////////
    //// Setting Up DataFrame for Machine Learning ////
    //////////////////////////////////////////////////
    import spark.implicits._
    // Grab only the columns we want
    val logregdataall = data.select($"income", $"workclass", $"fnlwgt", $"education", $"education-num", $"marital-status", $"occupation", $"relationship", $"race", $"sex", $"capital-gain", $"capital-loss", $"hours-per-week", $"native-country")
    val logregdata = logregdataall.na.drop()

    // A few things we need to do before Spark can accept the data!
    // Convert categorical columns into a binary vector using one hot encoder
    // We need to deal with the Categorical columns

    // Import VectorAssembler and Vectors
    import org.apache.spark.ml.feature.{ VectorAssembler, StringIndexer, VectorIndexer, OneHotEncoder }
    import org.apache.spark.ml.linalg.Vectors

    // Deal with Categorical Columns
    // Transform string type columns to string indexer 
    val workclassIndexer = new StringIndexer().setInputCol("workclass").setOutputCol("workclassIndex")
    val educationIndexer = new StringIndexer().setInputCol("education").setOutputCol("educationIndex")
    val maritalStatusIndexer = new StringIndexer().setInputCol("marital-status").setOutputCol("maritalStatusIndex")
    val occupationIndexer = new StringIndexer().setInputCol("occupation").setOutputCol("occupationIndex")
    val relationshipIndexer = new StringIndexer().setInputCol("relationship").setOutputCol("relationshipIndex")
    val raceIndexer = new StringIndexer().setInputCol("race").setOutputCol("raceIndex")
    val sexIndexer = new StringIndexer().setInputCol("sex").setOutputCol("sexIndex")
    val nativeCountryIndexer = new StringIndexer().setInputCol("native-country").setOutputCol("nativeCountryIndex")
    val incomeIndexer = new StringIndexer().setInputCol("income").setOutputCol("incomeIndex")

    // Transform string type columns to string indexer 
    val workclassEncoder = new OneHotEncoder().setInputCol("workclassIndex").setOutputCol("workclassVec")
    val educationEncoder = new OneHotEncoder().setInputCol("educationIndex").setOutputCol("educationVec")
    val maritalStatusEncoder = new OneHotEncoder().setInputCol("maritalStatusIndex").setOutputCol("maritalVec")
    val occupationEncoder = new OneHotEncoder().setInputCol("occupationIndex").setOutputCol("occupationVec")
    val relationshipEncoder = new OneHotEncoder().setInputCol("relationshipIndex").setOutputCol("relationshipVec")
    val raceEncoder = new OneHotEncoder().setInputCol("raceIndex").setOutputCol("raceVec")
    val sexEncoder = new OneHotEncoder().setInputCol("sexIndex").setOutputCol("sexVec")
    val nativeCountryEncoder = new OneHotEncoder().setInputCol("nativeCountryIndex").setOutputCol("nativeCountryVec")
    val incomeEncoder = new StringIndexer().setInputCol("incomeIndex").setOutputCol("label")

    // Assemble everything together to be ("label","features") format
  /*  val assembler = (new VectorAssembler()
      .setInputCols(Array("workclassVec", "fnlwgt", "educationVec", "education-num", "maritalVec", "occupationVec", "relationshipVec", "raceVec", "sexVec", "capital-gain", "capital-loss", "hours-per-week", "nativeCountryVec"))
      .setOutputCol("features"))*/
  val assembler = (new VectorAssembler()
      .setInputCols(Array("workclass", "education", "marital-status", "occupation", "relationship", "race", "sex", "native-country", "income"))
      .setOutputCol("features"))
    ////////////////////////////
    /// Split the Data ////////
    //////////////////////////
    val Array(training, test) = logregdata.randomSplit(Array(0.7, 0.3), seed = 12345)

    ///////////////////////////////
    // Set Up the Pipeline ///////
    /////////////////////////////
    import org.apache.spark.ml.Pipeline

    val lr = new RandomForestClassifier().setNumTrees(10)

    //val pipeline = new Pipeline().setStages(Array(workclassIndexer, educationIndexer, maritalStatusIndexer, occupationIndexer, relationshipIndexer, raceIndexer, sexIndexer, nativeCountryIndexer, incomeIndexer, workclassEncoder, educationEncoder, maritalStatusEncoder, occupationEncoder, relationshipEncoder, raceEncoder, sexEncoder, nativeCountryEncoder, incomeEncoder, assembler, lr))
    val pipeline = new Pipeline().setStages(Array(assembler, lr))

    // Fit the pipeline to training documents.
    val model = pipeline.fit(training)
    // Get Results on Test Set
    val results = model.transform(test)

    ////////////////////////////////////
    //// MODEL EVALUATION /////////////
    //////////////////////////////////
    println("schema")
    println(results.select($"label").distinct().foreach { x => println(x) })

    // For Metrics and Evaluation
    import org.apache.spark.mllib.evaluation.MulticlassMetrics

    // Need to convert to RDD to use this
    val predictionAndLabels = results.select($"prediction", $"label").as[(Double, Double)].rdd

    // Instantiate metrics object
    val metrics = new MulticlassMetrics(predictionAndLabels)

    // Confusion matrix
    println("Confusion matrix:")
    println(metrics.confusionMatrix)
    println(metrics.accuracy)
  }
}

字符串

相关问题