PySpark Read.Parquet()

Pyspark Read Parquet



Sa PySpark, ang write.parquet() function ay nagsusulat ng DataFrame sa parquet file at ang read.parquet() ay nagbabasa ng parquet file sa PySpark DataFrame o anumang iba pang DataSource. Upang maproseso ang mga column sa Apache Spark nang mabilis at mahusay, kailangan nating i-compress ang data. Ang data compression ay nagse-save ng aming memorya at ang lahat ng mga column ay na-convert sa flat level. Nangangahulugan iyon na umiiral ang flat column level storage. Ang file na nag-iimbak ng mga ito ay kilala bilang PARQUET file.

Sa gabay na ito, pangunahing tututukan namin ang pagbabasa/pag-load ng parquet file sa PySpark DataFrame/SQL gamit ang read.parquet() function na available sa pyspark.sql.DataFrameReader class.

Paksa ng Nilalaman:







Kunin ang Parquet File



Basahin ang Parquet File sa PySpark DataFrame



Basahin ang Parquet File sa PySpark SQL





Pyspark.sql.DataFrameReader.parquet()

Ginagamit ang function na ito upang basahin ang parquet file at i-load ito sa PySpark DataFrame. Kinukuha nito ang path/file name ng parquet file. Magagamit lang natin ang read.parquet() function dahil ito ang generic na function.

Syntax:



Tingnan natin ang syntax ng read.parquet():

spark_app.read.parquet(file_name.parquet/path)

Una, i-install ang PySpark module gamit ang pip command:

pip install pyspark

Kunin ang Parquet File

Upang magbasa ng parquet file, kailangan mo ang data kung saan nabuo ang parquet file mula sa data na iyon. Sa bahaging ito, makikita natin kung paano bumuo ng parquet file mula sa PySpark DataFrame.

Gumawa tayo ng PySpark DataFrame na may 5 record at isulat ito sa 'industry_parquet' parquet file.

import pyspark

mula sa pyspark.sql import SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# lumikha ng dataframe na nag-iimbak ng mga detalye ng Industriya

industry_df = linuxhint_spark_app.createDataFrame([Row(Type= 'Agrikultura' ,Lugar= 'USA' ,
Rating= 'Mainit' ,Kabuuang_empleyado= 100 ),

Hilera(Uri= 'Agrikultura' ,Lugar= 'India' , Rating= 'Mainit' ,Kabuuang_empleyado= 200 ),

Hilera(Uri= 'Pag-unlad' ,Lugar= 'USA' , Rating= 'Mainit' ,Kabuuang_empleyado= 100 ),

Hilera(Uri= 'Edukasyon' ,Lugar= 'USA' , Rating= 'Malamig' ,Kabuuang_empleyado= 400 ),

Hilera(Uri= 'Edukasyon' ,Lugar= 'USA' , Rating= 'Mainit' ,Kabuuang_empleyado= dalawampu )

])

# Aktwal na DataFrame

industry_df.show()

# Isulat ang industry_df sa parquet file

industry_df.coalesce( 1 ).write.parquet( 'industriya_parquet' )

Output:

Ito ang DataFrame na mayroong 5 record.

Ang isang parquet file ay nilikha para sa nakaraang DataFrame. Dito, ang aming file name na may extension ay 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet'. Ginagamit namin ang file na ito sa buong tutorial.

Basahin ang Parquet File sa PySpark DataFrame

Nasa amin ang parquet file. Basahin natin ang file na ito gamit ang read.parquet() function at i-load ito sa PySpark DataFrame.

import pyspark

mula sa pyspark.sql import SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# Basahin ang parquet file sa dataframe_from_parquet object.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Ipakita ang dataframe_from_parquet-DataFrame

dataframe_from_parquet.show()

Output:

Ipinapakita namin ang DataFrame gamit ang show() na paraan na ginawa mula sa parquet file.

Mga SQL Query na may Parquet File

Pagkatapos mag-load sa DataFrame, posibleng gumawa ng mga SQL table at ipakita ang data na nasa DataFrame. Kailangan nating lumikha ng PANSAMANG TANAN at gamitin ang mga SQL command upang ibalik ang mga talaan mula sa DataFrame na nilikha mula sa parquet file.

Halimbawa 1:

Gumawa ng pansamantalang view na pinangalanang 'Mga Sektor' at gamitin ang SELECT command upang ipakita ang mga tala sa DataFrame. Maaari kang sumangguni dito pagtuturo na nagpapaliwanag kung paano gumawa ng VIEW sa Spark – SQL.

import pyspark

mula sa pyspark.sql import SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# Basahin ang parquet file sa dataframe_from_parquet object.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Lumikha ng View mula sa itaas na parquet file na pinangalanang - 'Mga Sektor'

dataframe_from_parquet.createOrReplaceTempView( 'Mga Sektor' )

# Query upang ipakita ang lahat ng mga tala mula sa Mga Sektor

linuxhint_spark_app.sql( 'piliin * mula sa Mga Sektor' ).show()

Output:

Halimbawa 2:

Gamit ang nakaraang VIEW, isulat ang SQL query:

  1. Upang ipakita ang lahat ng mga tala mula sa Mga Sektor na kabilang sa 'India'.
  2. Upang ipakita ang lahat ng mga tala mula sa Mga Sektor na may isang empleyado na higit sa 100.
# Query upang ipakita ang lahat ng mga tala mula sa Mga Sektor na kabilang sa 'India'.

linuxhint_spark_app.sql( 'select * from Sectors where Area='India'' ).show()

# Query upang ipakita ang lahat ng mga tala mula sa Mga Sektor na may empleyadong higit sa 100

linuxhint_spark_app.sql( 'select * from Sectors where Total_employees>100' ).show()

Output:

Mayroon lamang isang talaan na may lugar na 'India' at dalawang talaan na may mga empleyado na higit sa 100.

Basahin ang Parquet File sa PySpark SQL

Una, kailangan nating gumawa ng VIEW gamit ang CREATE command. Gamit ang keyword na 'path' sa loob ng SQL query, mababasa natin ang parquet file sa Spark SQL. Pagkatapos ng landas, kailangan nating tukuyin ang filename/lokasyon ng file.

Syntax:

spark_app.sql( 'GUMAWA NG PANSAMANTALAANG VIEW na view_name GAMIT ang parquet OPTIONS (path ' file_name.parquet ')' )

Halimbawa 1:

Gumawa ng pansamantalang view na pinangalanang 'Sector2' at basahin ang parquet file dito. Gamit ang sql() function, isulat ang piling query para ipakita ang lahat ng record na naroroon sa view.

import pyspark

mula sa pyspark.sql import SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# Basahin ang parquet file sa Spark- SQL

linuxhint_spark_app.sql( 'GUMAWA NG PANSAMANTALA TINGIN Sektor2 GAMITIN ANG MGA OPSYON NG parquet (path ' bahagi-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )

# Query para ipakita ang lahat ng record mula sa Sector2

linuxhint_spark_app.sql( 'piliin * mula sa Sector2' ).show()

Output:

Halimbawa 2:

Gamitin ang nakaraang VIEW at isulat ang query para ipakita ang lahat ng record na may rating na “Hot” o “Cool”.

# Query para ipakita ang lahat ng record mula sa Sector2 na may Rating- Hot or Cool.

linuxhint_spark_app.sql( 'select * from Sector2 where Rating='Hot' OR Rating='Cool'' ).show()

Output:

May tatlong record na may rating na 'Hot' o 'Cool'.

Konklusyon

Sa PySpark, isinusulat ng write.parquet() function ang DataFrame sa parquet file. Binabasa ng function na read.parquet() ang parquet file sa PySpark DataFrame o anumang iba pang DataSource. Natutunan namin kung paano basahin ang parquet file sa PySpark DataFrame at sa PySpark table. Bilang bahagi ng tutorial na ito, tinalakay din namin kung paano gumawa ng mga talahanayan mula sa PySpark DataFrame at i-filter ang data gamit ang WHERE clause.