PySpark データフレームを JSON に変換する

Pyspark Detafuremuwo Json Ni Bian Huansuru



JSON を使用した構造化データの送信が可能であり、メモリ消費も少ないです。 PySpark RDD や PySpark DataFrame と比較すると、JSON はメモリ消費量が少なく、JSON で可能なシリアル化も可能です。 pyspark.sql.DataFrameWriter.json() メソッドを使用して、PySpark DataFrame を JSON に変換できます。それとは別に、DataFrame を JSON に変換する方法が他に 2 つあります。

内容のトピック:

すべての例で単純な PySpark DataFrame を検討し、前述の関数を使用してそれを JSON に変換してみましょう。







必要なモジュール:

PySpark ライブラリがまだインストールされていない場合は、環境にインストールします。次のコマンドを参照してインストールできます。



pip インストール pyspark

To_json() と ToPandas() を使用した PySpark DataFrame から JSON への変換

to_json() メソッドは、Pandas DataFrame を JSON に変換する Pandas モジュールで使用できます。 PySpark DataFrame を Pandas DataFrame に変換する場合、このメソッドを利用できます。 PySpark DataFrame を Pandas DataFrame に変換するには、toPandas() メソッドが使用されます。 to_json() の構文とそのパラメーターを見てみましょう。



構文:





dataframe_object.toPandas().to_json(orient,index,...)
  1. Orient は、変換された JSON を目的の形式で表示するために使用されます。 「レコード」、「テーブル」、「値」、「列」、「インデックス」、「分割」を使用します。
  2. インデックスは、変換された JSON 文字列にインデックスを含めたり削除したりするために使用されます。 「True」に設定するとインデックスが表示されます。そうしないと、方向が「分割」または「テーブル」の場合、インデックスは表示されません。

例 1: 「レコード」として配置する

3 行 4 列の「skills_df」PySpark DataFrame を作成します。 orientパラメータに「レコード」を指定して、このDataFrameをJSONに変換します。

pysparkをインポートする

パンダをインポートする

pyspark.sql から SparkSession をインポート

linuxhint_spark_app = SparkSession.builder.appName( 「Linux のヒント」 ).getOrCreate()

# 3 行 4 列のスキル データ

スキル =[{ 「ID」 : 123 '人' : 'ハニー' 'スキル' : 'ペインティング' '賞' : 25000 }、

{ 「ID」 : 112 '人' : 「ムーニ」 'スキル' : 'ダンス' '賞' : 2000年 }、

{ 「ID」 : 153 '人' : 「トゥラシ」 'スキル' : '読む' '賞' : 1200 }



# 上記データからスキルデータフレームを作成

skill_df = linuxhint_spark_app.createDataFrame(スキル)

# 実技データ

スキル_df.show()

# orient を「レコード」として to_json() を使用して JSON に変換します

json_skills_data = skill_df.toPandas().to_json(orient= '記録' )

print(json_skills_data)

出力:



+---+------+-----+--------+

| ID|人物|賞品|スキル|

+---+------+-----+--------+

| 123 |ハニー| 25000 |絵|

| 112 |モウニ| 2000年 |ダンス|

| 153 |トゥラシ| 1200 |読書|

+---+------+-----+--------+

[{ 「ID」 : 123 '人' : 'ハニー' '賞' : 25000 'スキル' : 'ペインティング' },{ 「ID」 : 112 '人' : 「ムーニ」 '賞' : 2000年 'スキル' : 'ダンス' },{ 「ID」 : 153 '人' : 「トゥラシ」 '賞' : 1200 'スキル' : '読む' }]

PySpark DataFrame が値の辞書を含む JSON 配列に変換されていることがわかります。ここで、キーは列名を表し、値は PySpark DataFrame の行/セルの値を表します。

例 2: 「分割」として方向を設定する

「分割」方向によって返される JSON 形式には、列のリスト、インデックスのリスト、データのリストを含む列名が含まれます。以下は「分割」方向の形式です。

# 方向を「split」として to_json() を使用して JSON に変換します

json_skills_data = skill_df.toPandas().to_json(orient= 'スプリット' )

print(json_skills_data)

出力:

{ 「コラム」 :[ 「ID」 '人' '賞' 'スキル' ]、 '索引' :[ 0 1 2 ]、 'データ' :[[ 123 'ハニー' 25000 'ペインティング' ]、[ 112 「ムーニ」 2000年 'ダンス' ]、[ 153 「トゥラシ」 1200 '読む' ]]}

例 3: 「インデックス」として方向付けする

ここで、PySpark DataFrame の各行は、キーを列名とする辞書の形式でリタイアされます。辞書ごとにインデックス位置をキーとして指定します。

# orient を「index」として to_json() を使用して JSON に変換します

json_skills_data = skill_df.toPandas().to_json(orient= '索引' )

print(json_skills_data)

出力:

{ 「0」 :{ 「ID」 : 123 '人' : 'ハニー' '賞' : 25000 'スキル' : 'ペインティング' }、 「1」 :{ 「ID」 : 112 '人' : 「ムーニ」 '賞' : 2000年 'スキル' : 'ダンス' }、 「2」 :{ 「ID」 : 153 '人' : 「トゥラシ」 '賞' : 1200 'スキル' : '読む' }}

例 4: 「列」として方向を設定する

列は各レコードのキーです。各列には、インデックス番号を持つ列値を取得するディクショナリが保持されます。

# 方向を「列」として to_json() を使用して JSON に変換します

json_skills_data = skill_df.toPandas().to_json(orient= 「列」 )

print(json_skills_data)

出力:

{ 「ID」 :{ 「0」 : 123 「1」 : 112 「2」 : 153 }、 '人' :{ 「0」 : 'ハニー' 「1」 : 「ムーニ」 「2」 : 「トゥラシ」 }、 '賞' :{ 「0」 : 25000 「1」 : 2000年 「2」 : 1200 }、 'スキル' :{ 「0」 : 'ペインティング' 「1」 : 'ダンス' 「2」 : '読む' }}

例 5: 「値」としての方向付け

JSON の値のみが必要な場合は、「値」の方向を選択できます。リスト内の各行を表示します。最後に、すべてのリストがリストに保存されます。この JSON はネストされたリスト型です。

# orient を「値」として to_json() を使用して JSON に変換します

json_skills_data = skill_df.toPandas().to_json(orient= 「価値観」 )

print(json_skills_data)

出力:

[[ 123 'ハニー' 25000 'ペインティング' ]、[ 112 「ムーニ」 2000年 'ダンス' ]、[ 153 「トゥラシ」 1200 '読む' ]]

例 6: 「テーブル」として配置する

「テーブル」方向は、列のデータ型、主キーとしてのインデックス、および Pandas のバージョンとともにフィールド名を含むスキーマを含む JSON を返します。値が設定された列名は「データ」として表示されます。

# orient を「table」として to_json() を使用して JSON に変換します

json_skills_data = skill_df.toPandas().to_json(orient= 'テーブル' )

print(json_skills_data)

出力:

{ 「スキーマ」 :{ '田畑' :[{ '名前' : '索引' 'タイプ' : '整数' },{ '名前' : 「ID」 'タイプ' : '整数' },{ '名前' : '人' 'タイプ' : '弦' },{ '名前' : '賞' 'タイプ' : '整数' },{ '名前' : 'スキル' 'タイプ' : '弦' }]、 「主キー」 :[ '索引' ]、 「パンダのバージョン」 : 「1.4.0」 }、 'データ' :[{ '索引' : 0 「ID」 : 123 '人' : 'ハニー' '賞' : 25000 'スキル' : 'ペインティング' },{ '索引' : 1 「ID」 : 112 '人' : 「ムーニ」 '賞' : 2000年 'スキル' : 'ダンス' },{ '索引' : 2 「ID」 : 153 '人' : 「トゥラシ」 '賞' : 1200 'スキル' : '読む' }]}

例 7: インデックスパラメータを使用する場合

まず、index パラメータを「True」に設定して渡します。各列値について、インデックス位置が辞書内のキーとして返されることがわかります。

2 番目の出力では、インデックスが「False」に設定されているため、インデックス位置なしで列名 (「columns」) とレコード (「data」) のみが返されます。

# to_json() を使用し、index=True で JSON に変換します

json_skills_data = skill_df.toPandas().to_json(index=True)

print(json_skills_data, \n )

# to_json() を使用し、index=False で JSON に変換します

json_skills_data= skill_df.toPandas().to_json(index=False,orient= 'スプリット' )

print(json_skills_data)

出力:

{ 「ID」 :{ 「0」 : 123 「1」 : 112 「2」 : 153 }、 '人' :{ 「0」 : 'ハニー' 「1」 : 「ムーニ」 「2」 : 「トゥラシ」 }、 '賞' :{ 「0」 : 25000 「1」 : 2000年 「2」 : 1200 }、 'スキル' :{ 「0」 : 'ペインティング' 「1」 : 'ダンス' 「2」 : '読む' }}

{ 「コラム」 :[ 「ID」 '人' '賞' 'スキル' ]、 'データ' :[[ 123 'ハニー' 25000 'ペインティング' ]、[ 112 「ムーニ」 2000年 'ダンス' ]、[ 153 「トゥラシ」 1200 '読む' ]]

ToJSON() を使用した PySpark DataFrame から JSON への変換

toJSON() メソッドは、PySpark DataFrame を JSON オブジェクトに変換するために使用されます。基本的に、リストで囲まれた JSON 文字列を返します。の [‘{列:値,…}’,…. ] は、この関数によって返される形式です。ここでは、PySpark DataFrame の各行が、列名をキーとした辞書として返されます。

構文:

dataframe_object.toJSON()

インデックス、列ラベル、データ型などのパラメータを渡すことができる場合があります。

例:

5 行 4 列の「skills_df」PySpark DataFrame を作成します。 toJSON() メソッドを使用して、この DataFrame を JSON に変換します。

pysparkをインポートする

pyspark.sql から SparkSession をインポート

linuxhint_spark_app = SparkSession.builder.appName( 「Linux のヒント」 ).getOrCreate()

# 5 行 4 列のスキル データ

スキル =[{ 「ID」 : 123 '人' : 'ハニー' 'スキル' : 'ペインティング' '賞' : 25000 }、

{ 「ID」 : 112 '人' : 「ムーニ」 'スキル' : 「音楽・ダンス」 '賞' : 2000年 }、

{ 「ID」 : 153 '人' : 「トゥラシ」 'スキル' : '読む' '賞' : 1200 }、

{ 「ID」 : 173 '人' : 「ラン」 'スキル' : '音楽' '賞' : 2000年 }、

{ 「ID」 : 43 '人' : 「カマラ」 'スキル' : '読む' '賞' : 10000 }



# 上記データからスキルデータフレームを作成

skill_df = linuxhint_spark_app.createDataFrame(スキル)

# 実技データ

スキル_df.show()

# JSON配列に変換

json_skills_data = skill_df.toJSON().collect()

print(json_skills_data)

出力:

+---+------+-----+----------+

| ID|人物|賞品|スキル|

+---+------+-----+----------+

| 123 |ハニー| 25000 |絵画|

| 112 |モウニ| 2000年 |音楽/ダンス|

| 153 |トゥラシ| 1200 |読書|

| 173 |らん| 2000年 |音楽|

| 43 |カマラ| 10000 |読書|

+---+------+-----+----------+

[ '{'id':123,'人':'ハニー','賞品':25000,'スキル':'絵を描く'}' '{'id':112,'人':'ムーニ','賞':2000,'特技':'音楽/ダンス'}' '{'id':153,'person':'トゥラシ','賞品':1200,'スキル':'読書'}' '{'id':173,'人':'蘭','賞':2000,'特技':'音楽'}' '{'id':43,'person':'カマラ','賞品':10000,'スキル':'読書'}'

PySpark DataFrame には 5 行があります。これら 5 行はすべて、カンマで区切られた文字列の辞書として返されます。

Write.json() を使用した PySpark DataFrame から JSON への変換

write.json() メソッドは PySpark で使用でき、PySpark DataFrame を JSON ファイルに書き込み/保存します。ファイル名/パスをパラメータとして受け取ります。基本的に、JSON を複数のファイル (分割されたファイル) で返します。これらすべてを 1 つのファイルにマージするには、coalesce() メソッドを使用します。

構文:

dataframe_object.coalesce( 1 ).write.json('ファイル名')
  1. 追加モード – dataframe_object.write.mode(‘append’).json(‘file_name’)
  2. 上書きモード – dataframe_object.write.mode('overwrite').json('file_name')

既存の JSON を追加/上書きすることが可能です。 write.mode() を使用すると、この関数に「append」を渡してデータを追加したり、「overwrite」を渡して既存の JSON データを上書きしたりできます。

例 1:

3 行 4 列の「skills_df」PySpark DataFrame を作成します。この DataFrame を JSON に書き込みます。

pysparkをインポートする

パンダをインポートする

pyspark.sql から SparkSession をインポート

linuxhint_spark_app = SparkSession.builder.appName( 「Linux のヒント」 ).getOrCreate()

# 3 行 4 列のスキル データ

スキル =[{ 「ID」 : 123 '人' : 'ハニー' 'スキル' : 'ペインティング' '賞' : 25000 }、

{ 「ID」 : 112 '人' : 「ムーニ」 'スキル' : 'ダンス' '賞' : 2000年 }、

{ 「ID」 : 153 '人' : 「トゥラシ」 'スキル' : '読む' '賞' : 1200 }



# 上記データからスキルデータフレームを作成

skill_df = linuxhint_spark_app.createDataFrame(スキル)

# write.json()

スキル_df.coalesce( 1 ).write.json( 「スキルデータ」 )

JSON ファイル:

skill_data フォルダーにパーティション分割された JSON データが含まれていることがわかります。

JSON ファイルを開いてみましょう。 PySpark DataFrame のすべての行が JSON に変換されていることがわかります。

PySpark DataFrame には 5 行があります。これら 5 行はすべて、カンマで区切られた文字列の辞書として返されます。

例 2:

1 行の「skills2_df」PySpark DataFrame を作成します。モードを「追加」に指定して、前の JSON ファイルに 1 行を追加します。

pysparkをインポートする

パンダをインポートする

pyspark.sql から SparkSession をインポート

linuxhint_spark_app = SparkSession.builder.appName( 「Linux のヒント」 ).getOrCreate()

スキル2 =[{ 「ID」 : 78 '人' : 「メアリー」 'スキル' : 'ライディング' '賞' : 8960 }



# 上記データからスキルデータフレームを作成

skill2_df = linuxhint_spark_app.createDataFrame(skills2)

# 追加モードで write.json() します。

skill2_df.write.mode( 「追加」 ).json( 「スキルデータ」 )

JSON ファイル:

パーティション分割された JSON ファイルが表示されます。最初のファイルは最初の DataFrame レコードを保持し、2 番目のファイルは 2 番目の DataFrame レコードを保持します。

結論

PySpark DataFrame を JSON に変換するには、3 つの異なる方法があります。まず、さまざまなパラメーターを考慮して、PySpark DataFrame を Pandas DataFrame に変換することで JSON に変換する to_json() メソッドについて、さまざまな例を示して説明しました。次に、toJSON() メソッドを利用しました。最後に、write.json() 関数を使用して PySpark DataFrame を JSON に書き込む方法を学びました。この機能により追加、上書きが可能です。