Trong phần kết của loạt bài này, hãy tìm hiểu cách điều chỉnh tài nguyên, tính song song và biểu diễn dữ liệu ảnh hưởng đến hiệu suất công việc Spark.
nguồn https://blog.cloudera.com/how-to-tune-your-apache-spark-jobs-part-2/
Trong bài đăng này, chúng ta sẽ kết thúc những gì chúng ta đã bắt đầu trong “Cách điều chỉnh công việc Apache Spark của bạn (Phần 1)” . Tôi sẽ cố gắng đề cập đến mọi thứ mà bạn có thể quan tâm để biết về cách làm cho một chương trình Spark chạy nhanh. Đặc biệt, bạn sẽ tìm hiểu về điều chỉnh tài nguyên hoặc cấu hình Spark để tận dụng mọi thứ mà cụm cung cấp. Sau đó, chúng ta sẽ chuyển sang điều chỉnh song song, thông số khó nhất cũng như quan trọng nhất trong hiệu suất công việc. Cuối cùng, bạn sẽ tìm hiểu về cách đại diện cho chính dữ liệu, ở dạng trên đĩa mà Spark sẽ đọc (cảnh báo spoiler: sử dụng Apache Avro hoặc Apache Parquet) cũng như định dạng trong bộ nhớ mà nó cần khi được lưu vào bộ nhớ đệm hoặc di chuyển qua hệ thống.
Điều chỉnh phân bổ tài nguyên
Danh sách người dùng Spark là một loạt các câu hỏi liên quan đến "Tôi có một cụm 500 nút, nhưng khi tôi chạy ứng dụng của mình, tôi chỉ thấy hai tác vụ đang thực thi cùng một lúc. HALP. ” Với số lượng tham số kiểm soát việc sử dụng tài nguyên của Spark, những câu hỏi này không phải là không công bằng, nhưng trong phần này, bạn sẽ học cách vắt từng chút nước cuối cùng ra khỏi cụm của bạn. Các đề xuất và cấu hình ở đây khác nhau một chút giữa các trình quản lý cụm của Spark (YARN, Mesos và Spark Standalone), nhưng chúng tôi sẽ chỉ tập trung vào YARN mà Cloudera đề xuất cho tất cả người dùng.
Để biết một số thông tin cơ bản về cách chạy Spark trên YARN, hãy xem bài đăng của tôi về chủ đề này .
Hai tài nguyên chính mà Spark (và YARN) nghĩ đến là CPU và bộ nhớ . Tất nhiên, I / O đĩa và mạng cũng đóng một vai trò quan trọng trong hiệu suất của Spark, nhưng cả Spark và YARN hiện đều không làm gì để chủ động quản lý chúng.
Mọi trình thực thi Spark trong một ứng dụng có cùng số lõi cố định và cùng kích thước heap cố định. Số lượng lõi có thể được chỉ định bằng cờ khi gọi spark-submit, spark-shell và pyspark từ dòng lệnh hoặc bằng cách đặt thuộc tính trong tệp hoặc trên một đối tượng. Tương tự, kích thước đống có thể được kiểm soát bằng cờ hoặc thuộc tính. Các tài sản kiểm soát số nhiệm vụ đồng thời một chấp hành viên có thể chạy. có nghĩa là mỗi người thực thi có thể chạy tối đa năm tác vụ cùng một lúc. Thuộc tính bộ nhớ ảnh hưởng đến lượng dữ liệu mà Spark có thể lưu vào bộ đệm, cũng như kích thước tối đa của cấu trúc dữ liệu xáo trộn được sử dụng để nhóm, tổng hợp và kết hợp.--executor-coresspark.executor.coresspark-defaults.confSparkConf--executor-memoryspark.executor.memorycores--executor-cores 5
Các dòng lệnh cờ hoặc sở hữu cấu hình kiểm soát số lượng Chấp hành viên yêu cầu. Bắt đầu từ CDH 5.4 / Spark 1.3, bạn sẽ có thể tránh thiết lập thuộc tính này bằng cách bật phân bổ động với thuộc tính. Phân bổ động cho phép ứng dụng Spark yêu cầu người thực thi khi có tồn đọng các nhiệm vụ đang chờ xử lý và giải phóng người thực thi khi nhàn rỗi.--num-executorsspark.executor.instancesspark.dynamicAllocation.enabled
Điều quan trọng là phải nghĩ về cách các tài nguyên mà Spark yêu cầu sẽ phù hợp với những gì YARN có sẵn. Các thuộc tính YARN có liên quan là:
Yêu cầu năm lõi thực thi sẽ dẫn đến yêu cầu YARN cho năm lõi ảo. Bộ nhớ được yêu cầu từ YARN phức tạp hơn một chút vì một số lý do:
Phần sau (không chia tỷ lệ với giá trị mặc định) hiển thị thứ bậc của các thuộc tính bộ nhớ trong Spark và YARN:
Và nếu điều đó vẫn chưa đủ để suy nghĩ, một vài mối quan tâm cuối cùng khi định cỡ các trình điều hành Spark:
Để hy vọng làm cho tất cả những điều này trở nên cụ thể hơn một chút, đây là một ví dụ hiệu quả về việc định cấu hình ứng dụng Spark để sử dụng càng nhiều cụm càng tốt: Hãy tưởng tượng một cụm có sáu nút chạy NodeManagers, mỗi nút được trang bị 16 lõi và bộ nhớ 64GB. Dung lượng NodeManager và có lẽ phải được đặt thành 63 * 1024 = 64512 (megabyte) và 15 tương ứng. Chúng tôi tránh phân bổ 100% tài nguyên cho các vùng chứa YARN vì nút cần một số tài nguyên để chạy các daemon OS và Hadoop. Trong trường hợp này, chúng tôi để lại một gigabyte và một lõi cho các quy trình hệ thống này. Cloudera Manager trợ giúp bằng cách tính toán các thuộc tính này và tự động định cấu hình các thuộc tính YARN này.yarn.nodemanager.resource.memory-mbyarn.nodemanager.resource.cpu-vcores
Xung lực đầu tiên có thể sẽ được sử dụng . Tuy nhiên, đây là cách tiếp cận sai vì:--num-executors 6 --executor-cores 15 --executor-memory 63G
Một lựa chọn tốt hơn sẽ được sử dụng . Tại sao?--num-executors 17 --executor-cores 5 --executor-memory 19G
Điều chỉnh song song
Spark, như bạn đã hình dung tại thời điểm này, là một công cụ xử lý song song. Điều có thể ít rõ ràng hơn là Spark không phải là một công cụ xử lý song song “ma thuật” và bị hạn chế về khả năng tìm ra lượng song song tối ưu. Mỗi giai đoạn Spark đều có một số tác vụ, mỗi tác vụ xử lý dữ liệu một cách tuần tự. Trong việc điều chỉnh công việc Spark, con số này có lẽ là thông số quan trọng nhất trong việc xác định hiệu suất.
Con số này được xác định như thế nào? Cách Spark nhóm các RDD thành các giai đoạn được mô tả trong bài trước . (Xin nhắc lại nhanh, các phép biến đổi giống như repartitionvà reduceByKeytạo ra các ranh giới của vùng.) Số lượng nhiệm vụ trong một màn cũng giống như số lượng phân vùng trong RDD cuối cùng trong vùng. Số lượng phân vùng trong RDD giống với số lượng phân vùng trong RDD mà nó phụ thuộc vào, với một vài ngoại lệ: coalescephép biến đổi cho phép tạo ra một RDD có ít phân vùng hơn RDD mẹ của nó, unionphép biến đổi tạo ra một RDD với tổng về số lượng phân vùng cha mẹ của nó và cartesiantạo RDD với sản phẩm của họ.
Những gì về RDD không có cha mẹ? RDD được tạo ra bởi textFilehoặc hadoopFilecó các phân vùng của chúng được xác định bởi MapReduce InputFormat bên dưới được sử dụng. Thông thường sẽ có một phân vùng cho mỗi khối HDFS đang được đọc. Các phân vùng cho RDD được tạo ra bằng cách song song hóa đến từ tham số do người dùng cung cấp hoặc nếu không có tham số nào được đưa ra.spark.default.parallelism
Để xác định số lượng phân vùng trong RDD, bạn luôn có thể gọi .rdd.partitions().size()
Mối quan tâm chính là số lượng nhiệm vụ sẽ quá ít. Nếu có ít tác vụ hơn các khe có sẵn để chạy chúng, giai đoạn này sẽ không tận dụng được tất cả CPU có sẵn.
Một số lượng nhỏ nhiệm vụ cũng có nghĩa là áp lực bộ nhớ nhiều hơn được đặt lên bất kỳ hoạt động tổng hợp nào xảy ra trong mỗi tác vụ. Bất kỳ join, cogrouphoặc hoạt động liên quan đến việc giữ các đối tượng trong bản đồ băm hoặc bộ đệm trong bộ nhớ để nhóm hoặc sắp xếp. , và sử dụng các cấu trúc dữ liệu này trong các tác vụ cho các giai đoạn nằm ở phía tìm nạp của các lần xáo trộn mà chúng kích hoạt. và sử dụng cấu trúc dữ liệu trong các tác vụ cho các giai đoạn ở cả hai phía của xáo trộn mà chúng kích hoạt.*ByKeyjoincogroupgroupByKeyreduceByKeyaggregateByKey
Khi các bản ghi dành cho các hoạt động tổng hợp này không dễ dàng phù hợp với bộ nhớ, một số tình huống lộn xộn có thể xảy ra sau đó. Đầu tiên, việc giữ nhiều bản ghi trong các cấu trúc dữ liệu này sẽ gây áp lực lên việc thu thập rác, có thể dẫn đến việc tạm dừng trên dòng. Thứ hai, khi các bản ghi không vừa trong bộ nhớ, Spark sẽ tràn chúng ra đĩa, gây ra hiện tượng I / O và sắp xếp đĩa. Chi phí này trong quá trình xáo trộn lớn có lẽ là nguyên nhân số một dẫn đến tình trạng ngừng việc mà tôi từng thấy tại các khách hàng của Cloudera.
Vậy làm thế nào để bạn tăng số lượng phân vùng? Nếu giai đoạn được đề cập đang đọc từ Hadoop, các tùy chọn của bạn là:
Nếu giai đoạn nhận được đầu vào từ một giai đoạn khác, thì sự chuyển đổi đã kích hoạt ranh giới giai đoạn sẽ chấp nhận một numPartitionsđối số, chẳng hạn như
val rdd2 = rdd1 . ReduceByKey ( _ + _ , numPartitions = X )
“X” phải là gì? Cách đơn giản nhất để điều chỉnh số lượng phân vùng là thử nghiệm: Nhìn vào số lượng phân vùng trong RDD mẹ và sau đó tiếp tục nhân nó với 1,5 cho đến khi hiệu suất ngừng cải thiện.
Ngoài ra còn có một cách tính X nguyên tắc hơn, nhưng rất khó áp dụng tiên nghiệm vì một số đại lượng rất khó tính. Tôi đưa nó vào đây không phải vì nó được khuyến khích sử dụng hàng ngày mà vì nó giúp hiểu được những gì đang xảy ra. Mục tiêu chính là chạy đủ tác vụ để dữ liệu dành cho từng tác vụ phù hợp với bộ nhớ có sẵn cho tác vụ đó.
Bộ nhớ khả dụng cho mỗi tác vụ là ( * * ) / . Phần bộ nhớ và phần an toàn mặc định lần lượt là 0,2 và 0,8.spark.executor.memoryspark.shuffle.memoryFractionspark.shuffle.safetyFractionspark.executor.cores
Khó xác định kích thước trong bộ nhớ của tổng dữ liệu xáo trộn. Phương pháp phỏng đoán gần nhất là tìm tỷ lệ giữa số liệu Xáo trộn Tràn (Bộ nhớ) và Tràn Tràn (Đĩa) cho một giai đoạn đã chạy. Sau đó nhân tổng số bài viết xáo trộn với số này. Tuy nhiên, điều này có thể hơi phức tạp nếu giai đoạn này đang giảm:
Sau đó làm tròn một chút vì quá nhiều phân vùng thường tốt hơn quá ít phân vùng.
Trên thực tế, khi nghi ngờ, hầu như luôn luôn tốt hơn nếu bạn thực hiện một số lượng lớn nhiệm vụ hơn (và do đó phân vùng). Lời khuyên này trái ngược với các khuyến nghị cho MapReduce, yêu cầu bạn phải thận trọng hơn với số lượng nhiệm vụ. Sự khác biệt xuất phát từ thực tế là MapReduce có chi phí khởi động cao cho các tác vụ, trong khi Spark thì không.
Giảm bớt cấu trúc dữ liệu của bạn
Dữ liệu chảy qua Spark dưới dạng bản ghi. Một bản ghi có hai cách biểu diễn: một biểu diễn đối tượng Java được giải mã hóa và một biểu diễn nhị phân tuần tự hóa. Nói chung, Spark sử dụng biểu diễn không hóa cho các bản ghi trong bộ nhớ và biểu diễn tuần tự hóa cho các bản ghi được lưu trữ trên đĩa hoặc được chuyển qua mạng. Có công việc lên kế hoạch để lưu trữ một số dữ liệu ngẫu nhiên trong bộ nhớ dưới dạng tuần tự.
Các tài sản kiểm soát serializer được dùng để chuyển đổi giữa hai đại diện này. Bộ nối tiếp Kryo , là lựa chọn ưu tiên. Rất tiếc, nó không phải là mặc định, vì một số bất ổn trong Kryo trong các phiên bản Spark trước đó và mong muốn không phá vỡ khả năng tương thích, nhưng bộ tuần tự Kryo nên luôn được sử dụngspark.serializerorg.apache.spark.serializer.KryoSerializer
Dấu vết của các bản ghi của bạn trong hai bản đại diện này có tác động lớn đến hiệu suất của Spark. Thật đáng giá khi xem lại các loại dữ liệu đã được chuyển qua và tìm kiếm các vị trí để cắt bớt mỡ.
Các đối tượng được giải phóng không khí bị phồng lên sẽ dẫn đến việc Spark tràn dữ liệu vào đĩa thường xuyên hơn và giảm số lượng các bản ghi không được giải phóng hóa mà Spark có thể lưu vào bộ nhớ cache (ví dụ: ở MEMORYcấp lưu trữ). Hướng dẫn điều chỉnh Spark có một phần tuyệt vời về giảm bớt những thứ này.
Các đối tượng được tuần tự hóa cồng kềnh sẽ dẫn đến I / O trên đĩa và mạng lớn hơn, cũng như giảm số lượng các bản ghi được tuần tự hóa mà Spark có thể lưu vào bộ nhớ cache (ví dụ: ở MEMORY_SERmức lưu trữ.) và chuyển xung quanh bằng cách sử dụng API.SparkConf#registerKryoClasses
Định dạng dữ liệu
Bất cứ khi nào bạn có khả năng đưa ra quyết định về cách dữ liệu được lưu trữ trên đĩa, hãy sử dụng định dạng nhị phân có thể mở rộng như Avro, Parquet, Thrift hoặc Protobuf. Chọn một trong các định dạng này và bám vào nó. Để rõ ràng, khi người ta nói về việc sử dụng Avro, Thrift hoặc Protobuf trên Hadoop, chúng có nghĩa là mỗi bản ghi là một cấu trúc Avro / Thrift / Protobuf được lưu trữ trong một tệp trình tự . JSON chỉ là không có giá trị nó.
Mỗi khi bạn cân nhắc việc lưu trữ nhiều dữ liệu trong JSON, hãy nghĩ về những cuộc xung đột sẽ bắt đầu ở Trung Đông, những con sông xinh đẹp sẽ được xây dựng ở Canada hoặc bụi phóng xạ từ các nhà máy hạt nhân sẽ được xây dựng ở trung tâm Hoa Kỳ để cung cấp năng lượng cho các chu kỳ CPU đã sử dụng để phân tích cú pháp các tệp của bạn lặp đi lặp lại. Ngoài ra, hãy cố gắng học hỏi các kỹ năng của mọi người để có thể thuyết phục đồng nghiệp và cấp trên của mình làm điều này.
Sandy Ryza là Nhà khoa học dữ liệu tại Cloudera, người cam kết Apache Spark và là thành viên Apache Hadoop PMC. Anh ấy là đồng tác giả của cuốn sách O'Reilly Media, Phân tích nâng cao với Spark .
Bước trên một chân
|