Как да четете и записвате таблични данни в PySpark

Kak Da Cetete I Zapisvate Tablicni Danni V Pyspark



Обработката на данни в PySpark е по-бърза, ако данните се зареждат под формата на таблица. С това, използвайки SQL изразите, обработката ще бъде бърза. Така че преобразуването на PySpark DataFrame/RDD в таблица, преди да бъде изпратено за обработка, е по-добрият подход. Днес ще видим как да четем данните от таблицата в PySpark DataFrame, да записваме PySpark DataFrame в таблицата и да вмъкваме нов DataFrame в съществуващата таблица с помощта на вградените функции. Да тръгваме!

Pyspark.sql.DataFrameWriter.saveAsTable()

Първо ще видим как да напишем съществуващия PySpark DataFrame в таблицата с помощта на функцията write.saveAsTable(). Взема името на таблицата и други незадължителни параметри като режими, partionBy и т.н., за да запише DataFrame в таблицата. Съхранява се като пила за паркет.

Синтаксис:







dataframe_obj.write.saveAsTable(path/Table_name,mode,partitionBy,…)
  1. Table_name е името на таблицата, която е създадена от dataframe_obj.
  2. Можем да добавим/презапишем данните от таблицата с помощта на параметъра mode.
  3. PartitionBy взема единичните/множеството колони, за да създаде дялове въз основа на стойности в тези предоставени колони.

Пример 1:

Създайте PySpark DataFrame с 5 реда и 4 колони. Запишете тази рамка с данни в таблица с име „Agri_Table1“.



импортиране на pyspark

от pyspark.sql импортирайте SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Linux Hint“ ).getOrCreate()

# данни за земеделие с 5 реда и 5 колони

агри =[{ „тип_почва“ : 'черно' , „Наличност_напояване“ : 'Не' , 'акри' : 2500 , „състояние на почвата“ : 'сухо' ,
'Държава' : 'САЩ' },

{ „тип_почва“ : 'черно' , „Наличност_напояване“ : 'да' , 'акри' : 3500 , „състояние на почвата“ : 'мокро' ,
'Държава' : 'Индия' },

{ „тип_почва“ : 'Червен' , „Наличност_напояване“ : 'да' , 'акри' : 210 , „състояние на почвата“ : 'сухо' ,
'Държава' : 'UK' },

{ „тип_почва“ : 'Друго' , „Наличност_напояване“ : 'Не' , 'акри' : 1000 , „състояние на почвата“ : 'мокро' ,
'Държава' : 'САЩ' },

{ „тип_почва“ : 'Пясък' , „Наличност_напояване“ : 'Не' , 'акри' : 500 , „състояние на почвата“ : 'сухо' ,
'Държава' : 'Индия' }]



# създайте рамката с данни от горните данни

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# Запишете горния DataFrame в таблицата.

agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )

Изход:







Виждаме, че един паркетен файл е създаден с предишните данни на PySpark.



Пример 2:

Разгледайте предишната DataFrame и запишете „Agri_Table2“ в таблицата, като разделите записите въз основа на стойностите в колоната „Държава“.

# Запишете горния DataFrame в таблицата с параметър partitionBy

agri_df.write.saveAsTable( 'Agri_Table2' ,partitionBy=[ 'Държава' ])

Изход:

В колоната „Държава“ има три уникални стойности – „Индия“, „Великобритания“ и „САЩ“. Така се създават три дяла. Всяка преграда побира паркетните пили.

Pyspark.sql.DataFrameReader.table()

Нека заредим таблицата в PySpark DataFrame с помощта на функцията spark.read.table(). Необходим е само един параметър, който е името на пътя/таблицата. Той директно зарежда таблицата в PySpark DataFrame и всички SQL функции, които се прилагат към PySpark DataFrame, също могат да бъдат приложени върху тази заредена DataFrame.

Синтаксис:

spark_app.read.table(path/'Table_name')

В този сценарий използваме предишната таблица, която е създадена от PySpark DataFrame. Уверете се, че трябва да внедрите кодовите фрагменти на предишния сценарий във вашата среда.

Пример:

Заредете таблицата „Agri_Table1“ в DataFrame с име „loaded_data“.

loaded_data = linuxhint_spark_app.read.table( 'Agri_Table1' )

loaded_data.show()

Изход:

Виждаме, че таблицата е заредена в PySpark DataFrame.

Изпълнение на SQL заявки

Сега изпълняваме някои SQL заявки върху заредената DataFrame, използвайки функцията spark.sql().

# Използвайте командата SELECT, за да покажете всички колони от горната таблица.

linuxhint_spark_app.sql( „ИЗБЕРЕТЕ * от Agri_Table1“ ).покажи()

# WHERE Клауза

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Dry' ' ).покажи()

linuxhint_spark_app.sql( 'ИЗБЕРЕТЕ * от Agri_Table1 WHERE Акра > 2000' ).покажи()

Изход:

  1. Първата заявка показва всички колони и записи от DataFrame.
  2. Втората заявка показва записите въз основа на колоната „Soil_status“. Има само три записа с елемента „Сух“.
  3. Последната заявка връща два записа с „акра“, които са по-големи от 2000.

Pyspark.sql.DataFrameWriter.insertInto()

С помощта на функцията insertInto() можем да добавим DataFrame в съществуващата таблица. Можем да използваме тази функция заедно с selectExpr(), за да дефинираме имената на колоните и след това да я вмъкнем в таблицата. Тази функция също така приема tableName като параметър.

Синтаксис:

DataFrame_obj.write.insertInto('Име_на_таблица')

В този сценарий използваме предишната таблица, която е създадена от PySpark DataFrame. Уверете се, че трябва да внедрите кодовите фрагменти на предишния сценарий във вашата среда.

Пример:

Създайте нова DataFrame с два записа и ги вмъкнете в таблицата „Agri_Table1“.

импортиране на pyspark

от pyspark.sql импортирайте SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Linux Hint“ ).getOrCreate()

# данни за земеделие с 2 реда

агри =[{ „тип_почва“ : 'Пясък' , „Наличност_напояване“ : 'Не' , 'акри' : 2500 , „състояние на почвата“ : 'сухо' ,
'Държава' : 'САЩ' },

{ „тип_почва“ : 'Пясък' , „Наличност_напояване“ : 'Не' , 'акри' : 1200 , „състояние на почвата“ : 'мокро' ,
'Държава' : 'Япония' }]

# създайте рамката с данни от горните данни

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'акри' , 'Държава' , „Наличност_напояване“ , 'тип_почва' ,
'Състояние_на_почвата' ).write.insertInto( 'Agri_Table1' )

# Показване на окончателната Agri_Table1

linuxhint_spark_app.sql( „ИЗБЕРЕТЕ * от Agri_Table1“ ).покажи()

Изход:

Сега общият брой редове, които присъстват в DataFrame, е 7.

Заключение

Вече разбирате как да напишете PySpark DataFrame в таблицата с помощта на функцията write.saveAsTable(). Взема името на таблицата и други незадължителни параметри. След това заредихме тази таблица в PySpark DataFrame с помощта на функцията spark.read.table(). Необходим е само един параметър, който е името на пътя/таблицата. Ако искате да добавите новия DataFrame към съществуващата таблица, използвайте функцията insertInto().