pySpark-SQL

Spark SQL的入门

1. SQLContext

  • 实例spark 1.x:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from pyspark.sql import SQLContext
from pyspark import SparkContext

'''
sqlcontext使用
在spark 1.X中,官方提供的获取sqlcontext的方法是
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
'''

if __name__ == '__main__':
#创建sparkcontext,用于sqlcontext参数传入
sc = SparkContext(master='local[2]',appName='sqlcontext')
sc.setLogLevel("WARN")
#sc.appName("sqlContext")
sqlContext = SQLContext(sc)
#利用sqlcontext读取json文件(在spark包的examples包中有样例数据)
#返回一个dataframe
read_json = sqlContext.read.json('D:\\apps\spark-2.3.2-bin-hadoop2.7\examples\src\main\\resources\people.json')
#打印schema
print(read_json.printSchema())
read_json.show()
print(read_json.take(2))
sc.stop()

#result
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

None
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

[Row(age=None, name='Michael'), Row(age=30, name='Andy')]
  • 实例 spark 2.x:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from pyspark.sql import SparkSession

if __name__ == '__main__':
spark = SparkSession\
.builder\
.master('local[2]')\
.appName('sqlcontext').getOrCreate()
read_json = spark.read.json('D:\\apps\spark-2.3.2-bin-hadoop2.7\examples\src\main\\resources\people.json')
read_json.printSchema()
read_json.show()
print(read_json.count())
spark.stop()

#result
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

3

2. HiveContext

  • 实例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from pyspark.sql import HiveContext
from pyspark import SparkContext
'''
hiveContext使用
'''
if __name__ == '__main__':
sc = SparkContext(appName='hiveContext')
sc.setLogLevel("WARN")
sqlContext = HiveContext(sc)

data_frame = sqlContext.table("data_m.tmp_lxm_20181119_bj_jyy_staypoint_tmp_new")
#print(data_frame.count())
#print(data_frame.take(5))
data_frame.printSchema()
result = sqlContext.sql('''
select t3.day_id,t3.grid_id,t3.number,count(distinct t3.mdn)
from
(select t2.day_id,t2.grid_id,t2.mdn,t2.duration,
case when t2.duration <= 10 then '<=10'
when t2.duration > 10 and t2.duration <= 30 then '10-30'
when t2.duration > 30 and t2.duration <= 60 then '30-60'
when t2.duration > 60 and t2.duration <= 120 then '60-120'
when t2.duration > 120 then '>120'
end number
from
(select t1.mdn,t1.grid_id,t1.day_id,t1.duration
from
(select mdn,grid_id,day_id,duration,
row_number() over(partition by mdn,day_id order by cast(duration as bigint) desc) num
from data_m.tmp_lxm_20181119_bj_jyy_staypoint_tmp_new
where day_id='20180212') t1
where t1.num = 1) t2 ) t3
where t3.number='<=10'
group by t3.day_id,t3.grid_id,t3.number
''')

print(result.take(50))
sc.stop()

3. SparkSession

  • 实例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from pyspark.sql import SparkSession

if __name__ == '__main__':
spark = SparkSession\
.builder\
.master('local[2]')\
.appName('sqlcontext').getOrCreate()
read_json = spark.read.json('D:\\apps\spark-2.3.2-bin-hadoop2.7\examples\src\main\\resources\people.json')
read_json.printSchema()
read_json.show()
print(read_json.count())


spark.stop()

#result
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

3

4. Spark-submit

1
2
3
spark-submit \
--master local[2] \
/home/data_m/data/lxm/pyspark/hiveContext.py

5. thriftserver/beeline的使用

  • 1.启动thriftserver:默认端口10000
  • 2.启动beeline
    • beeline -u jdbc:hive2://localhost:10000 -n username