内容のトピック:
PySpark DataFrame とモジュールのインストールについて知りたい場合は、これを参照してください。 記事 。
Pyspark.sql.functions.pandas_udf()
pandas_udf () は PySpark の sql.functions モジュールで利用でき、「from」キーワードを使用してインポートできます。これは、PySpark DataFrame でベクトル化された操作を実行するために使用されます。この関数は、3 つのパラメーターを渡すことによってデコレーターのように実装されます。その後、矢印を使用して、データをベクトル形式で返すユーザー定義関数を作成できます (これには series/NumPy を使用します)。この関数内で結果を返すことができます。
構造と構文:
まず、この関数の構造と構文を見てみましょう。
@pandas_udf(データ型)def 関数名(操作) -> 変換形式:
return ステートメント
ここで、 function_name は定義された関数の名前です。データ型は、この関数によって返されるデータ型を指定します。 「return」キーワードを使用して結果を返すことができます。すべての操作は、矢印の割り当てを使用して関数内で実行されます。
Pandas_udf (関数と ReturnType)
- 最初のパラメータは、それに渡されるユーザー定義関数です。
- 2 番目のパラメーターは、関数から返されるデータ型を指定するために使用されます。
データ:
このガイド全体では、デモンストレーションのために PySpark DataFrame を 1 つだけ使用します。私たちが定義するすべてのユーザー定義関数は、この PySpark DataFrame に適用されます。 PySpark のインストール後、必ず最初にこの DataFrame を環境内に作成してください。
pysparkをインポートする
pyspark.sql から SparkSession をインポート
linuxhint_spark_app = SparkSession.builder.appName( 「Linux のヒント」 ).getOrCreate()
pyspark.sql.functionsからpandas_udfをインポート
pyspark.sql.types インポートから *
パンダをパンダとしてインポートします
#野菜の詳細
野菜 =[{ 'タイプ' : '野菜' 、 '名前' : 'トマト' 、 '国を特定' : 'アメリカ合衆国' 、 '量' : 800 }、
{ 'タイプ' : 'フルーツ' 、 '名前' : 'バナナ' 、 '国を特定' : '中国' 、 '量' : 二十 }、
{ 'タイプ' : '野菜' 、 '名前' : 'トマト' 、 '国を特定' : 'アメリカ合衆国' 、 '量' : 800 }、
{ 'タイプ' : '野菜' 、 '名前' : 'マンゴー' 、 '国を特定' : '日本' 、 '量' : 0 }、
{ 'タイプ' : 'フルーツ' 、 '名前' : 'レモン' 、 '国を特定' : 'インド' 、 '量' : 1700 }、
{ 'タイプ' : '野菜' 、 '名前' : 'トマト' 、 '国を特定' : 'アメリカ合衆国' 、 '量' : 1200 }、
{ 'タイプ' : '野菜' 、 '名前' : 'マンゴー' 、 '国を特定' : '日本' 、 '量' : 0 }、
{ 'タイプ' : 'フルーツ' 、 '名前' : 'レモン' 、 '国を特定' : 'インド' 、 '量' : 0 }
】
# 上記のデータから市場データフレームを作成します
Market_df = linuxhint_spark_app.createDataFrame(野菜)
マーケット_df.show()
出力:
ここでは、4 列 8 行の DataFrame を作成します。ここで、pandas_udf() を使用してユーザー定義関数を作成し、それらをこれらの列に適用します。
異なるデータ型の Pandas_udf()
このシナリオでは、pandas_udf() を使用していくつかのユーザー定義関数を作成し、それらを列に適用し、select() メソッドを使用して結果を表示します。いずれの場合も、ベクトル化された操作を実行するときに pandas.Series を使用します。これは、列の値を 1 次元配列とみなして、操作を列に適用します。デコレーター自体で、関数の戻り値の型を指定します。
例 1: 文字列型の Pandas_udf()
ここでは、文字列型の列の値を大文字と小文字に変換するために、文字列の戻り値の型を持つ 2 つのユーザー定義関数を作成します。最後に、これらの関数を「type」列と「locate_country」列に適用します。
# pandas_udf で type 列を大文字に変換します@pandas_udf(StringType())
def type_upper_case(i: panda.Series) -> panda.Series:
i.str.upper() を返す
# pandas_udf を使用して、locate_count 列を小文字に変換します
@pandas_udf(StringType())
def country_ lower_case(i: panda.Series) -> panda.Series:
i.str. lower() を返す
# select() を使用して列を表示します
マーケット_df.select( 'タイプ' ,type_大文字( 'タイプ' )、 「場所の国」 、
国_小文字( 「場所の国」 ))。見せる()
出力:
説明:
StringType() 関数は、pyspark.sql.types モジュールで使用できます。このモジュールは、PySpark DataFrame の作成時にすでにインポートされています。
- まず、UDF (ユーザー定義関数) は、str.upper() 関数を使用して文字列を大文字で返します。 str.upper() は、指定された文字列を大文字に変換するシリーズ データ構造で使用できます (関数内の矢印でシリーズに変換しているため)。最後に、この関数は select() メソッド内で指定された「type」列に適用されます。以前は、type 列の文字列はすべて小文字でした。現在は大文字に変更されています。
- 次に、UDF は str. lower() 関数を使用して文字列を大文字で返します。 str. lower() は、指定された文字列を小文字に変換する Series データ構造で使用できます。最後に、この関数は select() メソッド内で指定された「type」列に適用されます。以前は、type 列の文字列はすべて大文字でした。現在は小文字に変更されています。
例 2: 整数型の Pandas_udf()
PySpark DataFrame の整数列を Pandas シリーズに変換し、各値に 100 を加算する UDF を作成してみましょう。 select() メソッド内のこの関数に「数量」列を渡します。
# 100を追加します@pandas_udf(IntegerType())
def add_100(i: panda.Series) -> panda.Series:
i+ を返す 100
# 数量列を上記の関数に渡して表示します。
マーケット_df.select( '量' ,add_100( '量' ))。見せる()
出力:
説明:
UDF 内で、すべての値を反復処理し、それらを Series に変換します。その後、系列内の各値に 100 を加算します。最後に、この関数に「数量」列を渡すと、すべての値に 100 が追加されることがわかります。
Groupby() と Agg() を使用したさまざまなデータ型の Pandas_udf()
UDF を集計列に渡す例を見てみましょう。ここでは、最初に groupby() 関数を使用して列値がグループ化され、agg() 関数を使用して集計が行われます。この集計関数内で UDF を渡します。
構文:
pyspark_dataframe_object.groupby( 「グループ化列」 ).agg(UDF(pyspark_dataframe_object[ '桁' ]))
ここでは、グループ化列の値が最初にグループ化されます。次に、UDF に関してグループ化されたデータごとに集計が行われます。
例 1: Pandas_udf() と Aggregate Mean()
ここでは、戻り値の型が float のユーザー定義関数を作成します。関数内では、mean() 関数を使用して平均を計算します。この UDF は「数量」列に渡され、各タイプの平均数量が取得されます。
# 平均値を返す@pandas_udf( '浮く' )
def Average_function(i: panda.Series) -> float:
i.mean() を返す
# type 列をグループ化して数量列を関数に渡します。
マーケット_df.groupby( 'タイプ' ).agg(average_function(market_df[ '量' ]))。見せる()
出力:
「type」列の要素に基づいてグループ化しています。 「果物」と「野菜」の 2 つのグループが形成されます。各グループの平均が計算されて返されます。
例 2: Pandas_udf() と集計 Max() および Min()
ここでは、整数 (int) 戻り値の型を持つ 2 つのユーザー定義関数を作成します。最初の UDF は最小値を返し、2 番目の UDF は最大値を返します。
# 最小値を返す pandas_udf@pandas_udf( 'int' )
def min_(i: panda.Series) -> int:
i.min()を返す
# 最大値を返す pandas_udf
@pandas_udf( 'int' )
def max_(i: panda.Series) -> int:
i.max()を返す
#locate_country をグループ化して、数量列を min_pandas_udf に渡します。
マーケット_df.groupby( 「場所の国」 ).agg(min_(market_df[ '量' ]))。見せる()
#locate_country をグループ化して、数量列を max_pandas_udf に渡します。
マーケット_df.groupby( 「場所の国」 ).agg(max_(market_df[ '量' ]))。見せる()
出力:
最小値と最大値を返すには、UDF の戻り値の型で min() 関数と max() 関数を利用します。次に、「locate_country」列のデータをグループ化します。 4つのグループ(「CHINA」、「INDIA」、「JAPAN」、「USA」)が形成されます。各グループについて、最大数量を返します。同様に、最小数量を返します。
結論
基本的に、pandas_udf () は、PySpark DataFrame でベクトル化された操作を実行するために使用されます。 pandas_udf() を作成して PySpark DataFrame に適用する方法を説明しました。理解を深めるために、すべてのデータ型 (文字列、浮動小数点、整数) を考慮してさまざまな例について説明しました。 agg() 関数を通じて、pandas_udf() を groupby() とともに使用することが可能です。