Skip to content
本页导航

Pyspark

Python数据处理中使用的方法大部分都是Pandas DataFrame,PySpark中也有该API,所以转换过程中节省了大量工作量。

在Pyspark脚本中需要先引入Pyspark库
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
创建SparkSession对象

在PySpark中,需要使用SparkSession对象来初始化Spark应用程序

spark = SparkSession.builder.appName("test").getOrCreate()
spark = SparkSession.builder.appName("test").getOrCreate()
将Pandas转换为SparkDataFrame
import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("test").getOrCreate()

# 创建pandas DataFrame
df = pd.DataFrame([["zhangsan",25], ["lisi", 24]], columns = ["name", "age"])

# 将pandas.DataFrame 转换成 Spark.DataFrame
spark_df = spark.createDataFrame(df)

spark_df.show()
import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("test").getOrCreate()

# 创建pandas DataFrame
df = pd.DataFrame([["zhangsan",25], ["lisi", 24]], columns = ["name", "age"])

# 将pandas.DataFrame 转换成 Spark.DataFrame
spark_df = spark.createDataFrame(df)

spark_df.show()
PySpark读取csv文件
data = spark.read
    .option("inferSchema","true")	# 自动推导数据类型
    .option("header","true")		# 读取列名
    .csv("/user/root/data.csv")	# 文件路径(如果是读取HDFS上的数据文件,则.csv改为.load('hdfs://ip:port/input/data.csv')
data = spark.read
    .option("inferSchema","true")	# 自动推导数据类型
    .option("header","true")		# 读取列名
    .csv("/user/root/data.csv")	# 文件路径(如果是读取HDFS上的数据文件,则.csv改为.load('hdfs://ip:port/input/data.csv')
SparkSQL

SparkDataFrame还可以创建虚拟数据表,使用SQL语法操作数据

data.createOrReplaceTempView("table")
spark.sql("select *from table").show()
data.createOrReplaceTempView("table")
spark.sql("select *from table").show()

文章推荐:

spark篇(二)——Spark DataFrame - 简书 (jianshu.com)

大数据之Spark(5)- SparkSql - 简书 (jianshu.com)

lhiro