Appearance
Pyspark
Python数据处理中使用的方法大部分都是Pandas DataFrame,PySpark中也有该API,所以转换过程中节省了大量工作量。
在Pyspark脚本中需要先引入Pyspark库
from pyspark.sql import SparkSessionfrom pyspark.sql import SparkSession创建SparkSession对象
在PySpark中,需要使用SparkSession对象来初始化Spark应用程序
spark = SparkSession.builder.appName("test").getOrCreate()spark = SparkSession.builder.appName("test").getOrCreate()将Pandas转换为SparkDataFrame
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test").getOrCreate()
# 创建pandas DataFrame
df = pd.DataFrame([["zhangsan",25], ["lisi", 24]], columns = ["name", "age"])
# 将pandas.DataFrame 转换成 Spark.DataFrame
spark_df = spark.createDataFrame(df)
spark_df.show()import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test").getOrCreate()
# 创建pandas DataFrame
df = pd.DataFrame([["zhangsan",25], ["lisi", 24]], columns = ["name", "age"])
# 将pandas.DataFrame 转换成 Spark.DataFrame
spark_df = spark.createDataFrame(df)
spark_df.show()PySpark读取csv文件
data = spark.read
.option("inferSchema","true") # 自动推导数据类型
.option("header","true") # 读取列名
.csv("/user/root/data.csv") # 文件路径(如果是读取HDFS上的数据文件,则.csv改为.load('hdfs://ip:port/input/data.csv')data = spark.read
.option("inferSchema","true") # 自动推导数据类型
.option("header","true") # 读取列名
.csv("/user/root/data.csv") # 文件路径(如果是读取HDFS上的数据文件,则.csv改为.load('hdfs://ip:port/input/data.csv')SparkSQL
SparkDataFrame还可以创建虚拟数据表,使用SQL语法操作数据
data.createOrReplaceTempView("table")
spark.sql("select *from table").show()data.createOrReplaceTempView("table")
spark.sql("select *from table").show()文章推荐: