Computer >> Máy Tính >  >> Lập trình >> Redis

Đọc và Viết Giao thức Redis trong Go

Trong bài đăng này, tôi phác thảo một cách triển khai đơn giản, dễ hiểu cho hai thành phần của ứng dụng khách Redis trong Go như một cách hiểu cách hoạt động của Redisprotocol và điều gì khiến nó trở nên tuyệt vời.

Nếu bạn đang tìm kiếm một ứng dụng Redis có đầy đủ tính năng, sẵn sàng sản xuất trong Go, hãy xem thư viện redigo của Gary Burd.

Trước khi chúng tôi bắt đầu , hãy chắc chắn rằng bạn đã đọc phần giới thiệu nhẹ nhàng của chúng tôi về giao thức Redis - phần giới thiệu này bao gồm những điều cơ bản về giao thức mà bạn cần hiểu đối với hướng dẫn này.

Người viết lệnh RESP trong Go

Đối với ứng dụng Redis giả định của chúng tôi, chỉ có một loại đối tượng mà chúng tôi cần phải viết:một mảng các chuỗi số lượng lớn để gửi lệnh tới Redis. Đây là cách triển khai đơn giản của trình viết lệnh để RESP:

package redis

import (
  "bufio"
  "io"
  "strconv"     // for converting integers to strings
)

var (
  arrayPrefixSlice      = []byte{'*'}
  bulkStringPrefixSlice = []byte{'$'}
  lineEndingSlice       = []byte{'\r', '\n'}
)

type RESPWriter struct {
  *bufio.Writer
}

func NewRESPWriter(writer io.Writer) *RESPWriter {
  return &RESPWriter{
    Writer: bufio.NewWriter(writer),
  }
}

func (w *RESPWriter) WriteCommand(args ...string) (err error) {
  // Write the array prefix and the number of arguments in the array.
  w.Write(arrayPrefixSlice)
  w.WriteString(strconv.Itoa(len(args)))
  w.Write(lineEndingSlice)

  // Write a bulk string for each argument.
  for _, arg := range args {
    w.Write(bulkStringPrefixSlice)
    w.WriteString(strconv.Itoa(len(arg)))
    w.Write(lineEndingSlice)
    w.WriteString(arg)
    w.Write(lineEndingSlice)
  }

  return w.Flush()
}

Thay vì ghi vào net.Conn đối tượng, RESPWriter ghi vào io.Writer sự vật. Điều này cho phép chúng tôi kiểm tra trình phân tích cú pháp của mình mà không cần kết hợp chặt chẽ với net cây rơm. Chúng tôi chỉ kiểm tra giao thức mạng theo cách chúng tôi làm với bất kỳ io nào khác .

Ví dụ:chúng ta có thể chuyển nó một bytes.Buffer để kiểm tra RESP cuối cùng:

var buf bytes.Buffer
writer := NewRESPWriter(&buf)
writer.WriteCommand("GET", "foo")
buf.Bytes() // *2\r\n$3\r\nGET\r\n$3\r\nfoo\r\n

Trình đọc RESP đơn giản trong Go

Sau khi gửi lệnh tới Redis bằng RESPWriter , khách hàng của chúng tôi sẽ sử dụng RESPReader để đọc từ kết nối TCP cho đến khi nó nhận được Báo cáo phản hồi đầy đủ. Để bắt đầu, chúng tôi sẽ cần một vài gói để xử lý bộ đệm và phân tích cú pháp dữ liệu đến:

package redis

import (
  "bufio"
  "bytes"
  "errors"
  "io"
  "strconv"
)

Và chúng tôi sẽ sử dụng một số biến và hằng số để làm cho mã của chúng tôi dễ đọc hơn một chút:

const (
  SIMPLE_STRING = '+'
  BULK_STRING   = '$'
  INTEGER       = ':'
  ARRAY         = '*'
  ERROR         = '-'
)

var (
  ErrInvalidSyntax = errors.New("resp: invalid syntax")
)

Giống như RESPWriter , RESPReader không quan tâm đến chi tiết triển khai của đối tượng mà nó đang đọc RESP. Tất cả những gì nó cần có khả năng đọc từng byte cho đến khi nó đọc được một đối tượng RESP đầy đủ. Trong trường hợp này, nó cần một io.Reader , được kết thúc bằng bufio.Reader để xử lý bộ đệm của dữ liệu đến.

Đối tượng và trình khởi tạo của chúng tôi rất đơn giản:

type RESPReader struct {
  *bufio.Reader
}

func NewReader(reader io.Reader) *RESPReader {
  return &RESPReader{
    Reader: bufio.NewReaderSize(reader, 32*1024),
  }
}

Kích thước bộ đệm cho bufio.Reader chỉ là phỏng đoán trong quá trình phát triển. Trong ứng dụng khách thực tế, bạn muốn làm cho kích thước của nó có thể định cấu hình và có thể thử nghiệm để tìm kích thước tối ưu. 32KB sẽ hoạt động tốt để phát triển.

RESPReader chỉ có một phương thức:ReadObject() , trả về một lát byte chứa một đối tượng RESP đầy đủ trên mỗi cuộc gọi. Nó sẽ trả lại bất kỳ lỗi nào được khắc phục từ io.Reader và cũng sẽ trả về lỗi khi nó gặp phải cú pháp RESP không hợp lệ.

Bản chất tiền tố của RESP có nghĩa là chúng ta chỉ cần đọc byte đầu tiên để quyết định cách xử lý các byte sau. Tuy nhiên, vì chúng ta sẽ luôn cần đọc ít nhất dòng đầy đủ đầu tiên (tức là cho đến khi \r\n đầu tiên ), chúng ta có thể bắt đầu bằng cách đọc toàn bộ dòng đầu tiên:

func (r *RESPReader) ReadObject() ([]byte, error) {
  line, err := r.readLine()
  if err != nil {
    return nil, err
  }

  switch line[0] {
  case SIMPLE_STRING, INTEGER, ERROR:
    return line, nil
  case BULK_STRING:
    return r.readBulkString(line)
  case ARRAY:
    return r.readArray(line) default:
    return nil, ErrInvalidSyntax
  }
}

Khi dòng mà chúng tôi đọc có một chuỗi, số nguyên hoặc tiền tố lỗi đơn giản, hãy chuyển dòng đầy đủ làm đối tượng RESP đã nhận vì các loại đối tượng đó được lưu giữ hoàn toàn trong một dòng.

Trong readLine() , chúng tôi đã đọc cho đến lần xuất hiện đầu tiên của \n và sau đó kiểm tra để đảm bảo rằng nó đứng trước \r trước khi trả về dòng dưới dạng bytelice:

func (r *RESPReader) readLine() (line []byte, err error) {
  line, err = r.ReadBytes('\n')
  if err != nil {
    return nil, err
  }

  if len(line) > 1 && line[len(line)-2] == '\r' {
    return line, nil
  } else {
    // Line was too short or \n wasn't preceded by \r.
    return nil, ErrInvalidSyntax
  }
}

Trong readBulkString() chúng tôi phân tích cú pháp đặc tả độ dài cho chuỗi số lượng lớn để biết chúng tôi cần đọc bao nhiêu byte. Sau khi thực hiện, chúng tôi đọc số byte đó và \r\n dấu chấm hết dòng:

func (r *RESPReader) readBulkString(line []byte) ([]byte, error) {
  count, err := r.getCount(line)
  if err != nil {
    return nil, err
  }
  if count == -1 {
    return line, nil
  }

  buf := make([]byte, len(line)+count+2)
  copy(buf, line)
  _, err = io.ReadFull(r, buf[len(line):])
  if err != nil {
    return nil, err
  }

  return buf, nil
}

Tôi đã kéo getCount() ra một phương thức riêng vì định nghĩa độ dài cũng được sử dụng cho các mảng:

func (r *RESPReader) getCount(line []byte) (int, error) {
  end := bytes.IndexByte(line, '\r')
  return strconv.Atoi(string(line[1:end]))
}

Để xử lý mảng, chúng ta lấy số phần tử của mảng, rồi gọi ReadObject() đệ quy, thêm các đối tượng kết quả vào bộ đệm RESPbuffer hiện tại của chúng tôi:

func (r *RESPReader) readArray(line []byte) ([]byte, error) {
  // Get number of array elements.
  count, err := r.getCount(line)
  if err != nil {
    return nil, err
  }

  // Read `count` number of RESP objects in the array.
  for i := 0; i < count; i++ {
    buf, err := r.ReadObject()
    if err != nil {
      return nil, err
    }
    line = append(line, buf...)
  }

  return line, nil
}

Kết thúc

Hàng trăm dòng trên là tất cả những gì cần thiết để đọc bất kỳ đối tượng RESP nào từRedis. Tuy nhiên, chúng tôi cần triển khai một số phần còn thiếu trước khi sử dụng thư viện này trong môi trường sản xuất:

  • Khả năng trích xuất các giá trị thực tế từ RESP. RESPReader hiện tại chỉ trả về phản hồi RESP đầy đủ, ví dụ:nó không trả về một chuỗi từ phản hồi chuỗi số lượng lớn. Tuy nhiên, việc thực hiện điều này sẽ rất dễ dàng.
  • RESPReader cần xử lý lỗi cú pháp tốt hơn.

Mã này cũng hoàn toàn không được tối ưu hóa và cần nhiều phân bổ và sao chép hơn. Ví dụ:readArray() phương thức:đối với mỗi đối tượng trong khay, chúng tôi đọc đối tượng đó và sau đó sao chép nó vào bộ đệm cục bộ của chúng tôi.

Nếu bạn quan tâm đến việc tìm hiểu cách triển khai các phần này, tôi khuyên bạn nên xem cách các thư viện phổ biến như thuê hoặc hoàn thiện lại chúng.

Đặc biệt cảm ơn Niel Smith đã giúp chúng tôi tìm ra một số lỗi trong giải mã trong bài đăng này.