Pyspark.sql.DataFrameWriter.saveAsTable()
まず、write.saveAsTable() 関数を使用して既存の PySpark DataFrame をテーブルに書き込む方法を見ていきます。 DataFrame をテーブルに書き込むには、テーブル名と、mode、partionBy などのその他のオプションのパラメーターを受け取ります。これは寄木細工のファイルとして保存されます。
構文:
dataframe_obj.write.saveAsTable(パス/テーブル名,モード,パーティション別,…)
- Table_name は、dataframe_obj から作成されるテーブルの名前です。
- mode パラメータを使用してテーブルのデータを追加/上書きできます。
- PartitionBy は単一または複数の列を取得し、これらの指定された列の値に基づいてパーティションを作成します。
例 1:
5 行 4 列の PySpark DataFrame を作成します。このデータフレームを「Agri_Table1」という名前のテーブルに書き込みます。
pysparkをインポートする
pyspark.sql から SparkSession をインポート
linuxhint_spark_app = SparkSession.builder.appName( 「Linux のヒント」 ).getOrCreate()
# 5 行 5 列のデータを農業化する
アグリ=[{ '土壌の種類' : '黒' 、 「灌漑_利用可能性」 : 'いいえ' 、 「エーカー」 : 2500 、 「土壌の状態」 : 'ドライ' 、
'国' : 'アメリカ合衆国' }、
{ '土壌の種類' : '黒' 、 「灌漑_利用可能性」 : 'はい' 、 「エーカー」 : 3500 、 「土壌の状態」 : '濡れた' 、
'国' : 'インド' }、
{ '土壌の種類' : '赤' 、 「灌漑_利用可能性」 : 'はい' 、 「エーカー」 : 210 、 「土壌の状態」 : 'ドライ' 、
'国' : 'イギリス' }、
{ '土壌の種類' : '他の' 、 「灌漑_利用可能性」 : 'いいえ' 、 「エーカー」 : 1000 、 「土壌の状態」 : '濡れた' 、
'国' : 'アメリカ合衆国' }、
{ '土壌の種類' : '砂' 、 「灌漑_利用可能性」 : 'いいえ' 、 「エーカー」 : 500 、 「土壌の状態」 : 'ドライ' 、
'国' : 'インド' }]
# 上記のデータからデータフレームを作成します
agri_df = linuxhint_spark_app.createDataFrame(agri)
agri_df.show()
# 上記の DataFrame をテーブルに書き込みます。
agri_df.coalesce( 1 ).write.saveAsTable( 「アグリ_テーブル1」 )
出力:
以前の PySpark データを使用して 1 つの寄木細工ファイルが作成されていることがわかります。
例 2:
前の DataFrame を検討し、「 Country 」列の値に基づいてレコードを分割することにより、「Agri_Table2」をテーブルに書き込みます。
# 上記の DataFrame を、partitionBy パラメータを使用してテーブルに書き込みますagri_df.write.saveAsTable( 「アグリ_テーブル2」 ,パーティション別=[ '国' ])
出力:
「国」列には、「インド」、「英国」、「米国」という 3 つの一意の値があります。したがって、3 つのパーティションが作成されます。各パーティションには寄木細工のファイルが保持されます。
Pyspark.sql.DataFrameReader.table()
spark.read.table() 関数を使用してテーブルを PySpark DataFrame にロードしましょう。パラメータはパス/テーブル名を 1 つだけ取ります。テーブルを PySpark DataFrame に直接ロードし、PySpark DataFrame に適用されるすべての SQL 関数も、このロードされた DataFrame に適用できます。
構文:
spark_app.read.table(path/'Table_name')このシナリオでは、PySpark DataFrame から作成された前のテーブルを使用します。前のシナリオのコード スニペットを環境に実装する必要があることを確認してください。
例:
「Agri_Table1」テーブルを「loaded_data」という名前のデータフレームにロードします。
ロードされたデータ = linuxhint_spark_app.read.table( 「農業_テーブル1」 )ロードされたデータ.show()
出力:
テーブルが PySpark DataFrame にロードされていることがわかります。
SQLクエリの実行
ここで、spark.sql() 関数を使用して、ロードされた DataFrame に対していくつかの SQL クエリを実行します。
# SELECT コマンドを使用して、上の表のすべての列を表示します。linuxhint_spark_app.sql( 「Agri_Table1 から SELECT *」 )。見せる()
# WHERE句
linuxhint_spark_app.sql( 「SELECT * from Agri_Table1 WHERE Soil_status='Dry'」 )。見せる()
linuxhint_spark_app.sql( 「SELECT * from Agri_Table1 WHERE エーカー > 2000」 )。見せる()
出力:
- 最初のクエリは、DataFrame のすべての列とレコードを表示します。
- 2 番目のクエリは、「Soil_status」列に基づいてレコードを表示します。 「Dry」要素を持つレコードは 3 つだけです。
- 最後のクエリは、2000 を超える「エーカー」を持つ 2 つのレコードを返します。
Pyspark.sql.DataFrameWriter.insertInto()
insertInto() 関数を使用すると、DataFrame を既存のテーブルに追加できます。この関数を selectExpr() とともに使用して列名を定義し、それをテーブルに挿入できます。この関数は、パラメータとして tableName も受け取ります。
構文:
DataFrame_obj.write.insertInto('Table_name')このシナリオでは、PySpark DataFrame から作成された前のテーブルを使用します。前のシナリオのコード スニペットを環境に実装する必要があることを確認してください。
例:
2 つのレコードを含む新しい DataFrame を作成し、「Agri_Table1」テーブルに挿入します。
pysparkをインポートするpyspark.sql から SparkSession をインポート
linuxhint_spark_app = SparkSession.builder.appName( 「Linux のヒント」 ).getOrCreate()
# 2 行のデータを農業化する
アグリ=[{ '土壌の種類' : '砂' 、 「灌漑_利用可能性」 : 'いいえ' 、 「エーカー」 : 2500 、 「土壌の状態」 : 'ドライ' 、
'国' : 'アメリカ合衆国' }、
{ '土壌の種類' : '砂' 、 「灌漑_利用可能性」 : 'いいえ' 、 「エーカー」 : 1200 、 「土壌の状態」 : '濡れた' 、
'国' : '日本' }]
# 上記のデータからデータフレームを作成します
agri_df2 = linuxhint_spark_app.createDataFrame(agri)
agri_df2.show()
# write.insertInto()
agri_df2.selectExpr( 「エーカー」 、 '国' 、 「灌漑_利用可能性」 、 「土壌の種類」 、
「土壌の状態」 ).write.insertInto( 「アグリ_テーブル1」 )
# 最終的なAgri_Table1を表示する
linuxhint_spark_app.sql( 「Agri_Table1 から SELECT *」 )。見せる()
出力:
現在、DataFrame に存在する行の合計数は 7 です。
結論
これで、write.saveAsTable() 関数を使用して PySpark DataFrame をテーブルに書き込む方法が理解できました。テーブル名とその他のオプションのパラメーターを受け取ります。次に、spark.read.table() 関数を使用して、このテーブルを PySpark DataFrame にロードしました。パラメータはパス/テーブル名を 1 つだけ取ります。新しい DataFrame を既存のテーブルに追加する場合は、insertInto() 関数を使用します。