Computer >> Máy Tính >  >> Lập trình >> Ruby

Concurrency Deep Dive:Vòng lặp sự kiện

Chào mừng bạn đến với bài viết Ruby Magic cuối cùng trong loạt bài của chúng tôi về đồng tiền. Trong các phiên bản trước, chúng tôi đã triển khai một máy chủ trò chuyện bằng cách sử dụng nhiều quy trình và nhiều chuỗi. Lần này chúng ta sẽ làm điều tương tự bằng cách sử dụng vòng lặp sự kiện.

Tóm tắt lại

Chúng tôi sẽ sử dụng cùng một máy khách và cùng một thiết lập máy chủ mà chúng tôi đã sử dụng trong các bài viết trước. Mục đích của chúng tôi là xây dựng một hệ thống trò chuyện giống như sau:

Vui lòng xem các bài viết trước để biết thêm chi tiết về thiết lập cơ bản. Mã nguồn đầy đủ được sử dụng trong các ví dụ trong bài viết này có sẵn trên GitHub, vì vậy bạn có thể tự mình thử nghiệm.

Máy chủ trò chuyện sử dụng vòng lặp sự kiện

Sử dụng vòng lặp sự kiện cho máy chủ trò chuyện của chúng tôi yêu cầu bạn phải có một mô hình tinh thần khác với việc sử dụng các chuỗi hoặc quy trình. Trong cách tiếp cận cổ điển, một luồng hoặc quy trình chịu trách nhiệm xử lý một kết nối duy nhất. Sử dụng vòng lặp sự kiện, bạn có một luồng duy nhất trong một quy trình xử lý nhiều kết nối. Hãy xem cách này hoạt động như thế nào bằng cách chia nhỏ nó.

Vòng lặp sự kiện

Ví dụ, một vòng lặp sự kiện được EventMachine hoặc NodeJS sử dụng hoạt động như sau. Chúng tôi bắt đầu với việc thông báo cho hệ điều hành mà chúng tôi quan tâm đến các sự kiện nhất định. Ví dụ, khi kết nối với ổ cắm được mở. Chúng tôi thực hiện việc này bằng cách gọi một hàm đăng ký mối quan tâm trên một số đối tượng IO, chẳng hạn như kết nối hoặc ổ cắm.

Khi có điều gì đó xảy ra trên đối tượng IO này, hệ điều hành sẽ gửi một sự kiện đến chương trình của chúng ta. Chúng tôi đặt những sự kiện này vào một hàng đợi. Vòng lặp sự kiện tiếp tục đưa các sự kiện ra khỏi danh sách và xử lý từng sự kiện một.

Theo một nghĩa nào đó, một vòng lặp sự kiện không thực sự đồng thời. Nó hoạt động tuần tự trong các lô rất nhỏ để mô phỏng hiệu ứng.

Để đăng ký sự quan tâm và yêu cầu hệ điều hành chuyển các sự kiện IO cho chúng tôi, chúng tôi phải viết một phần mở rộng C, vì không có API nào cho phần mở rộng đó trong thư viện chuẩn Ruby. Tìm hiểu sâu về điều đó nằm ngoài phạm vi của bài viết này, vì vậy chúng tôi sẽ sử dụng IO.select thay vào đó để tạo ra các sự kiện. IO.select lấy một mảng IO đối tượng cần giám sát. Nó đợi cho đến khi một hoặc nhiều đối tượng từ mảng sẵn sàng để đọc hoặc ghi và nó trả về một mảng chỉ với những IO đó đối tượng.

Mã quản lý mọi thứ liên quan đến kết nối được triển khai dưới dạng Fiber :chúng tôi sẽ gọi mã này là "trình xử lý" từ bây giờ. Một Fiber là một khối mã có thể bị tạm dừng và tiếp tục. Máy ảo Ruby không tự động thực hiện việc này, vì vậy chúng tôi phải tiếp tục và chuyển nhượng theo cách thủ công. Chúng tôi sẽ sử dụng đầu vào từ IO.select để thông báo cho người xử lý của chúng tôi khi kết nối của họ sẵn sàng để đọc hoặc ghi.

Giống như trong các ví dụ về luồng và đa quy trình từ các bài viết trước, chúng ta cần một số bộ nhớ để theo dõi các khách hàng và các tin nhắn được gửi. Chúng tôi không cần Mutex thời gian này. Vòng lặp sự kiện của chúng tôi đang chạy trong một chuỗi duy nhất, vì vậy không có nguy cơ các đối tượng bị biến đổi cùng một lúc bởi các chuỗi khác nhau.

client_handlers = {}
messages = []

Trình xử lý ứng dụng khách được triển khai trong Fiber sau . Khi ổ cắm có thể được đọc từ hoặc ghi vào, một sự kiện được kích hoạt mà Fiber trả lời. Khi trạng thái là :readable nó đọc một dòng từ ổ cắm và đẩy dòng này vào các thông điệp messages mảng. Khi trạng thái là :writable nó viết bất kỳ tin nhắn nào đã nhận được từ các máy khách khác kể từ lần viết cuối cùng cho máy khách. Sau khi xử lý một sự kiện, nó gọi Fiber.yield , vì vậy nó sẽ tạm dừng và chờ sự kiện tiếp theo.

def create_client_handler(nickname, socket)
  Fiber.new do
    last_write = Time.now
    loop do
      state = Fiber.yield
 
      if state == :readable
        # Read a message from the socket
        incoming = read_line_from(socket)
        # All good, add it to the list to write
        $messages.push(
          :time => Time.now,
          :nickname => nickname,
          :text => incoming
        )
      elsif state == :writable
        # Write messages to the socket
        get_messages_to_send(last_write, nickname, $messages).each do |message|
          socket.puts "#{message[:nickname]}: #{message[:text]}"
        end
        last_write = Time.now
      end
    end
  end
end

Vậy làm cách nào để chúng tôi kích hoạt Fiber để đọc hoặc ghi vào đúng thời điểm khi Socket sẵn sàng? Chúng tôi sử dụng một vòng lặp sự kiện có bốn bước:

loop do
  # Step 1: Accept incoming connections
  accept_incoming_connections
 
  # Step 2: Get connections that are ready for reading or writing
  get_ready_connections
 
  # Step 3: Read from readable connections
  read_from_readable_connections
 
  # Step 4: Write to writable connections
  write_to_writable_connections
end

Chú ý rằng không có ma thuật nào ở đây cả. Đây là một vòng lặp Ruby bình thường.

Bước 1:Chấp nhận các kết nối đến

Xem liệu chúng tôi có bất kỳ kết nối mới nào không. Chúng tôi sử dụng accept_nonblock , sẽ không đợi máy khách kết nối. Thay vào đó, nó sẽ phát sinh lỗi nếu không có ứng dụng khách mới và nếu lỗi đó xảy ra, chúng tôi bắt lỗi và chuyển sang bước tiếp theo. Nếu có ứng dụng khách mới, chúng tôi tạo trình xử lý cho ứng dụng đó và đặt trình xử lý đó trên các ứng dụng khách messages cửa hàng. Chúng tôi sẽ sử dụng đối tượng socket làm khóa của Hash đó để chúng tôi có thể tìm thấy trình xử lý ứng dụng khách sau này.

begin
  socket = server.accept_nonblock
  nickname = socket.gets.chomp
  $client_handlers[socket] = create_client_handler(nickname, socket)
  puts "Accepted connection from #{nickname}"
rescue IO::WaitReadable, Errno::EINTR
  # No new incoming connections at the moment
end

Bước 2:Nhận các kết nối sẵn sàng để đọc hoặc ghi

Tiếp theo, chúng tôi yêu cầu hệ điều hành thông báo cho chúng tôi khi kết nối đã sẵn sàng. Chúng tôi chuyển các khóa của client_handlers lưu trữ để đọc, ghi và xử lý lỗi. Các khóa này là đối tượng socket mà chúng tôi đã chấp nhận ở bước 1. Chúng tôi đợi 10 mili giây để điều này xảy ra.

readable, writable = IO.select(
  $client_handlers.keys,
  $client_handlers.keys,
  $client_handlers.keys,
  0.01
)

Bước 3:Đọc từ các kết nối có thể đọc được

Nếu bất kỳ kết nối nào của chúng tôi có thể đọc được, chúng tôi sẽ kích hoạt trình xử lý ứng dụng khách và tiếp tục chúng với một readable tiểu bang. Chúng tôi có thể tra cứu các trình xử lý ứng dụng khách này vì Socket đối tượng được trả về bởi IO.select được sử dụng làm khóa của kho trình xử lý.

if readable
  readable.each do |ready_socket|
    # Get the client from storage
    client = $client_handlers[ready_socket]
 
    client.resume(:readable)
  end
end

Bước 4:Ghi vào các kết nối có thể ghi

Nếu bất kỳ kết nối nào của chúng tôi có thể ghi được, chúng tôi sẽ kích hoạt trình xử lý ứng dụng khách và tiếp tục chúng bằng writable trạng thái.

if writable
  writable.each do |ready_socket|
    # Get the client from storage
    client = $client_handlers[ready_socket]
    next unless client
 
    client.resume(:writable)
  end
end

Bằng cách sử dụng bốn bước này trong một vòng lặp tạo trình xử lý và gọi readablewritable trên các trình xử lý này vào đúng thời điểm, chúng tôi đã tạo một máy chủ trò chuyện có sự kiện đầy đủ chức năng. Có rất ít chi phí cho mỗi kết nối và chúng tôi có thể mở rộng quy mô này lên đến một số lượng lớn khách hàng đồng thời.

Cách tiếp cận này hoạt động rất hiệu quả miễn là chúng ta giữ cho số lượng công việc trên mỗi tích tắc của vòng lặp nhỏ. Điều này đặc biệt quan trọng đối với công việc liên quan đến tính toán, vì một vòng lặp sự kiện chạy trong một luồng duy nhất và do đó chỉ có thể sử dụng một CPU duy nhất. Trong hệ thống sản xuất thường có nhiều quy trình chạy một vòng lặp sự kiện để giải quyết hạn chế này.

Kết luận

Sau tất cả những điều này, bạn có thể hỏi, tôi nên sử dụng phương pháp nào trong ba phương pháp này?

  • Đối với hầu hết các ứng dụng, phân luồng có ý nghĩa. Đây là cách tiếp cận đơn giản nhất để làm việc.
  • Nếu bạn chạy các ứng dụng đồng thời cao với các luồng chạy dài, các vòng lặp sự kiện cho phép bạn mở rộng quy mô.
  • Nếu bạn mong đợi các quy trình của mình gặp sự cố, hãy sử dụng nhiều quy trình cũ tốt, vì đó là cách tiếp cận mạnh mẽ nhất.

Điều này kết thúc loạt bài của chúng tôi về đồng thời. Nếu bạn muốn có một bản tóm tắt đầy đủ, hãy kiểm tra bài viết đồng thời làm chủ ban đầu cũng như các bài viết chi tiết về cách sử dụng nhiều quy trình và nhiều chuỗi.