PySpark でテーブル データを読み書きする方法

Pyspark Deteburu Detawo Dumi Shukisuru Fang Fa



データがテーブル形式でロードされると、PySpark でのデータ処理が高速になります。これでSQl式を使うと処理が早くなります。したがって、処理のために送信する前に PySpark DataFrame/RDD をテーブルに変換する方が良い方法です。今日は、組み込み関数を使用してテーブル データを PySpark DataFrame に読み取り、PySpark DataFrame をテーブルに書き込み、既存のテーブルに新しい DataFrame を挿入する方法を説明します。さあ行こう!

Pyspark.sql.DataFrameWriter.saveAsTable()

まず、write.saveAsTable() 関数を使用して既存の PySpark DataFrame をテーブルに書き込む方法を見ていきます。 DataFrame をテーブルに書き込むには、テーブル名と、mode、partionBy などのその他のオプションのパラメーターを受け取ります。これは寄木細工のファイルとして保存されます。

構文:







dataframe_obj.write.saveAsTable(パス/テーブル名,モード,パーティション別,…)
  1. Table_name は、dataframe_obj から作成されるテーブルの名前です。
  2. mode パラメータを使用してテーブルのデータを追加/上書きできます。
  3. PartitionBy は単一または複数の列を取得し、これらの指定された列の値に基づいてパーティションを作成します。

例 1:

5 行 4 列の PySpark DataFrame を作成します。このデータフレームを「Agri_Table1」という名前のテーブルに書き込みます。



pysparkをインポートする

pyspark.sql から SparkSession をインポート

linuxhint_spark_app = SparkSession.builder.appName( 「Linux のヒント」 ).getOrCreate()

# 5 行 5 列のデータを農業化する

アグリ=[{ '土壌の種類' : '黒' 「灌漑_利用可能性」 : 'いいえ' 「エーカー」 : 2500 「土壌の状態」 : 'ドライ'
'国' : 'アメリカ合衆国' }、

{ '土壌の種類' : '黒' 「灌漑_利用可能性」 : 'はい' 「エーカー」 : 3500 「土壌の状態」 : '濡れた'
'国' : 'インド' }、

{ '土壌の種類' : '赤' 「灌漑_利用可能性」 : 'はい' 「エーカー」 : 210 「土壌の状態」 : 'ドライ'
'国' : 'イギリス' }、

{ '土壌の種類' : '他の' 「灌漑_利用可能性」 : 'いいえ' 「エーカー」 : 1000 「土壌の状態」 : '濡れた'
'国' : 'アメリカ合衆国' }、

{ '土壌の種類' : '砂' 「灌漑_利用可能性」 : 'いいえ' 「エーカー」 : 500 「土壌の状態」 : 'ドライ'
'国' : 'インド' }]



# 上記のデータからデータフレームを作成します

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# 上記の DataFrame をテーブルに書き込みます。

agri_df.coalesce( 1 ).write.saveAsTable( 「アグリ_テーブル1」 )

出力:







以前の PySpark データを使用して 1 つの寄木細工ファイルが作成されていることがわかります。



例 2:

前の DataFrame を検討し、「 Country 」列の値に基づいてレコードを分割することにより、「Agri_Table2」をテーブルに書き込みます。

# 上記の DataFrame を、partitionBy パラメータを使用してテーブルに書き込みます

agri_df.write.saveAsTable( 「アグリ_テーブル2」 ,パーティション別=[ '国' ])

出力:

「国」列には、「インド」、「英国」、「米国」という 3 つの一意の値があります。したがって、3 つのパーティションが作成されます。各パーティションには寄木細工のファイルが保持されます。

Pyspark.sql.DataFrameReader.table()

spark.read.table() 関数を使用してテーブルを PySpark DataFrame にロードしましょう。パラメータはパス/テーブル名を 1 つだけ取ります。テーブルを PySpark DataFrame に直接ロードし、PySpark DataFrame に適用されるすべての SQL 関数も、このロードされた DataFrame に適用できます。

構文:

spark_app.read.table(path/'Table_name')

このシナリオでは、PySpark DataFrame から作成された前のテーブルを使用します。前のシナリオのコード スニペットを環境に実装する必要があることを確認してください。

例:

「Agri_Table1」テーブルを「loaded_data」という名前のデータフレームにロードします。

ロードされたデータ = linuxhint_spark_app.read.table( 「農業_テーブル1」 )

ロードされたデータ.show()

出力:

テーブルが PySpark DataFrame にロードされていることがわかります。

SQLクエリの実行

ここで、spark.sql() 関数を使用して、ロードされた DataFrame に対していくつかの SQL クエリを実行します。

# SELECT コマンドを使用して、上の表のすべての列を表示します。

linuxhint_spark_app.sql( 「Agri_Table1 から SELECT *」 )。見せる()

# WHERE句

linuxhint_spark_app.sql( 「SELECT * from Agri_Table1 WHERE Soil_status='Dry'」 )。見せる()

linuxhint_spark_app.sql( 「SELECT * from Agri_Table1 WHERE エーカー > 2000」 )。見せる()

出力:

  1. 最初のクエリは、DataFrame のすべての列とレコードを表示します。
  2. 2 番目のクエリは、「Soil_status」列に基づいてレコードを表示します。 「Dry」要素を持つレコードは 3 つだけです。
  3. 最後のクエリは、2000 を超える「エーカー」を持つ 2 つのレコードを返します。

Pyspark.sql.DataFrameWriter.insertInto()

insertInto() 関数を使用すると、DataFrame を既存のテーブルに追加できます。この関数を selectExpr() とともに使用して列名を定義し、それをテーブルに挿入できます。この関数は、パラメータとして tableName も受け取ります。

構文:

DataFrame_obj.write.insertInto('Table_name')

このシナリオでは、PySpark DataFrame から作成された前のテーブルを使用します。前のシナリオのコード スニペットを環境に実装する必要があることを確認してください。

例:

2 つのレコードを含む新しい DataFrame を作成し、「Agri_Table1」テーブルに挿入します。

pysparkをインポートする

pyspark.sql から SparkSession をインポート

linuxhint_spark_app = SparkSession.builder.appName( 「Linux のヒント」 ).getOrCreate()

# 2 行のデータを農業化する

アグリ=[{ '土壌の種類' : '砂' 「灌漑_利用可能性」 : 'いいえ' 「エーカー」 : 2500 「土壌の状態」 : 'ドライ'
'国' : 'アメリカ合衆国' }、

{ '土壌の種類' : '砂' 「灌漑_利用可能性」 : 'いいえ' 「エーカー」 : 1200 「土壌の状態」 : '濡れた'
'国' : '日本' }]

# 上記のデータからデータフレームを作成します

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 「エーカー」 '国' 「灌漑_利用可能性」 「土壌の種類」
「土壌の状態」 ).write.insertInto( 「アグリ_テーブル1」 )

# 最終的なAgri_Table1を表示する

linuxhint_spark_app.sql( 「Agri_Table1 から SELECT *」 )。見せる()

出力:

現在、DataFrame に存在する行の合計数は 7 です。

結論

これで、write.saveAsTable() 関数を使用して PySpark DataFrame をテーブルに書き込む方法が理解できました。テーブル名とその他のオプションのパラメーターを受け取ります。次に、spark.read.table() 関数を使用して、このテーブルを PySpark DataFrame にロードしました。パラメータはパス/テーブル名を 1 つだけ取ります。新しい DataFrame を既存のテーブルに追加する場合は、insertInto() 関数を使用します。