package com.immoc.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession, types}
/**
* DataFrame与RDD的互操作
*/
object DataFrameRDDApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("DataFrameRDDApp").master("spark://master:7077").getOrCreate()
//inferReflection(spark)
program(spark)
spark.stop()
}
private def program (spark: SparkSession) = {
val rdd = spark.sparkContext.textFile("file:///usr/local/app/spark/examples/src/main/resources/info.txt")
val infoRDD = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))
val structType = StructType(Array(StructField("id",IntegerType, true),
StructField("name",StringType, true),
StructField("age",IntegerType, true)
))
val infoDF = spark.createDataFrame(infoRDD,structType)
infoDF.printSchema()
infoDF.show()
//通过df的api进行操作
infoDF.filter(infoDF.col("age") > 30).show
//通过sql的方式进行操作
infoDF.createOrReplaceTempView("infos")
spark.sql("select * from infos where age > 30").show()
}
private def inferReflection(spark: SparkSession) = {
//RDD ===> DataFrame
val rdd = spark.sparkContext.textFile("file:///usr/local/app/spark/examples/src/main/resources/info.txt")
//需要导入隐式转换
import spark.implicits._
val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()
infoDF.show()
}
case class Info(id:Int, name:String, age:Int)
}