PySpark Pandas_Udf()

Pyspark Pandas Udf



PySpark DataFrame の変換は、pandas_udf() 関数を使用して可能です。これは、矢印を使用して PySpark DataFrame に適用されるユーザー定義関数です。 pandas_udf() を使用してベクトル化された操作を実行できます。この関数をデコレータとして渡すことで実装できます。このガイドを詳しく読んで、構文、パラメーター、さまざまな例を理解しましょう。

内容のトピック:

PySpark DataFrame とモジュールのインストールについて知りたい場合は、これを参照してください。 記事 。







Pyspark.sql.functions.pandas_udf()

pandas_udf () は PySpark の sql.functions モジュールで利用でき、「from」キーワードを使用してインポートできます。これは、PySpark DataFrame でベクトル化された操作を実行するために使用されます。この関数は、3 つのパラメーターを渡すことによってデコレーターのように実装されます。その後、矢印を使用して、データをベクトル形式で返すユーザー定義関数を作成できます (これには series/NumPy を使用します)。この関数内で結果を返すことができます。



構造と構文:



まず、この関数の構造と構文を見てみましょう。

@pandas_udf(データ型)
def 関数名(操作) -> 変換形式:
return ステートメント

ここで、 function_name は定義された関数の名前です。データ型は、この関数によって返されるデータ型を指定します。 「return」キーワードを使用して結果を返すことができます。すべての操作は、矢印の割り当てを使用して関数内で実行されます。





Pandas_udf (関数と ReturnType)

  1. 最初のパラメータは、それに渡されるユーザー定義関数です。
  2. 2 番目のパラメーターは、関数から返されるデータ型を指定するために使用されます。

データ:

このガイド全体では、デモンストレーションのために PySpark DataFrame を 1 つだけ使用します。私たちが定義するすべてのユーザー定義関数は、この PySpark DataFrame に適用されます。 PySpark のインストール後、必ず最初にこの DataFrame を環境内に作成してください。



pysparkをインポートする

pyspark.sql から SparkSession をインポート

linuxhint_spark_app = SparkSession.builder.appName( 「Linux のヒント」 ).getOrCreate()

pyspark.sql.functionsからpandas_udfをインポート

pyspark.sql.types インポートから *

パンダをパンダとしてインポートします

#野菜の詳細

野菜 =[{ 'タイプ' : '野菜' '名前' : 'トマト​​' '国を特定' : 'アメリカ合衆国' '量' : 800 }、

{ 'タイプ' : 'フルーツ' '名前' : 'バナナ' '国を特定' : '中国' '量' : 二十 }、

{ 'タイプ' : '野菜' '名前' : 'トマト​​' '国を特定' : 'アメリカ合衆国' '量' : 800 }、

{ 'タイプ' : '野菜' '名前' : 'マンゴー' '国を特定' : '日本' '量' : 0 }、

{ 'タイプ' : 'フルーツ' '名前' : 'レモン' '国を特定' : 'インド' '量' : 1700 }、

{ 'タイプ' : '野菜' '名前' : 'トマト​​' '国を特定' : 'アメリカ合衆国' '量' : 1200 }、

{ 'タイプ' : '野菜' '名前' : 'マンゴー' '国を特定' : '日本' '量' : 0 }、

{ 'タイプ' : 'フルーツ' '名前' : 'レモン' '国を特定' : 'インド' '量' : 0 }



# 上記のデータから市場データフレームを作成します

Market_df = linuxhint_spark_app.createDataFrame(野菜)

マーケット_df.show()

出力:

ここでは、4 列 8 行の DataFrame を作成します。ここで、pandas_udf() を使用してユーザー定義関数を作成し、それらをこれらの列に適用します。

異なるデータ型の Pandas_udf()

このシナリオでは、pandas_udf() を使用していくつかのユーザー定義関数を作成し、それらを列に適用し、select() メソッドを使用して結果を表示します。いずれの場合も、ベクトル化された操作を実行するときに pandas.Series を使用します。これは、列の値を 1 次元配列とみなして、操作を列に適用します。デコレーター自体で、関数の戻り値の型を指定します。

例 1: 文字列型の Pandas_udf()

ここでは、文字列型の列の値を大文字と小文字に変換するために、文字列の戻り値の型を持つ 2 つのユーザー定義関数を作成します。最後に、これらの関数を「type」列と「locate_country」列に適用します。

# pandas_udf で type 列を大文字に変換します

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

i.str.upper() を返す

# pandas_udf を使用して、locate_count 列を小文字に変換します

@pandas_udf(StringType())

def country_ lower_case(i: panda.Series) -> panda.Series:

i.str. lower() を返す

# select() を使用して列を表示します

マーケット_df.select( 'タイプ' ,type_大文字( 'タイプ' )、 「場所の国」
国_小文字( 「場所の国」 ))。見せる()

出力:

説明:

StringType() 関数は、pyspark.sql.types モジュールで使用できます。このモジュールは、PySpark DataFrame の作成時にすでにインポートされています。

  1. まず、UDF (ユーザー定義関数) は、str.upper() 関数を使用して文字列を大文字で返します。 str.upper() は、指定された文字列を大文字に変換するシリーズ データ構造で使用できます (関数内の矢印でシリーズに変換しているため)。最後に、この関数は select() メソッド内で指定された「type」列に適用されます。以前は、type 列の文字列はすべて小文字でした。現在は大文字に変更されています。
  2. 次に、UDF は str. lower() 関数を使用して文字列を大文字で返します。 str. lower() は、指定された文字列を小文字に変換する Series データ構造で使用できます。最後に、この関数は select() メソッド内で指定された「type」列に適用されます。以前は、type 列の文字列はすべて大文字でした。現在は小文字に変更されています。

例 2: 整数型の Pandas_udf()

PySpark DataFrame の整数列を Pandas シリーズに変換し、各値に 100 を加算する UDF を作成してみましょう。 select() メソッド内のこの関数に「数量」列を渡します。

# 100を追加します

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

i+ を返す 100

# 数量列を上記の関数に渡して表示します。

マーケット_df.select( '量' ,add_100( '量' ))。見せる()

出力:

説明:

UDF 内で、すべての値を反復処理し、それらを Series に変換します。その後、系列内の各値に 100 を加算します。最後に、この関数に「数量」列を渡すと、すべての値に 100 が追加されることがわかります。

Groupby() と Agg() を使用したさまざまなデータ型の Pandas_udf()

UDF を集計列に渡す例を見てみましょう。ここでは、最初に groupby() 関数を使用して列値がグループ化され、agg() 関数を使用して集計が行われます。この集計関数内で UDF を渡します。

構文:

pyspark_dataframe_object.groupby( 「グループ化列」 ).agg(UDF
(pyspark_dataframe_object[ '桁' ]))

ここでは、グループ化列の値が最初にグループ化されます。次に、UDF に関してグループ化されたデータごとに集計が行われます。

例 1: Pandas_udf() と Aggregate Mean()

ここでは、戻り値の型が float のユーザー定義関数を作成します。関数内では、mean() 関数を使用して平均を計算します。この UDF は「数量」列に渡され、各タイプの平均数量が取得されます。

# 平均値を返す

@pandas_udf( '浮く' )

def Average_function(i: panda.Series) -> float:

i.mean() を返す

# type 列をグループ化して数量列を関数に渡します。

マーケット_df.groupby( 'タイプ' ).agg(average_function(market_df[ '量' ]))。見せる()

出力:

「type」列の要素に基づいてグループ化しています。 「果物」と「野菜」の 2 つのグループが形成されます。各グループの平均が計算されて返されます。

例 2: Pandas_udf() と集計 Max() および Min()

ここでは、整数 (int) 戻り値の型を持つ 2 つのユーザー定義関数を作成します。最初の UDF は最小値を返し、2 番目の UDF は最大値を返します。

# 最小値を返す pandas_udf

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

i.min()を返す

# 最大値を返す pandas_udf

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

i.max()を返す

#locate_country をグループ化して、数量列を min_pandas_udf に渡します。

マーケット_df.groupby( 「場所の国」 ).agg(min_(market_df[ '量' ]))。見せる()

#locate_country をグループ化して、数量列を max_pandas_udf に渡します。

マーケット_df.groupby( 「場所の国」 ).agg(max_(market_df[ '量' ]))。見せる()

出力:

最小値と最大値を返すには、UDF の戻り値の型で min() 関数と max() 関数を利用します。次に、「locate_country」列のデータをグループ化します。 4つのグループ(「CHINA」、「INDIA」、「JAPAN」、「USA」)が形成されます。各グループについて、最大数量を返します。同様に、最小数量を返します。

結論

基本的に、pandas_udf () は、PySpark DataFrame でベクトル化された操作を実行するために使用されます。 pandas_udf() を作成して PySpark DataFrame に適用する方法を説明しました。理解を深めるために、すべてのデータ型 (文字列、浮動小数点、整数) を考慮してさまざまな例について説明しました。 agg() 関数を通じて、pandas_udf() を groupby() とともに使用することが可能です。