Hầu hết các ứng dụng web có thể hưởng lợi từ hàng đợi nền, thường được sử dụng để xử lý các công việc phụ dễ xảy ra lỗi hoặc tốn thời gian. Các công việc nền này có thể khác nhau, từ gửi email, cập nhật bộ nhớ đệm, đến thực hiện logic kinh doanh cốt lõi.
Vì bất kỳ hệ thống xếp hàng nền nào cũng mở rộng quy mô số lượng công việc cần xử lý, nhóm công nhân xử lý các công việc đó cũng cần phải mở rộng quy mô. trong việc duy trì tốc độ xử lý. Ngoài ra, giảm quy mô nhân công trong thời gian thông lượng hàng đợi thấp có thể tiết kiệm đáng kể!
Thật không may, nhiều phụ trợ xếp hàng không được trang bị logic chia tỷ lệ để bật hoặc tắt nhân viên.
Quy tắc ngón tay cái trong hàng đợi
Nếu các công việc được xếp vào hàng đợi với tỷ lệ cao hơn chúng được xử lý bởi nhân viên xếp hàng, thì độ sâu của hàng đợi sẽ tăng lên và thời gian mà mỗi công việc dành cho hàng đợi cũng sẽ tăng lên. hàng đợi) cho mỗi công việc ở mức thấp nhất có thể - từ 0 giây lên đến một số giới hạn có thể chấp nhận được. QROT được thể hiện dưới dạng bất bình đẳng mô tả số lượng máy chủ cần thiết để phục vụ một hàng đợi công việc, nhưng một dạng có thể được viết là:
workers = (number_of_jobs * avg_service_time_per_job) / time_to_finish_queue
Vì vậy, nếu chúng ta muốn tìm ra số lượng công nhân cần thiết để phục vụ hàng đợi của chúng ta trong thời gian mong muốn, chẳng hạn, 30 giây, chúng ta chỉ cần biết số lượng công việc (kích thước của hàng đợi) và thời gian trung bình cần để thực hiện từng công việc. Ví dụ:nếu chúng ta có một hàng đợi 7500 công việc và mỗi công việc mất trung bình 0,3 giây để thực thi, thì chúng ta có thể hoàn thành hàng đợi đó trong 30 giây với 75 công nhân.
Truy cập các chỉ số Hiệu suất
Để ước tính thời gian phục vụ trung bình cho các công việc trong hàng đợi, chúng tôi cần quyền truy cập vào các chỉ số hiệu suất cho từng loại công việc. công việc đã được thực hiện.
Chúng tôi có thể sử dụng API AppSignal GraphQL sắp tới để có được thời lượng trung bình của từng loại công việc trong 24 giờ qua. nhằm mục đích tự lập tài liệu và chúng tôi có thể sử dụng một công cụ như GraphiQL để xem xét bên trong API và tìm ra đối tượng dữ liệu mà nó hiển thị.
Quá trình xây dựng truy vấn GraphQL nằm ngoài phạm vi của bài đăng này, nhưng dưới đây là ví dụ về lớp Ruby kết nối với API AppSignal GraphQL bằng cách sử dụng thư viện máy khách Faraday HTTP phổ biến để truy vấn tổng hợp số liệu cơ bản.
require 'json'
require 'faraday'
class AppsignalClient
BASE_URL = 'https://appsignal.com/'
DEFAULT_APP_ID = ENV['APPSIGNAL_APP_ID']
DEFAULT_TOKEN = ENV['APPSIGNAL_API_TOKEN']
# GraphQL query to fetch the "mean" metric for the selected app.
METRICS_QUERY = <<~GRAPHQL.freeze
query($appId: String!, $query: [MetricAggregation!]!, $timeframe: TimeframeEnum!) {
app(id: $appId) {
metrics {
list(timeframe: $timeframe, query: $query) {
start
end
rows {
fields {
key
value
}
}
}
}
}
}
GRAPHQL
def initialize(app_id: DEFAULT_APP_ID, client_secret: DEFAULT_TOKEN)
@app_id = app_id
@client_secret = client_secret
end
# Fetch the average duration for a job class's perform action
# Default timeframe is last 24 hours
def average_job_duration(job_class, timeframe: 'R24H')
response =
connection.post(
'graphql',
JSON.dump(
query: METRICS_QUERY,
variables: {
appId: @app_id,
timeframe: timeframe,
query: [
name: 'transaction_duration',
headerType: legacy
tags: [
{ key: 'namespace', value: 'background' },
{ key: 'action', value: "#{job_class.name}#perform" },
],
fields: [{ field: 'MEAN', aggregate: 'AVG' }],
],
}
)
)
data = JSON.parse(response.body, symbolize_names: true)
rows = data.dig(:data, :app, :metrics, :list, :rows)
# There may be no metrics in the selected timeframe
return 0.0 if rows.empty?
rows.first[:fields].first[:value]
end
private
def connection
@connection ||= Faraday.new(
url: BASE_URL,
params: { token: @client_secret },
headers: { 'Content-Type' => 'application/json' },
request: { timeout: 10 }
) do |faraday|
faraday.response :raise_error
faraday.adapter Faraday.default_adapter
end
end
end
Với lớp này, chúng tôi có thể nhận được thời lượng công việc trung bình cho một lớp ActiveJob nhất định, được trả lại cho chúng tôi sau mili giây:
AppsignalClient.new.average_job_duration(MyMailerJob)
# => 233.1
Theo mặc định, điều này yêu cầu thời lượng giao dịch trung bình của công việc trong 24 giờ qua của dữ liệu. Ví dụ:nếu chúng tôi có công việc chạy hàng trăm lần một giờ, chúng tôi có thể muốn thay đổi timeframe
của mình đến một giờ (R1H
) để ước tính tốt hơn thời lượng của một công việc như vậy nếu được thực hiện ngay bây giờ.
Lưu ý rằng dữ liệu hiệu suất này tách biệt với dữ liệu sử dụng máy chủ của chúng tôi. Dữ liệu này cho chúng tôi biết thời gian thực sự sẽ mất bao lâu để thực hiện công việc cần thiết cho mỗi công việc. .
Tìm hiểu nội dung hàng đợi
Tiếp theo, chúng ta cần xem xét bên trong hàng đợi của mình để xác định các công việc sẽ được phục vụ. Phần phụ trợ xếp hàng phổ biến của Ruby là Resque, cũng tích hợp độc đáo với ActiveJob. mỗi công việc dựa trên lớp của nó, sử dụng AppsignalClient
của chúng tôi lớp từ trên xuống.
require 'resque'
class ResqueEstimator
def initialize(queue: 'default')
@queue = queue
@cache = {}
@appsignal_client = AppsignalClient.new
end
def enqueued_duration_estimate
Resque.data_store.everything_in_queue(queue).map do |job|
estimate_job_duration decode_activejob_args(job)
end.sum
end
def estimate_job_duration(job)
@cache[job['job_class']] ||= @appsignal_client
.average_job_duration job['job_class']
end
private
# ActiveJob-specific method for parsing job arguments
# for ActiveJob+Resque integration
def decode_activejob_args(job)
decoded_job = job
decoded_job = Resque.decode(job) if job.is_a? String
decoded_job['args'].first
end
end
Sử dụng lớp này đơn giản như sau:
ResqueEstimator.new(queue: 'my_queue').enqueued_duration_estimate
# => 23000 (ms)
Lưu ý rằng chúng tôi sử dụng một bản ghi nhớ đơn giản về thời lượng công việc trong estimate_job_duration
của chúng tôi để tránh các lệnh gọi trùng lặp đến API AppSignal. Nhiều khả năng, hàng đợi của chúng ta sẽ chứa nhiều công việc của cùng một lớp và chúng ta có thể giảm chi phí của mình bằng cách chỉ ước tính việc thực thi mỗi lớp một lần.
Sử dụng dữ liệu hiệu suất để mở rộng quy mô
Kết hợp tất cả những điều này lại với nhau, giờ đây chúng tôi có thể sử dụng dữ liệu hiệu suất gần đây của mình để chia tỷ lệ nhân viên trong hàng đợi của chúng tôi lên hoặc xuống dựa trên nội dung của hàng đợi của chúng tôi! Bất kỳ lúc nào, chúng tôi có thể xem xét các công việc trong hàng đợi của mình và nhận được ước tính về số công nhân cần thiết để phục vụ nó trong giới hạn thời gian mong muốn của chúng tôi.
Chúng tôi sẽ cần quyết định giới hạn thời gian xếp hàng mong muốn (lượng thời gian tối đa mà bất kỳ công việc nào phải chờ trong hàng đợi), ví dụ:Chúng tôi cũng sẽ cần chỉ định số lượng công nhân tối thiểu và tối đa. Sẽ rất hữu ích nếu giữ ít nhất một công nhân chạy cho hàng đợi, để xử lý (các) công việc đầu tiên được xếp hàng sau khi hàng đợi trống trong một thời gian. cũng muốn có số lượng công nhân tối đa, để tránh mở rộng quá mức các kết nối cơ sở dữ liệu của chúng tôi và / hoặc chi phí sử dụng máy chủ với quá nhiều công nhân.
Chúng ta có thể tạo một lớp để xử lý logic này cho chúng ta, về cơ bản chỉ là việc chúng ta triển khai Quy tắc xếp hàng của ngón tay cái từ trước.
class ResqueWorkerScaler
def initialize(queue: 'default', workers_range: 1..100, desired_wait_ms: 300_000)
@queue = queue
@workers_range = workers_range
@desired_wait_ms = desired_wait_ms
@estimator = ResqueEstimator.new(queue: @queue)
end
def desired_workers
total_time_ms = @estimator.enqueued_duration_estimate
workers_required = [(total_time_ms / desired_wait_ms).ceil, workers_range.last].min
[workers_required, workers_range.first].max
end
def scale
# using platform-specific scaling interface, scale to desired_workers
end
end
Chúng tôi sẽ muốn mở rộng quy mô nhân viên của mình một cách đều đặn để chúng tôi mở rộng quy mô và thu nhỏ dựa trên nhu cầu. Chúng tôi có thể thực hiện một tác vụ Rake gọi ResqueWorkerScaler
của chúng tôi tầng lớp nhân công:
# inside lib/tasks/resque_workers.rake
namespace :resque_workers do
desc 'Scale worker pool based on enqueued jobs'
task :scale, [:queue] => [:environment] do |_t, args|
queue = args[:queue] || 'default'
ResqueWorkerScaler.new(queue: queue).scale
end
end
Và sau đó, chúng tôi có thể thiết lập một công việc cron để chạy tác vụ Rake này một cách đều đặn:
*/5 * * * * /path/to/our/rake resque_workers:scale
# scale a non-default queue:
*/5 * * * * /path/to/our/rake resque_workers:scale['my_queue']
Lưu ý rằng chúng tôi đặt tác vụ mở rộng quy mô chạy 5 phút một lần. chúng tôi sử dụng Vì vậy, nếu chúng tôi cố gắng mở rộng quy mô nhân viên của mình mỗi phút, chúng tôi có thể sẽ tăng hoặc giảm quy mô lại trước khi những thay đổi mong muốn của chúng tôi có hiệu lực. có thể gọi tác vụ Cào của chúng tôi vào khoảng thời gian hàng giờ. Nhưng nếu kích thước hàng đợi của chúng ta dao động trong một giờ, chúng ta sẽ muốn xem xét hàng đợi của mình ở khoảng thời gian thường xuyên hơn, như 5 phút ở trên.
Các bước tiếp theo
Một hệ thống như vậy trong đó dữ liệu hiệu suất thực tế được sử dụng để mở rộng cơ sở hạ tầng có thể đáp ứng rất tốt nhu cầu và có khả năng phục hồi đối với việc sử dụng khác nhau. quy mô thích hợp hơn nhiều.
Việc triển khai chia tỷ lệ hàng đợi thay thế có thể đo thời gian chờ trung bình cho mỗi công việc thay vì xem xét bên trong toàn bộ hàng đợi, nhưng số liệu đó có thể không đại diện khi nội dung và kích thước hàng đợi thay đổi nhanh chóng. hoặc thời gian thực hiện công việc thay đổi rộng rãi, thì việc xem xét nội quan hàng đợi sẽ nhanh hơn nhiều để phản hồi và chính xác một cách đáng tin cậy.
Nhưng có một số hạn chế cần xem xét trong hệ thống xem xét hàng đợi của chúng tôi. Nếu hàng đợi đủ lớn, việc xem xét từng công việc để ước tính thực hiện sẽ rất chậm. lấy mẫu đại diện ngẫu nhiên của các công việc từ hàng đợi và tính toán thực thi trung bình từ mẫu đó. vài lần.
Hệ thống được nêu ở trên cũng có thể được cải thiện đáng kể với một vài chỉnh sửa. worker để cải thiện độ chính xác của ước tính tổng thời gian phục vụ của chúng tôi. Đối với kiến trúc xử lý nền với nhiều hàng đợi, chúng tôi có thể chỉ định thời gian chờ mong muốn cho mỗi hàng đợi, dựa trên mức độ ưu tiên của hàng đợi và chia tỷ lệ nhân viên một cách thích hợp.
Hệ thống xếp hàng có xu hướng thu thập rất nhiều công việc có tính thay đổi cao trong bất kỳ dự án nào.
Chúc bạn mở rộng quy mô thành công!
Tái bút. Nếu bạn muốn đọc các bài đăng của Ruby Magic ngay khi chúng xuất hiện trên báo chí, hãy đăng ký bản tin Ruby Magic của chúng tôi và không bao giờ bỏ lỡ một bài đăng nào!