内容のトピック:
- PySpark DataFrame を Pandas DataFrame に変換して CSV に変換する
- To_Csv() メソッドを使用した PySpark Pandas データフレームから CSV への変換
- PySpark Pandas データフレームを NumPy 配列に変換して CSV に変換する
- Write.Csv() メソッドを使用した PySpark データフレームから CSV への変換
PySpark DataFrame とモジュールのインストールについて知りたい場合は、これを参照してください。 記事 。
PySpark DataFrame を Pandas DataFrame に変換して CSV に変換する
to_csv() は、Pandas DataFrame を CSV に変換する Pandas モジュールで使用できるメソッドです。まず、PySpark DataFrame を Pandas DataFrame に変換する必要があります。これを行うには、toPandas() メソッドが使用されます。 to_csv() の構文とそのパラメーターを見てみましょう。
構文:
pandas_dataframe_obj.to_csv(パス/ 「ファイル名.csv」 、 ヘッダ 、インデックス、列、モード...)
- CSV ファイルのファイル名を指定する必要があります。ダウンロードした CSV を PC 上の特定の場所に保存する場合は、ファイル名とともにパスを指定することもできます。
- ヘッダーが「True」に設定されている場合、列が含まれます。列が必要ない場合は、ヘッダーを「False」に設定します。
- インデックスが「True」に設定されている場合、インデックスが指定されます。インデックスが必要ない場合は、インデックスを「False」に設定します。
- Columns パラメーターは、CSV ファイルに抽出される特定の列を指定できる列名のリストを受け取ります。
- mode パラメーターを使用して、CSV にレコードを追加できます。追加 - これを行うには「a」が使用されます。
例 1: ヘッダーとインデックスのパラメーターを使用する場合
3 行 4 列の「skills_df」PySpark データフレームを作成します。この DataFrame を CSV に変換するには、まず Pandas DataFrame に変換します。
pysparkをインポートする
pyspark.sql から SparkSession をインポート
linuxhint_spark_app = SparkSession.builder.appName( 「Linux のヒント」 ).getOrCreate()
# 3 行 4 列のスキル データ
スキル =[{ 「ID」 : 123 、 '人' : 'ハニー' 、 'スキル' : 'ペインティング' 、 '賞' : 25000 }、
{ 「ID」 : 112 、 '人' : 「ムーニ」 、 'スキル' : 'ダンス' 、 '賞' : 2000年 }、
{ 「ID」 : 153 、 '人' : 「トゥラシ」 、 'スキル' : '読む' 、 '賞' : 1200 }
】
# 上記データからスキルデータフレームを作成
skill_df = linuxhint_spark_app.createDataFrame(スキル)
スキル_df.show()
# skill_df を pandas DataFrame に変換する
pandas_skills_df= skill_df.toPandas()
print(pandas_skills_df)
# この DataFrame をヘッダーとインデックス付きの CSV に変換します
pandas_skills_df.to_csv( 「pandas_skills1.csv」 、 ヘッダ =True、インデックス=True)
出力:
PySpark DataFrame が Pandas DataFrame に変換されていることがわかります。列名とインデックスを含む CSV に変換されるかどうかを確認してみましょう。
例 2: データを CSV に追加する
1 つのレコードを持つ PySpark DataFrame をもう 1 つ作成し、これを最初の例の一部として作成された CSV に追加します。モードパラメータとともにヘッダーを「False」に設定する必要があることを確認してください。それ以外の場合、列名も行として追加されます。
pysparkをインポートするpyspark.sql から SparkSession をインポート
linuxhint_spark_app = SparkSession.builder.appName( 「Linux のヒント」 ).getOrCreate()
スキル =[{ 「ID」 : 90 、 '人' : 「バーガブ」 、 'スキル' : '読む' 、 '賞' : 12000 }
】
# 上記データからスキルデータフレームを作成
skill_df = linuxhint_spark_app.createDataFrame(スキル)
# skill_df を pandas DataFrame に変換する
pandas_skills_df= skill_df.toPandas()
# このデータフレームを pandas_skills1.csv ファイルに追加します
pandas_skills_df.to_csv( 「pandas_skills1.csv」 、モード= 「あ」 、 ヘッダ =偽)
CSV出力:
CSV ファイルに新しい行が追加されたことがわかります。
例 3: Columns パラメーターを使用した場合
同じDataFrameを「人物」と「賞品」の2列のCSVに変換してみましょう。
pysparkをインポートするpyspark.sql から SparkSession をインポート
linuxhint_spark_app = SparkSession.builder.appName( 「Linux のヒント」 ).getOrCreate()
# 3 行 4 列のスキル データ
スキル =[{ 「ID」 : 123 、 '人' : 'ハニー' 、 'スキル' : 'ペインティング' 、 '賞' : 25000 }、
{ 「ID」 : 112 、 '人' : 「ムーニ」 、 'スキル' : 'ダンス' 、 '賞' : 2000年 }、
{ 「ID」 : 153 、 '人' : 「トゥラシ」 、 'スキル' : '読む' 、 '賞' : 1200 }
】
# 上記データからスキルデータフレームを作成
skill_df = linuxhint_spark_app.createDataFrame(スキル)
# skill_df を pandas DataFrame に変換する
pandas_skills_df= skill_df.toPandas()
# この DataFrame を特定の列を含む CSV に変換します
pandas_skills_df.to_csv( 「pandas_skills2.csv」 、列=[ '人' 、 '賞' ])
CSV出力:
CSV ファイルには「人物」列と「賞品」列のみが存在していることがわかります。
To_Csv() メソッドを使用した PySpark Pandas データフレームから CSV への変換
to_csv() は、Pandas DataFrame を CSV に変換する Pandas モジュールで使用できるメソッドです。まず、PySpark DataFrame を Pandas DataFrame に変換する必要があります。これを行うには、toPandas() メソッドが使用されます。 to_csv() の構文とそのパラメーターを見てみましょう。
構文:
pyspark_pandas_dataframe_obj.to_csv(パス/ 「ファイル名.csv」 、 ヘッダ 、インデックス、列、...)- CSV ファイルのファイル名を指定する必要があります。ダウンロードした CSV を PC 上の特定の場所に保存する場合は、ファイル名とともにパスを指定することもできます。
- ヘッダーが「True」に設定されている場合、列が含まれます。列が必要ない場合は、ヘッダーを「False」に設定します。
- インデックスが「True」に設定されている場合、インデックスが指定されます。インデックスが必要ない場合は、インデックスを「False」に設定します。
- columns パラメーターは列名のリストを受け取り、どの特定の列を CSV ファイルに抽出するかを指定できます。
例 1: Columns パラメーターを使用する場合
3 列の PySpark Pandas DataFrame を作成し、「person」列と「prize」列を含む to_csv() を使用して CSV に変換します。
pysparkからパンダをインポートpyspark_pandas_dataframe=pandas.DataFrame({ 「ID」 :[ 90 、 78 、 90 、 57 ]、 '人' :[ 'ハニー' 、 「ムーニ」 、 '彼自身' 、 「ラダ」 ]、 '賞' :[ 1 、 2 、 3 、 4 ]})
print(pyspark_pandas_dataframe)
# この DataFrame を特定の列を含む CSV に変換します
pyspark_pandas_dataframe.to_csv( 「pyspark_pandas1」 、列=[ '人' 、 '賞' ])
出力:
PySpark Pandas DataFrame が 2 つのパーティションを含む CSV に変換されていることがわかります。各パーティションには 2 つのレコードが保持されます。また、CSV内の列は「人物」と「賞品」のみです。
パーティションファイル1:
パーティションファイル2:
例 2: ヘッダー パラメーターを使用する場合
以前の DataFrame を使用し、ヘッダー パラメーターを「True」に設定して指定します。
pysparkからパンダをインポートpyspark_pandas_dataframe=pandas.DataFrame({ 「ID」 :[ 90 、 78 、 90 、 57 ]、 '人' :[ 'ハニー' 、 「ムーニ」 、 '彼自身' 、 「ラダ」 ]、 '賞' :[ 1 、 2 、 3 、 4 ]})
# このDataFrameをヘッダー付きのcsvに変換します。
pyspark_pandas_dataframe.to_csv( 「pyspark_pandas2」 、 ヘッダ =本当)
CSV出力:
PySpark Pandas DataFrame が 2 つのパーティションを含む CSV に変換されていることがわかります。各パーティションには、列名を持つ 2 つのレコードが保持されます。
パーティションファイル1:
パーティションファイル2:
PySpark Pandas データフレームを NumPy 配列に変換して CSV に変換する
Numpy 配列に変換することで、PySpark Pandas DataFrame を CSV に変換するオプションがあります。 to_numpy() は、PySpark Pandas データフレームを NumPy 配列に変換する PySpark Pandas モジュールで使用できるメソッドです。
構文:
pyspark_pandas_dataframe_obj.to_numpy()パラメータは必要ありません。
Tofile() メソッドの使用
NumPy 配列に変換した後、tofile() メソッドを使用して NumPy を CSV に変換できます。ここでは、CSV ファイルの列方向に新しいセルに各レコードを保存します。
構文:
array_obj.to_numpy(ファイル名/パス,sep=’ ’)CSV のファイル名またはパスと区切り文字を受け取ります。
例:
3 列と 4 レコードを持つ PySpark Pandas DataFrame を作成し、最初に NumPy 配列に変換して CSV に変換します。
pysparkからパンダをインポートpyspark_pandas_dataframe=pandas.DataFrame({ 「ID」 :[ 90 、 78 、 90 、 57 ]、 '人' :[ 'ハニー' 、 「ムーニ」 、 '彼自身' 、 「ラダ」 ]、 '賞' :[ 1 、 2 、 3 、 4 ]})
# 上記の DataFrame を numpy 配列に変換します
変換済み = pyspark_pandas_dataframe.to_numpy()
印刷(変換済み)
# tofile() を使用する
変換済み.tofile( 「変換1.csv」 、9月 = 「、」 )
出力:
[[ 90 'ハニー' 1 ][ 78 「ムーニ」 2 ]
[ 90 '彼自身' 3 ]
[ 57 「ラダ」 4 ]]
PySpark Pandas DataFrame が NumPy 配列 (12 値) に変換されていることがわかります。 CSV データが表示される場合は、各セルの値が新しい列に保存されます。
Write.Csv() メソッドを使用した PySpark データフレームから CSV への変換
write.csv() メソッドは、CSV ファイルを保存する必要があるファイル名/パスをパラメータとして受け取ります。
構文:
dataframe_object.coalesce( 1 ).write.csv( 'ファイル名' )実際には、CSV はパーティション (複数) として保存されます。これを取り除くために、分割されたすべての CSV ファイルを 1 つに結合します。このシナリオでは、coalesce() 関数を使用します。これで、PySpark DataFrame のすべての行を含む CSV ファイルが 1 つだけ表示されます。
例:
4 つの列を持つ 4 つのレコードを持つ PySpark データフレームを考えてみましょう。このDataFrameを「market_details」というファイル名でCSVに書き込みます。
pysparkをインポートするpyspark.sql から SparkSession をインポート
linuxhint_spark_app = SparkSession.builder.appName( 「Linux のヒント」 ).getOrCreate()
# 4 行 4 列の市場データ
市場 =[{ 'm_id' : 「mz-001」 、 「m_name」 : 「ABC」 、 「m_city」 : 「デリー」 、 'm_state' : 「デリー」 }、
{ 'm_id' : 「mz-002」 、 「m_name」 : 「XYZ」 、 「m_city」 : 「パトナ」 、 'm_state' : 「幸運」 }、
{ 'm_id' : 「mz-003」 、 「m_name」 : 「PQR」 、 「m_city」 : 「フロリダ」 、 'm_state' : '一' }、
{ 'm_id' : 「mz-004」 、 「m_name」 : 「ABC」 、 「m_city」 : 「デリー」 、 'm_state' : 「幸運」 }
]
# 上記のデータから市場データフレームを作成します
マーケット_df = linuxhint_spark_app.createDataFrame(マーケット)
# 実際の市場データ
マーケット_df.show()
#write.csv()
マーケット_df.coalesce( 1 ).write.csv( 「マーケットの詳細」 )
出力:
ファイルを確認してみましょう:
最後のファイルを開いてレコードを確認します。
結論
さまざまなパラメーターを考慮して、PySpark DataFrame を CSV に変換する 4 つの異なるシナリオを例とともに学習しました。 PySpark DataFrame を使用する場合、この DataFrame を CSV に変換するには 2 つのオプションがあります。1 つは write() メソッドを使用する方法、もう 1 つは Pandas DataFrame に変換して to_csv() メソッドを使用する方法です。 PySpark Pandas DataFrame を使用している場合は、NumPy 配列に変換することで to_csv() および tofile() を利用することもできます。