PySpark Pandas_Udf()

Pyspark Pandas Udf



Ang pagbabago sa PySpark DataFrame ay posible gamit ang pandas_udf() function. Ito ay isang function na tinukoy ng gumagamit na inilapat sa PySpark DataFrame na may arrow. Magagawa natin ang mga vectorized na operasyon gamit ang pandas_udf(). Maaari itong ipatupad sa pamamagitan ng pagpasa sa function na ito bilang isang dekorador. Sumisid tayo sa gabay na ito para malaman ang syntax, mga parameter, at iba't ibang halimbawa.

Paksa ng Nilalaman:

Kung gusto mong malaman ang tungkol sa PySpark DataFrame at pag-install ng module, dumaan dito artikulo .







Pyspark.sql.functions.pandas_udf()

Ang pandas_udf () ay magagamit sa sql.functions module sa PySpark na maaaring i-import gamit ang 'mula sa' keyword. Ito ay ginagamit upang isagawa ang vectorized na mga operasyon sa aming PySpark DataFrame. Ang function na ito ay ipinatupad tulad ng isang dekorador sa pamamagitan ng pagpasa ng tatlong mga parameter. Pagkatapos nito, maaari kaming lumikha ng function na tinukoy ng gumagamit na nagbabalik ng data sa format na vector (tulad ng paggamit namin ng serye/NumPy para dito) gamit ang isang arrow. Sa loob ng function na ito, naibabalik namin ang resulta.



Istraktura at Syntax:



Una, tingnan natin ang istraktura at syntax ng function na ito:

@pandas_udf(datatype)
def function_name(operasyon) -> convert_format:
balik pahayag

Dito, ang function_name ay ang pangalan ng aming tinukoy na function. Tinutukoy ng uri ng data ang uri ng data na ibinalik ng function na ito. Maaari naming ibalik ang resulta gamit ang keyword na 'ibalik'. Ang lahat ng mga operasyon ay ginagawa sa loob ng function na may pagtatalaga ng arrow.





Pandas_udf (Function and ReturnType)

  1. Ang unang parameter ay ang function na tinukoy ng user na ipinapasa dito.
  2. Ang pangalawang parameter ay ginagamit upang tukuyin ang uri ng data ng pagbabalik mula sa function.

Data:

Sa buong gabay na ito, isang PySpark DataFrame lang ang ginagamit namin para sa pagpapakita. Ang lahat ng mga function na tinukoy ng user na aming tinukoy ay inilalapat sa PySpark DataFrame na ito. Tiyaking gagawin mo muna itong DataFrame sa iyong kapaligiran pagkatapos ng pag-install ng PySpark.



import pyspark

mula sa pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

mula sa pyspark.sql.functions import pandas_udf

mula sa pyspark.sql.types import *

import panda bilang panda

# mga detalye ng gulay

gulay =[{ 'type' : 'gulay' , 'pangalan' : 'kamatis' , 'locate_country' : 'USA' , 'dami' : 800 },

{ 'type' : 'prutas' , 'pangalan' : 'saging' , 'locate_country' : 'CHINA' , 'dami' : dalawampu },

{ 'type' : 'gulay' , 'pangalan' : 'kamatis' , 'locate_country' : 'USA' , 'dami' : 800 },

{ 'type' : 'gulay' , 'pangalan' : 'mango' , 'locate_country' : 'HAPON' , 'dami' : 0 },

{ 'type' : 'prutas' , 'pangalan' : 'lemon' , 'locate_country' : 'INDIA' , 'dami' : 1700 },

{ 'type' : 'gulay' , 'pangalan' : 'kamatis' , 'locate_country' : 'USA' , 'dami' : 1200 },

{ 'type' : 'gulay' , 'pangalan' : 'mango' , 'locate_country' : 'HAPON' , 'dami' : 0 },

{ 'type' : 'prutas' , 'pangalan' : 'lemon' , 'locate_country' : 'INDIA' , 'dami' : 0 }

]

# lumikha ng dataframe ng merkado mula sa data sa itaas

market_df = linuxhint_spark_app.createDataFrame(gulay)

market_df.show()

Output:

Dito, ginagawa namin itong DataFrame na may 4 na column at 8 row. Ngayon, ginagamit namin ang pandas_udf() upang lumikha ng mga function na tinukoy ng user at ilapat ang mga ito sa mga column na ito.

Pandas_udf() na may Iba't ibang Uri ng Data

Sa sitwasyong ito, gumawa kami ng ilang function na tinukoy ng user na may pandas_udf() at inilapat ang mga ito sa mga column at ipinapakita ang mga resulta gamit ang select() na paraan. Sa bawat kaso, ginagamit namin ang pandas.Series habang ginagawa namin ang mga vectorized na operasyon. Isinasaalang-alang nito ang mga value ng column bilang isang one-dimensional na array at inilalapat ang operasyon sa column. Sa mismong dekorador, tinukoy namin ang uri ng pagbabalik ng function.

Halimbawa 1: Pandas_udf() na may String Type

Dito, lumikha kami ng dalawang function na tinukoy ng gumagamit na may uri ng pagbabalik ng string upang i-convert ang mga halaga ng column ng uri ng string sa uppercase at lowercase. Panghuli, inilalapat namin ang mga function na ito sa mga column na 'type' at 'locate_country'.

# I-convert ang uri ng column sa upper case na may pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

ibalik ang i.str.upper()

# I-convert ang locate_country column sa lowercase na may pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

bumalik i.str.lower()

# Ipakita ang mga column gamit ang select()

market_df.select( 'uri' ,type_upper_case( 'uri' ), 'locate_country' ,
country_lower_case( 'locate_country' )).show()

Output:

Paliwanag:

Ang StringType() function ay available sa pyspark.sql.types module. Na-import na namin ang module na ito habang ginagawa ang PySpark DataFrame.

  1. Una, ibinabalik ng UDF (user-defined function) ang mga string sa uppercase gamit ang str.upper() function. Ang str.upper() ay available sa Series Data Structure (habang nagko-convert kami sa serye na may arrow sa loob ng function) na nagko-convert sa ibinigay na string sa uppercase. Sa wakas, ang function na ito ay inilapat sa column na 'uri' na tinukoy sa loob ng select() na paraan. Dati, ang lahat ng mga string sa hanay ng uri ay nasa maliit na titik. Ngayon, binago ang mga ito sa uppercase.
  2. Pangalawa, ibinabalik ng UDF ang mga string sa uppercase gamit ang str.lower()function. Ang str.lower() ay available sa Series Data Structure na nagko-convert sa ibinigay na string sa lowercase. Sa wakas, ang function na ito ay inilapat sa column na 'uri' na tinukoy sa loob ng select() na paraan. Dati, ang lahat ng mga string sa hanay ng uri ay nasa uppercase. Ngayon, binago ang mga ito sa lowercase.

Halimbawa 2: Pandas_udf() na may Uri ng Integer

Gumawa tayo ng UDF na nagko-convert sa column ng PySpark DataFrame integer sa serye ng Pandas at magdagdag ng 100 sa bawat value. Ipasa ang column na “quantity” sa function na ito sa loob ng select() method.

# Magdagdag ng 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

bumalik i+ 100

# Ipasa ang column ng dami sa function at display sa itaas.

market_df.select( 'dami' , add_100( 'dami' )).show()

Output:

Paliwanag:

Sa loob ng UDF, inuulit namin ang lahat ng value at iko-convert ang mga ito sa Serye. Pagkatapos nito, nagdaragdag kami ng 100 sa bawat halaga sa Serye. Sa wakas, ipinapasa namin ang column na 'dami' sa function na ito at makikita namin na 100 ay idinagdag sa lahat ng mga halaga.

Pandas_udf() na may Iba't ibang Uri ng Data Gamit ang Groupby() at Agg()

Tingnan natin ang mga halimbawa para ipasa ang UDF sa mga pinagsama-samang column. Dito, pinagsama-sama muna ang mga value ng column gamit ang groupby() function at ginagawa ang aggregation gamit ang agg() function. Ipinapasa namin ang aming UDF sa loob ng pinagsama-samang function na ito.

Syntax:

pyspark_dataframe_object.groupby( 'grouping_column' ).agg(UDF
(pyspark_dataframe_object[ 'column' ]))

Dito, unang pinagsama-sama ang mga halaga sa column ng pagpapangkat. Pagkatapos, ang pagsasama-sama ay ginagawa sa bawat nakagrupong data na may kinalaman sa aming UDF.

Halimbawa 1: Pandas_udf() na may Aggregate Mean()

Dito, lumikha kami ng function na tinukoy ng gumagamit na may isang return type float. Sa loob ng function, kinakalkula namin ang average gamit ang mean() function. Ang UDF na ito ay ipinapasa sa column na 'dami' upang makuha ang average na dami para sa bawat uri.

# ibalik ang mean/average

@pandas_udf( 'lumutang' )

def average_function(i: panda.Series) -> float:

bumalik i.mean()

# Ipasa ang column ng dami sa function sa pamamagitan ng pagpapangkat ng uri ng column.

market_df.groupby( 'uri' ).agg(average_function(market_df[ 'dami' ])).show()

Output:

Pinagpapangkat namin batay sa mga elemento sa column na 'uri'. Dalawang grupo ang nabuo - 'prutas' at 'gulay'. Para sa bawat pangkat, ang ibig sabihin ay kinakalkula at ibinalik.

Halimbawa 2: Pandas_udf() na may Aggregate Max() at Min()

Dito, lumikha kami ng dalawang function na tinukoy ng gumagamit na may integer (int) na uri ng pagbabalik. Ang unang UDF ay nagbabalik ng pinakamababang halaga at ang pangalawang UDF ay nagbabalik ng pinakamataas na halaga.

# pandas_udf na nagbabalik ng pinakamababang halaga

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

ibalik ang i.min()

# pandas_udf na nagbabalik ng maximum na halaga

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

ibalik ang i.max()

# Ipasa ang column ng dami sa min_ pandas_udf sa pamamagitan ng pagpapangkat ng locate_country.

market_df.groupby( 'locate_country' ).agg(min_(market_df[ 'dami' ])).show()

# Ipasa ang column ng dami sa max_ pandas_udf sa pamamagitan ng pagpapangkat ng locate_country.

market_df.groupby( 'locate_country' ).agg(max_(market_df[ 'dami' ])).show()

Output:

Upang ibalik ang minimum at maximum na mga halaga, ginagamit namin ang min() at max() na mga function sa uri ng pagbabalik ng mga UDF. Ngayon, pinapangkat namin ang data sa column na 'locate_country'. Apat na grupo ang nabuo (“CHINA”, “INDIA”, “JAPAN”, “USA”). Para sa bawat pangkat, ibinabalik namin ang maximum na dami. Katulad nito, ibinabalik namin ang pinakamababang dami.

Konklusyon

Karaniwan, ang pandas_udf () ay ginagamit upang isagawa ang mga vectorized na operasyon sa aming PySpark DataFrame. Nakita namin kung paano lumikha ng pandas_udf() at ilapat ito sa PySpark DataFrame. Para sa mas mahusay na pag-unawa, tinalakay namin ang iba't ibang mga halimbawa sa pamamagitan ng pagsasaalang-alang sa lahat ng mga datatypes (string, float, at integer). Posibleng gamitin ang pandas_udf() kasama ang groupby() sa pamamagitan ng agg() function.