Computer >> Máy Tính >  >> Lập trình >> Ruby

Fibers và Enumerator trong Ruby - Biến các khối từ trong ra ngoài

Ruby có nhiều cách khác nhau để thực hiện lặp — vòng lặp, khối và liệt kê. Hầu hết các lập trình viên Ruby đều ít nhất quen thuộc với các vòng lặp và khối nhưng EnumeratorFiber thường ở trong bóng tối. Trong ấn bản này của Ruby Magic, tác giả khách mời Julik đã chiếu sáng EnumerableFiber để giải thích các liệt kê kiểm soát luồng và chuyển các khối từ trong ra ngoài.

Tạm dừng các khối và lặp lại theo chuỗi

Chúng tôi đã thảo luận về Enumerator trong một ấn bản trước của Ruby Magic, nơi chúng tôi đã mô tả cách trả về một Enumerator từ #each của riêng bạn và nó có thể được sử dụng để làm gì. Một trường hợp sử dụng thậm chí còn rộng hơn cho EnumeratorFiber là họ có thể "tạm ngưng một khối" giữa chuyến bay. Không chỉ khối được cung cấp cho #each hoặc toàn bộ cuộc gọi tới #each , nhưng bất kỳ khối nào!

Đây là một cấu trúc rất mạnh mẽ, có thể được sử dụng để triển khai các miếng chêm cho các phương thức hoạt động bằng cách sử dụng các khối làm cầu nối với những người gọi mong đợi các cuộc gọi tuần tự thay vì lấy một khối. Ví dụ:hãy tưởng tượng chúng ta muốn mở một xử lý cơ sở dữ liệu và đọc từng mục mà chúng ta đã truy xuất:

db.with_each_row_of_result(sql_stmt) do |row|
  yield row
end

API khối rất tuyệt vì nó có khả năng thực hiện tất cả các loại dọn dẹp cho chúng tôi khi khối bị chấm dứt. Tuy nhiên, một số người tiêu dùng có thể muốn làm việc với cơ sở dữ liệu theo cách này:

@cursor = cursor
 
# later:
row = @cursor.next_row
send_row_to_event_stream(row)

Trong thực tế, điều đó có nghĩa là chúng tôi muốn "tạm dừng" việc thực thi khối "chỉ bây giờ" và tiếp tục sau trong khối. Do đó, người gọi tiếp quản quyền kiểm soát luồng thay vì nằm trong tay của callee (phương thức thực hiện khối).

Trình lặp chuỗi

Một trong những cách sử dụng phổ biến nhất của mẫu này là liên kết nhiều trình lặp lại với nhau. Khi chúng tôi làm như vậy, các phương thức chúng tôi đã sử dụng để lặp lại (như #each ), trả về một đối tượng Enumerator thay vào đó, chúng ta có thể sử dụng đối tượng này để "lấy" các giá trị mà khối gửi cho chúng ta bằng cách sử dụng yield tuyên bố:

range = 1..8
each_enum = range.each # => <Enumerator...>

Sau đó, các điều tra viên có thể được xâu chuỗi cho phép chúng tôi thực hiện các hoạt động như "bất kỳ lần lặp nào nhưng với chỉ mục". Trong ví dụ này, chúng tôi đang gọi #map trên một dải ô để nhận được một Enumerable sự vật. Sau đó, chúng tôi chuỗi #with_index để lặp qua phạm vi với một chỉ mục:

(1..3).map.with_index {|element_n, index| [element_n, index] }
#=> [[1, 0], [2, 1], [3, 2]]

Điều này có thể rất hữu ích, đặc biệt nếu hệ thống của bạn sử dụng các sự kiện. Ruby cung cấp một phương thức tích hợp để gói bất kỳ phương thức nào với trình tạo Enumerator, cho phép chúng ta thực hiện chính xác điều này. Hãy tưởng tượng chúng ta muốn "kéo" từng hàng một từ with_each_row_of_result của mình , thay vì phương pháp mang lại chúng cho chúng tôi.

@cursor = db.to_enum(:with_each_row_of_result, sql_stmt)
schedule_for_later do
  begin
    row = @cursor.next
    send_row_to_event_stream(row)
  rescue StopIteration # the block has ended and the cursor is empty, the cleanup has taken place
  end
end

Nếu chúng tôi tự thực hiện điều này, thì đây là cách nó có thể xảy ra:

cursor = Enumerator.new do |yielder|
  db.with_each_row_of_result(sql_stmt) do |row|
    yielder.yield row
  end
end

Chuyển các khối từ trong ra ngoài

Rails cho phép chúng ta chỉ định cơ quan phản hồi cũng là một Enumerator. Nó sẽ gọi next trên Enumerator, chúng tôi chỉ định làm phần thân phản hồi và mong đợi giá trị trả về là một chuỗi — giá trị này sẽ được ghi vào phản hồi Rack. Ví dụ:chúng tôi có thể trả về một cuộc gọi đến #each phương thức của Phạm vi dưới dạng nội dung phản hồi Rails:

class MyController < ApplicationController
  def index
    response.body = ('a'..'z').each
  end
end

Đây là điều tôi gọi là chuyển khối từ trong ra ngoài. Về bản chất, nó là một trình trợ giúp luồng điều khiển cho phép chúng ta "đóng băng thời gian" trong một khối (hoặc một vòng lặp, cũng là một khối trong Ruby) giữa chuyến bay.

Tuy nhiên, ĐTV có một đặc tính hạn chế khiến chúng hơi kém hữu ích. Hãy tưởng tượng chúng ta muốn làm điều gì đó như thế này:

File.open('output.tmp', 'wb') do |f|
  # Yield file for writing, continuously
  loop { yield(f) }
end

Hãy bọc nó bằng một điều tra viên và viết vào đó

writer_enum = File.to_enum(:open, 'output.tmp', 'wb')
file = en.next
file << data
file << more_data

Mọi thứ hoạt động tuyệt vời. Tuy nhiên, có một trở ngại - làm thế nào để chúng tôi nói với điều tra viên rằng chúng tôi đã viết xong, để nó có thể "hoàn thành" khối, đóng tệp và thoát ra? Thao tác này sẽ thực hiện một số bước quan trọng — ví dụ:dọn dẹp tài nguyên (tệp sẽ bị đóng), cũng như đảm bảo tất cả các ghi trong bộ đệm đều được chuyển vào đĩa. Chúng tôi có quyền truy cập vào File đối tượng, và chúng tôi có thể tự đóng, nhưng chúng tôi muốn điều tra viên quản lý việc đóng cho chúng tôi; chúng tôi phải để điều tra viên tiến hành vượt qua khối.

Một trở ngại khác là đôi khi chúng ta muốn chuyển các đối số về những gì đang xảy ra trong khối bị treo. Hãy tưởng tượng chúng ta có một phương thức chấp nhận khối với ngữ nghĩa sau:

write_file_through_encryptor(file_name) do |writable|
  writable << "Some data"
  writable << "Some more data"
  writable << "Even more data"
end

nhưng trong mã gọi điện của chúng tôi, chúng tôi muốn sử dụng nó như thế này:

writable = write_file_through_encryptor(file_name)
writable << "Some data"
# ...later on
writable << "Some more data"
writable.finish

Tốt nhất, chúng ta nên gói lời gọi phương thức của mình vào một số cấu trúc có thể cho phép chúng ta thực hiện thủ thuật sau:

write_file_through_encryptor(file_name) do |writable|
  loop do
    yield_and_wait_for_next_call(writable)
    # Then we somehow break out of this loop to let the block complete
  end
end

Điều gì sẽ xảy ra nếu chúng ta kết thúc các bài viết của mình như thế này?

deferred_writable = write_file_through_encryptor(file_name)
deferred_writable.next("Some data")
deferred_writable.next("Some more data")
deferred_writable.next("Even more data")
deferred_writable.next(:terminate)

Trong trường hợp này, chúng tôi sẽ sử dụng :terminate như một giá trị ma thuật sẽ cho phương thức của chúng ta biết rằng nó có thể kết thúc khối và trả về. Cái này là nơi Enumerator sẽ không thực sự giúp ích cho chúng tôi vì chúng tôi không thể chuyển bất kỳ đối số nào đến Enumerator#next . Nếu có thể, chúng tôi sẽ làm được:

deferred_writable = write_file_through_encryptor(file_name)
deferred_writable.next("Some data")
...
deferred_writable.next(:terminate)

Nhập các sợi của Ruby

Đây chính xác là những gì Fibers cho phép. A Fiber cho phép bạn chấp nhận các đối số trên mỗi lần thử lại , vì vậy chúng tôi có thể triển khai trình bao bọc của mình như sau:

deferred_writable = Fiber.new do |data_to_write_or_termination|
  write_file_through_encryptor(filename) do |f|
     # Here we enter the block context of the fiber, reentry will be to the start of this block
    loop do
      # When we call Fiber.yield our fiber will be suspended—we won't reach the
      # "data_to_write_or_termination = " assignment before our fiber gets resumed
      data_to_write_or_termination = Fiber.yield
    end
  end
end

Đây là cách nó hoạt động:Khi bạn lần đầu tiên gọi .resume trên deferred_writable của bạn , nó đi vào sợi quang và đi đến toàn bộ Fiber.yield đầu tiên hoặc đến cuối khối Fiber ngoài cùng, tùy điều kiện nào đến trước. Khi bạn gọi Fiber.yield , nó cho phép bạn kiểm soát lại. Nhớ Điều tra viên? Khối sẽ bị tạm ngừng và vào lần tiếp theo bạn gọi .resume , đối số cho resume trở thành data_to_write mới .

deferred_writes = Fiber.new do |data_to_write|
  loop do
    $stderr.puts "Received #{data_to_write} to work with"
    data_to_write = Fiber.yield
  end
end
# => #<Fiber:0x007f9f531783e8>
deferred_writes.resume("Hello") #=> Received Hello to work with
deferred_writes.resume("Goodbye") #=> Received Goodbye to work with
 

Vì vậy, trong Fiber, luồng mã được bắt đầu trong cuộc gọi đầu tiên tới Fiber#resume , bị tạm ngưng ở lần gọi đầu tiên tới Fiber.yield và sau đó tiếp tục trong các cuộc gọi tiếp theo tới Fiber#resume , với giá trị trả về là Fiber.yield là đối số để resume . Mã tiếp tục chạy từ điểm mà Fiber.yield được gọi lần cuối.

Đây là một chút khó hiểu của Fibers ở chỗ các đối số ban đầu cho sợi sẽ được chuyển cho bạn dưới dạng các đối số khối, chứ không phải thông qua giá trị trả về của Fiber.yield .

Với ý nghĩ đó, chúng tôi biết điều đó bằng cách chuyển một đối số đặc biệt vào resume , chúng ta có thể quyết định trong Fiber xem chúng ta có nên dừng lại hay không. Hãy thử điều đó:

deferred_writes = Fiber.new do |data_to_write|
  loop do
    $stderr.puts "Received #{data_to_write} to work with"
    break if data_to_write == :terminate # Break out of the loop, or...
    write_to_output(data_to_write)       # ...write to the output
    data_to_write = Fiber.yield          # suspend ourselves and wait for the next `resume`
  end
  # We end up here if we break out of the loop above. There is no Fiber.yield
  # statement anywhere, so the Fiber will terminate and become "dead".
end
 
deferred_writes.resume("Hello") #=> Received Hello to work with
deferred_writes.resume("Goodbye") #=> Received Goodbye to work with
deferred_writes.resume(:terminate)
deferred_writes.resume("Some more data after close") # FiberError: dead fiber called

Có một số tình huống mà các phương tiện này có thể rất hữu ích. Vì Sợi có chứa một khối mã bị treo có thể được tiếp tục lại theo cách thủ công, nên Sợi có thể được sử dụng để triển khai các lò phản ứng sự kiện và để xử lý các hoạt động đồng thời trong một luồng duy nhất. Chúng có trọng lượng nhẹ, vì vậy bạn có thể triển khai một máy chủ sử dụng Fibers bằng cách gán một ứng dụng khách cho một Fiber và chuyển đổi giữa các đối tượng Fiber này nếu cần.

client_fiber = Fiber.new do |socket|
   loop do
     received_from_client = socket.read_nonblock(10)
     sent_to_client = socket.write_nonblock("OK")
     Fiber.yield # Return control back to the caller and wait for it to call 'resume' on us
   end
end
 
client_fibers << client_fiber
 
# and then in your main webserver loop
client_fibers.each do |client_fiber|
  client_fiber.resume # Receive data from the client if any, and send it an OK
end

Ruby có một thư viện tiêu chuẩn bổ sung được gọi là fiber cho phép bạn chuyển quyền kiểm soát một cách rõ ràng từ sợi quang này sang sợi quang khác, có thể là một tiện ích bổ sung cho những mục đích sử dụng này.

Kiểm soát tốc độ phát thải dữ liệu

Một công dụng tuyệt vời khác cho các sợi và bộ điều tra có thể nảy sinh khi bạn muốn kiểm soát tốc độ khối Ruby phát ra dữ liệu. Ví dụ:trong zip_tricks, chúng tôi hỗ trợ việc sử dụng khối sau làm cách sử dụng thư viện chính:

ZipTricks::Streamer.open(output_io) do |z|
  z.write_deflated_file("big.csv") do |destination|
   columns.each do |col|
     destination << column
   end
  end
end

Do đó, chúng tôi cho phép kiểm soát "đẩy" đối với phần mã tạo ra tệp lưu trữ ZIP và không thể kiểm soát lượng dữ liệu mà nó xuất ra và tần suất xuất ra. Nếu chúng tôi muốn viết ZIP của mình ở dạng khối, chẳng hạn, 5 MB — đây sẽ là một giới hạn đối với bộ nhớ đối tượng AWS S3 — chúng tôi sẽ phải tạo một output_io tùy chỉnh đối tượng nào đó sẽ "từ chối" chấp nhận << gọi phương thức khi phân đoạn cần được tách thành một phần nhiều phần S3. Tuy nhiên, chúng ta có thể đảo ngược điều khiển và làm cho nó "kéo". Chúng tôi sẽ vẫn sử dụng cùng một khối để ghi tệp CSV lớn của mình, nhưng chúng tôi sẽ tiếp tục và tạm dừng nó dựa trên kết quả mà nó cung cấp. Do đó, chúng tôi có thể sử dụng những điều sau:

output_enum = ZipTricks::Streamer.output_enum do |z|
  z.write_deflated_file("big.csv") do |destination|
   columns.each do |col|
     destination << column
   end
  end
end
 
# At this point nothing has been generated or written yet
enum = output_enum.each # Create an Enumerator
bin_str = enum.next # Let the block generate some binary data and then suspend it
output.write(bin_str) # Our block is suspended and waiting for the next invocation of `next`

Điều này cho phép chúng tôi kiểm soát tốc độ mà trình tạo tệp ZIP của chúng tôi phát ra dữ liệu.

Do đó, Enumerator và Fiber là một cơ chế kiểm soát dòng chảy để biến các khối "push" thành các đối tượng "pull" chấp nhận các lệnh gọi phương thức.

Chỉ có một cạm bẫy với Fibers và Enumerator — nếu bạn có một cái gì đó như ensure trong khối của bạn hoặc một việc gì đó cần được thực hiện sau khi khối hoàn thành, giờ người gọi có gọi cho bạn đủ số lần hay không. Theo một cách nào đó, nó có thể so sánh với những ràng buộc bạn có khi sử dụng Promises trong JavaScript.

Kết luận

Điều này kết thúc cái nhìn của chúng ta về các liệt kê được điều khiển bằng luồng trong Ruby. Trên đường đi, Julik đã làm sáng tỏ những điểm giống và khác nhau giữa EnumerableFiber và đi sâu vào các ví dụ trong đó người gọi xác định luồng dữ liệu. Chúng tôi cũng đã tìm hiểu về Fiber Phép thuật bổ sung để cho phép chuyển các đối số trên mỗi khối được thử lại. Chúc bạn kiểm soát luồng vui vẻ!

Để có được liều lượng ma thuật ổn định, hãy đăng ký Ruby Magic và chúng tôi sẽ gửi ấn bản hàng tháng đến thẳng hộp thư đến của bạn.