PySpark Read.Parquet()

Pyspark Read Parquet



В PySpark функцията write.parquet() записва DataFrame във файла на паркета, а read.parquet() чете файла на паркета в PySpark DataFrame или всеки друг източник на данни. За да обработваме колоните в Apache Spark бързо и ефективно, трябва да компресираме данните. Компресирането на данни спестява нашата памет и всички колони се преобразуват в плоско ниво. Това означава, че съществува съхранение на ниво плоска колона. Файлът, който ги съхранява, е известен като PARQUET файл.

В това ръководство ще се фокусираме главно върху четенето/зареждането на файла с паркет в PySpark DataFrame/SQL с помощта на функцията read.parquet(), която е налична в класа pyspark.sql.DataFrameReader.

Тема на съдържанието:







Вземете пилата за паркет



Прочетете файла Parquet в PySpark DataFrame



Прочетете файла Parquet в PySpark SQL





Pyspark.sql.DataFrameReader.parquet()

Тази функция се използва за четене на паркетния файл и зареждането му в PySpark DataFrame. Взема пътя/името на файла на паркетния файл. Можем просто да използваме функцията read.parquet(), тъй като това е общата функция.

Синтаксис:



Нека видим синтаксиса на read.parquet():

spark_app.read.parquet(име_на_файл.паркет/път)

Първо инсталирайте модула PySpark с помощта на командата pip:

pip инсталирайте pyspark

Вземете пилата за паркет

За да разчетете файл за паркет, имате нужда от данните, в които се генерира файлът за паркет от тези данни. В тази част ще видим как да генерираме файл с паркет от PySpark DataFrame.

Нека създадем PySpark DataFrame с 5 записа и да го запишем в паркетния файл „industry_parquet“.

импортиране на pyspark

от pyspark.sql импортирайте SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( „Linux Hint“ ).getOrCreate()

# създайте рамка с данни, която съхранява подробности за индустрията

industry_df = linuxhint_spark_app.createDataFrame([Row(Type= 'Селско стопанство' ,Площ= 'САЩ' ,
Рейтинг= 'Горещо' ,Общо_служители= 100 ),

Ред(Тип= 'Селско стопанство' ,Площ= 'Индия' ,Оценка= 'Горещо' ,Общо_служители= 200 ),

Ред(Тип= 'Развитие' ,Площ= 'САЩ' ,Оценка= 'топло' ,Общо_служители= 100 ),

Ред(Тип= 'Образование' ,Площ= 'САЩ' ,Оценка= 'Готино' ,Общо_служители= 400 ),

Ред(Тип= 'Образование' ,Площ= 'САЩ' ,Оценка= 'топло' ,Общо_служители= двадесет )

])

# Действителен DataFrame

industrial_df.show()

# Запишете industrial_df във файла parket

industry_df.coalesce( 1 ).write.parquet( 'индустрия_паркет' )

Изход:

Това е DataFrame, който съдържа 5 записа.

Създава се паркетен файл за предишния DataFrame. Тук името на нашия файл с разширение е „part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet“. Използваме този файл в целия урок.

Прочетете файла Parquet в PySpark DataFrame

Разполагаме с пила за паркет. Нека прочетем този файл с помощта на функцията read.parquet() и да го заредим в PySpark DataFrame.

импортиране на pyspark

от pyspark.sql импортирайте SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( „Linux Hint“ ).getOrCreate()

# Прочетете файла с паркета в обект dataframe_from_parquet.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'част-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Показване на dataframe_from_parquet-DataFrame

dataframe_from_parquet.show()

Изход:

Ние показваме DataFrame с помощта на метода show(), който е създаден от файла на паркета.

SQL заявки с паркетен файл

След зареждане в DataFrame може да е възможно да създадете SQL таблици и да покажете данните, които присъстват в DataFrame. Трябва да създадем ВРЕМЕНЕН ИЗГЛЕД и да използваме SQL командите, за да върнем записите от DataFrame, който е създаден от файла на паркета.

Пример 1:

Създайте временен изглед с име „Сектори“ и използвайте командата SELECT, за да покажете записите в DataFrame. Можете да се обърнете към това урок който обяснява как да създадете VIEW в Spark – SQL.

импортиране на pyspark

от pyspark.sql импортирайте SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( „Linux Hint“ ).getOrCreate()

# Прочетете файла с паркета в обект dataframe_from_parquet.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'част-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Създайте изглед от горния паркетен файл с име - 'Сектори'

dataframe_from_parquet.createOrReplaceTempView( 'Сектори' )

# Заявка за показване на всички записи от секторите

linuxhint_spark_app.sql( 'изберете * от сектори' ).покажи()

Изход:

Пример 2:

Използвайки предишния VIEW, напишете SQL заявката:

  1. За показване на всички записи от секторите, които принадлежат на „Индия“.
  2. За показване на всички записи от секторите със служител, който е над 100.
# Заявка за показване на всички записи от секторите, принадлежащи на 'Индия'.

linuxhint_spark_app.sql( 'изберете * от сектори, където Area='Индия'' ).покажи()

# Заявка за показване на всички записи от секторите със служители над 100

linuxhint_spark_app.sql( 'изберете * от сектори, където Total_employees>100' ).покажи()

Изход:

Има само един запис с област, която е „Индия“ и два записа със служители, които са над 100.

Прочетете файла Parquet в PySpark SQL

Първо, трябва да създадем VIEW с помощта на командата CREATE. Използвайки ключовата дума „path“ в рамките на SQL заявката, можем да прочетем файла с паркет в SQL на Spark. След пътя трябва да посочим името на файла/местоположението на файла.

Синтаксис:

spark_app.sql( „СЪЗДАВАНЕ НА ВРЕМЕНЕН ИЗГЛЕД име на изглед ИЗПОЛЗВАНЕ НА ОПЦИИ за паркет (път ' име_на_файл.паркет ')' )

Пример 1:

Създайте временен изглед с име “Sector2” и прочетете файла с паркета в него. С помощта на функцията sql() напишете заявката за избор, за да покажете всички записи, които присъстват в изгледа.

импортиране на pyspark

от pyspark.sql импортирайте SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( „Linux Hint“ ).getOrCreate()

# Прочетете паркетния файл в Spark-SQL

linuxhint_spark_app.sql( 'СЪЗДАВАНЕ НА ВРЕМЕНЕН ИЗГЛЕД Сектор2 ИЗПОЛЗВАНЕ НА ОПЦИИ за паркет (път ' част-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )

# Заявка за показване на всички записи от Sector2

linuxhint_spark_app.sql( 'изберете * от Сектор2' ).покажи()

Изход:

Пример 2:

Използвайте предишния VIEW и напишете заявката, за да покажете всички записи с оценка „Горещо“ или „Готино“.

# Заявка за показване на всички записи от Sector2 с рейтинг - Hot или Cool.

linuxhint_spark_app.sql( 'изберете * от Сектор2, където Рейтинг='Горещо' ИЛИ ​​Рейтинг='Готино'' ).покажи()

Изход:

Има три записа с рейтинг „Горещо“ или „Готино“.

Заключение

В PySpark функцията write.parquet() записва DataFrame във файла на паркета. Функцията read.parquet() чете файла на паркета в PySpark DataFrame или всеки друг източник на данни. Научихме как да четем файла с паркет в PySpark DataFrame и в таблицата на PySpark. Като част от този урок обсъдихме също как да създаваме таблици от PySpark DataFrame и да филтрираме данните с помощта на клаузата WHERE.