pySpark-DataFrame

Spark DataFrame

DataFrame和RDD互操作的两种方法:

  • 1.反射模式:Row() 前提:事先需要知道字段、字段类型
  • 2.编程模式:StructType() 不知道字段、字段类型
  • 3.选型:优先选择第一种
  • 实例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
'''
dataframe和rdd的互操作
'''

#反射推断模式
def reflect(text):
parts = text.map(lambda x: x.split(','))
emp = parts.map(
lambda x: Row(empno=x[0], ename=x[1], job=x[2], mgr=x[3], hiredate=x[4], sal=x[5], com=x[6], deptno=x[7]))
# 将schema注册成一张表
schemaEmp = spark.createDataFrame(emp)
schemaEmp.createOrReplaceTempView('emp')
schemaEmp.printSchema()
spark.sql('select * from emp where sal >=2000').show()

#编程模式
def structType(text):
parts = text.map(lambda x: x.split(','))
emp = parts.map(lambda e: (e[0],e[1],e[2],e[3],e[4],e[5],e[6],e[7]))
schamestring = 'empno,ename,job,mgr,hiredate,sal,com,deptno'
fields = [StructField(field_name,StringType(),True) for field_name in schamestring.split(',')]
schame = StructType(fields)
schameEmp = spark.createDataFrame(emp, schame)
schameEmp.printSchema()
print(schameEmp.count())
print(schameEmp.show(2))

if __name__ == '__main__':
spark = SparkSession.builder.appName('dataframe').master('local[2]').getOrCreate()
read_text = spark.sparkContext.textFile('D:\myProject\Spark\PySpark\data\emp.txt')

#reflect(read_text)
structType(read_text)
spark.stop()