Computer >> Máy Tính >  >> Lập trình >> Ruby

Nhân viên hàng đợi mở rộng quy mô một cách hiệu quả với AppSignal Metrics

Hầu hết các ứng dụng web có thể hưởng lợi từ hàng đợi nền, thường được sử dụng để xử lý các công việc phụ dễ xảy ra lỗi hoặc tốn thời gian. Các công việc nền này có thể khác nhau, từ gửi email, cập nhật bộ nhớ đệm, đến thực hiện logic kinh doanh cốt lõi.

Vì bất kỳ hệ thống xếp hàng nền nào cũng mở rộng quy mô số lượng công việc cần xử lý, nhóm công nhân xử lý các công việc đó cũng cần phải mở rộng quy mô. trong việc duy trì tốc độ xử lý. Ngoài ra, giảm quy mô nhân công trong thời gian thông lượng hàng đợi thấp có thể tiết kiệm đáng kể!

Thật không may, nhiều phụ trợ xếp hàng không được trang bị logic chia tỷ lệ để bật hoặc tắt nhân viên.

Quy tắc ngón tay cái trong hàng đợi

Nếu các công việc được xếp vào hàng đợi với tỷ lệ cao hơn chúng được xử lý bởi nhân viên xếp hàng, thì độ sâu của hàng đợi sẽ tăng lên và thời gian mà mỗi công việc dành cho hàng đợi cũng sẽ tăng lên. hàng đợi) cho mỗi công việc ở mức thấp nhất có thể - từ 0 giây lên đến một số giới hạn có thể chấp nhận được. QROT được thể hiện dưới dạng bất bình đẳng mô tả số lượng máy chủ cần thiết để phục vụ một hàng đợi công việc, nhưng một dạng có thể được viết là:

workers = (number_of_jobs * avg_service_time_per_job) / time_to_finish_queue

Vì vậy, nếu chúng ta muốn tìm ra số lượng công nhân cần thiết để phục vụ hàng đợi của chúng ta trong thời gian mong muốn, chẳng hạn, 30 giây, chúng ta chỉ cần biết số lượng công việc (kích thước của hàng đợi) và thời gian trung bình cần để thực hiện từng công việc. Ví dụ:nếu chúng ta có một hàng đợi 7500 công việc và mỗi công việc mất trung bình 0,3 giây để thực thi, thì chúng ta có thể hoàn thành hàng đợi đó trong 30 giây với 75 công nhân.

Truy cập các chỉ số Hiệu suất

Để ước tính thời gian phục vụ trung bình cho các công việc trong hàng đợi, chúng tôi cần quyền truy cập vào các chỉ số hiệu suất cho từng loại công việc. công việc đã được thực hiện.

Chúng tôi có thể sử dụng API AppSignal GraphQL sắp tới để có được thời lượng trung bình của từng loại công việc trong 24 giờ qua. nhằm mục đích tự lập tài liệu và chúng tôi có thể sử dụng một công cụ như GraphiQL để xem xét bên trong API và tìm ra đối tượng dữ liệu mà nó hiển thị.

Quá trình xây dựng truy vấn GraphQL nằm ngoài phạm vi của bài đăng này, nhưng dưới đây là ví dụ về lớp Ruby kết nối với API AppSignal GraphQL bằng cách sử dụng thư viện máy khách Faraday HTTP phổ biến để truy vấn tổng hợp số liệu cơ bản.

require 'json'
require 'faraday'
 
class AppsignalClient
  BASE_URL = 'https://appsignal.com/'
  DEFAULT_APP_ID = ENV['APPSIGNAL_APP_ID']
  DEFAULT_TOKEN = ENV['APPSIGNAL_API_TOKEN']
  # GraphQL query to fetch the "mean" metric for the selected app.
  METRICS_QUERY = <<~GRAPHQL.freeze
    query($appId: String!, $query: [MetricAggregation!]!, $timeframe: TimeframeEnum!) {
      app(id: $appId) {
        metrics {
          list(timeframe: $timeframe, query: $query) {
            start
            end
            rows {
              fields {
                key
                value
              }
            }
          }
        }
      }
    }
  GRAPHQL
 
  def initialize(app_id: DEFAULT_APP_ID, client_secret: DEFAULT_TOKEN)
    @app_id = app_id
    @client_secret = client_secret
  end
 
  # Fetch the average duration for a job class's perform action
  # Default timeframe is last 24 hours
  def average_job_duration(job_class, timeframe: 'R24H')
    response =
      connection.post(
        'graphql',
        JSON.dump(
          query: METRICS_QUERY,
          variables: {
            appId: @app_id,
            timeframe: timeframe,
            query: [
              name: 'transaction_duration',
              headerType: legacy
tags: [
                { key: 'namespace', value: 'background' },
                { key: 'action', value: "#{job_class.name}#perform" },
              ],
              fields: [{ field: 'MEAN', aggregate: 'AVG' }],
            ],
          }
        )
      )
    data = JSON.parse(response.body, symbolize_names: true)
    rows = data.dig(:data, :app, :metrics, :list, :rows)
    # There may be no metrics in the selected timeframe
    return 0.0 if rows.empty?
 
    rows.first[:fields].first[:value]
  end
 
  private
 
  def connection
    @connection ||= Faraday.new(
      url: BASE_URL,
      params: { token: @client_secret },
      headers: { 'Content-Type' => 'application/json' },
      request: { timeout: 10 }
    ) do |faraday|
      faraday.response :raise_error
      faraday.adapter Faraday.default_adapter
    end
  end
end

Với lớp này, chúng tôi có thể nhận được thời lượng công việc trung bình cho một lớp ActiveJob nhất định, được trả lại cho chúng tôi sau mili giây:

AppsignalClient.new.average_job_duration(MyMailerJob)
# => 233.1

Theo mặc định, điều này yêu cầu thời lượng giao dịch trung bình của công việc trong 24 giờ qua của dữ liệu. Ví dụ:nếu chúng tôi có công việc chạy hàng trăm lần một giờ, chúng tôi có thể muốn thay đổi timeframe của mình đến một giờ (R1H ) để ước tính tốt hơn thời lượng của một công việc như vậy nếu được thực hiện ngay bây giờ.

Lưu ý rằng dữ liệu hiệu suất này tách biệt với dữ liệu sử dụng máy chủ của chúng tôi. Dữ liệu này cho chúng tôi biết thời gian thực sự sẽ mất bao lâu để thực hiện công việc cần thiết cho mỗi công việc. .

Tìm hiểu nội dung hàng đợi

Tiếp theo, chúng ta cần xem xét bên trong hàng đợi của mình để xác định các công việc sẽ được phục vụ. Phần phụ trợ xếp hàng phổ biến của Ruby là Resque, cũng tích hợp độc đáo với ActiveJob. mỗi công việc dựa trên lớp của nó, sử dụng AppsignalClient của chúng tôi lớp từ trên xuống.

require 'resque'
 
class ResqueEstimator
  def initialize(queue: 'default')
    @queue = queue
    @cache = {}
    @appsignal_client = AppsignalClient.new
  end
 
  def enqueued_duration_estimate
    Resque.data_store.everything_in_queue(queue).map do |job|
      estimate_job_duration decode_activejob_args(job)
    end.sum
  end
 
  def estimate_job_duration(job)
    @cache[job['job_class']] ||= @appsignal_client
                                 .average_job_duration job['job_class']
  end
 
  private
 
  # ActiveJob-specific method for parsing job arguments
  # for ActiveJob+Resque integration
  def decode_activejob_args(job)
    decoded_job = job
    decoded_job = Resque.decode(job) if job.is_a? String
    decoded_job['args'].first
  end
end

Sử dụng lớp này đơn giản như sau:

ResqueEstimator.new(queue: 'my_queue').enqueued_duration_estimate
# => 23000 (ms)

Lưu ý rằng chúng tôi sử dụng một bản ghi nhớ đơn giản về thời lượng công việc trong estimate_job_duration của chúng tôi để tránh các lệnh gọi trùng lặp đến API AppSignal. Nhiều khả năng, hàng đợi của chúng ta sẽ chứa nhiều công việc của cùng một lớp và chúng ta có thể giảm chi phí của mình bằng cách chỉ ước tính việc thực thi mỗi lớp một lần.

Sử dụng dữ liệu hiệu suất để mở rộng quy mô

Kết hợp tất cả những điều này lại với nhau, giờ đây chúng tôi có thể sử dụng dữ liệu hiệu suất gần đây của mình để chia tỷ lệ nhân viên trong hàng đợi của chúng tôi lên hoặc xuống dựa trên nội dung của hàng đợi của chúng tôi! Bất kỳ lúc nào, chúng tôi có thể xem xét các công việc trong hàng đợi của mình và nhận được ước tính về số công nhân cần thiết để phục vụ nó trong giới hạn thời gian mong muốn của chúng tôi.

Chúng tôi sẽ cần quyết định giới hạn thời gian xếp hàng mong muốn (lượng thời gian tối đa mà bất kỳ công việc nào phải chờ trong hàng đợi), ví dụ:Chúng tôi cũng sẽ cần chỉ định số lượng công nhân tối thiểu và tối đa. Sẽ rất hữu ích nếu giữ ít nhất một công nhân chạy cho hàng đợi, để xử lý (các) công việc đầu tiên được xếp hàng sau khi hàng đợi trống trong một thời gian. cũng muốn có số lượng công nhân tối đa, để tránh mở rộng quá mức các kết nối cơ sở dữ liệu của chúng tôi và / hoặc chi phí sử dụng máy chủ với quá nhiều công nhân.

Chúng ta có thể tạo một lớp để xử lý logic này cho chúng ta, về cơ bản chỉ là việc chúng ta triển khai Quy tắc xếp hàng của ngón tay cái từ trước.

class ResqueWorkerScaler
  def initialize(queue: 'default', workers_range: 1..100, desired_wait_ms: 300_000)
    @queue = queue
    @workers_range = workers_range
    @desired_wait_ms = desired_wait_ms
    @estimator = ResqueEstimator.new(queue: @queue)
  end
 
  def desired_workers
    total_time_ms = @estimator.enqueued_duration_estimate
    workers_required = [(total_time_ms / desired_wait_ms).ceil, workers_range.last].min
    [workers_required, workers_range.first].max
  end
 
  def scale
    # using platform-specific scaling interface, scale to desired_workers
  end
end

Chúng tôi sẽ muốn mở rộng quy mô nhân viên của mình một cách đều đặn để chúng tôi mở rộng quy mô và thu nhỏ dựa trên nhu cầu. Chúng tôi có thể thực hiện một tác vụ Rake gọi ResqueWorkerScaler của chúng tôi tầng lớp nhân công:

# inside lib/tasks/resque_workers.rake
 
namespace :resque_workers do
  desc 'Scale worker pool based on enqueued jobs'
  task :scale, [:queue] => [:environment] do |_t, args|
    queue = args[:queue] || 'default'
    ResqueWorkerScaler.new(queue: queue).scale
  end
end

Và sau đó, chúng tôi có thể thiết lập một công việc cron để chạy tác vụ Rake này một cách đều đặn:

*/5 * * * * /path/to/our/rake resque_workers:scale
# scale a non-default queue:
*/5 * * * * /path/to/our/rake resque_workers:scale['my_queue']

Lưu ý rằng chúng tôi đặt tác vụ mở rộng quy mô chạy 5 phút một lần. chúng tôi sử dụng Vì vậy, nếu chúng tôi cố gắng mở rộng quy mô nhân viên của mình mỗi phút, chúng tôi có thể sẽ tăng hoặc giảm quy mô lại trước khi những thay đổi mong muốn của chúng tôi có hiệu lực. có thể gọi tác vụ Cào của chúng tôi vào khoảng thời gian hàng giờ. Nhưng nếu kích thước hàng đợi của chúng ta dao động trong một giờ, chúng ta sẽ muốn xem xét hàng đợi của mình ở khoảng thời gian thường xuyên hơn, như 5 phút ở trên.

Các bước tiếp theo

Một hệ thống như vậy trong đó dữ liệu hiệu suất thực tế được sử dụng để mở rộng cơ sở hạ tầng có thể đáp ứng rất tốt nhu cầu và có khả năng phục hồi đối với việc sử dụng khác nhau. quy mô thích hợp hơn nhiều.

Việc triển khai chia tỷ lệ hàng đợi thay thế có thể đo thời gian chờ trung bình cho mỗi công việc thay vì xem xét bên trong toàn bộ hàng đợi, nhưng số liệu đó có thể không đại diện khi nội dung và kích thước hàng đợi thay đổi nhanh chóng. hoặc thời gian thực hiện công việc thay đổi rộng rãi, thì việc xem xét nội quan hàng đợi sẽ nhanh hơn nhiều để phản hồi và chính xác một cách đáng tin cậy.

Nhưng có một số hạn chế cần xem xét trong hệ thống xem xét hàng đợi của chúng tôi. Nếu hàng đợi đủ lớn, việc xem xét từng công việc để ước tính thực hiện sẽ rất chậm. lấy mẫu đại diện ngẫu nhiên của các công việc từ hàng đợi và tính toán thực thi trung bình từ mẫu đó. vài lần.

Hệ thống được nêu ở trên cũng có thể được cải thiện đáng kể với một vài chỉnh sửa. worker để cải thiện độ chính xác của ước tính tổng thời gian phục vụ của chúng tôi. Đối với kiến ​​trúc xử lý nền với nhiều hàng đợi, chúng tôi có thể chỉ định thời gian chờ mong muốn cho mỗi hàng đợi, dựa trên mức độ ưu tiên của hàng đợi và chia tỷ lệ nhân viên một cách thích hợp.

Hệ thống xếp hàng có xu hướng thu thập rất nhiều công việc có tính thay đổi cao trong bất kỳ dự án nào.

Chúc bạn mở rộng quy mô thành công!

Tái bút. Nếu bạn muốn đọc các bài đăng của Ruby Magic ngay khi chúng xuất hiện trên báo chí, hãy đăng ký bản tin Ruby Magic của chúng tôi và không bao giờ bỏ lỡ một bài đăng nào!