Pyspark.sql.DataFrameWriter.saveAsTable()
Първо ще видим как да напишем съществуващия PySpark DataFrame в таблицата с помощта на функцията write.saveAsTable(). Взема името на таблицата и други незадължителни параметри като режими, partionBy и т.н., за да запише DataFrame в таблицата. Съхранява се като пила за паркет.
Синтаксис:
dataframe_obj.write.saveAsTable(path/Table_name,mode,partitionBy,…)
- Table_name е името на таблицата, която е създадена от dataframe_obj.
- Можем да добавим/презапишем данните от таблицата с помощта на параметъра mode.
- PartitionBy взема единичните/множеството колони, за да създаде дялове въз основа на стойности в тези предоставени колони.
Пример 1:
Създайте PySpark DataFrame с 5 реда и 4 колони. Запишете тази рамка с данни в таблица с име „Agri_Table1“.
импортиране на pyspark
от pyspark.sql импортирайте SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Linux Hint“ ).getOrCreate()
# данни за земеделие с 5 реда и 5 колони
агри =[{ „тип_почва“ : 'черно' , „Наличност_напояване“ : 'Не' , 'акри' : 2500 , „състояние на почвата“ : 'сухо' ,
'Държава' : 'САЩ' },
{ „тип_почва“ : 'черно' , „Наличност_напояване“ : 'да' , 'акри' : 3500 , „състояние на почвата“ : 'мокро' ,
'Държава' : 'Индия' },
{ „тип_почва“ : 'Червен' , „Наличност_напояване“ : 'да' , 'акри' : 210 , „състояние на почвата“ : 'сухо' ,
'Държава' : 'UK' },
{ „тип_почва“ : 'Друго' , „Наличност_напояване“ : 'Не' , 'акри' : 1000 , „състояние на почвата“ : 'мокро' ,
'Държава' : 'САЩ' },
{ „тип_почва“ : 'Пясък' , „Наличност_напояване“ : 'Не' , 'акри' : 500 , „състояние на почвата“ : 'сухо' ,
'Държава' : 'Индия' }]
# създайте рамката с данни от горните данни
agri_df = linuxhint_spark_app.createDataFrame(agri)
agri_df.show()
# Запишете горния DataFrame в таблицата.
agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )
Изход:
Виждаме, че един паркетен файл е създаден с предишните данни на PySpark.
Пример 2:
Разгледайте предишната DataFrame и запишете „Agri_Table2“ в таблицата, като разделите записите въз основа на стойностите в колоната „Държава“.
# Запишете горния DataFrame в таблицата с параметър partitionByagri_df.write.saveAsTable( 'Agri_Table2' ,partitionBy=[ 'Държава' ])
Изход:
В колоната „Държава“ има три уникални стойности – „Индия“, „Великобритания“ и „САЩ“. Така се създават три дяла. Всяка преграда побира паркетните пили.
Pyspark.sql.DataFrameReader.table()
Нека заредим таблицата в PySpark DataFrame с помощта на функцията spark.read.table(). Необходим е само един параметър, който е името на пътя/таблицата. Той директно зарежда таблицата в PySpark DataFrame и всички SQL функции, които се прилагат към PySpark DataFrame, също могат да бъдат приложени върху тази заредена DataFrame.
Синтаксис:
spark_app.read.table(path/'Table_name')В този сценарий използваме предишната таблица, която е създадена от PySpark DataFrame. Уверете се, че трябва да внедрите кодовите фрагменти на предишния сценарий във вашата среда.
Пример:
Заредете таблицата „Agri_Table1“ в DataFrame с име „loaded_data“.
loaded_data = linuxhint_spark_app.read.table( 'Agri_Table1' )loaded_data.show()
Изход:
Виждаме, че таблицата е заредена в PySpark DataFrame.
Изпълнение на SQL заявки
Сега изпълняваме някои SQL заявки върху заредената DataFrame, използвайки функцията spark.sql().
# Използвайте командата SELECT, за да покажете всички колони от горната таблица.linuxhint_spark_app.sql( „ИЗБЕРЕТЕ * от Agri_Table1“ ).покажи()
# WHERE Клауза
linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Dry' ' ).покажи()
linuxhint_spark_app.sql( 'ИЗБЕРЕТЕ * от Agri_Table1 WHERE Акра > 2000' ).покажи()
Изход:
- Първата заявка показва всички колони и записи от DataFrame.
- Втората заявка показва записите въз основа на колоната „Soil_status“. Има само три записа с елемента „Сух“.
- Последната заявка връща два записа с „акра“, които са по-големи от 2000.
Pyspark.sql.DataFrameWriter.insertInto()
С помощта на функцията insertInto() можем да добавим DataFrame в съществуващата таблица. Можем да използваме тази функция заедно с selectExpr(), за да дефинираме имената на колоните и след това да я вмъкнем в таблицата. Тази функция също така приема tableName като параметър.
Синтаксис:
DataFrame_obj.write.insertInto('Име_на_таблица')В този сценарий използваме предишната таблица, която е създадена от PySpark DataFrame. Уверете се, че трябва да внедрите кодовите фрагменти на предишния сценарий във вашата среда.
Пример:
Създайте нова DataFrame с два записа и ги вмъкнете в таблицата „Agri_Table1“.
импортиране на pysparkот pyspark.sql импортирайте SparkSession
linuxhint_spark_app = SparkSession.builder.appName( „Linux Hint“ ).getOrCreate()
# данни за земеделие с 2 реда
агри =[{ „тип_почва“ : 'Пясък' , „Наличност_напояване“ : 'Не' , 'акри' : 2500 , „състояние на почвата“ : 'сухо' ,
'Държава' : 'САЩ' },
{ „тип_почва“ : 'Пясък' , „Наличност_напояване“ : 'Не' , 'акри' : 1200 , „състояние на почвата“ : 'мокро' ,
'Държава' : 'Япония' }]
# създайте рамката с данни от горните данни
agri_df2 = linuxhint_spark_app.createDataFrame(agri)
agri_df2.show()
# write.insertInto()
agri_df2.selectExpr( 'акри' , 'Държава' , „Наличност_напояване“ , 'тип_почва' ,
'Състояние_на_почвата' ).write.insertInto( 'Agri_Table1' )
# Показване на окончателната Agri_Table1
linuxhint_spark_app.sql( „ИЗБЕРЕТЕ * от Agri_Table1“ ).покажи()
Изход:
Сега общият брой редове, които присъстват в DataFrame, е 7.
Заключение
Вече разбирате как да напишете PySpark DataFrame в таблицата с помощта на функцията write.saveAsTable(). Взема името на таблицата и други незадължителни параметри. След това заредихме тази таблица в PySpark DataFrame с помощта на функцията spark.read.table(). Необходим е само един параметър, който е името на пътя/таблицата. Ако искате да добавите новия DataFrame към съществуващата таблица, използвайте функцията insertInto().