Computer >> Máy Tính >  >> Lập trình >> Redis

Timeseries in Redis with Streams

Redis thường được coi là “máy chủ cấu trúc dữ liệu”, cung cấp giao diện mạng cho một số cấu trúc dữ liệu ban đầu đơn giản. Luồng là cấu trúc dữ liệu có mục đích chung mới đầu tiên kể từ khi Redis giới thiệu các tập hợp được sắp xếp cách đây nhiều năm. Hãy xem một trong những cách sử dụng chính cho cấu trúc mới này:lập mô hình dữ liệu chuỗi thời gian.

Luồng:cấu trúc dữ liệu Redis mới

Redis Streams đại diện cho thời gian chỉ nối thêm của các cặp khóa-giá trị.

Bất kỳ số lượng khách hàng nào cũng có thể ghi vào Luồng và mỗi khi họ nhận được một ID tăng dần duy nhất của mục đã được chèn vào thời gian biểu.

Khách hàng đang đọc dữ liệu có thể chặn khi đang nghe dữ liệu mới đến, giữ lại “dấu trang” của tin nhắn cuối cùng đã đọc để xử lý hàng loạt hoặc có thể được tổ chức thành “nhóm khách hàng” phức tạp hơn để chia sẻ khối lượng công việc và nhận thông báo.

Luồng là một chủ đề lớn, vì vậy bài đăng này sẽ chỉ xem xét ngắn gọn cách sử dụng để lập mô hình dữ liệu thời gian trong Redis. Cấu trúc dữ liệu mới làm cho cấu trúc này đơn giản hơn nhiều so với các cách tiếp cận trước đây như sử dụng các loại List hoặc SortedSet để lập mô hình thời gian.

Đối với các ví dụ mã ở đây, chúng tôi sẽ sử dụng ứng dụng khách theioredis cho node.js:

var Redis = require('ioredis');
var redis = new Redis();

Gửi dữ liệu tới Luồng

Luồng chỉ là phần phụ, do đó, thông thường chi tiết duy nhất bạn cần là tên của khóaRedis mà bạn muốn ghi vào và tập hợp các cặp khóa-giá trị của bạn.

Tôi sẽ ghi lại các phép đo từ cảm biến chất lượng không khí của mình, gửi một loạt các cặp khóa-giá trị đến site:pdx chính - cụ thể là tôi đang gửi Chỉ số chất lượng không khí hiện tại và nhiệt độ tính bằng độ C:

redis.xadd('site:pdx', '*',
           'aqi', 37,
           'tempc', 5.1).then(function(id) {
  console.log("id:", id);
});

> 1527974818120-0

Tôi gửi XADD lệnh bất cứ khi nào có một phép đo mới cần được ghi lại. Phản hồi là một ID duy nhất, luôn tăng dần, có thể được sử dụng trong các truy vấn. Phần đầu tiên của ID, 1527974818120 là dấu thời gian được chỉ định bởi máy chủ Redis. Phần thứ hai của ID là một số tăng dần để tránh trường hợp nhiều khách hàng có thể viết cùng một lúc.

Cung cấp * là đối số thứ hai trong lệnh ghi, như được hiển thị ở trên, cho phép Redis biết để lưu trữ dữ liệu với nhãn hiệu của nó

Cũng có thể cung cấp dấu thời gian khi ghi dữ liệu, nhưng nó thường không được ưu tiên:việc để Redis chọn dấu thời gian cho phép nhiều khách hàng ghi dữ liệu vào một Luồng duy nhất đồng thời mà không cần khách hàng phải điều phối về việc lựa chọn và đặt hàng ID - máy chủ Redis chỉ xử lý các chi tiết cho khách hàng.

Đọc đơn giản

Khi một vài giá trị có trong luồng, bạn có thể truy xuất một loạt giá trị bằng cách cung cấp một loạt ID hoặc dấu thời gian. Điều này có thể được sử dụng bởi một ứng dụng hiển thị các kết quả đọc gần đây nhất về chỉ số chất lượng không khí của tôi trên biểu đồ:

redis.xrange('site:pdx',
             '1527974818120-0',
             '+',
             'COUNT', 5).then(function(resp) {

  // resp now holds 5 readings, pass them to the open graph:
  // console.log(resp);
});

> [ [ '1543947167906-0', [ 'aqi', '31', 'tempc', '5.1' ] ],
> [ '1543947168312-0', [ 'aqi', '31', 'tempc', '5.3' ] ],
> [ '1543947168901-0', [ 'aqi', '31', 'tempc', '5.4' ] ],
> [ '1543947170033-0', [ 'aqi', '31', 'tempc', '5.4' ] ],
> [ '1543947171460-0', [ 'aqi', '31', 'tempc', '5.6' ] ] ]

Có thể lấy mẫu phạm vi số từ bất kỳ đâu trong Luồng, cho phép hệ thống truy vấn dữ liệu lịch sử mà không bị phạt về hiệu suất.

Chặn và thăm dò

Truy vấn phạm vi dữ liệu hữu ích cho đồ thị và theo dõi lịch sử, nhưng đôi khi bạn muốn xây dựng một hệ thống có thể phản hồi ngay lập tức với dữ liệu khi nó được đưa vào - Các luồng Redis cũng rất phù hợp cho việc này, sử dụng XREAD :

redis.xread('BLOCK', 10000,
            'STREAMS', 'site:pdx', '$').then(function(resp) {

  // close the windows if aqi > 50
  console.log(resp);
});

Thao tác chặn như thế này sẽ đợi cho đến khi có dữ liệu hoặc cho đến khi hết thời gian chờ (ở đây là 1000 mili giây), vì vậy, để duy trì một cuộc thăm dò liên tục, bạn chặn như trong ví dụ trên cho đến khi có dữ liệu và chỉ cần gọi XREAD đó lệnh lại mỗi lần sau khi nhận được dữ liệu hoặc lệnh hết thời gian chờ.

Để đảm bảo không có dữ liệu nào bị bỏ lỡ giữa các kết nối với luồng của bạn, bạn có thể cung cấp ID cuối cùng đã đọc khỏi luồng của mình khi đọc từ đó và theo cách đó, bạn sẽ chọn chính xác nơi bạn đã dừng lại.

Trong trường hợp trên, tôi chỉ muốn bất kỳ dữ liệu nào được đưa vào luồng nên đã sử dụng mã thông báo đặc biệt ‘$’ để chỉ ra “chỉ dữ liệu mới”; lệnh có vẻ hơi khác với một ID được cung cấp:

redis.xread('BLOCK', 10000,
            'STREAMS', 'site:pdx', '1543947171460-0');

Bạn cũng có thể đọc nhiều luồng khác nhau cùng một lúc, trả về các giá trị từ luồng đầu tiên chấp nhận dữ liệu:

redis.xread('BLOCK', 10000,
            'STREAMS',
              'site:pdx', 'site:global',
              '1543947171460-0', '$');

Trong ví dụ này, lệnh sẽ trả về khi có bất kỳ dữ liệu nào mới hơn ID 1543947171460-0 được viết trên site:pdx hoặc khi bất kỳ dữ liệu mới là writetenon site:global .

Điều phối người tiêu dùng

Khi bạn có nhiều dữ liệu chảy qua một luồng, bạn có thể muốn có nhiều bản sao của người tiêu dùng đang xử lý các tin nhắn đến. Những người tiêu dùng này nên chọn một thông điệp, thực hiện một hành động trên nó và sau đó “thừa nhận” rằng công việc đã được thực hiện. Các luồng Redis cung cấp các nguyên tắc cơ bản cho các hoạt động này - quá nhiều thứ để vào đây nhưng hãy xem tài liệu để biết chi tiết.

Tham chiếu nhanh cho các lệnh stream

Có rất nhiều lệnh Redis mới có sẵn cho các luồng! Đây là tham khảo nhanh, bạn có thể tìm thêm thông tin chi tiết trong tài liệu chính thức.

Các lệnh đơn giản:

  • XADD:thêm một mục (một nhóm các cặp khóa-giá trị) vào một luồng
  • XRANGE và XREVRANGE:chọn phạm vi hoặc lặp lại qua các mục luồng
  • XREAD:tìm nạp các mục gần đây hơn một số ID (tùy chọn chặn)
  • XTRIM:loại bỏ các mục cũ để cắt bớt luồng
  • XDEL:xóa một mục cụ thể khỏi luồng
  • XLEN:đếm các mục trong một luồng
  • XINFO:kiểm tra siêu dữ liệu luồng

Lệnh nhóm người tiêu dùng:

  • XGROUP:tạo, xóa hoặc đặt lại các nhóm người tiêu dùng và xóa các thành viên của họ
  • XREADGROUP:giống như XREAD (ở trên), nhưng nhận tin nhắn bằng cách sử dụng một nhóm khách hàng
  • XÁC NHẬN:kiểm tra các thông báo đã được gửi đến một nhóm người tiêu dùng nhưng không phải của nhóm người tiêu dùng.
  • XACK:xác nhận thông báo cho một nhóm người tiêu dùng, xóa thông báo đó khỏi danh sách đang chờ xử lý
  • XCLAIM:tiếp nhận thông điệp từ một người tiêu dùng đã chết

Các trường hợp sử dụng

Trường hợp sử dụng thường được trích dẫn nhất cho Luồng là khối lượng công việc IoT trong đó các cảm biến đưa dữ liệu vào luồng để người tiêu dùng sử dụng theo nhiều cách (phân tích, lưu trữ vào kho lạnh, hiển thị trên đồ thị). Các luồng có kích thước giới hạn là trường hợp sử dụng tuyệt vời ở đây, cho phép bạn phân bổ luồng có kích thước cố định với vùng nhớ có thể đoán trước được.

Các luồng cũng thích hợp để sử dụng trong các ứng dụng trước đây đã sử dụng cấu trúc dữ liệu OtherRedis. Các ứng dụng xếp hàng như Celery và Sidekiq có thể lợi dụng cho các nhóm người tiêu dùng của Streams để kiểm tra các biên nhận đã đọc trong Redis-native. Có rất nhiều bài đăng trên blog chứng minh các ứng dụng trò chuyện đơn giản sử dụng Redis pubsub có thể được thực hiện mạnh mẽ hơn với Redis Streams, vì pubsub không giữ lại tin nhắn sau khi chúng được xuất bản cho khách hàng.

Bạn đã sẵn sàng dùng thử chưa?

Bạn có thể thử các luồng Redis trên máy chủ RedisGreen mới chỉ với một vài cú nhấp chuột.