Computer >> Máy Tính >  >> Lập trình >> Ruby

Mở Hộp công cụ Ruby Concurrency

Đồng thời và song song quan trọng hơn bao giờ hết đối với các nhà phát triển Ruby. Chúng có thể làm cho các ứng dụng của chúng ta nhanh hơn, sử dụng phần cứng phát huy hết khả năng của chúng. Trong bài viết này, chúng ta sẽ khám phá các công cụ hiện có sẵn cho mọi Rubyist và cũng như những gì Ruby hứa sẽ sớm cung cấp trong bộ phận này.

Không phải ai cũng sử dụng đồng thời trực tiếp, nhưng tất cả chúng ta đều sử dụng gián tiếp thông qua các công cụ như Sidekiq. Hiểu về đồng thời Ruby sẽ không chỉ giúp bạn xây dựng các giải pháp của riêng mình; nó sẽ giúp bạn hiểu và khắc phục sự cố hiện có.

Nhưng trước tiên, hãy lùi lại một chút và nhìn vào bức tranh toàn cảnh.

Đồng thời so với Song song

Những thuật ngữ này được sử dụng một cách lỏng lẻo, nhưng chúng có ý nghĩa riêng biệt.

  • Đồng tiền: Nghệ thuật thực hiện nhiều nhiệm vụ, một công việc tại một thời điểm. Bằng cách chuyển đổi giữa chúng một cách nhanh chóng, nó có thể xuất hiện với người dùng như thể chúng xảy ra đồng thời.
  • Song song: Thực hiện nhiều nhiệm vụ cùng một lúc. Thay vì xuất hiện đồng thời, chúng xuất hiện đồng thời.

Đồng thời thường được sử dụng cho các ứng dụng nặng IO. Ví dụ:một ứng dụng web có thể thường xuyên tương tác với cơ sở dữ liệu hoặc thực hiện nhiều yêu cầu mạng. Bằng cách sử dụng đồng thời, chúng tôi có thể giữ cho ứng dụng của mình luôn phản hồi, ngay cả khi chúng tôi chờ cơ sở dữ liệu phản hồi cho truy vấn của chúng tôi.

Điều này có thể xảy ra bởi vì Ruby VM cho phép các luồng khác chạy trong khi một luồng đang đợi trong quá trình IO. Ngay cả khi một chương trình phải thực hiện hàng chục yêu cầu, nếu chúng tôi sử dụng đồng thời, các yêu cầu sẽ được thực hiện hầu như cùng một lúc.

Mặt khác, tính song song hiện không được Ruby hỗ trợ.

Tại sao Không có Song song trong Ruby?

Ngày nay, không có cách nào đạt được tính song song trong một quy trình Ruby duy nhất bằng cách sử dụng cài đặt Ruby mặc định (thường được gọi là MRI hoặc CRuby). Ruby VM thực thi một khóa (GVM hoặc Global VM Lock) ngăn nhiều luồng chạy mã Ruby cùng một lúc. Khóa này tồn tại để bảo vệ trạng thái bên trong của máy ảo và ngăn các tình huống có thể dẫn đến sự cố máy ảo. Đây không phải là một nơi tuyệt vời để tham gia, nhưng tất cả hy vọng sẽ không bị mất:Ruby 3 sắp ra mắt và nó hứa hẹn sẽ giải quyết được nhược điểm này bằng cách giới thiệu một khái niệm có tên mã là Guild (được giải thích trong phần cuối của bài viết này).

Chủ đề

Các luồng là workhorse đồng thời của Ruby. Để hiểu rõ hơn về cách sử dụng chúng và những cạm bẫy cần lưu ý, chúng tôi sẽ đưa ra một ví dụ. Chúng tôi sẽ xây dựng một chương trình nhỏ sử dụng API và lưu trữ kết quả của nó trong kho dữ liệu bằng cách sử dụng đồng thời.

Trước khi xây dựng ứng dụng khách API, chúng tôi cần có một API. Dưới đây là việc triển khai một API nhỏ chấp nhận một số và phản hồi dưới dạng văn bản thuần túy nếu số được cung cấp là số chẵn lẻ. Nếu cú ​​pháp có vẻ lạ với bạn, đừng lo lắng. Điều này không liên quan gì đến đồng thời. Nó chỉ là một công cụ mà chúng tôi sẽ sử dụng.

app =
  Proc.new do |env|
    sleep 0.05
    qs = env['QUERY_STRING']
    number = Integer(qs.match(/number=(\d+)/)[1])
    [
      '200',
      { 'Content-Type' => 'text/plain' },
      [number.even? ? 'even' : 'odd']
    ]
  end

run app

Để chạy ứng dụng web này, bạn cần cài đặt rack gem, sau đó thực thi rackup config.ru .

Chúng tôi cũng cần một kho dữ liệu giả. Đây là một lớp mô phỏng cơ sở dữ liệu khóa-giá trị:

class Datastore
  # ... accessors and initialization omitted ...
  def read(key)
    data[key]
  end

  def write(key, value)
    data[key] = value
  end
end

Bây giờ, chúng ta hãy đi qua việc triển khai giải pháp đồng thời của chúng tôi. Chúng tôi có một phương pháp, run , tìm nạp đồng thời 1.000 bản ghi và lưu trữ chúng trong kho dữ liệu của chúng tôi.

class ThreadPoweredIntegration
  # ... accessors and initialization ...
  def run
    threads = []
    (1..1000).each_slice(250) do |subset|
      threads << Thread.new do
        subset.each do |number|
          uri = 'https://localhost:9292/' \
            "even_or_odd?number=#{number}"
          status, body = AdHocHTTP.new(uri).blocking_get
          handle_response(status, body)
        rescue Errno::ETIMEDOUT
          retry # Try again if the server times out.
        end
      end
    end
    threads.each(&:join)
  end
  # ...
end

Chúng tôi tạo bốn luồng, mỗi luồng xử lý 250 bản ghi. Chúng tôi sử dụng chiến lược này để không lấn át API của bên thứ ba hoặc hệ thống của chính chúng tôi.

Bằng cách yêu cầu được thực hiện đồng thời bằng cách sử dụng nhiều luồng, tổng quá trình thực thi sẽ mất một phần thời gian so với thực hiện tuần tự. Mặc dù mỗi luồng có những khoảnh khắc không hoạt động trong tất cả các bước cần thiết để thiết lập và giao tiếp thông qua một yêu cầu HTTP, Ruby VM cho phép một luồng khác bắt đầu chạy. Đây là lý do tại sao việc triển khai này nhanh hơn nhiều so với cách thực hiện tuần tự.

AdHocHTTP lớp là một ứng dụng khách HTTP đơn giản được triển khai đặc biệt cho bài viết này để cho phép chúng tôi chỉ tập trung vào sự khác biệt giữa mã được cung cấp bởi các luồng và mã được cung cấp bởi các sợi. Nó nằm ngoài phạm vi của bài viết này để thảo luận về việc triển khai nó, nhưng bạn có thể xem nó tại đây nếu bạn tò mò.

Cuối cùng, chúng tôi xử lý phản hồi của máy chủ vào cuối vòng lặp bên trong. Đây là cách thực hiện phương pháp handle_response ngoại hình:

# ... inside the ThreadPoweredIntegration class ...

attr_reader :ds

def initialize
  @ds = Datastore.new(even: 0, odd: 0)
end

# ...

def handle_response(status, body)
  return if status != '200'
  key = body.to_sym
  curr_count = ds.read(key)
  ds.write(key, curr_count + 1)
end

Phương pháp này có vẻ ổn, phải không? Hãy chạy nó và xem những gì kết thúc tại kho dữ liệu của chúng tôi:

{ even: 497, odd: 489 }

Điều này khá lạ, vì tôi chắc chắn rằng từ 1 đến 1000 có 500 số chẵn và 500 số lẻ. Trong phần tiếp theo, hãy hiểu điều gì đang xảy ra và khám phá ngắn gọn một trong những cách giải quyết lỗi này.

Chủ đề và cuộc đua dữ liệu:Ác quỷ là trong chi tiết

Việc sử dụng các luồng cho phép các chương trình nặng IO của chúng tôi chạy nhanh hơn nhiều, nhưng chúng cũng khó thực hiện đúng. Lỗi trong kết quả của chúng tôi ở trên là do điều kiện chủng tộc trong handle_response phương pháp. Điều kiện đua xảy ra khi hai luồng thao tác cùng một dữ liệu.

Vì chúng tôi đang hoạt động trên một tài nguyên được chia sẻ (ds đối tượng kho dữ liệu), chúng ta phải đặc biệt cẩn thận với các hoạt động phi nguyên tử. Lưu ý rằng lần đầu tiên chúng ta đọc từ kho dữ liệu và - trong một câu lệnh thứ hai - chúng ta ghi vào đó số đếm tăng lên 1. Đây là vấn đề vì luồng của chúng ta có thể ngừng chạy sau khi đọc nhưng trước khi ghi. Sau đó, nếu một chuỗi khác chạy và tăng giá trị của khóa mà chúng tôi quan tâm, chúng tôi sẽ ghi số lỗi thời khi chuỗi ban đầu tiếp tục.

Một cách để giảm thiểu những nguy hiểm khi sử dụng các luồng là sử dụng các phần trừu tượng cấp cao hơn để cấu trúc một triển khai đồng thời. Kiểm tra đá quý ruby ​​đồng thời để biết các mẫu khác nhau để sử dụng và một chương trình cung cấp luồng an toàn hơn.

Có nhiều cách để khắc phục một cuộc chạy đua dữ liệu. Một giải pháp đơn giản là sử dụng mutex. Cơ chế đồng bộ hóa này thực thi quyền truy cập một lần vào một đoạn mã nhất định. Đây là cách triển khai trước đây của chúng tôi đã được khắc phục bằng cách sử dụng mutex:

# ... inside ThreadPoweredIntegration class ...
def initialize
  # ...
  @semaphore = Mutex.new
end
# ...
def handle_response(status, body)
  return if status != '200'
  key = body.to_sym
  semaphore.synchronize do
    curr_count = ds.read(key)
    ds.write(key, curr_count + 1)
  end
end

Nếu bạn dự định sử dụng các luồng bên trong ứng dụng Rails, hướng dẫn chính thức Luồng và thực thi mã trong Rails là phải đọc. Không tuân theo các nguyên tắc này có thể dẫn đến những hậu quả rất khó chịu, chẳng hạn như rò rỉ các kết nối cơ sở dữ liệu.

Sau khi chạy triển khai đã sửa, chúng tôi nhận được kết quả mong đợi:

{ even: 500, odd: 500 }

Thay vì sử dụng mutex, chúng ta cũng có thể loại bỏ các cuộc đua dữ liệu bằng cách loại bỏ hoàn toàn các chuỗi và tìm đến một công cụ đồng thời khác có sẵn trong Ruby. Trong phần tiếp theo, chúng ta sẽ xem xét Fiber như một cơ chế để cải thiện hiệu suất của các ứng dụng nặng IO.

Fiber:Công cụ mảnh mai cho đồng tiền

Ruby Fibers cho phép bạn đạt được sự đồng thời hợp tác trong một chuỗi duy nhất. Điều này có nghĩa là các sợi không được ưu tiên trước và bản thân chương trình phải thực hiện việc lập lịch. Bởi vì lập trình viên kiểm soát khi sợi bắt đầu và dừng lại, việc tránh các điều kiện đua sẽ dễ dàng hơn nhiều.

Không giống như sợi chỉ, sợi không cho chúng ta hiệu suất tốt hơn khi IO xảy ra. May mắn thay, Ruby cung cấp khả năng đọc và ghi không đồng bộ thông qua lớp IO của nó. Bằng cách sử dụng các phương pháp không đồng bộ này, chúng tôi có thể ngăn các hoạt động IO chặn mã dựa trên sợi quang của chúng tôi.

Cùng một tình huống, Hiện có với Sợi

Chúng ta hãy xem xét cùng một ví dụ, nhưng bây giờ sử dụng các sợi kết hợp với khả năng không đồng bộ của lớp IO của Ruby. Nó nằm ngoài phạm vi của bài viết này để giải thích tất cả các chi tiết của IO không đồng bộ trong Ruby. Tuy nhiên, chúng tôi sẽ đề cập đến các phần cơ bản trong hoạt động của nó và bạn có thể xem qua việc triển khai các phương thức liên quan của AdHocHTTP (cùng một ứng dụng khách xuất hiện trong giải pháp phân luồng mà chúng tôi vừa khám phá) nếu bạn tò mò.

Chúng ta sẽ bắt đầu bằng cách xem xét run phương pháp triển khai sử dụng sợi quang của chúng tôi:

class FiberPoweredIntegration
  # ... accessors and initialization ...
  def run
    (1..1000).each_slice(250) do |subset|
      Fiber.new do
        subset.each do |number|
          uri = 'https://127.0.0.1:9292/' \
            "even_or_odd?number=#{number}"
          client = AdHocHTTP.new(uri)
          socket = client.init_non_blocking_get
          yield_if_waiting(client,
                           socket,
                           :connect_non_blocking_get)
          yield_if_waiting(client,
                           socket,
                           :write_non_blocking_get)
          status, body =
            yield_if_waiting(client,
                             socket,
                             :read_non_blocking_get)
          handle_response(status, body)
        ensure
          client&.close_non_blocking_get
        end
      end.resume
    end

    wait_all_requests
  end
  # ...
end

Trước tiên, chúng tôi tạo một sợi cho mỗi tập con số mà chúng tôi muốn kiểm tra xem là chẵn hay lẻ.

Sau đó, chúng tôi lặp lại các số, gọi yield_if_waiting . Phương pháp này chịu trách nhiệm dừng sợi quang hiện tại và cho phép một sợi quang khác tiếp tục lại.

Cũng lưu ý rằng sau khi tạo một sợi quang, chúng tôi gọi resume . Điều này làm cho sợi quang bắt đầu chạy. Bằng cách gọi resume ngay sau khi tạo, chúng tôi bắt đầu thực hiện các yêu cầu HTTP ngay cả trước khi vòng lặp chính đi từ 1 đến 1000 kết thúc.

Vào cuối run , có một lệnh gọi đến wait_all_requests . Phương pháp này chọn các sợi sẵn sàng chạy và cũng đảm bảo chúng tôi thực hiện tất cả các yêu cầu đã định. Chúng ta sẽ xem xét nó trong phân đoạn cuối cùng của phần này.

Bây giờ, hãy xem yield_if_waiting chi tiết:

# ... inside FiberPoweredIntegration ...
def initialize
  @ds = Datastore.new(even: 0, odd: 0)
  @waiting = { wait_readable: {}, wait_writable: {} }
end
# ...
def yield_if_waiting(client, socket, operation)
  res_or_status = client.send(operation)
  is_waiting =
    [:wait_readable,
     :wait_writable].include?(res_or_status)
  return res_or_status unless is_waiting

  waiting[res_or_status][socket] = Fiber.current
  Fiber.yield
  waiting[res_or_status].delete(socket)
  yield_if_waiting(client, socket, operation)
rescue Errno::ETIMEDOUT
  retry # Try again if the server times out.
end

Trước tiên, chúng tôi cố gắng thực hiện một thao tác (kết nối, đọc hoặc ghi) bằng ứng dụng khách của chúng tôi. Có thể có hai kết quả chính:

  • Thành công: Khi điều đó xảy ra, chúng tôi quay trở lại.
  • Chúng tôi có thể nhận được một biểu tượng: Điều này có nghĩa là chúng tôi phải đợi.

Làm thế nào để một người "đợi"?

  1. Chúng tôi tạo một loại điểm kiểm tra bằng cách thêm ổ cắm của chúng tôi kết hợp với sợi hiện tại vào biến cá thể waiting (là Hash ).
  2. Chúng tôi lưu trữ cặp này bên trong một bộ sưu tập chứa IO đang chờ đọc hoặc ghi (chúng tôi sẽ xem lý do tại sao điều đó lại quan trọng trong giây lát), tùy thuộc vào kết quả mà chúng tôi nhận được từ khách hàng.
  3. Chúng tôi ngừng thực thi sợi hiện tại, cho phép một sợi khác chạy. Sợi quang bị tạm dừng sẽ có cơ hội tiếp tục hoạt động vào một thời điểm nào đó sau khi ổ cắm mạng được liên kết sẵn sàng. Sau đó, hoạt động IO sẽ được thử lại (và lần này sẽ thành công).

Mọi chương trình Ruby đều chạy bên trong một sợi mà bản thân nó là một phần của một luồng (mọi thứ bên trong một tiến trình). Kết quả là, khi chúng tôi tạo một sợi đầu tiên, hãy chạy nó, và sau đó tại một thời điểm nào đó, chúng tôi sẽ tiếp tục thực hiện phần trung tâm của chương trình.

Bây giờ chúng ta đã hiểu cơ chế được sử dụng để mang lại hiệu suất thực thi khi một sợi quang đang chờ IO, hãy cùng khám phá bit cuối cùng cần thiết để hiểu được việc triển khai cấp nguồn bằng sợi quang này.

def wait_all_requests
  while(waiting[:wait_readable].any? ||
        waiting[:wait_writable].any?)

    ready_to_read, ready_to_write =
      IO.select(waiting[:wait_readable].keys,
                waiting[:wait_writable].keys)

    ready_to_read.each do |socket|
      waiting[:wait_readable][socket].resume
    end

    ready_to_write.each do |socket|
      waiting[:wait_writable][socket].resume
    end
  end
end

Ý tưởng chính ở đây là đợi (nói cách khác, lặp lại) cho đến khi tất cả các hoạt động IO đang chờ xử lý hoàn tất.

Để làm điều đó, chúng tôi sử dụng IO.select . Nó chấp nhận hai bộ sưu tập các đối tượng IO đang chờ xử lý:một để đọc và một để ghi. Nó trả về những đối tượng IO đã hoàn thành công việc của chúng. Vì chúng tôi đã liên kết các đối tượng IO này với các sợi chịu trách nhiệm vận hành chúng, nên việc nối lại các sợi đó rất đơn giản.

Chúng tôi tiếp tục lặp lại các bước này cho đến khi tất cả các yêu cầu được kích hoạt và hoàn thành.

Chung kết:Màn trình diễn có thể so sánh được, không cần khóa

handle_response của chúng tôi hoàn toàn giống với phương thức ban đầu được sử dụng trong mã sử dụng các luồng (phiên bản không có mutex). Tuy nhiên, vì tất cả các sợi của chúng ta đều chạy bên trong cùng một sợi, nên chúng ta sẽ không có bất kỳ cuộc đua dữ liệu nào. Khi chúng tôi chạy mã của mình, chúng tôi nhận được kết quả mong đợi:

{ even: 500, odd: 500 }

Bạn có thể không muốn đối phó với tất cả các nghiệp vụ chuyển mạch cáp quang mỗi khi bạn sử dụng IO không đồng bộ. May mắn thay, một số gem trừu tượng hóa tất cả công việc này và làm cho việc sử dụng các sợi trở thành thứ mà nhà phát triển không cần phải nghĩ đến. Kiểm tra dự án không đồng bộ như một khởi đầu tuyệt vời.

Sợi tỏa sáng khi cần phải có khả năng mở rộng cao

Mặc dù chúng ta có thể gặt hái được những lợi ích của việc hầu như loại bỏ rủi ro của các cuộc chạy đua dữ liệu ngay cả trong các kịch bản quy mô nhỏ, nhưng sợi là một công cụ tuyệt vời khi cần khả năng mở rộng cao. Sợi nhẹ hơn nhiều so với sợi. Với cùng một nguồn tài nguyên sẵn có, việc tạo ra các luồng sẽ lấn át một hệ thống sớm hơn nhiều so với các sợi. Để có một khám phá tuyệt vời về chủ đề này, chúng tôi giới thiệu bài thuyết trình Hành trình đến một triệu người bởi Samuel Williams của Ruby Core Team.

Guild - Lập trình song song trong Ruby

Cho đến nay, chúng ta đã thấy hai công cụ hữu ích cho đồng thời trong Ruby. Tuy nhiên, không cái nào trong số chúng có thể cải thiện hiệu suất của các phép tính thuần túy. Đối với điều đó, bạn sẽ cần tính song song thực sự, hiện không tồn tại trong Ruby (ở đây chúng tôi đang xem xét MRI, triển khai mặc định).

Điều này có thể thay đổi trong Ruby 3 với sự xuất hiện của một tính năng mới được gọi là "Guilds". Thông tin chi tiết vẫn còn mơ hồ, nhưng trong các phần sau, chúng ta sẽ xem xét cách tính năng đang tiến hành này hứa hẹn cho phép sử dụng song song trong Ruby.

Cách các Bang hội có thể hoạt động

Một nguồn đau đớn đáng kể khi thực hiện các giải pháp đồng thời / song song là bộ nhớ được chia sẻ. Trong phần về chủ đề, chúng ta đã thấy việc lập phiếu và viết mã dễ dàng như thế nào thoạt nhìn có vẻ vô thưởng vô phạt nhưng thực ra lại chứa những lỗi nhỏ.

Koichi Sasada - thành viên Nhóm Ruby Core đang phụ trách việc phát triển tính năng Guild mới - đang nỗ lực thiết kế một giải pháp khắc phục nguy cơ chia sẻ bộ nhớ giữa nhiều luồng. Trong bài thuyết trình của mình tại RubyConf 2018, anh ấy giải thích rằng khi sử dụng các guild, người ta sẽ không thể chia sẻ các đối tượng có thể thay đổi một cách đơn giản. Ý tưởng chính là ngăn chặn các cuộc đua dữ liệu bằng cách chỉ cho phép các đối tượng bất biến được chia sẻ giữa các bang hội khác nhau.

Các cấu trúc dữ liệu chuyên biệt sẽ được giới thiệu trong Ruby để cho phép một số đo lường bộ nhớ được chia sẻ giữa các bang hội, nhưng chi tiết về cách thức hoạt động chính xác của điều này vẫn chưa được tiết lộ đầy đủ. Cũng sẽ có một API cho phép sao chép hoặc di chuyển các đối tượng giữa các bang hội, cộng với một biện pháp bảo vệ để ngăn đối tượng được tham chiếu sau khi nó được chuyển đến một bang hội khác.

Sử dụng Bang hội để khám phá một tình huống chung

Có nhiều tình huống mà bạn có thể ước mình có thể tăng tốc độ tính toán bằng cách chạy chúng song song. Hãy tưởng tượng rằng chúng ta phải tính giá trị trung bình và giá trị trung bình của cùng một tập dữ liệu.

Ví dụ dưới đây cho thấy cách chúng tôi có thể làm điều này với các bang hội. Hãy nhớ rằng mã này hiện không hoạt động và có thể không bao giờ hoạt động, ngay cả sau khi các bang hội được giải phóng.

# A frozen array of numeric values is an immutable object.
dataset = [88, 43, 37, 85, 84, 38, 13, 84, 17, 87].freeze
# The overhead of using guilds will probably be
# considerable, so it will only make sense to
# parallelize work when a dataset is large / when
# performing lots of operations.

g1 = Guild.new do
  mean = dataset.reduce(:+).fdiv(dataset.length)
  Guild.send_to(:mean, Guild.parent)
end

g2 = Guild.new do
  median = Median.calculate(dataset.sort)
  Guild.send_to(:median, Guild.parent)
end

results = {}
# Every Ruby program will be run inside a main guild;
# therefore, we can also receive messages in the main
# section of our program.
Guild.receive(:mean, :median) do |tag, result|
  results[tag] = result
end

Tổng kết

Tính đồng thời và tính song song không phải là điểm mạnh chính của Ruby, nhưng ngay cả trong lĩnh vực này, ngôn ngữ này cũng cung cấp các công cụ có thể đủ tốt để giải quyết hầu hết các trường hợp sử dụng. Ruby 3 sắp ra mắt và có vẻ như mọi thứ sẽ trở nên tốt hơn đáng kể với sự ra đời của Guild nguyên thủy. Theo tôi, Ruby vẫn là một lựa chọn rất phù hợp trong nhiều tình huống, và cộng đồng của nó rõ ràng đang rất nỗ lực trong việc làm cho ngôn ngữ trở nên tốt hơn nữa. Hãy chú ý lắng nghe những gì sắp tới!