[python] 使用 Spark與 Hive進行 ETL

之前實作 ETL系統是透過 Python + MongoDB/MySQL完成,對於少量的資料綽綽有餘,但如果想處理大量資料,又想要借用 Spark MLlib機器學習套件的話,那麼就使用 PySpark + Hive來達成任務吧。能使用熟悉的Python與 SQL語法,無痛轉移。

Spark與 Hive環境設定

這裡透過 Google Cloud Dataproc 架設,Cluster跑起來之後全部套件都有囉,包含 Hadoop 2.7.3,Spark 2.0.2與 Hive 2.1.1 ,而為了要使用 PySpark,記得加上環境變數。

$ exportSPARK_HOME=/usr/lib/spark
$ exportPYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/build:$PYTHONPATH

使用 PySpark來進行 ETL

ETL 的流程就如上圖三步驟。我們想要抓取的資料是行政院農委會提供的 農村地方美食小吃特色料理 ,內容是農村特色料理的所在位置與店家資訊,反正內容是什麼不重要,重要的是希望程式能直接透過 HTTP Request 抓下來,經過轉換後丟進 Hive資料庫。

由於 Spark 2.0版本加入了 Spark session,使用上變得簡潔,不過想要與 Hive溝通,記得要加上 enableHiveSupport() 才能存取資料喔。另外,Dataproc上的 Python 版本為 2.7,處理中文會比較麻煩,如果要換成 Python3,記得 Cluster環境下只改 Master是不行的,必須要在裝環境的時候,將 所有機器上的 PySpark預設改掉。詳細作法可以參考 這篇 。這裡仍然使用 Python2.7,詳細的程式碼與說明如下:

# -*- coding: utf-8 -*-
frompyspark.sqlimportSparkSession
frompyspark.sqlimportHiveContext
importrequests
importjson
 
class SparkHiveExample:
 
    def__init__(self):
        ## initialize spark session
        self.spark = SparkSession.builder.appName("Spark Hive example").enableHiveSupport().getOrCreate()
 
    defrun(self):
        ## download with opendata API
        url = "http://data.coa.gov.tw/Service/OpenData/ODwsv/ODwsvTravelFood.aspx?"
        data = requests.get(url)
 
        ## convert from JSON to dataframe
        df = self.spark.createDataFrame(data.json())
 
        ## display schema
        df.printSchema()
 
        ## creates a temporary view using the DataFrame
        df.createOrReplaceTempView("travelfood")
 
        ## save into Hive
        self.spark.sql("DROP TABLE IF EXISTS travelfood_hive")
        df.write.saveAsTable("travelfood_hive")
 
        ## use SQL
        sqlDF = self.spark.sql("SELECT * FROM travelfood_hive WHERE City == '屏東縣'")
        sqlDF.select("Name", "City", "Town", "Coordinate").show()
 
if __name__ == "__main__":
    EXAMPLE = SparkHiveExample()
    EXAMPLE.run()

從 Requests取得的 JSON格式資料,使用 createDataFrame 就會變成 Spark的 DataFrame格式,也因為這個資料集的 Key只有一個層級,轉換不用特別處理,你能透過 printSchema 看見所有的欄位名稱。

使用 createOrReplaceTempView 會產生一個暫時的 View,就能透過 SQL語法存取資料了,但注意資料還沒有送進 Hive喔,只是方便你處理而已,必須用 DataFrame的函式 saveAsTable才算完成。剩下的 SQL操作,看看就懂了吧!附上執行結果:

Youtube上的教學影片

如果使用的語言是 Scala,請參考這部影片的教學 (Spark 1.6)

參考資料

傑瑞窩在這稿源:傑瑞窩在這 (源链) | 关于 | 阅读提示

本站遵循[CC BY-NC-SA 4.0]。如您有版权、意见投诉等问题,请通过eMail联系我们处理。
酷辣虫 » 后端存储 » [python] 使用 Spark與 Hive進行 ETL

喜欢 (0)or分享给?

专业 x 专注 x 聚合 x 分享 CC BY-NC-SA 4.0

使用声明 | 英豪名录