PySpark Read.Parquet()

Pyspark Read Parquet



PySpark では、write.parquet() 関数は DataFrame を寄木細工ファイルに書き込み、read.parquet() 関数は寄木細工ファイルを PySpark DataFrame またはその他の DataSource に読み取ります。 Apache Spark で列を迅速かつ効率的に処理するには、データを圧縮する必要があります。データ圧縮によりメモリが節約され、すべての列がフラット レベルに変換されます。つまり、フラットカラムレベルのストレージが存在することになります。これらを格納するファイルは PARQUET ファイルと呼ばれます。

このガイドでは、pyspark.sql.DataFrameReader クラスで使用できる read.parquet() 関数を使用して、寄木細工ファイルを PySpark DataFrame/SQL に読み取り/ロードすることに主に焦点を当てます。

内容のトピック:







寄木細工ファイルを入手する



Parquet ファイルを PySpark データフレームに読み取る



Parquet ファイルを PySpark SQL に読み取る





Pyspark.sql.DataFrameReader.parquet()

この関数は、寄木細工ファイルを読み取り、PySpark DataFrame にロードするために使用されます。寄木細工ファイルのパス/ファイル名を受け取ります。これは汎用関数であるため、単純に read.parquet() 関数を使用できます。

構文:



read.parquet() の構文を見てみましょう。

spark_app.read.parquet(ファイル名.parquet/パス)

まず、pip コマンドを使用して PySpark モジュールをインストールします。

pip インストール pyspark

寄木細工ファイルを入手する

寄木細工ファイルを読み取るには、そのデータから寄木細工ファイルが生成されるデータが必要です。このパートでは、PySpark DataFrame から寄木細工ファイルを生成する方法を見ていきます。

5 つのレコードを持つ PySpark DataFrame を作成し、これを「industry_parquet」寄木細工ファイルに書き込んでみましょう。

pysparkをインポートする

pyspark.sql から SparkSession,Row をインポート

linuxhint_spark_app = SparkSession.builder.appName( 「Linux のヒント」 ).getOrCreate()

# 業界の詳細を保存するデータフレームを作成します

Industry_df = linuxhint_spark_app.createDataFrame([Row(Type= '農業' ,面積= 'アメリカ合衆国'
評価= '熱い' ,従業員の合計= 100 )、

行(タイプ= '農業' ,面積= 'インド' 、評価= '熱い' ,従業員の合計= 200 )、

行(タイプ= '発達' ,面積= 'アメリカ合衆国' 、評価= '暖かい' ,従業員の合計= 100 )、

行(タイプ= '教育' ,面積= 'アメリカ合衆国' 、評価= 'いいね' ,従業員の合計= 400 )、

行(タイプ= '教育' ,面積= 'アメリカ合衆国' 、評価= '暖かい' ,従業員の合計= 二十 )

])

# 実際のデータフレーム

Industry_df.show()

# Industry_df を寄木細工ファイルに書き込みます

Industry_df.coalesce( 1 ).write.parquet( 「産業寄木細工」 )

出力:

これは 5 つのレコードを保持する DataFrame です。

以前の DataFrame に対して寄木細工ファイルが作成されます。ここでは、拡張子を付けたファイル名は「part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet」です。このファイルはチュートリアル全体で使用します。

Parquet ファイルを PySpark データフレームに読み取る

寄木細工のファイルがあります。 read.parquet() 関数を使用してこのファイルを読み取り、PySpark DataFrame にロードしましょう。

pysparkをインポートする

pyspark.sql から SparkSession,Row をインポート

linuxhint_spark_app = SparkSession.builder.appName( 「Linux のヒント」 ).getOrCreate()

# parquet ファイルを dataframe_from_parquet オブジェクトに読み込みます。

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 「part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet」 )

# dataframe_from_parquet-DataFrame を表示します

dataframe_from_parquet.show()

出力:

寄木細工ファイルから作成された show() メソッドを使用して DataFrame を表示します。

Parquet ファイルを使用した SQL クエリ

DataFrame にロードした後、SQL テーブルを作成し、DataFrame に存在するデータを表示できるようになります。 TEMPORARY VIEW を作成し、SQL コマンドを使用して寄木細工ファイルから作成された DataFrame からレコードを返す必要があります。

例 1:

「Sectors」という名前の一時ビューを作成し、SELECT コマンドを使用して DataFrame 内のレコードを表示します。これを参照できます チュートリアル Spark – SQL で VIEW を作成する方法を説明しています。

pysparkをインポートする

pyspark.sql から SparkSession,Row をインポート

linuxhint_spark_app = SparkSession.builder.appName( 「Linux のヒント」 ).getOrCreate()

# parquet ファイルを dataframe_from_parquet オブジェクトに読み込みます。

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 「part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet」 )

# 上記の寄木細工ファイルから「Sectors」という名前のビューを作成します

dataframe_from_parquet.createOrReplaceTempView( 「セクター」 )

# セクターのすべてのレコードを表示するクエリ

linuxhint_spark_app.sql( 「セクターから * を選択」 )。見せる()

出力:

例 2:

前の VIEW を使用して、SQL クエリを作成します。

  1. 「インド」に属するセクターのすべてのレコードを表示します。
  2. 従業員数が 100 人を超えるセクターのすべてのレコードを表示します。
# 「インド」に属するセクターのすべてのレコードを表示するクエリ。

linuxhint_spark_app.sql( 「エリア='インド'のセクターから * を選択」 )。見せる()

# 従業員数が 100 人を超えるセクターのすべてのレコードを表示するクエリ

linuxhint_spark_app.sql( 「従業員の合計が 100 を超えるセクターから * を選択」 )。見せる()

出力:

地域が「インド」であるレコードは 1 つだけで、従業員が 100 を超えるレコードは 2 つあります。

Parquet ファイルを PySpark SQL に読み取る

まず、CREATE コマンドを使用して VIEW を作成する必要があります。 SQL クエリ内で「path」キーワードを使用すると、寄木細工ファイルを Spark SQL に読み取ることができます。パスの後に、ファイル名/ファイルの場所を指定する必要があります。

構文:

スパーク_app.sql( 'CREATE TEMPORARY VIEW view_name USING parquet OPTIONS (パス ' ファイル名.寄木細工 ')'

例 1:

「Sector2」という名前の一時ビューを作成し、そこに寄木細工ファイルを読み込みます。 sql() 関数を使用して、ビューに存在するすべてのレコードを表示する選択クエリを作成します。

pysparkをインポートする

pyspark.sql から SparkSession,Row をインポート

linuxhint_spark_app = SparkSession.builder.appName( 「Linux のヒント」 ).getOrCreate()

# 寄木細工ファイルを Spark-SQL に読み込みます

linuxhint_spark_app.sql( '寄木細工のオプションを使用して一時ビュー セクター 2 を作成 (パス ' パート-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')'

# Sector2 のすべてのレコードを表示するクエリ

linuxhint_spark_app.sql( 「セクター2から*を選択」 )。見せる()

出力:

例 2:

前の VIEW を使用して、「ホット」または「クール」の評価を持つすべてのレコードを表示するクエリを作成します。

# クエリを実行して、Sector2 のすべてのレコードをホットまたはクールの評価で表示します。

linuxhint_spark_app.sql( 'select * from Sector2 where Rating='Hot' OR Rating='Cool'' )。見せる()

出力:

「ホット」または「クール」の評価を持つレコードが 3 件あります。

結論

PySpark では、write.parquet() 関数は DataFrame を寄木細工ファイルに書き込みます。 read.parquet() 関数は、寄木細工ファイルを PySpark DataFrame またはその他の DataSource に読み取ります。寄木細工ファイルを PySpark DataFrame と PySpark テーブルに読み取る方法を学びました。このチュートリアルの一環として、PySpark DataFrame からテーブルを作成し、WHERE 句を使用してデータをフィルターする方法についても説明しました。