Computer >> Máy Tính >  >> Lập trình >> Ruby

Học bằng cách xây dựng, một Hệ thống xử lý nền trong Ruby

Trong bài đăng hôm nay, chúng ta sẽ triển khai một hệ thống xử lý nền ngây thơ cho vui! Chúng ta có thể tìm hiểu một số điều trong quá trình xem xét nội bộ của các hệ thống xử lý nền phổ biến như Sidekiq. Sản phẩm của trò vui này hoàn toàn không được dùng cho mục đích sản xuất.

Hãy tưởng tượng chúng ta có một nhiệm vụ trong ứng dụng của mình là tải một hoặc nhiều trang web và trích xuất tiêu đề của chúng. Vì chúng tôi không có bất kỳ ảnh hưởng nào đến hiệu suất của các trang web này, nên chúng tôi muốn thực hiện tác vụ bên ngoài chuỗi chính của chúng tôi (hoặc yêu cầu hiện tại — nếu chúng tôi đang xây dựng một ứng dụng web), nhưng trong nền.

Đóng gói một tác vụ

Trước khi bắt đầu xử lý nền, chúng ta hãy xây dựng một đối tượng dịch vụ để thực hiện tác vụ trong tầm tay. Chúng tôi sẽ sử dụng OpenURI và Nokogiri để trích xuất nội dung của thẻ tiêu đề.

require 'open-uri'
require 'nokogiri'
 
class TitleExtractorService
  def call(url)
    document = Nokogiri::HTML(open(url))
    title = document.css('html > head > title').first.content
    puts title.gsub(/[[:space:]]+/, ' ').strip
  rescue
    puts "Unable to find a title for #{url}"
  end
end

Việc gọi dịch vụ sẽ in ra tiêu đề của URL đã cho.

TitleExtractorService.new.call('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir

Điều này hoạt động như mong đợi, nhưng hãy xem liệu chúng ta có thể cải thiện một chút cú pháp để làm cho nó trông giống và giống các hệ thống xử lý nền khác một chút hay không. Bằng cách tạo Magique::Worker mô-đun, chúng ta có thể thêm một số cú pháp vào đối tượng dịch vụ.

module Magique
  module Worker
    def self.included(base)
      base.extend(ClassMethods)
    end
 
    module ClassMethods
      def perform_now(*args)
        new.perform(*args)
      end
    end
 
    def perform(*)
      raise NotImplementedError
    end
  end
end

Mô-đun thêm một perform phương thức cho phiên bản worker và một perform_now cho lớp worker để làm cho lời gọi tốt hơn một chút.

Hãy đưa mô-đun vào đối tượng dịch vụ của chúng tôi. Trong khi chúng ta đang ở đó, hãy cũng đổi tên nó thành TitleExtractorWorker và thay đổi call phương thức để perform .

class TitleExtractorWorker
  include Magique::Worker
 
  def perform(url)
    document = Nokogiri::HTML(open(url))
    title = document.css('html > head > title').first.content
    puts title.gsub(/[[:space:]]+/, ' ').strip
  rescue
    puts "Unable to find a title for #{url}"
  end
end

Lời gọi vẫn có cùng kết quả, nhưng rõ ràng hơn một chút về những gì đang xảy ra.

TitleExtractorWorker.perform_now('https://appsignal.com')
# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir

Triển khai xử lý không đồng bộ

Bây giờ chúng ta đã làm việc trích xuất tiêu đề, chúng ta có thể lấy tất cả các tiêu đề từ các bài báo Ruby Magic trước đây. Để làm điều này, giả sử chúng ta có RUBYMAGIC không đổi với danh sách tất cả các URL của các bài viết trước đây.

RUBYMAGIC.each do |url|
  TitleExtractorWorker.perform_now(url)
end
 
# Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# Bindings and Lexical Scope in Ruby | AppSignal Blog
# Building a Ruby C Extension From Scratch | AppSignal Blog
# Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...

Chúng tôi nhận được tiêu đề của các bài báo trước đây, nhưng phải mất một lúc để trích xuất tất cả. Đó là bởi vì chúng tôi đợi cho đến khi từng yêu cầu được hoàn thành trước khi chuyển sang yêu cầu tiếp theo.

Hãy cải thiện điều đó bằng cách giới thiệu perform_async phương pháp cho mô-đun công nhân của chúng tôi. Để tăng tốc độ, nó tạo một chuỗi mới cho mỗi URL.

module Magique
  module Worker
    module ClassMethods
      def perform_async(*args)
        Thread.new { new.perform(*args) }
      end
    end
  end
end

Sau khi thay đổi lời gọi thành TitleExtractorWorker.perform_async(url) , chúng tôi nhận được tất cả các danh hiệu gần như cùng một lúc. Tuy nhiên, điều này cũng có nghĩa là chúng tôi sẽ mở hơn 20 kết nối đến blog Ruby Magic cùng một lúc. ( Xin lỗi vì đã làm xáo trộn blog của bạn! 😅)

Nếu bạn đang theo dõi cùng với việc triển khai của riêng mình và thử nghiệm điều này bên ngoài quy trình chạy lâu dài (như máy chủ web), đừng quên thêm một cái gì đó như loop { sleep 1 } vào cuối tập lệnh của bạn để đảm bảo quá trình không kết thúc ngay lập tức.

Sắp xếp công việc

Với cách tiếp cận tạo một chuỗi mới cho mọi lời gọi, cuối cùng chúng tôi sẽ đạt đến giới hạn tài nguyên (cả về phía chúng tôi và trên các trang web mà chúng tôi đang truy cập). Vì chúng ta muốn trở thành những công dân tử tế, hãy thay đổi cách triển khai thành một thứ gì đó không đồng bộ nhưng không giống như một cuộc tấn công từ chối dịch vụ.

Một cách phổ biến để giải quyết vấn đề này là sử dụng mô hình nhà sản xuất / người tiêu dùng. Một hoặc nhiều nhà sản xuất đẩy nhiệm vụ vào hàng đợi trong khi một hoặc nhiều người tiêu dùng nhận nhiệm vụ từ hàng đợi và xử lý chúng.

Hàng đợi về cơ bản là một danh sách các phần tử. Về lý thuyết, một mảng đơn giản sẽ thực hiện công việc. Tuy nhiên, vì chúng tôi đang giải quyết vấn đề đồng thời, chúng tôi cần đảm bảo rằng chỉ một nhà sản xuất hoặc người tiêu dùng có thể truy cập vào hàng đợi tại một thời điểm. Nếu chúng ta không cẩn thận về điều này, mọi thứ sẽ kết thúc trong hỗn loạn — giống như hai người cố gắng chui qua một cánh cửa cùng một lúc.

Vấn đề này được gọi là vấn đề của người sản xuất-người tiêu dùng và có nhiều giải pháp cho nó. May mắn thay, đó là một vấn đề rất phổ biến và Ruby vận chuyển với một Queue thích hợp triển khai mà chúng tôi có thể sử dụng mà không phải lo lắng về đồng bộ hóa chuỗi.

Để sử dụng nó, hãy đảm bảo rằng cả nhà sản xuất và người tiêu dùng đều có thể truy cập vào hàng đợi. Chúng tôi thực hiện việc này bằng cách thêm một phương thức lớp vào Magique của chúng tôi mô-đun và gán một phiên bản của Queue với nó.

module Magique
  def self.backend
    @backend
  end
 
  def self.backend=(backend)
    @backend = backend
  end
end
 
Magique.backend = Queue.new

Tiếp theo, chúng tôi thay đổi perform_async triển khai để đẩy một nhiệm vụ vào hàng đợi thay vì tạo luồng mới của riêng nó. Một tác vụ được biểu diễn dưới dạng một hàm băm bao gồm một tham chiếu đến lớp worker cũng như các đối số được truyền đến perform_async phương pháp.

module Magique
  module Worker
    module ClassMethods
      def perform_async(*args)
        Magique.backend.push(worker: self, args: args)
      end
    end
  end
end

Với điều đó, chúng tôi đã hoàn thành mọi việc với phía nhà sản xuất. Tiếp theo, hãy xem xét khía cạnh người tiêu dùng.

Mỗi người tiêu dùng là một luồng riêng biệt nhận nhiệm vụ từ hàng đợi và thực hiện chúng. Thay vì dừng lại sau một tác vụ, như luồng, người tiêu dùng sau đó nhận một tác vụ khác từ hàng đợi và thực hiện nó, v.v. Đây là cách triển khai cơ bản của người tiêu dùng có tên là Magique::Processor . Mỗi bộ xử lý tạo ra một luồng mới lặp lại vô hạn. Đối với mỗi lần lặp lại, nó cố gắng lấy một tác vụ mới từ hàng đợi, tạo một phiên bản mới của lớp worker và gọi perform của nó với các đối số đã cho.

module Magique
  class Processor
    def self.start(concurrency = 1)
      concurrency.times { |n| new("Processor #{n}") }
    end
 
    def initialize(name)
      thread = Thread.new do
        loop do
          payload = Magique.backend.pop
          worker_class = payload[:worker]
          worker_class.new.perform(*payload[:args])
        end
      end
 
      thread.name = name
    end
  end
end

Ngoài vòng lặp xử lý, chúng tôi thêm một phương thức tiện lợi được gọi là Magique::Processor.start . Điều này cho phép chúng tôi quay nhiều bộ xử lý cùng một lúc. Mặc dù việc đặt tên cho chuỗi không thực sự cần thiết, nhưng nó sẽ cho phép chúng tôi xem liệu mọi thứ có thực sự hoạt động như mong đợi hay không.

Hãy điều chỉnh đầu ra của TitleExtractorWorker của chúng tôi bao gồm tên của chuỗi hiện tại.

puts "[#{Thread.current.name}] #{title.gsub(/[[:space:]]+/, ' ').strip}"

Để kiểm tra thiết lập xử lý nền của chúng tôi, trước tiên, chúng tôi cần thiết lập một bộ vi xử lý trước khi xếp hàng đợi các tác vụ của mình.

Magique.backend = Queue.new
Magique::Processor.start(5)
 
RUBYMAGIC.each do |url|
  TitleExtractorWorker.perform_async(url)
end
 
# [Processor 3] Bindings and Lexical Scope in Ruby | AppSignal Blog
# [Processor 4] Building a Ruby C Extension From Scratch | AppSignal Blog
# [Processor 1] Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
# [Processor 0] Ruby's Hidden Gems, StringScanner | AppSignal Blog
# [Processor 2] Fibers and Enumerators in Ruby: Turning Blocks Inside Out | AppSignal Blog
# [Processor 4] Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
# ...

Khi điều này được chạy, chúng tôi vẫn nhận được tiêu đề của tất cả các bài báo. Mặc dù không nhanh bằng việc chỉ sử dụng một chuỗi riêng cho mọi tác vụ, nhưng nó vẫn nhanh hơn so với việc triển khai ban đầu không có xử lý nền. Nhờ tên bộ xử lý được thêm vào, chúng tôi cũng có thể xác nhận rằng tất cả bộ xử lý đang hoạt động thông qua hàng đợi. Bằng cách điều chỉnh số lượng bộ xử lý đồng thời, có thể tìm thấy sự cân bằng giữa tốc độ xử lý và các giới hạn tài nguyên hiện có.

Mở rộng sang nhiều quy trình và máy móc

Cho đến nay, việc triển khai hệ thống xử lý nền hiện tại của chúng tôi hoạt động đủ tốt. Tuy nhiên, nó vẫn bị giới hạn trong cùng một quy trình. Các tác vụ ngốn tài nguyên sẽ vẫn ảnh hưởng đến hiệu suất của toàn bộ quy trình. Bước cuối cùng, hãy xem xét việc phân phối khối lượng công việc trên nhiều quy trình và thậm chí có thể nhiều máy.

Hàng đợi là kết nối duy nhất giữa người sản xuất và người tiêu dùng. Hiện tại, nó đang sử dụng triển khai trong bộ nhớ. Hãy lấy thêm cảm hứng từ Sidekiq và triển khai hàng đợi bằng Redis.

Redis có hỗ trợ các danh sách cho phép chúng tôi đẩy và tìm nạp các tác vụ từ đó. Ngoài ra, đá quý Redis Ruby là một chuỗi an toàn và các lệnh Redis để sửa đổi danh sách là nguyên tử. Các thuộc tính này giúp bạn có thể sử dụng nó cho hệ thống xử lý nền không đồng bộ của chúng tôi mà không gặp phải sự cố đồng bộ hóa.

Hãy tạo một hàng đợi được hỗ trợ bởi Redis để triển khai pushshift giống như Queue chúng tôi đã sử dụng trước đây.

require 'json'
require 'redis'
 
module Magique
  module Backend
    class Redis
      def initialize(connection = ::Redis.new)
        @connection = connection
      end
 
      def push(job)
        @connection.lpush('magique:queue', JSON.dump(job))
      end
 
      def shift
        _queue, job = @connection.brpop('magique:queue')
        payload = JSON.parse(job, symbolize_names: true)
        payload[:worker] = Object.const_get(payload[:worker])
        payload
      end
    end
  end
end

Vì Redis không biết gì về các đối tượng Ruby, chúng tôi phải tuần tự hóa các tác vụ của mình thành JSON trước khi lưu trữ chúng trong cơ sở dữ liệu bằng cách sử dụng lpush lệnh thêm một phần tử vào đầu danh sách.

Để tìm nạp một nhiệm vụ từ hàng đợi, chúng tôi đang sử dụng brpop lệnh lấy phần tử cuối cùng từ danh sách. Nếu danh sách trống, nó sẽ chặn cho đến khi có phần tử mới. Đây là một cách hay để tạm dừng bộ xử lý của chúng tôi khi không có tác vụ nào. Cuối cùng, sau khi nhận được một nhiệm vụ từ Redis, chúng ta phải tìm kiếm lớp Ruby thực dựa trên tên của worker bằng cách sử dụng Object.const_get .

Bước cuối cùng, hãy chia mọi thứ thành nhiều quy trình. Về phía nhà sản xuất, điều duy nhất chúng tôi phải làm là thay đổi phần phụ trợ thành hàng đợi Redis mới được triển khai của chúng tôi.

# ...
 
Magique.backend = Magique::Backend::Redis.new
 
RUBYMAGIC.each do |url|
  TitleExtractorWorker.perform_async(url)
end

Về khía cạnh người tiêu dùng, chúng ta có thể rút ra một vài dòng như sau:

# ...
 
Magique.backend = Magique::Backend::Redis.new
Magique::Processor.start(5)
 
loop { sleep 1 }

Khi được thực hiện, quy trình của người tiêu dùng sẽ đợi tác phẩm mới đến trong hàng đợi. Khi chúng tôi bắt đầu quy trình sản xuất đẩy các tác vụ vào hàng đợi, chúng tôi có thể thấy rằng chúng được xử lý ngay lập tức.

Tận hưởng có trách nhiệm và không sử dụng thứ này trong sản xuất

Mặc dù chúng tôi giữ nó khác xa với thiết lập thế giới thực mà bạn sẽ sử dụng trong sản xuất (vì vậy đừng!), Chúng tôi đã thực hiện một vài bước trong việc xây dựng bộ xử lý nền. Chúng tôi bắt đầu bằng cách thực hiện một quy trình chạy dưới dạng dịch vụ nền. Sau đó, chúng tôi làm cho nó không đồng bộ và sử dụng Queue để giải quyết vấn đề người sản xuất - người tiêu dùng. Sau đó, chúng tôi mở rộng quy trình sang nhiều quy trình hoặc máy bằng Redis thay vì triển khai trong bộ nhớ.

Như đã đề cập trước đây, đây là một triển khai đơn giản của một hệ thống xử lý nền. Còn rất nhiều điều thiếu sót và không được xử lý dứt điểm. Chúng bao gồm (nhưng không giới hạn ở) xử lý lỗi, nhiều hàng đợi, lập lịch, tổng hợp kết nối và xử lý tín hiệu.

Tuy nhiên, chúng tôi rất vui khi viết điều này và hy vọng bạn sẽ thích một cái nhìn sâu về hệ thống xử lý nền. Có lẽ bạn thậm chí đã lấy đi một hoặc hai thứ.