我一直在尝试在PySpark上做一个简单的随机森林回归模型。我在R上有一个不错的机器学习经验。然而,对我来说,Pyspark上的ML似乎完全不同-特别是在处理分类变量,字符串索引和OneHotEncoding时(当只有数字变量时,我可以通过下面的例子来执行RF回归)。虽然有很多例子可用于处理分类变量,例如this和this,我没有成功地使用它们中的任何一个,因为大多数都超出了我的理解范围(可能是因为我对Python ML不熟悉)。
以下是我的尝试:inputfile is here
from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import Row
from pyspark.sql.functions import col, round
train = sqlContext.read.format('com.databricks.spark.csv').options(header='true',inferschema = "true").load('filename.csv')
train.cache()
train.dtypes
字符串
输出为:
DataFrame[ID: int, Country: string, Carrier: double, TrafficType: string, ClickDate: timestamp, Device: string, Browser: string, OS: string, RefererUrl: string, UserIp: string, ConversionStatus: string, ConversionDate: string, ConversionPayOut: string, publisherId: string, subPublisherId: string, advertiserCampaignId: double, Fraud: double]
型
接下来我选择我感兴趣的变量:
IMP = ["Country","Carrier","TrafficType","Device","Browser","OS","Fraud","ConversionPayOut"]
train = train.fillna("XXX")
train = train.select([column for column in train.columns if column in IMP])
from pyspark.sql.types import DoubleType
train = train.withColumn("ConversionPayOut", train["ConversionPayOut"].cast("double"))
train.cache()
型
输出为:
DataFrame[Country: string, Carrier: double, TrafficType: string, Device: string, Browser: string, OS: string, ConversionPayOut: double, Fraud: double]
型
我的因变量是ConversionPayOut
,以前的字符串类型现在转换为double类型。
从这里开始我的困惑:基于这篇文章,我明白了我必须将我的分类字符串类型变量转换为onehot编码向量。下面是我的尝试:
首先是StringIndexing:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(junk) for column in list(set(junk.columns)-set(['Carrier','ConversionPayOut','Fraud'])) ]
pipeline = Pipeline(stages=indexers)
train_catind = pipeline.fit(train).transform(train)
train_catind.show()
型
StringIndexing的输出:
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+
|Country|Carrier|TrafficType| Device| Browser| OS| ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index|
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+
| TH| 20.0| A| Lava| chrome|Android| 41.6| 0.0| 0.0| 1.0| 0.0| 0.0| 7.0|
| BR| 217.0| A| LG| chrome|Android| 26.2680574| 0.0| 0.0| 2.0| 0.0| 0.0| 5.0|
| TH| 20.0| A|Generic| chrome|Android| 41.6| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0|`
Next, I think, I have to do the OneHOtEncoding of the String Indexes:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
indexers_ON = [OneHotEncoder(inputCol=column, outputCol=column+"_Vec") for column in filter(lambda x: x.endswith('_index'), train_catind.columns) ]
pipeline = Pipeline(stages=indexers_ON)
train_OHE = pipeline.fit(train_catind).transform(train_catind)
train_OHE.show()
型
one-hot编码后的输出看起来像这样:
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+
|Country|Carrier|TrafficType| Device| Browser| OS| ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index|TrafficType_index_Vec|Country_index_Vec|Browser_index_Vec| OS_index_Vec|Device_index_Vec|
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+
| TH| 20.0| A| Lava| chrome|Android| 41.6| 0.0| 0.0| 1.0| 0.0| 0.0| 7.0| (1,[0],[1.0])| (9,[1],[1.0])| (5,[0],[1.0])|(1,[0],[1.0])| (15,[7],[1.0])|
| BR| 217.0| A| LG| chrome|Android| 26.2680574| 0.0| 0.0| 2.0| 0.0| 0.0| 5.0| (1,[0],[1.0])| (9,[2],[1.0])| (5,[0],[1.0])|(1,[0],[1.0])| (15,[5],[1.0])|
| TH| 20.0| A|Generic| chrome|Android| 41.6| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0| (1,[0],[1.0])| (9,[1],[1.0])| (5,[0],[1.0])|(1,[0],[1.0])| (15,[0],[1.0])|
型
我不知道如何继续下去。事实上,我不知道哪些Spark机器学习软件包需要我们做这种独热编码,哪些不需要。
如果StackOverflow社区能够澄清如何前进,对于所有PySpark的新手来说,这将是一个很好的学习。
2条答案
按热度按时间xxslljrj1#
要在预处理的数据上运行随机森林,您可以继续下面的代码。
字符串
希望这有帮助!
mrzz3bfm2#
以下是一个综合示例(数据文件在https://drive.google.com/open?id=1z4YKyqIrLmWY1wNeqGrKVdTGfckqikDt上共享)-
字符串