Hỏi đáp về IT
Mã xác nhận Thay đổi một
Ngô Quang Hải quanghaisoft@yahoo.com Bigdata engineering

Hướng dẫn lập trình -Spark RDD

Duyệt qua: 125
Hướng dẫn lập trình RDD  Tổng quat

Ở cấp độ cao, mọi ứng dụng Spark bao gồm một chương trình điều khiển chạy mainchức năng của người dùng và thực hiện các hoạt động song song khác nhau trên một cụm. Tính trừu tượng chính mà Spark cung cấp là tập dữ liệu phân tán có khả năng phục hồi (RDD), là tập hợp các phần tử được phân vùng trên các nút của cụm có thể hoạt động song song. RDD được tạo bằng cách bắt đầu bằng một tệp trong hệ thống tệp Hadoop (hoặc bất kỳ hệ thống tệp nào khác được Hadoop hỗ trợ) hoặc một bộ sưu tập Scala hiện có trong chương trình trình điều khiển và chuyển đổi nó. Người dùng cũng có thể yêu cầu Spark duy trì một RDD trong bộ nhớ, cho phép nó được sử dụng lại một cách hiệu quả trong các hoạt động song song. Cuối cùng, các RDD tự động phục hồi sau các lỗi của nút.

Sự trừu tượng thứ hai trong Spark là các biến được chia sẻ có thể được sử dụng trong các hoạt động song song. Theo mặc định, khi Spark chạy song song một hàm dưới dạng một tập hợp các tác vụ trên các nút khác nhau, nó sẽ gửi một bản sao của từng biến được sử dụng trong hàm cho mỗi tác vụ. Đôi khi, một biến cần được chia sẻ giữa các tác vụ hoặc giữa các tác vụ và chương trình điều khiển. Spark hỗ trợ hai loại biến chia sẻ: biến quảng bá , có thể được sử dụng để lưu trữ một giá trị trong bộ nhớ trên tất cả các nút và bộ tích lũy , là những biến chỉ được “thêm vào”, chẳng hạn như bộ đếm và tổng.

Hướng dẫn này hiển thị từng tính năng này trong từng ngôn ngữ được hỗ trợ của Spark. Nó là dễ dàng nhất để làm theo cùng với nếu bạn khởi chạy trình bao tương tác của Spark - bin/spark-shellđối với trình bao Scala hoặc bin/pysparkđối với trình bao Python.

Liên kết với Spark
  • Scala
  • Java
  • Python
  • Spark 3.0.0-preview được xây dựng và phân phối để hoạt động với Scala 2.12 theo mặc định. (Spark cũng có thể được xây dựng để hoạt động với các phiên bản khác của Scala.) Để viết ứng dụng trong Scala, bạn sẽ cần sử dụng phiên bản Scala tương thích (ví dụ: 2.12.X).

    Để viết một ứng dụng Spark, bạn cần thêm phần phụ thuộc Maven vào Spark. Spark có sẵn thông qua Maven Central tại:

    groupId = org.apache.spark
    artifactId = spark-core_2.12
    version = 3.0.0-preview
    

    Ngoài ra, nếu bạn muốn truy cập vào một cụm HDFS, bạn cần thêm phần phụ thuộc vào hadoop-clientcho phiên bản HDFS của mình.

    groupId = org.apache.hadoop
    artifactId = hadoop-client
    version = <your-hdfs-version>
    

    Cuối cùng, bạn cần nhập một số lớp Spark vào chương trình của mình. Thêm các dòng sau:

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf

    (Trước Spark 1.3.0, bạn cần import org.apache.spark.SparkContext._bật rõ ràng các chuyển đổi ngầm định cần thiết.)

    Khởi tạo Spark
  • Scala
  • Java
  • Python
  • Điều đầu tiên mà chương trình Spark phải làm là tạo một đối tượng SparkContext , đối tượng này cho Spark biết cách truy cập một cụm. Để tạo một, SparkContexttrước tiên bạn cần xây dựng một đối tượng SparkConf chứa thông tin về ứng dụng của bạn.

    Chỉ một SparkContext nên hoạt động trên mỗi JVM. Bạn phải stop()SparkContext đang hoạt động trước khi tạo một cái mới.

    val conf = new SparkConf().setAppName(appName).setMaster(master)
    new SparkContext(conf)

    Các appNametham số là một tên cho ứng dụng của bạn để hiển thị trên giao diện người dùng cluster. masterlà URL cụm Spark, Mesos hoặc YARN hoặc một chuỗi "cục bộ" đặc biệt để chạy ở chế độ cục bộ. Trong thực tế, khi chạy trên một cụm, bạn sẽ không muốn mã hóa cứng mastertrong chương trình mà phải khởi chạy ứng dụng vớispark-submit và nhận nó ở đó. Tuy nhiên, đối với thử nghiệm cục bộ và thử nghiệm đơn vị, bạn có thể vượt qua “cục bộ” để chạy Spark trong quá trình.

    Sử dụng Shell
  • Scala
  • Python
  • Trong Spark shell, một SparkContext nhận biết thông dịch viên đặc biệt đã được tạo cho bạn, trong biến được gọi sc. Tạo SparkContext của riêng bạn sẽ không hoạt động. Bạn có thể đặt cái chủ ngữ cảnh nào kết nối với việc sử dụng --masterđối số và bạn có thể thêm các JAR vào classpath bằng cách chuyển một danh sách được phân tách bằng dấu phẩy vào --jarsđối số. Bạn cũng có thể thêm các phần phụ thuộc (ví dụ: Gói Spark) vào phiên shell của mình bằng cách cung cấp danh sách tọa độ Maven được phân tách bằng dấu phẩy cho --packagesđối số. Bất kỳ kho lưu trữ bổ sung nào có thể tồn tại các phụ thuộc (ví dụ: Sonatype) đều có thể được chuyển cho --repositoriesđối số. Ví dụ: để chạy bin/spark-shelltrên chính xác bốn lõi, hãy sử dụng:

    $ ./bin/spark-shell --master local[4]

    Hoặc, để thêm code.jarvào classpath của nó, hãy sử dụng:

    $ ./bin/spark-shell --master local[4] --jars code.jar

    Để bao gồm một phần phụ thuộc bằng cách sử dụng tọa độ Maven:

    $ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

    Để có danh sách đầy đủ các tùy chọn, hãy chạy spark-shell --help. Phía sau hậu trường, spark-shellgọi ra spark-submitkịch bản chung hơn .

    Tập dữ liệu phân tán có khả năng phục hồi (RDD)

    Spark xoay quanh khái niệm về tập dữ liệu phân tán có khả năng phục hồi (RDD), là một tập hợp các phần tử có khả năng chịu lỗi và có thể hoạt động song song. Có hai cách để tạo RDD: song song một tập hợp hiện có trong chương trình trình điều khiển của bạn hoặc tham chiếu tập dữ liệu trong hệ thống lưu trữ bên ngoài, chẳng hạn như hệ thống tệp được chia sẻ, HDFS, HBase hoặc bất kỳ nguồn dữ liệu nào cung cấp Hadoop InputFormat.

    Bộ sưu tập song song
  • Scala
  • Java
  • Python
  • Bộ sưu tập song song được tạo ra bằng cách gọi SparkContextcủa parallelizephương pháp trên một bộ sưu tập hiện có trong chương trình lái xe của bạn (một Scala Seq). Các phần tử của bộ sưu tập được sao chép để tạo thành một tập dữ liệu phân tán có thể hoạt động song song. Ví dụ: đây là cách tạo một tập hợp song song chứa các số từ 1 đến 5:

    val data = Array(1, 2, 3, 4, 5)
    val distData = sc.parallelize(data)

    Sau khi được tạo, tập dữ liệu phân tán ( distData) có thể được vận hành song song. Ví dụ, chúng ta có thể gọi distData.reduce((a, b) => a + b)để cộng các phần tử của mảng. Chúng tôi mô tả các hoạt động trên tập dữ liệu phân tán sau này.

    Một tham số quan trọng đối với các tập hợp song song là số lượng phân vùng để cắt tập dữ liệu vào. Spark sẽ chạy một tác vụ cho mỗi phân vùng của cụm. Thông thường, bạn muốn 2-4 phân vùng cho mỗi CPU trong cụm của mình. Thông thường, Spark cố gắng đặt số lượng phân vùng tự động dựa trên cụm của bạn. Tuy nhiên, bạn cũng có thể đặt nó theo cách thủ công bằng cách chuyển nó dưới dạng tham số thứ hai cho parallelize(ví dụ sc.parallelize(data, 10)). Lưu ý: một số nơi trong mã sử dụng thuật ngữ lát (một từ đồng nghĩa với phân vùng) để duy trì khả năng tương thích ngược.

    Bộ dữ liệu bên ngoài
  • Scala
  • Java
  • Python
  • Spark có thể tạo tập dữ liệu phân tán từ bất kỳ nguồn lưu trữ nào được Hadoop hỗ trợ, bao gồm hệ thống tệp cục bộ của bạn, HDFS, Cassandra, HBase, Amazon S3 , v.v. Spark hỗ trợ tệp văn bản, SequenceFiles và bất kỳ Hadoop InputFormat nào khác .

    RDDs tập tin văn bản có thể được tạo ra sử dụng SparkContextcủa textFilephương pháp. Phương pháp này có một URI cho tập tin (hoặc một con đường cục bộ trên máy, hoặc một hdfs://s3a://, vv URI) và đọc nó như là một tập hợp các dòng. Đây là một lời gọi ví dụ:

    scala> val distFile = sc.textFile("data.txt")
    distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26

    Sau khi được tạo, distFilecó thể được thực hiện bằng các hoạt động của tập dữ liệu. Ví dụ, chúng ta có thể thêm lên các kích thước của tất cả các dòng bằng cách sử dụng mapvà reducehoạt động như sau: distFile.map(s => s.length).reduce((a, b) => a + b).

    Một số lưu ý khi đọc tệp với Spark:

    • Nếu sử dụng một đường dẫn trên hệ thống tệp cục bộ, tệp cũng phải có thể truy cập được tại cùng một đường dẫn trên các nút công nhân. Sao chép tệp cho tất cả công nhân hoặc sử dụng hệ thống tệp chia sẻ được gắn kết trên mạng.

    • Tất cả các phương thức nhập dựa trên tệp của Spark, bao gồm textFile, hỗ trợ chạy trên thư mục, tệp nén và cả ký tự đại diện. Ví dụ, bạn có thể sử dụng textFile("/my/directory")textFile("/my/directory/*.txt")và textFile("/my/directory/*.gz").

    • Các textFilephương pháp cũng có một đối số tùy chọn thứ hai để kiểm soát số lượng các phân vùng của tập tin. Theo mặc định, Spark tạo một phân vùng cho mỗi khối của tệp (các khối là 128MB theo mặc định trong HDFS), nhưng bạn cũng có thể yêu cầu số lượng phân vùng cao hơn bằng cách chuyển một giá trị lớn hơn. Lưu ý rằng bạn không thể có ít phân vùng hơn khối.

    Ngoài các tệp văn bản, API Scala của Spark cũng hỗ trợ một số định dạng dữ liệu khác:

    • SparkContext.wholeTextFilescho phép bạn đọc một thư mục chứa nhiều tệp văn bản nhỏ và trả về mỗi tệp dưới dạng cặp (tên tệp, nội dung). Điều này trái ngược với textFile, sẽ trả về một bản ghi trên mỗi dòng trong mỗi tệp. Phân vùng được xác định bởi vị trí dữ liệu, trong một số trường hợp, có thể dẫn đến quá ít phân vùng. Đối với những trường hợp đó, wholeTextFilescung cấp đối số thứ hai tùy chọn để kiểm soát số lượng phân vùng tối thiểu.

    • Đối với SequenceFiles , hãy sử dụng sequenceFile[K, V]phương thức của SparkContext nơi Kvà Vlà các loại khóa và giá trị trong tệp. Đây phải là các lớp con của giao diện có thể ghi được của Hadoop , chẳng hạn như có thể ghi được và văn bản . Ngoài ra, Spark cho phép bạn chỉ định các kiểu gốc cho một vài Writables phổ biến; ví dụ, sequenceFile[Int, String]sẽ tự động đọc IntWritables và Texts.

    • Đối với các Định dạng đầu vào Hadoop khác, bạn có thể sử dụng SparkContext.hadoopRDDphương thức này, lấy một JobConflớp định dạng đầu vào và tùy ý , lớp khóa và lớp giá trị. Đặt những điều này giống như cách bạn làm cho công việc Hadoop với nguồn đầu vào của bạn. Bạn cũng có thể sử dụng SparkContext.newAPIHadoopRDDcho InputFormats dựa trên API MapReduce “mới” ( org.apache.hadoop.mapreduce).

    • RDD.saveAsObjectFilevà SparkContext.objectFilehỗ trợ lưu RDD ở định dạng đơn giản bao gồm các đối tượng Java được tuần tự hóa. Mặc dù điều này không hiệu quả bằng các định dạng chuyên biệt như Avro, nhưng nó cung cấp một cách dễ dàng để lưu bất kỳ RDD nào.

    Hoạt động RDD

    RDD hỗ trợ hai loại hoạt động: biến đổi , tạo ra một tập dữ liệu mới từ một tập dữ liệu hiện có và các hành động , trả về một giá trị cho chương trình trình điều khiển sau khi chạy một tính toán trên tập dữ liệu. Ví dụ, maplà một phép chuyển đổi chuyển từng phần tử tập dữ liệu qua một hàm và trả về một RDD mới đại diện cho kết quả. Mặt khác, reducelà một hành động tổng hợp tất cả các phần tử của RDD bằng cách sử dụng một số hàm và trả về kết quả cuối cùng cho chương trình điều khiển (mặc dù cũng có một song song reduceByKeytrả về một tập dữ liệu phân tán).

    Tất cả các phép biến đổi trong Spark đều lười biếng , ở chỗ chúng không tính toán ngay kết quả của chúng. Thay vào đó, họ chỉ nhớ các phép biến đổi được áp dụng cho một số tập dữ liệu cơ sở (ví dụ: một tệp). Các phép biến đổi chỉ được tính toán khi một hành động yêu cầu trả về kết quả cho chương trình điều khiển. Thiết kế này giúp Spark chạy hiệu quả hơn. Ví dụ, chúng ta có thể nhận ra rằng một tập dữ liệu được tạo thông qua mapsẽ được sử dụng trong một reducevà chỉ trả về kết quả của reducetrình điều khiển, thay vì tập dữ liệu được ánh xạ lớn hơn.

    Theo mặc định, mỗi RDD đã chuyển đổi có thể được tính toán lại mỗi khi bạn chạy một hành động trên đó. Tuy nhiên, bạn cũng có thể duy trì một RDD trong bộ nhớ bằng cách sử dụng phương thức persist(hoặc cache), trong trường hợp này Spark sẽ giữ các phần tử xung quanh trên cụm để truy cập nhanh hơn nhiều vào lần tiếp theo bạn truy vấn nó. Ngoài ra còn có hỗ trợ cho các RDD lâu dài trên đĩa hoặc được sao chép qua nhiều nút.

    Khái niệm cơ bản
  • Scala
  • Java
  • Python
  • Để minh họa những điều cơ bản về RDD, hãy xem xét chương trình đơn giản dưới đây:

    val lines = sc.textFile("data.txt")
    val lineLengths = lines.map(s => s.length)
    val totalLength = lineLengths.reduce((a, b) => a + b)

    Dòng đầu tiên xác định một RDD cơ sở từ một tệp bên ngoài. Tập dữ liệu này không được tải trong bộ nhớ hoặc không được tác động vào: lineschỉ là một con trỏ đến tệp. Dòng thứ hai xác định lineLengthslà kết quả của một mapphép biến đổi. Một lần nữa, lineLengths được không ngay lập tức tính toán, do sự lười biếng. Cuối cùng, chúng tôi chạy reduce, đó là một hành động. Tại thời điểm này, Spark chia nhỏ tính toán thành các tác vụ để chạy trên các máy riêng biệt và mỗi máy chạy cả phần bản đồ và phần giảm cục bộ, chỉ trả lại câu trả lời cho chương trình trình điều khiển.

    Nếu sau này chúng tôi cũng muốn sử dụng lineLengthslại, chúng tôi có thể thêm:

    lineLengths.persist()

    trước reduce, sẽ lineLengthsđược lưu vào bộ nhớ sau lần đầu tiên nó được tính toán.

    Chuyển các chức năng sang Spark
  • Scala
  • Java
  • Python
  • API của Spark chủ yếu dựa vào việc chuyển các chức năng trong chương trình trình điều khiển để chạy trên cụm. Có hai cách được đề xuất để làm điều này:

    • Cú pháp hàm ẩn danh , có thể được sử dụng cho các đoạn mã ngắn.
    • Các phương thức tĩnh trong một đối tượng singleton toàn cục. Ví dụ: bạn có thể xác định object MyFunctionsvà sau đó chuyển MyFunctions.func1, như sau:
    object MyFunctions {
      def func1(s: String): String = { ... }
    }
    
    myRdd.map(MyFunctions.func1)

    Lưu ý rằng mặc dù cũng có thể chuyển một tham chiếu đến một phương thức trong một cá thể lớp (trái ngược với một đối tượng singleton), điều này yêu cầu gửi đối tượng chứa lớp đó cùng với phương thức. Ví dụ, hãy xem xét:

    class MyClass {
      def func1(s: String): String = { ... }
      def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
    }

    Ở đây, nếu chúng ta tạo ra một mới MyClassdụ và gọi doStuffvào nó, các mapbên có tham chiếu đến func1phương pháp đó MyClassdụ , vì vậy toàn bộ đối tượng cần phải được gửi đến các cluster. Nó cũng tương tự như viết rdd.map(x => this.func1(x)).

    Theo cách tương tự, việc truy cập các trường của đối tượng bên ngoài sẽ tham chiếu đến toàn bộ đối tượng:

    class MyClass {
      val field = "Hello"
      def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
    }

    tương đương với văn bản rdd.map(x => this.field + x), tham chiếu đến tất cả this. Để tránh vấn đề này, cách đơn giản nhất là sao chép fieldvào một biến cục bộ thay vì truy cập nó từ bên ngoài:

    def doStuff(rdd: RDD[String]): RDD[String] = {
      val field_ = this.field
      rdd.map(x => field_ + x)
    }
    Tìm hiểu về việc đóng cửa 

    Một trong những điều khó hơn về Spark là hiểu phạm vi và vòng đời của các biến và phương thức khi thực thi mã trên một cụm. Các hoạt động RDD sửa đổi các biến bên ngoài phạm vi của chúng có thể là một nguồn thường xuyên gây nhầm lẫn. Trong ví dụ dưới đây, chúng ta sẽ xem xét mã sử dụng foreach()để tăng bộ đếm, nhưng các vấn đề tương tự cũng có thể xảy ra cho các hoạt động khác.

    Thí dụ

    Hãy xem xét tổng phần tử RDD ngây thơ bên dưới, có thể hoạt động khác nhau tùy thuộc vào việc thực thi có diễn ra trong cùng một JVM hay không. Một ví dụ phổ biến về điều này là khi chạy Spark ở localchế độ ( --master = local[n]) so với triển khai ứng dụng Spark cho một cụm (ví dụ: thông qua spark-submit tới YARN):

  • Scala
  • Java
  • Python
  • var counter = 0
    var rdd = sc.parallelize(data)
    
    // Wrong: Don't do this!!
    rdd.foreach(x => counter += x)
    
    println("Counter value: " + counter)
    Chế độ cục bộ so với cụm

    Hành vi của mã trên là không xác định và có thể không hoạt động như dự kiến. Để thực thi các công việc, Spark chia nhỏ quá trình xử lý các hoạt động RDD thành các tác vụ, mỗi tác vụ được thực thi bởi một người thực thi. Trước khi thực hiện, Spark tính toán quá trình đóng của nhiệm vụ . Đóng là các biến và phương thức phải hiển thị cho người thực thi để thực hiện các tính toán của nó trên RDD (trong trường hợp này foreach()). Việc đóng này được tuần tự hóa và gửi đến từng người thực thi.

    Các biến trong bao đóng được gửi đến mỗi trình thực thi bây giờ là bản sao và do đó, khi bộ đếm được tham chiếu trong foreachhàm, nó không còn là bộ đếm trên nút trình điều khiển. Vẫn còn một bộ đếm trong bộ nhớ của nút trình điều khiển nhưng điều này không còn hiển thị với những người thực thi! Những người thực thi chỉ nhìn thấy bản sao từ việc đóng tuần tự. Do đó, giá trị cuối cùng của bộ đếm sẽ vẫn bằng 0 vì tất cả các hoạt động trên bộ đếm tham chiếu đến giá trị trong lần đóng tuần tự.

    Ở chế độ cục bộ, trong một số trường hợp, foreachchức năng sẽ thực sự thực thi trong cùng một JVM với trình điều khiển và sẽ tham chiếu cùng một bộ đếm ban đầu và có thể thực sự cập nhật nó.

    Để đảm bảo hành vi được xác định rõ ràng trong các loại tình huống này, người ta nên sử dụng một Accumulator. Bộ tích lũy trong Spark được sử dụng đặc biệt để cung cấp cơ chế cập nhật biến một cách an toàn khi việc thực thi được phân chia giữa các nút công nhân trong một cụm. Phần Bộ tích lũy của hướng dẫn này thảo luận chi tiết hơn về chúng.

    Nói chung, các bao đóng - các cấu trúc giống như các vòng lặp hoặc các phương thức được xác định cục bộ, không nên được sử dụng để thay đổi một số trạng thái toàn cục. Spark không xác định hoặc đảm bảo hành vi của các đột biến đối với các đối tượng được tham chiếu từ bên ngoài các bao đóng. Một số mã thực hiện điều này có thể hoạt động ở chế độ cục bộ, nhưng đó chỉ là tình cờ và mã như vậy sẽ không hoạt động như mong đợi trong chế độ phân tán. Thay vào đó, hãy sử dụng Bộ tích lũy nếu cần một số tổng hợp toàn cục.

    In các phần tử của RDD

    Một thành ngữ phổ biến khác là cố gắng in ra các phần tử của RDD bằng cách sử dụng rdd.foreach(println)hoặc rdd.map(println). Trên một máy duy nhất, điều này sẽ tạo ra đầu ra mong đợi và in tất cả các phần tử của RDD. Tuy nhiên, trong clusterchế độ, đầu ra để stdoutđược gọi bởi những người thực thi hiện đang viết cho người thực thi stdoutthay vì ghi vào trình điều khiển, vì vậy stdouttrình điều khiển sẽ không hiển thị những điều này! Để in tất cả các yếu tố trên người lái xe, người ta có thể sử dụng collect()phương pháp đầu tiên mang RDD đến nút điều khiển như sau: rdd.collect().foreach(println). Tuy nhiên, điều này có thể làm cho trình điều khiển hết bộ nhớ vì collect()tìm nạp toàn bộ RDD vào một máy duy nhất; nếu bạn chỉ cần in một vài yếu tố của RDD, một cách tiếp cận an toàn hơn là sử dụng take()rdd.take(100).foreach(println).

    Làm việc với các cặp khóa-giá trị
  • Scala
  • Java
  • Python
  • Trong khi hầu hết các hoạt động của Spark hoạt động trên RDD có chứa bất kỳ loại đối tượng nào, một vài hoạt động đặc biệt chỉ khả dụng trên RDD của các cặp khóa-giá trị. Các thao tác phổ biến nhất là các hoạt động “xáo trộn” được phân phối, chẳng hạn như nhóm hoặc tổng hợp các phần tử bằng một khóa.

    Trong Scala, các thao tác này tự động có sẵn trên các RDD chứa các đối tượng Tuple2 (các bộ tích hợp sẵn trong ngôn ngữ, được tạo bằng cách viết đơn giản (a, b)). Các hoạt động cặp khóa-giá trị có sẵn trong lớp PairRDDFunctions , lớp này tự động bao quanh một RDD gồm các bộ giá trị.

    Ví dụ: đoạn mã sau sử dụng reduceByKeythao tác trên các cặp khóa-giá trị để đếm số lần mỗi dòng văn bản xuất hiện trong một tệp:

    val lines = sc.textFile("data.txt")
    val pairs = lines.map(s => (s, 1))
    val counts = pairs.reduceByKey((a, b) => a + b)

    counts.sortByKey()Ví dụ, chúng tôi cũng có thể sử dụng để sắp xếp các cặp theo thứ tự bảng chữ cái, và cuối cùng counts.collect()để đưa chúng trở lại chương trình điều khiển dưới dạng một mảng các đối tượng.

    Lưu ý: khi sử dụng đối tượng tùy chỉnh làm khóa trong các phép toán cặp khóa-giá trị, bạn phải chắc chắn rằng một equals()phương thức tùy chỉnh được đi kèm với một hashCode()phương thức so khớp . Để biết đầy đủ chi tiết, hãy xem hợp đồng được nêu trong tài liệu Object.hashCode () .

    Sự biến đổi

    Bảng sau liệt kê một số phép biến đổi phổ biến được Spark hỗ trợ. Tham khảo tài liệu API RDD ( Scala , Java , Python , R ) và cặp tài liệu hàm RDD ( Scala , Java ) để biết chi tiết.

    Chuyển đổi Ý nghĩa
    bản đồ ( func ) Quay trở lại một tập dữ liệu phân bố mới hình thành bằng cách đi qua từng phần tử của nguồn thông qua một hàm func .
    bộ lọc ( func ) Trả về một tập dữ liệu mới được hình thành bằng cách chọn các phần tử của nguồn mà func trả về true.
    flatMap ( func ) Tương tự như bản đồ, nhưng mỗi mục đầu vào có thể được ánh xạ tới 0 hoặc nhiều mục đầu ra (vì vậy func nên trả về một Seq thay vì một mục duy nhất).
    mapPartitions ( func ) Tương tự như bản đồ, nhưng chạy riêng biệt trên từng phân vùng (khối) của RDD, vì vậy func phải là kiểu Iterator <T> => Iterator <U> khi chạy trên RDD kiểu T.
    mapPartitionsWithIndex ( func ) Tương tự như mapPartitions, nhưng cũng cung cấp func với một giá trị nguyên đại diện cho chỉ mục của phân vùng, vì vậy func phải có kiểu (Int, Iterator <T>) => Iterator <U> khi chạy trên RDD kiểu T.
    mẫu (với Thay thế , phân đoạn , hạt giống ) Lấy mẫu một phần nhỏ của dữ liệu, có hoặc không thay thế, bằng cách sử dụng một hạt tạo số ngẫu nhiên nhất định.
    union ( otherDataset ) Trả về một tập dữ liệu mới có chứa sự kết hợp của các phần tử trong tập dữ liệu nguồn và đối số.
    giao lộ ( otherDataset ) Trả về một RDD mới có chứa giao điểm của các phần tử trong tập dữ liệu nguồn và đối số.
    khác biệt ([ numPartitions ])) Trả về tập dữ liệu mới có chứa các phần tử riêng biệt của tập dữ liệu nguồn.
    groupByKey ([ numPartitions ]) Khi được gọi trên một tập dữ liệu gồm các cặp (K, V), trả về một tập dữ liệu gồm các cặp (K, Iterable <V>).
    Lưu ý: Nếu bạn đang nhóm để thực hiện tổng hợp (chẳng hạn như tổng hoặc trung bình) trên mỗi khóa, thì việc sử dụng reduceByKeyhoặc aggregateByKeysẽ mang lại hiệu suất tốt hơn nhiều.
    Lưu ý: Theo mặc định, mức độ song song trong đầu ra phụ thuộc vào số lượng phân vùng của RDD mẹ. Bạn có thể chuyển một numPartitionsđối số tùy chọn để đặt một số tác vụ khác nhau.
    ReduceByKey ( func , [ numPartitions ]) Khi được gọi trên tập dữ liệu của các cặp (K, V), trả về tập dữ liệu của các cặp (K, V) trong đó các giá trị cho mỗi khóa được tổng hợp bằng cách sử dụng hàm giảm hàm func đã cho , phải thuộc loại (V, V) => V. Giống như trong groupByKey, số lượng tác vụ giảm có thể được cấu hình thông qua đối số thứ hai tùy chọn.
    tổng hợpByKey ( zeroValue ) ( seqOp , combOp , [ numPartitions ]) Khi được gọi trên tập dữ liệu của các cặp (K, V), trả về tập dữ liệu của các cặp (K, U) trong đó các giá trị cho mỗi khóa được tổng hợp bằng cách sử dụng các hàm kết hợp đã cho và giá trị "0" trung tính. Cho phép loại giá trị tổng hợp khác với loại giá trị đầu vào, đồng thời tránh phân bổ không cần thiết. Giống như trong groupByKey, số lượng tác vụ giảm có thể được định cấu hình thông qua đối số thứ hai tùy chọn.
    sortByKey ([ tăng dần ], [ numPartitions ]) Khi được gọi trên tập dữ liệu của các cặp (K, V) trong đó K triển khai Có thứ tự, trả về tập dữ liệu của các cặp (K, V) được sắp xếp theo các khóa theo thứ tự tăng dần hoặc giảm dần, như được chỉ định trong ascendingđối số boolean .
    tham gia ( otherDataset , [ numPartitions ]) Khi được gọi trên tập dữ liệu kiểu (K, V) và (K, W), trả về tập dữ liệu gồm các cặp (K, (V, W)) với tất cả các cặp phần tử cho mỗi khóa. Ngoài tham gia được hỗ trợ thông qua leftOuterJoinrightOuterJoinvà fullOuterJoin.
    cogroup ( otherDataset , [ numPartitions ]) Khi được gọi trên các tập dữ liệu kiểu (K, V) và (K, W), trả về một tập dữ liệu gồm các bộ dữ liệu (K, (Iterable <V>, Iterable <W>)). Thao tác này cũng được gọi là groupWith.
    Cartesian ( otherDataset ) Khi được gọi trên các tập dữ liệu kiểu T và U, trả về một tập dữ liệu gồm (T, U) các cặp (tất cả các cặp phần tử).
    pipe ( lệnh , [envVars] ) Đưa từng phân vùng của RDD thông qua một lệnh shell, ví dụ như tập lệnh Perl hoặc bash. Các phần tử RDD được ghi vào stdin của quy trình và các dòng xuất ra stdout của nó được trả về dưới dạng RDD của các chuỗi.
    liên kết ( numPartitions ) Giảm số lượng phân vùng trong RDD xuống numPartitions. Hữu ích để chạy các hoạt động hiệu quả hơn sau khi lọc bớt một tập dữ liệu lớn.
    phân vùng lại ( numPartitions ) Sắp xếp lại dữ liệu trong RDD một cách ngẫu nhiên để tạo nhiều hoặc ít phân vùng hơn và cân bằng giữa chúng. Điều này luôn luôn xáo trộn tất cả dữ liệu trên mạng.
    repartitionAndSortWithinPartitions (trình phân vùng ) Phân vùng lại RDD theo trình phân vùng đã cho và trong mỗi phân vùng kết quả, sắp xếp các bản ghi theo các khóa của chúng. Điều này hiệu quả hơn việc gọi repartitionvà sau đó sắp xếp trong mỗi phân vùng vì nó có thể đẩy việc phân loại xuống bộ máy xáo trộn.
    Hành động

    Bảng sau liệt kê một số tác vụ phổ biến được Spark hỗ trợ. Tham khảo tài liệu API RDD ( Scala , Java , Python , R )

    và ghép các hàm RDD doc ( Scala , Java ) để biết chi tiết.

    Hoạt động Ý nghĩa
    giảm bớt ( func ) Tổng hợp các phần tử của tập dữ liệu bằng cách sử dụng một hàm func (nhận hai đối số và trả về một). Hàm phải có tính chất giao hoán và liên kết để nó có thể được tính toán song song một cách chính xác.
    thu thập () Trả về tất cả các phần tử của tập dữ liệu dưới dạng một mảng tại chương trình điều khiển. Điều này thường hữu ích sau khi bộ lọc hoặc hoạt động khác trả về một tập hợp con đủ nhỏ của dữ liệu.
    count () Trả về số phần tử trong tập dữ liệu.
    đầu tiên () Trả về phần tử đầu tiên của tập dữ liệu (tương tự như lấy (1)).
    lấy ( n ) Trả về một mảng có n phần tử đầu tiên của tập dữ liệu.
    takeSample ( withReplacement , num , [ seed ]) Trả về một mảng với một mẫu ngẫu nhiên gồm num phần tử của tập dữ liệu, có hoặc không thay thế, tùy chọn chỉ định trước một hạt tạo số ngẫu nhiên.
    takeOrdered ( n , [đặt hàng] ) Trả về n phần tử đầu tiên của RDD bằng cách sử dụng thứ tự tự nhiên của chúng hoặc bộ so sánh tùy chỉnh.
    saveAsTextFile ( đường dẫn ) Ghi các phần tử của tập dữ liệu dưới dạng tệp văn bản (hoặc tập hợp tệp văn bản) trong một thư mục nhất định trong hệ thống tệp cục bộ, HDFS hoặc bất kỳ hệ thống tệp nào khác được Hadoop hỗ trợ. Spark sẽ gọi toString trên mỗi phần tử để chuyển nó thành một dòng văn bản trong tệp.
    saveAsSequenceFile ( đường dẫn )
    (Java và Scala)
    Viết các phần tử của tập dữ liệu dưới dạng Hadoop SequenceFile trong một đường dẫn nhất định trong hệ thống tệp cục bộ, HDFS hoặc bất kỳ hệ thống tệp nào khác được Hadoop hỗ trợ. Điều này có sẵn trên RDD của các cặp khóa-giá trị triển khai giao diện Ghi của Hadoop. Trong Scala, nó cũng có sẵn trên các loại có thể chuyển đổi hoàn toàn thành W ghi được (Spark bao gồm các chuyển đổi cho các loại cơ bản như Int, Double, String, v.v.).
    saveAsObjectFile ( đường dẫn )
    (Java và Scala)
    Viết các phần tử của tập dữ liệu ở định dạng đơn giản bằng cách sử dụng tuần tự hóa Java, sau đó có thể được tải bằng cách sử dụng SparkContext.objectFile().
    countByKey () Chỉ có sẵn trên RDD của loại (K, V). Trả về một bản đồ băm của các cặp (K, Int) với số lượng của mỗi khóa.
    foreach ( func ) Chạy một hàm func trên mỗi phần tử của tập dữ liệu. Điều này thường được thực hiện đối với các tác dụng phụ như cập nhật Bộ tích lũy hoặc tương tác với hệ thống lưu trữ bên ngoài.
    Lưu ý : việc sửa đổi các biến không phải Bộ tích lũy bên ngoài foreach()có thể dẫn đến hành vi không xác định. Xem phần Tìm hiểu về việc đóng cửa để biết thêm chi tiết

    Spark RDD API cũng hiển thị các phiên bản không đồng bộ của một số hành động, chẳng hạn như foreachAsyncfor foreach, ngay lập tức trả về a FutureActioncho người gọi thay vì chặn khi hoàn thành hành động. Điều này có thể được sử dụng để quản lý hoặc đợi thực thi không đồng bộ của hành động.

    Thao tác xáo trộn

    Các hoạt động nhất định trong Spark sẽ kích hoạt một sự kiện được gọi là xáo trộn. Sự xáo trộn là cơ chế của Spark để phân phối lại dữ liệu để nó được nhóm khác nhau giữa các phân vùng. Điều này thường liên quan đến việc sao chép dữ liệu giữa các trình thực thi và máy móc, làm cho việc xáo trộn trở thành một hoạt động phức tạp và tốn kém.

    Lý lịch

    Để hiểu điều gì xảy ra trong quá trình xáo trộn, chúng ta có thể xem xét ví dụ về reduceByKeythao tác. Các reduceByKeyhoạt động tạo ra một RDD mới, nơi tất cả các giá trị cho một chìa khóa duy nhất được kết hợp thành một tuple - chìa khóa và kết quả của thực hiện một chức năng giảm chống lại tất cả các giá trị gắn liền với khóa đó. Thách thức là không phải tất cả các giá trị cho một khóa duy nhất nhất thiết phải nằm trên cùng một phân vùng, hoặc thậm chí trên cùng một máy, nhưng chúng phải được đặt cùng vị trí để tính toán kết quả.

    Trong Spark, dữ liệu thường không được phân phối trên các phân vùng để ở nơi cần thiết cho một hoạt động cụ thể. Trong quá trình tính toán, một tác vụ duy nhất sẽ hoạt động trên một phân vùng duy nhất - do đó, để tổ chức tất cả dữ liệu cho một reduceByKeytác vụ giảm duy nhất để thực thi, Spark cần thực hiện một hoạt động tổng thể. Nó phải đọc từ tất cả các phân vùng để tìm tất cả các giá trị cho tất cả các khóa và sau đó tập hợp các giá trị trên các phân vùng để tính toán kết quả cuối cùng cho mỗi khóa - đây được gọi là xáo trộn .

    Mặc dù tập hợp các phần tử trong mỗi phân vùng của dữ liệu mới được xáo trộn sẽ có tính xác định, và thứ tự của bản thân các phân vùng cũng vậy, nhưng thứ tự của các phần tử này thì không. Nếu một người muốn dữ liệu có thứ tự dự đoán sau khi xáo trộn thì có thể sử dụng:

    • mapPartitions để sắp xếp từng phân vùng, ví dụ: .sorted
    • repartitionAndSortWithinPartitions để sắp xếp các phân vùng một cách hiệu quả đồng thời phân vùng lại
    • sortBy để tạo một RDD được đặt hàng toàn cầu

    Các thao tác có thể gây ra xáo trộn bao gồm các thao tác phân vùng lại như repartitionvà coalesce'Các thao tác ByKey (ngoại trừ việc đếm) như groupByKeyvà reduceByKey, vừa nối các thao tác như cogroupvà join.

    Tác động đến hiệu suất

    Các shuffle là một hoạt động tốn kém vì nó liên quan đến đĩa I / O, dữ liệu tuần tự, và mạng I / O. Tổ chức dữ liệu cho shuffle, Spark tạo bộ nhiệm vụ - bản đồ nhiệm vụ tổ chức dữ liệu, và một bộ giảm nhiệm vụ để tổng hợp nó. Danh pháp này đến từ MapReduce và không liên quan trực tiếp đến hoạt động mapvà reducehoạt động của Spark .

    Trong nội bộ, kết quả từ các nhiệm vụ bản đồ riêng lẻ được lưu trong bộ nhớ cho đến khi chúng không thể khớp. Sau đó, chúng được sắp xếp dựa trên phân vùng đích và được ghi vào một tệp duy nhất. Về mặt thu gọn, các tác vụ đọc các khối được sắp xếp có liên quan.

    Một số hoạt động xáo trộn nhất định có thể tiêu tốn một lượng lớn bộ nhớ heap vì chúng sử dụng cấu trúc dữ liệu trong bộ nhớ để tổ chức các bản ghi trước hoặc sau khi chuyển chúng. Cụ thể, reduceByKeyvà aggregateByKeytạo các cấu trúc này ở phía bản đồ, và các 'ByKeyphép toán tạo ra các cấu trúc này ở phía thu gọn. Khi dữ liệu không vừa trong bộ nhớ, Spark sẽ làm tràn các bảng này vào đĩa, làm phát sinh thêm chi phí I / O của đĩa và tăng khả năng thu gom rác.

    Shuffle cũng tạo ra một số lượng lớn các tệp trung gian trên đĩa. Kể từ Spark 1.3, các tệp này được giữ nguyên cho đến khi các RDD tương ứng không còn được sử dụng nữa và được thu gom rác. Điều này được thực hiện để các tệp xáo trộn không cần phải được tạo lại nếu dòng dõi được tính toán lại. Việc thu gom rác chỉ có thể xảy ra sau một khoảng thời gian dài, nếu ứng dụng giữ lại các tham chiếu đến các RDD này hoặc nếu GC không hoạt động thường xuyên. Điều này có nghĩa là các công việc Spark hoạt động lâu dài có thể tiêu tốn một lượng lớn dung lượng ổ đĩa. Thư mục lưu trữ tạm thời được chỉ định bởi spark.local.dirtham số cấu hình khi cấu hình ngữ cảnh Spark.

    Hành vi xáo trộn có thể được điều chỉnh bằng cách điều chỉnh nhiều thông số cấu hình. Xem phần 'Hành vi xáo trộn' trong Hướng dẫn cấu hình Spark .

    Độ bền RDD

    Một trong những khả năng quan trọng nhất trong Spark là duy trì (hoặc lưu vào bộ nhớ đệm ) một tập dữ liệu trong bộ nhớ qua các hoạt động. Khi bạn duy trì một RDD, mỗi nút lưu trữ bất kỳ phân vùng nào của nó mà nó tính toán trong bộ nhớ và sử dụng lại chúng trong các hành động khác trên tập dữ liệu đó (hoặc các tập dữ liệu bắt nguồn từ nó). Điều này cho phép các hành động trong tương lai nhanh hơn nhiều (thường gấp hơn 10 lần). Bộ nhớ đệm là một công cụ chính cho các thuật toán lặp lại và sử dụng tương tác nhanh.

    Bạn có thể đánh dấu một RDD được duy trì bằng cách sử dụng persist()hoặc cache()các phương pháp trên đó. Lần đầu tiên nó được tính toán trong một hành động, nó sẽ được lưu trong bộ nhớ trên các nút. Bộ nhớ cache của Spark có khả năng chịu lỗi - nếu bất kỳ phân vùng nào của RDD bị mất, nó sẽ tự động được tính toán lại bằng cách sử dụng các phép biến đổi đã tạo ra ban đầu.

    Ngoài ra, mỗi RDD lâu dài có thể được lưu trữ bằng cách sử dụng một mức lưu trữ khác nhau , ví dụ: cho phép bạn duy trì tập dữ liệu trên đĩa, duy trì nó trong bộ nhớ nhưng dưới dạng các đối tượng Java được tuần tự hóa (để tiết kiệm dung lượng), sao chép nó qua các nút. Các mức này được thiết lập bằng cách truyền một StorageLevelđối tượng ( Scala , Java , Python ) tới persist(). Các cache()phương pháp là một cách viết tắt cho việc sử dụng các mức lưu trữ mặc định, đó là StorageLevel.MEMORY_ONLY(cửa hàng deserialized đối tượng trong bộ nhớ). Tập hợp đầy đủ các mức lưu trữ là:

    Mức lưu trữ Ý nghĩa
    MEMORY_ONLY Lưu trữ RDD dưới dạng các đối tượng Java được giải hóa trong JVM. Nếu RDD không vừa với bộ nhớ, một số phân vùng sẽ không được lưu vào bộ nhớ đệm và sẽ được tính toán lại nhanh chóng mỗi khi chúng cần. Đây là mức mặc định.
    MEMORY_AND_DISK Lưu trữ RDD dưới dạng các đối tượng Java được giải hóa trong JVM. Nếu RDD không vừa trong bộ nhớ, hãy lưu trữ các phân vùng không vừa trên đĩa và đọc chúng từ đó khi cần.
    MEMORY_ONLY_SER
    (Java và Scala)
    Lưu trữ RDD dưới dạng các đối tượng Java được tuần tự hóa (một mảng byte trên mỗi phân vùng). Điều này nói chung là tiết kiệm không gian hơn các đối tượng được khử trên không, đặc biệt là khi sử dụng bộ tuần tự hóa nhanh , nhưng đòi hỏi nhiều CPU hơn để đọc.
    MEMORY_AND_DISK_SER
    (Java và Scala)
    Tương tự như MEMORY_ONLY_SER, nhưng tràn phân vùng không vừa trong bộ nhớ vào đĩa thay vì tính toán lại chúng một cách nhanh chóng mỗi khi cần.
    DISK_ONLY Chỉ lưu trữ các phân vùng RDD trên đĩa.
    MEMORY_ONLY_2, MEMORY_AND_DISK_2, v.v. Tương tự như các cấp trên, nhưng sao chép từng phân vùng trên hai nút cụm.
    OFF_HEAP (thử nghiệm) Tương tự như MEMORY_ONLY_SER, nhưng lưu trữ dữ liệu trong bộ nhớ off-heap . Điều này yêu cầu bật bộ nhớ off-heap.

    Lưu ý: Trong Python, các đối tượng được lưu trữ sẽ luôn được tuần tự hóa với thư viện Pickle , vì vậy việc bạn chọn mức độ tuần tự hóa không quan trọng. Mức lưu trữ sẵn trong Python bao gồm MEMORY_ONLYMEMORY_ONLY_2MEMORY_AND_DISKMEMORY_AND_DISK_2DISK_ONLY, và DISK_ONLY_2.

    Spark cũng tự động lưu giữ một số dữ liệu trung gian trong các hoạt động trộn (ví dụ reduceByKey), ngay cả khi người dùng không gọi persist. Điều này được thực hiện để tránh tính toán lại toàn bộ dữ liệu đầu vào nếu một nút bị lỗi trong quá trình trộn. Chúng tôi vẫn khuyến nghị người dùng gọi persistRDD kết quả nếu họ định sử dụng lại nó.

    Chọn mức lưu trữ nào?

    Các mức lưu trữ của Spark nhằm mang lại những sự cân bằng khác nhau giữa việc sử dụng bộ nhớ và hiệu quả của CPU. Chúng tôi khuyên bạn nên thực hiện quy trình sau để chọn một:

    • Nếu RDD của bạn vừa vặn với mức lưu trữ mặc định ( MEMORY_ONLY), hãy để chúng theo cách đó. Đây là tùy chọn tiết kiệm CPU nhất, cho phép các hoạt động trên RDD chạy nhanh nhất có thể.

    • Nếu không, hãy thử sử dụng MEMORY_ONLY_SERvà chọn một thư viện tuần tự hóa nhanh để làm cho các đối tượng tiết kiệm không gian hơn nhiều nhưng vẫn truy cập nhanh một cách hợp lý. (Java và Scala)

    • Không tràn ra đĩa trừ khi các hàm tính toán bộ dữ liệu của bạn đắt tiền hoặc chúng lọc một lượng lớn dữ liệu. Nếu không, tính toán lại một phân vùng có thể nhanh như đọc nó từ đĩa.

    • Sử dụng các mức lưu trữ sao chép nếu bạn muốn khôi phục lỗi nhanh chóng (ví dụ: nếu sử dụng Spark để phục vụ các yêu cầu từ ứng dụng web). Tất cả các mức lưu trữ cung cấp khả năng chịu lỗi đầy đủ bằng cách tính toán lại dữ liệu bị mất, nhưng các mức được sao chép cho phép bạn tiếp tục chạy các tác vụ trên RDD mà không cần đợi tính toán lại một phân vùng bị mất.

    Xóa dữ liệu

    Spark tự động giám sát việc sử dụng bộ nhớ cache trên mỗi nút và loại bỏ các phân vùng dữ liệu cũ theo kiểu ít được sử dụng gần đây (LRU). Nếu bạn muốn xóa RDD theo cách thủ công thay vì đợi nó thoát ra khỏi bộ nhớ cache, hãy sử dụng RDD.unpersist()phương pháp này. Lưu ý rằng phương pháp này không chặn theo mặc định. Để chặn cho đến khi tài nguyên được giải phóng, hãy chỉ định blocking=truethời điểm gọi phương thức này.

    Các biến được chia sẻ

    Thông thường, khi một hàm được truyền cho một hoạt động Spark (chẳng hạn như maphoặc reduce) được thực thi trên một nút cụm từ xa, nó sẽ hoạt động trên các bản sao riêng biệt của tất cả các biến được sử dụng trong hàm. Các biến này được sao chép vào từng máy và không có bản cập nhật nào cho các biến trên máy từ xa được truyền trở lại chương trình trình điều khiển. Việc hỗ trợ các biến chia sẻ chung, đọc-ghi trên các nhiệm vụ sẽ không hiệu quả. Tuy nhiên, Spark cung cấp hai loại biến chia sẻ hạn chế cho hai kiểu sử dụng phổ biến: biến quảng bá và biến tích lũy.

    Các biến truyền phát

    Các biến quảng bá cho phép lập trình viên giữ một biến chỉ đọc được lưu vào bộ nhớ cache trên mỗi máy thay vì gửi một bản sao của nó cùng với các tác vụ. Ví dụ, chúng có thể được sử dụng để cung cấp cho mọi nút một bản sao của tập dữ liệu đầu vào lớn một cách hiệu quả. Spark cũng cố gắng phân phối các biến quảng bá bằng cách sử dụng các thuật toán quảng bá hiệu quả để giảm chi phí truyền thông.

    Các hành động Spark được thực hiện thông qua một tập hợp các giai đoạn, được phân tách bằng các hoạt động “xáo trộn” phân tán. Spark tự động truyền phát dữ liệu chung cần thiết cho các tác vụ trong từng giai đoạn. Dữ liệu được truyền phát theo cách này được lưu trong bộ nhớ cache ở dạng tuần tự hóa và giải mã hóa trước khi chạy mỗi tác vụ. Điều này có nghĩa là việc tạo các biến quảng bá một cách rõ ràng chỉ hữu ích khi các tác vụ qua nhiều giai đoạn cần cùng một dữ liệu hoặc khi bộ nhớ đệm dữ liệu ở dạng deserialized là quan trọng.

    Các biến quảng bá được tạo từ một biến vbằng cách gọi SparkContext.broadcast(v). Biến quảng bá là một trình bao bọc xung quanh vvà giá trị của nó có thể được truy cập bằng cách gọi value phương thức. Đoạn mã dưới đây cho thấy điều này:

  • Scala
  • Java
  • Python
  • scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
    broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
    
    scala> broadcastVar.value
    res0: Array[Int] = Array(1, 2, 3)

    Sau khi biến quảng bá được tạo, nó sẽ được sử dụng thay cho giá trị vtrong bất kỳ hàm nào chạy trên cụm để vkhông được chuyển đến các nút nhiều hơn một lần. Ngoài ra, đối tượng vkhông nên được sửa đổi sau khi nó được phát sóng để đảm bảo rằng tất cả các nút đều nhận được cùng một giá trị của biến quảng bá (ví dụ: nếu biến được chuyển đến một nút mới sau này).

    Để giải phóng các tài nguyên mà biến quảng bá đã sao chép vào các trình thực thi, hãy gọi .unpersist(). Nếu chương trình phát sóng được sử dụng lại sau đó, chương trình sẽ được phát lại. Để giải phóng vĩnh viễn tất cả các tài nguyên được sử dụng bởi biến quảng bá, hãy gọi .destroy(). Không thể sử dụng biến quảng bá sau đó. Lưu ý rằng các phương pháp này không chặn theo mặc định. Để chặn cho đến khi tài nguyên được giải phóng, hãy chỉ định blocking=truethời điểm gọi chúng.

    Bộ tích lũy

    Bộ tích lũy là các biến chỉ được “thêm vào” thông qua một phép toán kết hợp và giao hoán và do đó có thể được hỗ trợ song song một cách hiệu quả. Chúng có thể được sử dụng để triển khai bộ đếm (như trong MapReduce) hoặc tính tổng. Spark nguyên bản hỗ trợ tích lũy kiểu số và lập trình viên có thể thêm hỗ trợ cho các kiểu mới.

    Là người dùng, bạn có thể tạo bộ tích lũy có tên hoặc không tên. Như được thấy trong hình ảnh bên dưới, một bộ tích lũy được đặt tên (trong trường hợp này counter) sẽ hiển thị trong giao diện người dùng web cho giai đoạn sửa đổi bộ tích lũy đó. Spark hiển thị giá trị cho mỗi bộ tích lũy được sửa đổi bởi một nhiệm vụ trong bảng "Nhiệm vụ".

    Bộ tích lũy trong giao diện người dùng Spark

    Theo dõi bộ tích lũy trong giao diện người dùng có thể hữu ích để hiểu tiến trình của các giai đoạn đang chạy (LƯU Ý: tính năng này chưa được hỗ trợ trong Python).

  • Scala
  • Java
  • Python
  • Bộ tích lũy số có thể được tạo bằng cách gọi SparkContext.longAccumulator()hoặc SparkContext.doubleAccumulator() để tích lũy các giá trị của loại Dài hoặc Đôi, tương ứng. Các tác vụ đang chạy trên một cụm sau đó có thể thêm vào nó bằng addphương pháp. Tuy nhiên, họ không thể đọc được giá trị của nó. Chỉ chương trình trình điều khiển mới có thể đọc giá trị của bộ tích lũy bằng valuephương pháp của nó .

    Đoạn mã dưới đây cho thấy một bộ tích lũy đang được sử dụng để thêm các phần tử của một mảng:

    scala> val accum = sc.longAccumulator("My Accumulator")
    accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
    
    scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
    ...
    10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
    
    scala> accum.value
    res2: Long = 10

    Trong khi mã này sử dụng hỗ trợ tích hợp cho bộ tích lũy loại Long, các lập trình viên cũng có thể tạo loại riêng của họ bằng cách phân lớp AccumulatorV2 . Lớp trừu tượng AccumulatorV2 có một số phương thức mà người ta phải ghi đè: resetđể đặt lại bộ tích lũy về 0, addđể thêm giá trị khác vào bộ tích lũy, mergeđể hợp nhất bộ tích lũy cùng loại khác vào bộ tích lũy này. Các phương thức khác phải được ghi đè có trong tài liệu API . Ví dụ: giả sử chúng ta có một MyVectorlớp đại diện cho các vectơ toán học, chúng ta có thể viết:

    class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
    
      private val myVector: MyVector = MyVector.createZeroVector
    
      def reset(): Unit = {
        myVector.reset()
      }
    
      def add(v: MyVector): Unit = {
        myVector.add(v)
      }
      ...
    }
    
    // Then, create an Accumulator of this type:
    val myVectorAcc = new VectorAccumulatorV2
    // Then, register it into spark context:
    sc.register(myVectorAcc, "MyVectorAcc1")

    Lưu ý rằng, khi người lập trình xác định kiểu AccumulatorV2 của riêng họ, kiểu kết quả có thể khác với kiểu của các phần tử được thêm vào.

    Đối với các bản cập nhật bộ tích lũy chỉ được thực hiện bên trong các tác vụ , Spark đảm bảo rằng bản cập nhật của mỗi tác vụ đối với bộ tích lũy sẽ chỉ được áp dụng một lần, tức là các tác vụ được khởi động lại sẽ không cập nhật giá trị. Trong các phép chuyển đổi, người dùng nên biết rằng bản cập nhật của mỗi tác vụ có thể được áp dụng nhiều lần nếu các tác vụ hoặc giai đoạn công việc được thực thi lại.

    Tích lũy không thay đổi mô hình đánh giá lười biếng của Spark. Nếu chúng đang được cập nhật trong một hoạt động trên RDD, giá trị của chúng chỉ được cập nhật khi RDD được tính như một phần của một hành động. Do đó, các bản cập nhật của bộ tích lũy không được đảm bảo sẽ được thực thi khi được thực hiện trong một chuyển đổi lười biếng như map(). Đoạn mã dưới đây minh họa thuộc tính này:

  • Scala
  • Java
  • Python
  • val accum = sc.longAccumulator
    data.map { x => accum.add(x); x }
    // Here, accum is still 0 because no actions have caused the map operation to be computed.
    Triển khai cho một cụm

    Các hướng dẫn nộp hồ sơ mô tả làm thế nào để nộp hồ sơ vào một cluster. Nói tóm lại, khi bạn đóng gói ứng dụng của mình thành JAR (đối với Java / Scala) hoặc một tập hợp .pyhoặc .ziptệp (đối với Python), bin/spark-submittập lệnh cho phép bạn gửi nó tới bất kỳ trình quản lý cụm được hỗ trợ nào.

    Khởi chạy việc làm Spark từ Java / Scala

    Các org.apache.spark.launcher gói cung cấp các lớp để tung ra công ăn việc làm Spark như tiến trình con sử dụng một đơn giản Java API.

    Kiểm tra đơn vị

    Spark thân thiện với kiểm thử đơn vị với bất kỳ khung kiểm thử đơn vị phổ biến nào. Chỉ cần tạo một SparkContexttrong thử nghiệm của bạn với URL chính được đặt thành local, chạy các hoạt động của bạn và sau đó gọi SparkContext.stop()để chia nhỏ nó. Đảm bảo rằng bạn dừng ngữ cảnh trong một finallykhối hoặc tearDownphương pháp của khung thử nghiệm , vì Spark không hỗ trợ hai ngữ cảnh chạy đồng thời trong cùng một chương trình.

    Đi đâu từ đây

    Bạn có thể xem một số chương trình Spark mẫu trên trang web Spark. Ngoài ra, Spark bao gồm một số mẫu trong examplesthư mục ( Scala , Java , Python , R ). Bạn có thể chạy các ví dụ Java và Scala bằng cách chuyển tên lớp cho bin/run-exampletập lệnh của Spark ; ví dụ:

    ./bin/run-example SparkPi
    

    Đối với các ví dụ Python, hãy sử dụng spark-submitthay thế:

    ./bin/spark-submit examples/src/main/python/pi.py
    

    Đối với các ví dụ R, hãy sử dụng spark-submitthay thế:

    ./bin/spark-submit examples/src/main/r/dataframe.R
    

    Để được trợ giúp về cách tối ưu hóa chương trình của bạn, hướng dẫn cấu hình và điều chỉnh cung cấp thông tin về các phương pháp hay nhất. Chúng đặc biệt quan trọng để đảm bảo rằng dữ liệu của bạn được lưu trữ trong bộ nhớ ở định dạng hiệu quả. Để được trợ giúp về việc triển khai, tổng quan về chế độ cụm mô tả các thành phần liên quan đến hoạt động phân tán và các trình quản lý cụm được hỗ trợ.

    Cuối cùng, tài liệu API đầy đủ có sẵn trong Scala , Java , Python và R .

    bigdata 2021/2/3 8:50

    Để lại dấu chân

    Bước trên một chân

    Bình luận

    copyright © bigdata 2010-2020
    Processed in 0 seconds, 0 queries