Hỏi đáp về IT
Mã xác nhận Thay đổi một
HNQ Bigdata engineering

Các kiểu tham gia Spark SQL với các ví dụ

Duyệt qua: 244

Spark DataFrame hỗ trợ tất cả SQL cơ bản Tham gia các loại như INNERLEFT OUTERRIGHT OUTERLEFT ANTILEFT SEMICROSSSELFJOIN. Spark SQL Joins là các phép biến đổi rộng hơn dẫn đến việc xáo trộn dữ liệu trên mạng do đó chúng có các vấn đề về hiệu suất rất lớn khi không được thiết kế cẩn thận.

Mặt khác Spark SQL Joins đi kèm với tối ưu hóa nhiều hơn theo mặc định (nhờ DataFrames & Dataset) tuy nhiên vẫn sẽ có một số vấn đề về hiệu suất cần xem xét khi sử dụng.

Trong hướng dẫn này, bạn sẽ học các cú pháp Nối khác nhau và sử dụng các kiểu Nối khác nhau trên hai DataFrames và Datasets bằng cách sử dụng các ví dụ Scala. Vui lòng truy cập Tham gia trên Nhiều DataFrames nếu bạn muốn tham gia nhiều hơn hai DataFrames. 

1. Các kiểu & cú pháp nối SQL

Dưới đây là danh sách tất cả các kiểu và cú pháp kết nối Spark SQL.

1) join(right: Dataset[_]): DataFrame
2) join(right: Dataset[_], usingColumn: String): DataFrame
3) join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
4) join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame
5) join(right: Dataset[_], joinExprs: Column): DataFrame
6) join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame

Phần còn lại của hướng dẫn giải thích Các kiểu nối bằng cú pháp 6, trong đó nhận các đối số ngay tham gia DataFrame, biểu thức nối và kiểu nối trong Chuỗi.

 

Đối với Cú pháp 4 & 5, bạn có thể sử dụng “JoinType” hoặc “Join String” được xác định trên bảng trên cho đối số chuỗi “joinType”. Khi bạn sử dụng “JoinType”, bạn nên import org.apache.spark.sql.catalyst.plans._vì gói này xác định các đối tượng JoinType.

JOINTYPE THAM GIA CHUỖI THAM GIA SQL TƯƠNG ĐƯƠNG
Inner.sql bên trong THAM GIA INNER
FullOuter.sql bên ngoài, đầy đủ, toàn bộ, đầy đủ THAM GIA NGOÀI TRỜI ĐẦY ĐỦ
LeftOuter.sql left, leftouter, left_outer CHỖ NỐI BÊN TRÁI
RightOuter.sql right, rightouter, right_outer THAM GIA QUYỀN
Cross.sql vượt qua  
LeftAnti.sql anti, leftanti, left_anti  
LeftSemi.sql semi, leftsemi, left_semi  

Tất cả các đối tượng Join được định nghĩa tại lớp joinTypes , Để sử dụng chúng, bạn cần nhập org.apache.spark.sql.catalyst.plans.{LeftOuter,Inner,....}.

Trước khi chúng ta chuyển sang các ví dụ về Spark SQL Join, trước tiên, hãy tạo một empvà dept DataFrame’s . ở đây, cột emp_idlà duy nhất trên emp và dept_idlà duy nhất trên tập dữ liệu dept và emp_dept_id từ emp có tham chiếu đến dept_id trên tập dữ liệu dept.

  val emp = Seq((1,"Smith",-1,"2018","10","M",3000),
    (2,"Rose",1,"2010","20","M",4000),
    (3,"Williams",1,"2010","10","M",1000),
    (4,"Jones",2,"2005","10","F",2000),
    (5,"Brown",2,"2010","40","",-1),
      (6,"Brown",2,"2010","50","",-1)
  )
  val empColumns = Seq("emp_id","name","superior_emp_id","year_joined",
       "emp_dept_id","gender","salary")
  import spark.sqlContext.implicits._
  val empDF = emp.toDF(empColumns:_*)
  empDF.show(false)

  val dept = Seq(("Finance",10),
    ("Marketing",20),
    ("Sales",30),
    ("IT",40)
  )

  val deptColumns = Seq("dept_name","dept_id")
  val deptDF = dept.toDF(deptColumns:_*)
  deptDF.show(false)

Điều này in "emp" và "ghi nợ" DataFrame vào bảng điều khiển.

Emp Dataset
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+

Dept Dataset
+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+
2. Tham gia bên trong

Spark Innerjoin là phép nối mặc định và nó chủ yếu được sử dụng, Nó được dùng để nối hai DataFrames / Datasets trên các cột chính và nơi các khóa không khớp với các hàng sẽ bị loại bỏ khỏi cả hai bộ dữ liệu ( empdept).

 
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"inner")
    .show(false)

Khi chúng tôi áp dụng tham gia bên trong trên tập dữ liệu của mình, nó giảm “ emp_dept_id” 50 từ “ emp” và “ dept_id” 30 từ depttập dữ liệu “”. Dưới đây là kết quả của biểu thức Join ở trên.

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
3. Tham gia đầy đủ bên ngoài

Outeraka fullfulloutertham gia trả về tất cả các hàng từ cả Spark DataFrame / Datasets, nơi tham gia biểu không phù hợp nó sẽ trả về null trên cột kỷ lục tương ứng.

  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"outer")
    .show(false)
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"full")
    .show(false)
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"fullouter")
    .show(false)

Từ “ emp” của tập dữ liệu “ ” của chúng tôi emp_dept_idvới giá trị 50 không có bản ghi trên “ dept” do đó các cột ghi nợ có giá trị null và “ dept_id” 30 không có bản ghi trong “ emp” do đó bạn thấy null trên các cột trống. Dưới đây là kết quả của biểu thức Join ở trên.

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
4. Tham gia bên ngoài bên trái

Spark Leftaka Left Outerjoin trả về tất cả các hàng từ DataFrame / Dataset bên trái bất kể kết quả khớp nào được tìm thấy trên tập dữ liệu bên phải khi biểu thức kết nối không khớp, nó chỉ định null cho bản ghi đó và loại bỏ các bản ghi từ bên phải khi không tìm thấy kết quả khớp.

  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"left")
    .show(false)
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"leftouter")
    .show(false)

Từ tập dữ liệu của chúng tôi, “ emp_dept_id” 5o không có bản ghi trên “ dept” tập dữ liệu, do đó, bản ghi này chứa rỗng trên deptcác cột “” (dept_name & dept_id). và “ dept_id” 30 từ depttập dữ liệu “” bị loại khỏi kết quả. Dưới đây là kết quả của biểu thức Join ở trên.

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

 

5. Tham gia bên ngoài bên phải

Spark Rightaka Right Outertham gia đối lập với lefttham gia, ở đây nó trả về tất cả các hàng từ DataFrame / Dataset bên phải bất kể toán học được tìm thấy trên tập dữ liệu bên trái, khi biểu thức tham gia không khớp, nó sẽ chỉ định null cho bản ghi đó và loại bỏ các bản ghi từ bên trái khi không khớp tìm.

  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"right")
   .show(false)
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"rightouter")
   .show(false)

Từ ví dụ của chúng tôi, tập dữ liệu bên phải “ dept_id” 30 không có trong tập dữ liệu bên trái “ emp” do đó, bản ghi này chứa rỗng trên empcác cột “”. và “ emp_dept_id” 50 giảm xuống do không tìm thấy kết quả phù hợp ở bên trái. Dưới đây là kết quả của biểu thức Join ở trên.

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
6. Tham gia bán bên trái

Spark Left Semijoin tương tự như innersự khác biệt của phép nối là phép leftseminối trả về tất cả các cột từ DataFrame / Dataset bên trái và bỏ qua tất cả các cột từ tập dữ liệu bên phải. Nói cách khác, phép nối này trả về các cột từ tập dữ liệu bên trái duy nhất cho các bản ghi khớp trong tập dữ liệu bên phải trên biểu thức nối, các bản ghi không khớp với biểu thức nối sẽ bị bỏ qua khỏi cả tập dữ liệu bên trái và bên phải.

Kết quả tương tự có thể đạt được bằng cách sử dụng lựa chọn trên kết quả của phép nối bên trong, tuy nhiên, việc sử dụng phép nối này sẽ hiệu quả.

  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"leftsemi")
    .show(false)

Dưới đây là kết quả của biểu thức nối ở trên.

leftsemi join
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+
7. Chống tham gia trái

eft Antijoin hoàn toàn ngược lại với Spark leftsemijoin, leftantijoin chỉ trả về các cột từ DataFrame / Dataset bên trái cho các bản ghi không khớp.

  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"leftanti")
    .show(false)

Sản lượng thấp hơn sản lượng

+------+-----+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|6     |Brown|2              |2010       |50         |      |-1    |
+------+-----+---------------+-----------+-----------+------+------+
8. Tự tham gia

Spark Joins sẽ không hoàn chỉnh nếu không có self join, Mặc dù không có sẵn loại self-join, nhưng chúng tôi có thể sử dụng bất kỳ loại liên kết nào được giải thích ở trên để tham gia DataFrame với chính nó. ví dụ dưới đây sử dụng innertự tham gia

  empDF.as("emp1").join(empDF.as("emp2"),
    col("emp1.superior_emp_id") === col("emp2.emp_id"),"inner")
    .select(col("emp1.emp_id"),col("emp1.name"),
      col("emp2.emp_id").as("superior_emp_id"),
      col("emp2.name").as("superior_emp_name"))
      .show(false)

Ở đây, chúng tôi đang tham gia emptập dữ liệu với chính nó để tìm ra cấp trên emp_idvà namecho tất cả nhân viên.

+------+--------+---------------+-----------------+
|emp_id|name    |superior_emp_id|superior_emp_name|
+------+--------+---------------+-----------------+
|2     |Rose    |1              |Smith            |
|3     |Williams|1              |Smith            |
|4     |Jones   |2              |Rose             |
|5     |Brown   |2              |Rose             |
|6     |Brown   |2              |Rose             |
+------+--------+---------------+-----------------+
9. Sử dụng biểu thức SQL

Vì Spark SQL hỗ trợ cú pháp SQL gốc, chúng ta cũng có thể viết các thao tác nối sau khi tạo bảng tạm thời trên DataFrame's và sử dụng spark.sql()

  empDF.createOrReplaceTempView("EMP")
  deptDF.createOrReplaceTempView("DEPT")
//SQL JOIN
  val joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id")
  joinDF.show(false)

  val joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id")
  joinDF2.show(false)
10. Mã nguồn | Ví dụ Scala
package com.sparkbyexamples.spark.dataframe.join

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
object JoinExample extends App {

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  val emp = Seq((1,"Smith",-1,"2018","10","M",3000),
    (2,"Rose",1,"2010","20","M",4000),
    (3,"Williams",1,"2010","10","M",1000),
    (4,"Jones",2,"2005","10","F",2000),
    (5,"Brown",2,"2010","40","",-1),
      (6,"Brown",2,"2010","50","",-1)
  )
  val empColumns = Seq("emp_id","name","superior_emp_id","year_joined","emp_dept_id","gender","salary")
  import spark.sqlContext.implicits._
  val empDF = emp.toDF(empColumns:_*)
  empDF.show(false)

  val dept = Seq(("Finance",10),
    ("Marketing",20),
    ("Sales",30),
    ("IT",40)
  )

  val deptColumns = Seq("dept_name","dept_id")
  val deptDF = dept.toDF(deptColumns:_*)
  deptDF.show(false)


  println("Inner join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"inner")
    .show(false)

  println("Outer join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"outer")
    .show(false)
  println("full join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"full")
    .show(false)
  println("fullouter join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"fullouter")
    .show(false)

  println("right join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"right")
    .show(false)
  println("rightouter join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"rightouter")
    .show(false)

  println("left join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"left")
    .show(false)
  println("leftouter join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"leftouter")
    .show(false)

  println("leftanti join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"leftanti")
    .show(false)

  println("leftsemi join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"leftsemi")
    .show(false)

  println("cross join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"cross")
    .show(false)

  println("Using crossJoin()")
  empDF.crossJoin(deptDF).show(false)

  println("self join")
  empDF.as("emp1").join(empDF.as("emp2"),
    col("emp1.superior_emp_id") === col("emp2.emp_id"),"inner")
    .select(col("emp1.emp_id"),col("emp1.name"),
      col("emp2.emp_id").as("superior_emp_id"),
      col("emp2.name").as("superior_emp_name"))
      .show(false)

  empDF.createOrReplaceTempView("EMP")
  deptDF.createOrReplaceTempView("DEPT")

  //SQL JOIN
  val joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id")
  joinDF.show(false)

  val joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id")
  joinDF2.show(false)

}

Các ví dụ được giải thích ở đây có sẵn tại dự án GitHub để tham khảo.

Phần kết luận

Trong hướng dẫn này, bạn đã học Spark SQL Tham gia các loại INNERLEFT OUTERRIGHT OUTERLEFT ANTILEFT SEMICROSSSELFtham gia sử dụng, và các ví dụ với Scala.

Người giới thiệu:
bigdata 2020/11/23 9:37

Để lại dấu chân

Bước trên một chân

Bình luận

copyright © bigdata 2010-2020
Processed in 0 seconds, 0 queries