Các công ty muốn phản ứng nhanh với nhu cầu xử lý và chia sẻ lượng lớn dữ liệu trong thời gian thực để có được thông tin chi tiết và tạo ra trải nghiệm khách hàng hấp dẫn hơn. Vì vậy, xử lý dữ liệu truyền thống không còn khả thi trong thế giới ngày nay.
Để đạt được điều đó, bạn cần xử lý nhiều dữ liệu càng nhanh càng tốt và sau đó gửi đến các dịch vụ khác để xử lý nhiều hơn. Nhưng ở giữa tất cả các hành động nhanh chóng này, cần phải thông báo cho người tiêu dùng khi sự kiện xảy ra — và chúng tôi có thể thực hiện điều đó bằng cách sử dụng phát trực tuyến sự kiện.
Đây là repo trong GitHub mà chúng tôi sẽ sử dụng.
Sự kiện
Trước khi nói về phát trực tuyến sự kiện, hãy nói về sự kiện là gì. Một sự kiện xảy ra trong ứng dụng có thể liên quan đến quy trình của người dùng hoặc đơn giản là các hành động ảnh hưởng đến hoạt động kinh doanh.
Các sự kiện đại diện cho một sự thay đổi trạng thái, không phải là câu hỏi về cách sửa đổi ứng dụng. Hãy coi đây là những ví dụ:
- Người dùng đăng nhập vào một dịch vụ
- Một giao dịch thanh toán
- Một nhà văn xuất bản một bài đăng trên blog
Trong hầu hết các trường hợp, một sự kiện sẽ kích hoạt nhiều sự kiện hơn; ví dụ:khi người dùng đăng ký một dịch vụ, ứng dụng sẽ gửi thông báo đến thiết bị của họ, chèn bản ghi vào cơ sở dữ liệu và gửi email chào mừng.
Phát trực tuyến sự kiện
Truyền trực tuyến sự kiện là một mẫu để thu thập dữ liệu trong thời gian thực từ các nguồn sự kiện như cơ sở dữ liệu. Các phần chính của luồng sự kiện như sau:
- Nhà môi giới :Hệ thống phụ trách lưu trữ các sự kiện
- Chủ đề :Một loại sự kiện
- Nhà sản xuất :Gửi các sự kiện cho nhà môi giới về một chủ đề cụ thể
- Người tiêu dùng :Đọc các sự kiện
- Sự kiện :Dữ liệu mà nhà sản xuất muốn truyền đạt cho người tiêu dùng
Không thể tránh khỏi việc nói về xuất bản và đăng ký mô hình kiến trúc (mô hình quán rượu / mô hình phụ) tại thời điểm này; phát trực tuyến sự kiện là một triển khai của mẫu đó nhưng với những thay đổi sau:
- Sự kiện xảy ra thay vì tin nhắn.
- Các sự kiện được sắp xếp theo thứ tự, thường là theo thời gian.
- Người tiêu dùng có thể đọc các sự kiện từ một điểm cụ thể trong chủ đề.
- Các sự kiện có độ bền theo thời gian.
Luồng bắt đầu khi nhà sản xuất xuất bản một sự kiện mới vào một chủ đề (như chúng ta đã thấy trước đây, chủ đề chỉ là phân loại cho một loại sự kiện cụ thể). Sau đó, người tiêu dùng quan tâm đến các sự kiện của một danh mục cụ thể đăng ký chủ đề đó. Cuối cùng, nhà môi giới xác định khách hàng của chủ đề và cung cấp các sự kiện mong muốn.
Ưu điểm của phát trực tuyến sự kiện
-
Tách rời Không có sự phụ thuộc giữa nhà xuất bản và người tiêu dùng vì họ không cần biết nhau. Ngoài ra, các sự kiện không chỉ rõ hành động của họ, vì vậy nhiều người tiêu dùng có thể nhận được cùng một sự kiện và thực hiện các hành động khác nhau.
-
Độ trễ thấp Các sự kiện được tách riêng và cho phép người tiêu dùng sử dụng chúng bất cứ lúc nào; nó có thể xảy ra trong mili giây.
-
Độc lập Như chúng ta biết, nhà xuất bản và người tiêu dùng độc lập, vì vậy các nhóm khác nhau có thể làm việc với họ bằng cách sử dụng các sự kiện giống nhau cho các hành động hoặc mục đích khác.
-
Khả năng chịu lỗi Một số nền tảng phát trực tuyến sự kiện giúp bạn đối phó với những thất bại của người tiêu dùng; ví dụ:người tiêu dùng có thể lưu vị trí của họ và bắt đầu lại từ đó nếu xảy ra lỗi.
-
Xử lý thời gian thực Phản hồi được nhận trong thời gian thực, vì vậy người dùng không cần đợi vài phút hoặc vài giờ để xem phản hồi về các sự kiện của họ.
-
Hiệu suất cao Nền tảng sự kiện có thể xử lý nhiều thông báo do độ trễ thấp — ví dụ:hàng nghìn sự kiện trong một giây.
Nhược điểm của phát trực tuyến sự kiện
-
Giám sát Một số công cụ phát trực tuyến sự kiện không có công cụ giám sát hoàn chỉnh; họ kêu gọi triển khai các công cụ bổ sung, chẳng hạn như Datadog hoặc New Relic.
-
Cấu hình Cấu hình trong một số công cụ có thể gây choáng ngợp ngay cả đối với những người có kinh nghiệm. Có rất nhiều tham số và đôi khi, bạn cần biết sâu về chủ đề để triển khai chúng.
-
Thư viện ứng dụng khách Không dễ dàng triển khai Kafka bằng các ngôn ngữ khác ngoài Java. Đôi khi, các thư viện máy khách không được cập nhật, hiển thị không ổn định hoặc không cung cấp nhiều lựa chọn thay thế để lựa chọn.
Một trong những công cụ phổ biến nhất để phát trực tuyến sự kiện là Apache Kafka . Công cụ này cho phép người dùng gửi, lưu trữ và yêu cầu dữ liệu bất cứ khi nào và bất cứ nơi nào họ cần; hãy nói về nó.
Apache Kafka
"Apache Kafka là nền tảng phát trực tuyến sự kiện phân tán mã nguồn mở được hàng nghìn công ty sử dụng cho các đường ống dữ liệu hiệu suất cao, phân tích phát trực tuyến, tích hợp dữ liệu và các ứng dụng quan trọng."
Được thiết kế đặc biệt để truyền nhật ký thời gian thực, Apache Kafka lý tưởng cho các ứng dụng yêu cầu những điều sau:
- Trao đổi dữ liệu đáng tin cậy giữa các thành phần khác nhau
- Khả năng phân chia khối lượng công việc nhắn tin khi các yêu cầu ứng dụng thay đổi
- Truyền theo thời gian thực để xử lý dữ liệu
Hãy sử dụng Kafka trong ứng dụng Rails!
Sử dụng Kafka với Rails
Đá quý nổi tiếng nhất để sử dụng Kafka trong Ruby được Zendesk gọi là ruby-kafka, và nó thật tuyệt! Tuy nhiên, bạn cần phải thực hiện tất cả việc triển khai theo cách thủ công, đó là lý do tại sao chúng tôi có một số "khuôn khổ" được xây dựng bằng ruby-kafka. Họ cũng giúp chúng tôi với tất cả các bước cấu hình và thực thi.
Karafka là một khuôn khổ được sử dụng để đơn giản hóa việc phát triển các ứng dụng Ruby dựa trên Apache Kafka.
Để làm việc với Kafka, cần phải cài đặt Java. Vì Kafka cũng là một ứng dụng Scala và Java nên việc cài đặt Zookeeper sẽ được yêu cầu.
Trước khi cài đặt, tôi muốn giải thích một chút về Zookeeper. Zookeeper là một dịch vụ tập trung cần thiết cho Kafka; nó sẽ gửi thông báo trong trường hợp có những thay đổi như tạo chủ đề mới, sự cố của nhà môi giới, xóa nhà môi giới, xóa chủ đề, v.v.
Nhiệm vụ chính của nó là quản lý các nhà môi giới Kafka, duy trì một danh sách với siêu dữ liệu tương ứng của họ và tạo điều kiện cho các cơ chế kiểm tra sức khỏe. Ngoài ra, nó giúp chọn nhà môi giới hàng đầu cho các phân vùng khác nhau của các chủ đề.
Yêu cầu
Đối với MacOS:
Bây giờ, hãy cài đặt Java và Zookeeper bằng các lệnh sau:
brew install java
brew install zookeeper
Sau đó, chúng tôi có thể tiếp tục cài đặt Kafka chạy sau:
brew install kafka
Khi chúng tôi đã cài đặt Kafka và Zookeeper, cần phải khởi động các dịch vụ theo cách này:
brew services start zookeeper
brew services start kafka
Đối với Windows và Linux:
Hướng dẫn:
- Cài đặt Java
- Tải xuống Zookeeper
Thiết lập đường ray
Chỉ cần tạo một ứng dụng Rails đơn giản như bình thường:
rails new karafka_example
và thêm đá quý karafka trong Gemfile:
gem 'karafka'
Sau đó, chạy bundle install
để cài đặt viên ngọc được thêm gần đây và đừng quên chạy lệnh sau để nhận tất cả những thứ Karafka:
bundle exec karafka install
Lệnh đó sẽ tạo ra một số tệp thú vị:tệp đầu tiên là karafka.rb
trong thư mục gốc, app/consumers/application_consumer.rb
và app/responders/application_responder.rb
.
Bộ khởi tạo Karafka
karafka.rb
tệp giống như một ứng dụng khởi tạo được tách ra từ cấu hình Rails. Nó cho phép bạn định cấu hình ứng dụng Karafka và vẽ một số tuyến, tương tự về mặt API như các tuyến ứng dụng Rails. Nhưng ở đây, nó dành cho các chủ đề và người tiêu dùng.
Nhà sản xuất
Nhà sản xuất chịu trách nhiệm tạo các sự kiện và chúng tôi có thể thêm chúng vào app/responders
thư mục. Bây giờ, hãy tạo một nhà sản xuất đơn giản cho người dùng:
# app/responders/users_responder.rb
class UsersResponder < ApplicationResponder
topic :users
def respond(event_payload)
respond_to :users, event_payload
end
end
Người tiêu dùng
Người tiêu dùng chịu trách nhiệm đọc tất cả các sự kiện / tin nhắn được gửi từ nhà sản xuất. Đây chỉ là người tiêu dùng ghi lại tin nhắn đã nhận.
# app/consumers/users_consumer.rb
class UsersConsumer < ApplicationConsumer
def consume
Karafka.logger.info "New [User] event: #{params}"
end
end
Chúng tôi sử dụng params
để nhận được sự kiện. Nhưng nếu bạn sẽ đọc các sự kiện theo lô và bạn có cấu hình config.batch_fetching
đúng như vậy, bạn nên sử dụng params_batch
.
Kiểm tra
Để chạy dịch vụ Karafka của chúng tôi (dịch vụ sẽ nghe các sự kiện), hãy chuyển đến bảng điều khiển, mở tab mới, chuyển đến dự án Rails và chạy:
bundle exec karafka server
Sự kiện thành công
Bây giờ, hãy mở một tab bảng điều khiển khác, đi tới dự án Rails và nhập vào:
rails c
Ở đó, hãy tạo một sự kiện với người trả lời của chúng tôi:
> UsersResponder.call({ event_name: "user_created", payload: { user_id: 1 } })
Nếu bạn kiểm tra bảng điều khiển Rails, chúng tôi sẽ nhận được thông báo này sau khi sự kiện được tạo:
Successfully appended 1 messages to users/0 on 192.168.1.77:9092 (node_id=0)
=> {"users"=>[["{\"event_name\":\"user_created\",\"payload\":{\"user_id\":1}}", {:topic=>"users"}]]}
Và trong tab dịch vụ Karafka, bạn sẽ thấy một cái gì đó như sau:
New [User] event: #<Karafka::Params::Params:0x00007fa76f0316c8>
Inline processing of topic users with 1 messages took 0 ms
1 message on users topic delegated to UsersConsumer
[[karafka_example] {}:] Marking users/0:1 as processed
[[karafka_example] {}:] Committing offsets: users/0:2
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 28 to 192.168.1.77:9092
Nhưng nếu bạn chỉ muốn tải trọng tin nhắn, bạn có thể thêm params.payload
trong người tiêu dùng của bạn và bạn sẽ có một cái gì đó như thế này:
Params deserialization for users topic successful in 0 ms
New [User] event: {"event_name"=>"user_created", "payload"=>{"user_id"=>1}}
Inline processing of topic users with 1 messages took 1 ms
1 message on users topic delegated to UsersConsumer
Sự kiện không thành công
Bạn có thể tạo Mô hình người dùng với một số thuộc tính như email
, first_name
và last_name
chạy lệnh sau:
rails g model User email first_name last_name
Sau đó, bạn có thể chạy quá trình di chuyển với điều này:
rails db:migrate
Bây giờ, hãy thêm một số xác nhận như sau:
class User < ApplicationRecord
validates :email, uniqueness: true
end
Cuối cùng, chúng tôi có thể thay đổi người tiêu dùng:
class UsersConsumer < ApplicationConsumer
def consume
Karafka.logger.info "New [User] event: #{params.payload}"
User.create!(params.payload['user'])
end
end
Vì vậy, hãy tạo hai sự kiện với cùng một email:
UsersResponder.call({ event_name: "user_created", user: { user_id: 1, email: '[email protected]', first_name: 'Bruce', last_name: 'Wayne' } } )
UsersResponder.call({ event_name: "user_created", user: { user_id: 2, email: '[email protected]', first_name: 'Bruce', last_name: 'Wayne' } } )
Với điều này, sự kiện đầu tiên được tạo trong cơ sở dữ liệu:
New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>1, "email"=>"[email protected]", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
[[karafka_example] {users: 0}:] [fetch] Received response 2 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 3 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 3 from 192.168.1.77:9092
TRANSACTION (0.1ms) BEGIN
↳ app/consumers/users_consumer.rb:14:in `consume'
User Create (9.6ms) INSERT INTO "users" ("user_id", "email", "first_name", "last_name", "created_at", "updated_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id" [["user_id", "1"], ["email", "[email protected]"], ["first_name", "Bruce"], ["last_name", "Wayne"], ["created_at", "2021-03-10 04:29:14.827778"], ["updated_at", "2021-03-10 04:29:14.827778"]]
↳ app/consumers/users_consumer.rb:14:in `consume'
TRANSACTION (5.0ms) COMMIT
↳ app/consumers/users_consumer.rb:14:in `consume'
Inline processing of topic users with 1 messages took 70 ms
1 message on users topic delegated to UsersConsumer
Nhưng cái thứ hai sẽ không thành công, bởi vì chúng tôi có một xác thực cho biết email là duy nhất. Nếu bạn cố gắng thêm một bản ghi khác với một email hiện có, bạn sẽ thấy một cái gì đó như thế này:
New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>2, "email"=>"[email protected]", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
[[karafka_example] {users: 0}:] [fetch] Received response 2 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 3 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 3 from 192.168.1.77:9092
TRANSACTION (0.2ms) BEGIN
↳ app/consumers/users_consumer.rb:14:in `consume'
User Exists? (0.3ms) SELECT 1 AS one FROM "users" WHERE "users"."email" = $1 LIMIT $2 [["email", "[email protected]"], ["LIMIT", 1]]
↳ app/consumers/users_consumer.rb:14:in `consume'
TRANSACTION (0.2ms) ROLLBACK
↳ app/consumers/users_consumer.rb:14:in `consume'
[[karafka_example] {users: 0}:] Exception raised when processing users/0 at offset 42 -- ActiveRecord::RecordInvalid: Validation failed: Email has already been taken
Bạn có thể thấy lỗi ở dòng cuối cùng ActiveRecord::RecordInvalid: Validation failed: Email has already been taken
. Nhưng điều thú vị ở đây là Kafka sẽ cố gắng xử lý sự kiện, hết lần này đến lần khác. Ngay cả khi bạn khởi động lại máy chủ Karafka, nó sẽ cố gắng xử lý sự kiện cuối cùng. Làm thế nào để Kafka biết bắt đầu từ đâu?
Nếu bạn nhìn thấy bảng điều khiển của mình, sau lỗi, bạn sẽ thấy điều này:
[[karafka_example] {users: 0}:] Exception raised when processing users/0 at offset 42
Nó sẽ cho bạn biết phần bù nào đã được xử lý:trong trường hợp này, phần bù 42. Vì vậy, nếu bạn khởi động lại dịch vụ Karafka, phần bù đó sẽ bắt đầu ở phần bù đó.
[[karafka_example] {}:] Committing offsets with recommit: users/0:42
[[karafka_example] {users: 0}:] Fetching batches
Nó vẫn sẽ không thành công vì chúng tôi có xác thực email trong mô hình Người dùng của mình. Tại thời điểm này, dừng máy chủ Karafka, xóa hoặc nhận xét xác thực đó và khởi động lại máy chủ của bạn; bạn sẽ thấy sự kiện được xử lý thành công như thế nào:
[[karafka_example] {}:] Committing offsets with recommit: users/0:42
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 5 to 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Waiting for response 5 from 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Received response 5 from 192.168.1.77:9092
Params deserialization for users topic successful in 0 ms
New [User] event: {"event_name"=>"user_created", "user"=>{"user_id"=>2, "email"=>"[email protected]", "first_name"=>"Bruce", "last_name"=>"Wayne"}}
TRANSACTION (0.2ms) BEGIN
↳ app/consumers/users_consumer.rb:14:in `consume'
User Create (3.8ms) INSERT INTO "users" ("user_id", "email", "first_name", "last_name", "created_at", "updated_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id" [["user_id", "2"], ["email", "[email protected]"], ["first_name", "Bruce"], ["last_name", "Wayne"], ["created_at", "2021-03-10 04:49:37.832452"], ["updated_at", "2021-03-10 04:49:37.832452"]]
↳ app/consumers/users_consumer.rb:14:in `consume'
TRANSACTION (5.5ms) COMMIT
↳ app/consumers/users_consumer.rb:14:in `consume'
Inline processing of topic users with 1 messages took 69 ms
1 message on users topic delegated to UsersConsumer
[[karafka_example] {}:] Marking users/0:43 as processed
Cuối cùng, bạn có thể thấy thông báo này ở dòng cuối cùng:Marking users/0:43 as processed
.
Gọi lại
Đây là một điều thú vị mà Karafka cung cấp:bạn có thể sử dụng lệnh gọi lại trong Người tiêu dùng của mình. Để làm điều đó, bạn chỉ cần nhập mô-đun và sử dụng chúng. Sau đó, mở UserConsumer
của bạn và thêm cái này:
class UsersConsumer < ApplicationConsumer
include Karafka::Consumers::Callbacks
before_poll do
Karafka.logger.info "*** Checking something new for #{topic.name}"
end
after_poll do
Karafka.logger.info '*** We just checked for new messages!'
end
def consume
Karafka.logger.info "New [User] event: #{params.payload}"
User.create!(params.payload['user'])
end
end
Thăm dò ý kiến là phương tiện mà qua đó chúng tôi tìm nạp các bản ghi dựa trên độ lệch phân vùng hiện tại. Vì vậy, các lệnh gọi lại đó before_poll
và after_poll
, giống như tên gọi của chúng, được thực thi tại thời điểm đó. Chúng tôi chỉ ghi nhật ký một tin nhắn và bạn có thể thấy chúng trong máy chủ Karafka của mình — một tin nhắn trước khi tìm nạp và một tin nhắn khác sau đó:
*** Checking something new for users
[[karafka_example] {}:] No batches to process
[[karafka_example] {users: 0}:] [fetch] Received response 325 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 326 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 326 from 192.168.1.77:9092
*** We just checked for new messages!
Nhịp tim
Nhịp tim chỉ là cách chúng ta, với tư cách là người tiêu dùng, nói với Kafka rằng chúng ta đang sống; nếu không, Kafka sẽ cho rằng người tiêu dùng đã chết.
Trong Karafka, chúng tôi có một cấu hình mặc định để thực hiện việc này trong một khoảng thời gian; nó là kafka.heartbeat_interval
và mặc định là 10 giây. Bạn có thể thấy nhịp tim này trong máy chủ Karafka của mình.
*** Checking something new for users
[[karafka_example_example] {}:] Sending heartbeat...
[[karafka_example_example] {}:] [heartbeat] Sending heartbeat API request 72 to 192.168.1.77:9092
[[karafka_example_example] {}:] [heartbeat] Waiting for response 72 from 192.168.1.77:9092
[[karafka_example_example] {}:] [heartbeat] Received response 72 from 192.168.1.77:9092
*** We just checked for new messages!
Với Sending heartbeat...
, Kafka biết rằng chúng tôi còn sống và chúng tôi là một thành viên hợp lệ trong nhóm người tiêu dùng của nó. Ngoài ra, chúng tôi có thể sử dụng nhiều bản ghi hơn.
Cam kết
Đánh dấu một phần bù là đã tiêu thụ được gọi là cam kết một phần bù. Trong Kafka, chúng tôi ghi lại các cam kết bù trừ bằng cách viết cho một chủ đề nội bộ của Kafka được gọi là chủ đề bù trừ. Một thông báo chỉ được coi là tiêu dùng khi phần bù của nó được cam kết với chủ đề bù trừ.
Karafka có một cấu hình để thực hiện cam kết này tự động mỗi lần; cấu hình là kafka.offset_commit_interval
và giá trị của nó là 10 giây theo mặc định. Với điều này, Karakfa sẽ thực hiện một cam kết bù đắp sau mỗi 10 giây và bạn có thể xem thông báo đó trong máy chủ Karafka của mình:
*** Checking something new for users
[[karafka_example] {}:] No batches to process
[[karafka_example] {users: 0}:] [fetch] Received response 307 from 192.168.1.77:9092
[[karafka_example] {users: 0}:] Fetching batches
[[karafka_example] {users: 0}:] [fetch] Sending fetch API request 308 to 192.168.1.77:9092
[[karafka_example] {users: 0}:] [fetch] Waiting for response 308 from 192.168.1.77:9092
[[karafka_example] {}:] Committing offsets: users/0:44
[[karafka_example] {}:] [offset_commit] Sending offset_commit API request 69 to 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Waiting for response 69 from 192.168.1.77:9092
[[karafka_example] {}:] [offset_commit] Received response 69 from 192.168.1.77:9092
*** We just checked for new messages!
Committing offsets: users/0:44
cho chúng tôi biết điều gì bù đắp được nó đang cam kết; trong trường hợp của tôi, nó nói với Kafka rằng nó có thể cam kết số bù 44 từ chủ đề 0. Bằng cách này, nếu có điều gì đó xảy ra với dịch vụ của chúng tôi, Karafka có thể bắt đầu lại để xử lý các sự kiện từ phần bù đó.
Kết luận
Phát trực tuyến sự kiện giúp chúng tôi nhanh hơn, sử dụng dữ liệu tốt hơn và thiết kế trải nghiệm người dùng tốt hơn. Trên thực tế, nhiều công ty đang sử dụng mô hình này để giao tiếp tất cả các dịch vụ của họ và để có thể phản ứng với các sự kiện khác nhau trong thời gian thực. Như tôi đã đề cập trước đây, có những lựa chọn thay thế khác ngoài Karafka mà bạn có thể sử dụng với Rails. Bạn đã có những điều cơ bản; bây giờ, hãy thử nghiệm với chúng.
Tài liệu tham khảo
- https://kafka.apache.org/
- https://github.com/karafka/karafka
- https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern