Spark SQL中rdd转换成DataFrame的五种方式 您所在的位置:网站首页 如何在1688上下载图片 Spark SQL中rdd转换成DataFrame的五种方式

Spark SQL中rdd转换成DataFrame的五种方式

#Spark SQL中rdd转换成DataFrame的五种方式| 来源: 网络整理| 查看: 265

Spark SQL中生成DataFrame的五种方式

以前公司用的是spark-core,但是换工作后用的多是spark-sql,spark-dataframe。

最近学习了很多spark-dataframe,在此做一个有spark经验的spark dataframe快速入门的教程。

方式一、

定义一个case class类,将其作为RDD中的存储类型,然后导包import spark.implicts._ 最后直接调用RDD的方法即:toDF方法即可生成DataFrame

创建一个case class类,目的就是把数据塞进一个类中,使之可以像一个表一样  http://spark.apache.org/docs/latest/sql-programming-guide.html#interoperating-with-rdds

使用反射来推断包含了特定数据类型的RDD的元数据(元数据就是case class) 使用DataFrame API或者sql方式进行编程

/** * 方式一: * 定义一个case class类,将其作为RDD中的存储类型,然后导包import spark.implicts._ 最后直接调用RDD的方法即:toDF方法即可生成DataFrame,代码如下 * */ case class Boy(name:String,age:Int,score:Int) def method1(): Unit ={ val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate() val rdd: RDD[String] = spark.sparkContext.parallelize(List("xiaohong 19 90", "xiaozhang 20 90", "huahua 21 99")) val boyRdd: RDD[Boy] = rdd.map(line => { val fields: Array[String] = line.split(" ") Boy(fields(0), fields(1).toInt, fields(2).toInt) }) import spark.implicits._ val dataFrame: DataFrame = boyRdd.toDF() dataFrame.createTempView("v_boy") dataFrame.show() spark.stop() } 方式二、

使用SparkSession中的方法createDataFrame,此方法为重载方法,方式二使用的这个重载方法里面需要传入两个参数,参数一:rowRDD,RDD中存储的类型为Row类型,参数二:schema,类型为StructType

/** * 方式二 * 使用SparkSession中的方法createDataFrame,此方法为重载方法,方式二使用的这个重载方法里面需要传入两个参数,参数一:rowRDD,RDD中存储的类型为Row类型,参数二:schema,类型为StructType,代码如下: * */ def method2(): Unit ={ val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate() val rdd: RDD[String] = spark.sparkContext.parallelize(List("xiaohong 19 90", "xiaozhang 20 90", "huahua 21 99")) val rowRdd: RDD[Row] = rdd.map(line => { val fields: Array[String] = line.split(" ") Row(fields(0), fields(1).toInt, fields(2).toInt) }) val schema = StructType( List( StructField("name",StringType), StructField("age",IntegerType), StructField("score",IntegerType) ) ) val dataFrame: DataFrame = spark.createDataFrame(rowRdd, schema) dataFrame.show() spark.stop() } 方式三、

RDD中存放的类型为元组,导包(import spark.implicits._)后可以直接调用toDF方法,生成DataFrame

/** * 方式三: * RDD中存放的类型为元组,导包(import spark.implicits._)后可以直接调用toDF方法,生成DataFrame,代码如下: * */ def method3(): Unit ={ val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate() val rdd: RDD[String] = spark.sparkContext.parallelize(List("xiaohong 19 90", "xiaozhang 20 90", "huahua 21 99")) val tupleRdd: RDD[(String, Int, Int)] = rdd.map(line => { val fields: Array[String] = line.split(" ") (fields(0), fields(1).toInt, fields(2).toInt) }) import spark.implicits._ val dataFrame: DataFrame = tupleRdd.toDF("name", "age", "score") dataFrame.show() spark.stop() } 方式四、以引入Java bean的方式 使用Scala spark创建DataFrame代码

定义一个JavaBean类,将此类型的数据存放到RDD中,调用SparkSession类对象的方法(createDataFrame),将此RDD及这个JavaBean类的类名作为两个参数传入方法,也可以生成DataFrame

/** * 方式四: * 定义一个JavaBean类,将此类型的数据存放到RDD中,调用SparkSession类对象的方法(createDataFrame),将此RDD及这个JavaBean类的类名作为两个参数传入方法,也可以生成DataFrame,代码如下: * */ def method4(): Unit ={ val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate() val rdd: RDD[String] = spark.sparkContext.parallelize(List("xiaohong 19 90", "xiaozhang 20 90", "huahua 21 99")) val beanRdd: RDD[Man] = rdd.map(line => { val fields: Array[String] = line.split(" ") new Man(fields(0), fields(1).toInt, fields(2).toInt) }) val dataFrame: DataFrame = spark.createDataFrame(beanRdd, classOf[Man]) dataFrame.show() spark.stop() }

其中需要用Java bean 类Man

/** * @author yyz * @class Man * @date 2021/01/24 17:16 **/ public class Man { private String name; private int age; private int score; public Man(String name, int age, int score) { this.name = name; this.age = age; this.score = score; } public Man() { } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public int getScore() { return score; } public void setScore(int score) { this.score = score; } } 方式一到四的完整代码及运行结果 TestCreateDataFrame.scala import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} /** * @class TestCreateDataFrame * @author yyz * @date 2021/01/24 17:01 * SparkSql中生成DataFrame的四种方式 * https://blog.csdn.net/m0_47444428/article/details/107393748 * */ object TestCreateDataFrame { Logger.getLogger("org").setLevel(Level.OFF) /** * 方式一: * 定义一个case class类,将其作为RDD中的存储类型,然后导包import spark.implicts._ 最后直接调用RDD的方法即:toDF方法即可生成DataFrame,代码如下 * */ case class Boy(name:String,age:Int,score:Int) def method1(): Unit ={ val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate() val rdd: RDD[String] = spark.sparkContext.parallelize(List("xiaohong 19 90", "xiaozhang 20 90", "huahua 21 99")) val boyRdd: RDD[Boy] = rdd.map(line => { val fields: Array[String] = line.split(" ") Boy(fields(0), fields(1).toInt, fields(2).toInt) }) import spark.implicits._ val dataFrame: DataFrame = boyRdd.toDF() dataFrame.createTempView("v_boy") dataFrame.show() spark.stop() } /** * 方式二 * 使用SparkSession中的方法createDataFrame,此方法为重载方法,方式二使用的这个重载方法里面需要传入两个参数,参数一:rowRDD,RDD中存储的类型为Row类型,参数二:schema,类型为StructType,代码如下: * */ def method2(): Unit ={ val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate() val rdd: RDD[String] = spark.sparkContext.parallelize(List("xiaohong 19 90", "xiaozhang 20 90", "huahua 21 99")) val rowRdd: RDD[Row] = rdd.map(line => { val fields: Array[String] = line.split(" ") Row(fields(0), fields(1).toInt, fields(2).toInt) }) val schema = StructType( List( StructField("name",StringType), StructField("age",IntegerType), StructField("score",IntegerType) ) ) val dataFrame: DataFrame = spark.createDataFrame(rowRdd, schema) dataFrame.show() spark.stop() } /** * 方式三: * RDD中存放的类型为元组,导包(import spark.implicits._)后可以直接调用toDF方法,生成DataFrame,代码如下: * */ def method3(): Unit ={ val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate() val rdd: RDD[String] = spark.sparkContext.parallelize(List("xiaohong 19 90", "xiaozhang 20 90", "huahua 21 99")) val tupleRdd: RDD[(String, Int, Int)] = rdd.map(line => { val fields: Array[String] = line.split(" ") (fields(0), fields(1).toInt, fields(2).toInt) }) import spark.implicits._ val dataFrame: DataFrame = tupleRdd.toDF("name", "age", "score") dataFrame.show() spark.stop() } /** * 方式四: * 定义一个JavaBean类,将此类型的数据存放到RDD中,调用SparkSession类对象的方法(createDataFrame),将此RDD及这个JavaBean类的类名作为两个参数传入方法,也可以生成DataFrame,代码如下: * */ def method4(): Unit ={ val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate() val rdd: RDD[String] = spark.sparkContext.parallelize(List("xiaohong 19 90", "xiaozhang 20 90", "huahua 21 99")) val beanRdd: RDD[Man] = rdd.map(line => { val fields: Array[String] = line.split(" ") new Man(fields(0), fields(1).toInt, fields(2).toInt) }) val dataFrame: DataFrame = spark.createDataFrame(beanRdd, classOf[Man]) dataFrame.show() spark.stop() } def main(args: Array[String]): Unit = { println(" method1()") method1() println(" method2()") method2() println(" method3()") method3() println(" method4()") method4() } }

输出结果:

SLF4J: Class path contains multiple SLF4J bindings. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties method1() +---------+---+-----+ | name|age|score| +---------+---+-----+ | xiaohong| 19| 90| |xiaozhang| 20| 90| | huahua| 21| 99| +---------+---+-----+ method2() +---------+---+-----+ | name|age|score| +---------+---+-----+ | xiaohong| 19| 90| |xiaozhang| 20| 90| | huahua| 21| 99| +---------+---+-----+ method3() +---------+---+-----+ | name|age|score| +---------+---+-----+ | xiaohong| 19| 90| |xiaozhang| 20| 90| | huahua| 21| 99| +---------+---+-----+ method4() +---+---------+-----+ |age| name|score| +---+---------+-----+ | 19| xiaohong| 90| | 20|xiaozhang| 90| | 21| huahua| 99| +---+---------+-----+ Process finished with exit code 0 方式五、以引入scala bean的方式 使用Scala spark创建DataFrame代码

1、问题分析 注:此处的普通类指的是scala中定义的非case class的类框架在底层将其视作java定义的标准bean类型来处理而scala中定义的普通bean类,不具备字段的java标准getters和setters,因而会处理失败,可以通过【@BeanProperty】来解决2、@BeanProperty分析

package scala.beans /** When attached to a field, this annotation adds a setter and a getter * method following the Java Bean convention. For example: * {{{ * @BeanProperty * var status = "" * }}} * adds the following methods to the class: * {{{ * def setStatus(s: String) { this.status = s } * def getStatus: String = this.status * }}} * For fields of type `Boolean`, if you need a getter named `isStatus`, * use the `scala.beans.BooleanBeanProperty` annotation instead. */ @scala.annotation.meta.field class BeanProperty extends scala.annotation.StaticAnnotation

3、主控代码 CreateScalaBeanClass.scala

package scala.beans import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} /** * @class DF04_Create_scala_Class * @author yyz * @date 2021/01/24 17:27 * */ object CreateScalaBeanClass { Logger.getLogger("org").setLevel(Level.OFF) def main(args: Array[String]): Unit = { println("this.getClass.getName: ",this.getClass.getName) val spark: SparkSession = SparkSession.builder().appName(this.getClass.getName).master("local[*]").getOrCreate() val rdd: RDD[String] = spark.sparkContext.textFile("file:///D:/scala_project/src/main/scala/scala/beans/stu.txt") println(rdd.count()) val data: RDD[StuScala] = rdd.map(line => { val arr = line.split(",") StuScala(arr(0).toInt, arr(1), arr(2).toInt, arr(3), arr(4).toDouble) }) val frame: DataFrame = spark.createDataFrame(data, classOf[StuScala]) frame.printSchema() frame.show(100) } }

StuScala.scala

package scala.beans import scala.beans.BeanProperty /** * @class StuScala * @author yyz * @date 2021/01/24 17:26 * */ class StuScala( @BeanProperty val id: Int, @BeanProperty val name: String, @BeanProperty val age: Int, @BeanProperty val city: String, @BeanProperty val score: Double) object StuScala { def apply(id: Int, name: String, age: Int, city: String, score: Double): StuScala = new StuScala(id, name, age, city, score) }

测试数据stu.txt

31,bj,1001,tom,123 42,sh,1002,jack,124 23,bj,1003,lily,118 34,hz,1004,lucy,119

输出结果

(this.getClass.getName: ,scala.beans.CreateScalaBeanClass$) SLF4J: Class path contains multiple SLF4J bindings. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 4 root |-- age: integer (nullable = false) |-- city: string (nullable = true) |-- id: integer (nullable = false) |-- name: string (nullable = true) |-- score: double (nullable = false) +----+----+---+----+-----+ | age|city| id|name|score| +----+----+---+----+-----+ |1001| tom| 31| bj|123.0| |1002|jack| 42| sh|124.0| |1003|lily| 23| bj|118.0| |1004|lucy| 34| hz|119.0| +----+----+---+----+-----+ Process finished with exit code 0 参考: https://blog.csdn.net/m0_47444428/article/details/107393748 https://blog.csdn.net/silentwolfyh/article/details/103836925


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有