pyspark学习之

您所在的位置:网站首页 流水线csdn pyspark学习之

pyspark学习之

2024-06-02 09:40:38| 来源: 网络整理| 查看: 265

目录 一、流水线Pipeline概念二、流水线工作流程2.1 训练过程2.2 测试过程 三、Estimator, Transformer, Param实例四、Pipeline实例

一、流水线Pipeline概念

       spark的流水线受 scikit-learn项目的启发,是对流水线式工作的一种高度抽象,通常可以包含多个机器学习流程,如:源数据ETL、数据预处理工作、指标提取、模型训练、模型验证、预测新数据等多个步骤。包含以下几个步骤:        1) DataFrame:spark.ml使用的数据格式是DataFrame,该格式可以容纳多种数据类型。DataFrame的列可以用来存储:文本,特征向量、真是标签、预测值等。(如果用过pandas,那么也可以将其类比于pandas的DataFrame)        2) Transformer: Transformer是一个抽象的概念,包含了 若干特征转换和学得模型。Transformer实现了transform()方法,该方法可以将一个DataFrame转换为另一个DataFrame。比如在一个DataFrame后面增加几个列,就属于Transformer;再比如通过一个模型为一个DataFrame增加标签,那么这个模型也属于Transformer。        3) Estimator:Estimator是一个学习算法或者算法拟合或者训练数据的一个抽象概念(有点绕,翻译的不好)。可以这么理解:由于Estimator实现了fit()方法,所以只要输入是DataFrame,然后产生一个模型,那么就可以将其理解为Estimator。比如说逻辑回归算法拟合某个DataFrame的数据,就会生成一个 LogisticRegressionModel,所以一个逻辑回归算法就是Estimator。Estimator同时也是Transformer。        4)Pipeline:将多个Transfomer和Estimator连接在一起构成一个机器学习工作流。        5) Parameter:所有Transformers和 Estimators共享用于指定参数的公共API。

二、流水线工作流程

       一个Pipeline由若干个阶段( stage)组成,每个阶段要么是Transformer要么是Estimator。上一个阶段的输出是下一个阶段的输入,按顺序执行各给阶段,输入的DataFrame每经过一个阶段就会被转换一次:        1)Transformer阶段通过 transform()执行DataFrame的转换;        2)Estimator阶段先通过fit()方法转换为Transformer,转换后的这个Transformer再执行transform()方法将输入的DataFrame进行转换

2.1 训练过程

在这里插入图片描述        上面这张图是一个处理文本的Pipeline训练示例:        1)图中上面那一行表示有三个阶段:Tokenizer(Transformer),HashingTF(Transformer),Logistic Regression(Estimator),两个Transformer,一个Estimator。        2)图中下面那一行表示Pipeline中数据的流向,其中每一个圆柱体表示一个DataFrame。当pipeline调用fit()方法时,从初始的DataFrame开始进行操作:               a)Tokenizer.transform()将原始文本(raw text)转换为单词(words),在原始的DataFrame中新增单词列;               b)HashingTF.transform()将单词转为特征向量(feature vectors),将这个特征向量新增到上一步产生的DataFrame中               c)由于Logistic Regression 是一个Estimator,所以调用先LogisticRegression.fit() 生成一个 LogisticRegressionModel,然后这个 LogisticRegressionModel再调用transform()方法来对输入的DataFrame进行转换。

2.2 测试过程

       训练的过程做了简要介绍,接下来再对测试过程进行讲解。

在这里插入图片描述        在训练阶段,构建的流水线Pipeline调用fit()后生成一个 PipelineModel,这个 PipelineModel是一个Transformer。当要使用测试数据进行测试时,让训练阶段生成的PipelineModel调用transform()方法。上图是利用这个PipelineModel进行测试的过程,可以看到测试过程和预测过程是一样的流程,拥有相同的阶段数量,只是将测试阶段LogisticRegression这个Estimator变成了Transformer。

三、Estimator, Transformer, Param实例 # 1.导包,利用逻辑回归来演示 from pyspark.ml.linalg import Vectors from pyspark.ml.classification import LogisticRegression # 2.构造训练数据,列表形式,其数据为元组:(label, features) training = spark.createDataFrame( [(1.0, Vectors.dense([0.0, 1.1, 0.1])), (0.0, Vectors.dense([2.0, 1.0, -1.0])), (0.0, Vectors.dense([2.0, 1.3, 1.0])), (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"]) # 3.创建LogisticRegression实例(迭代10次,正则化参数为0.01),该实例是一个Estimator lr = LogisticRegression(maxIter=10,regParam=0.01) # 查看该实例信息,会打印出具体信息,内容太多,就不在这里展示了 print("logisticRegression parameters:\n" + lr.explainParams() +"\n") # 4.训练模型:调用fit()方法,生成一个LogisticRegressionModel model1 = lr.fit(training) # 通过模型的extractParamMap()方法可以查看模型的详细信息 print("Model 1 was fit using parameters: ") print(model1.extractParamMap()) # 5.自定义设置参数 # 5.1 通过python字典可以指定模型参数,指定的参数会覆盖原有参数,下面是三种指定参数的方式: paramMap={lr.maxIter:20} paramMap[lr.maxIter]=30 paramMap.update({lr.regParam:0.1,lr.threshold:0.55}) # 5.2 修改参数名称,参数的名称也是可以修改的 paramMap2 = {lr.probabilityCol: "myProbability"} # 5.3 将2次修改进行合并 paramMapCombined = paramMap.copy() paramMapCombined.update(paramMap2) # 5.4 将自定义的参数作用于模型的生成 model2 = lr.fit(training,paramCombined) print("Model 2 was fit using parameters: ") print(model2.extractParamMap()) # 6.测试:训练阶段生成的模型调用transform() 方法来对测试数据进行预测 # 6.1 生成测试数据 test = spark.createDataFrame([ (1.0, Vectors.dense([-1.0, 1.5, 1.3])), (0.0, Vectors.dense([3.0, 2.0, -0.1])), (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"]) # 6.2测试 prediction = model2.transform(test) # 7. 查看测试结果,注意:在第5.2步中,将lr.probabilityCol重命名为myProbability, # 所以如果要查看该部分信息则需要将名称进行相应的更换 result = prediction.select("features", "label", "myProbability", "prediction").collect() for row in result: print("features=%s, label=%s -> prob=%s, prediction=%s" % (row.features, row.label, row.myProbability, row.prediction)) # 输出入下: features=[-1.0,1.5,1.3], label=1.0 -> prob=[0.0570730417103402,0.9429269582896599], prediction=1.0 features=[3.0,2.0,-0.1], label=0.0 -> prob=[0.9238522311704104,0.07614776882958958], prediction=0.0 features=[0.0,2.2,-1.5], label=1.0 -> prob=[0.10972776114779453,0.8902722388522054], prediction=1.0 四、Pipeline实例

       第三步演示了单独进行各项操作的过程,在这里演示管道化处理。

# 1.导包 from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import HashingTF,Tokenizer # 2.构造训练数据,列表形式,其数据为元组: (id, text, label) training = spark.createDataFrame([ (0, "a b c d e spark", 1.0), (1, "b d", 0.0), (2, "spark f g h", 1.0), (3, "hadoop mapreduce", 0.0)],["id", "text", "label"]) # 3. 创建管道,包含三个阶段:tokenizer, hashingTF, and lr # 3.1 在初始DataFrame中新增words列,值为text列的数据转换为单词 tokenizer = Tokenizer(inputCol="text",outputCol="words") # 3.2 将上一步得到的DataFrame的输出列(words列)转为特征向量后作为新增的列,列名为features hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(),outputCol='features') # 3.3 创建逻辑回归实例 lr = LogisticRegression(maxIter=10,regParam=0.001) # 3.4 创建管道:将上述各个阶段依次结合 pipeline = Pipeline(stages=[tokenizer,hashingTF,lr]) # 4.训练模型:调用流水线的fit()方法,生成一个pipelinemodel,该模型是一个Transformer model = pipeline.fit(training) # 5.测试 # 5.1 生成测试数据 test = spark.createDataFrame([ (4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop") ], ["id", "text"]) # 5.2 测试:用训练得到的模型调用transform()方法来预测 prediction = model.transform(test) # 6.查看预测情况 selected = prediction.select("id", "text", "probability", "prediction") for row in selected.collect(): rid, text, prob, prediction = row print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction)) # 结果如下: (4, spark i j k) --> prob=[0.15964077387874748,0.8403592261212525], prediction=1.000000 (5, l m n) --> prob=[0.8378325685476744,0.16216743145232562], prediction=0.000000 (6, spark hadoop spark) --> prob=[0.06926633132976032,0.9307336686702398], prediction=1.000000 (7, apache hadoop) --> prob=[0.9821575333444218,0.01784246665557808], prediction=0.000000


【本文地址】

公司简介

联系我们

今日新闻


点击排行

实验室常用的仪器、试剂和
说到实验室常用到的东西,主要就分为仪器、试剂和耗
不用再找了,全球10大实验
01、赛默飞世尔科技(热电)Thermo Fisher Scientif
三代水柜的量产巅峰T-72坦
作者:寞寒最近,西边闹腾挺大,本来小寞以为忙完这
通风柜跟实验室通风系统有
说到通风柜跟实验室通风,不少人都纠结二者到底是不
集消毒杀菌、烘干收纳为一
厨房是家里细菌较多的地方,潮湿的环境、没有完全密
实验室设备之全钢实验台如
全钢实验台是实验室家具中较为重要的家具之一,很多

推荐新闻


图片新闻

实验室药品柜的特性有哪些
实验室药品柜是实验室家具的重要组成部分之一,主要
小学科学实验中有哪些教学
计算机 计算器 一般 打孔器 打气筒 仪器车 显微镜
实验室各种仪器原理动图讲
1.紫外分光光谱UV分析原理:吸收紫外光能量,引起分
高中化学常见仪器及实验装
1、可加热仪器:2、计量仪器:(1)仪器A的名称:量
微生物操作主要设备和器具
今天盘点一下微生物操作主要设备和器具,别嫌我啰嗦
浅谈通风柜使用基本常识
 众所周知,通风柜功能中最主要的就是排气功能。在

专题文章

    CopyRight 2018-2019 实验室设备网 版权所有 win10的实时保护怎么永久关闭