Bởi Stephen Sanwo
Để xây dựng một ứng dụng full-stack hoạt động được, có rất nhiều phần cần phải suy nghĩ. Và bạn sẽ cần đưa ra nhiều quyết định quan trọng cho sự thành công của ứng dụng.
Ví dụ:bạn sẽ sử dụng ngôn ngữ nào và bạn sẽ triển khai trên nền tảng nào? Bạn sẽ triển khai một phần mềm được đóng gói trên máy chủ hay sử dụng các chức năng serverless để xử lý phần phụ trợ? Bạn có dự định sử dụng API của bên thứ ba để xử lý các phần phức tạp trong ứng dụng của mình như xác thực hoặc thanh toán không? Bạn lưu trữ dữ liệu ở đâu?
Ngoài tất cả những điều này, bạn cũng cần phải suy nghĩ về giao diện người dùng, thiết kế và khả năng sử dụng ứng dụng của mình, v.v.
Đây là lý do tại sao các ứng dụng lớn phức tạp cần có nhóm phát triển đa chức năng cộng tác để xây dựng ứng dụng.
Một trong những cách tốt nhất để học cách phát triển các ứng dụng full stack là xây dựng các dự án bao gồm quy trình phát triển từ đầu đến cuối. Bạn sẽ trải qua quá trình thiết kế kiến trúc, phát triển dịch vụ API, phát triển giao diện người dùng và cuối cùng là triển khai ứng dụng của mình.
Vì vậy, hướng dẫn này sẽ hướng dẫn bạn qua quá trình xây dựng một chatbot AI để giúp bạn tìm hiểu sâu hơn về các khái niệm này.
Một số chủ đề chúng tôi sẽ đề cập bao gồm:
- Cách xây dựng API bằng Python, FastAPI và WebSockets
- Cách xây dựng hệ thống thời gian thực với Redis
- Cách xây dựng Giao diện người dùng trò chuyện bằng React
Lưu ý quan trọng: Đây là một dự án phát triển phần mềm full stack trung cấp yêu cầu một số kiến thức cơ bản về Python và JavaScript.
Tôi đã cẩn thận chia dự án thành các phần để đảm bảo rằng bạn có thể dễ dàng chọn giai đoạn quan trọng đối với mình trong trường hợp bạn không muốn viết mã toàn bộ ứng dụng.
Bạn có thể tải xuống toàn bộ kho lưu trữ trên Github của tôi tại đây.
Mục lục
Phần 1
- Kiến trúc ứng dụng
- Cách thiết lập môi trường phát triển
Phần 2
- Cách xây dựng máy chủ trò chuyện bằng Python, FastAPI và WebSockets
- Cách thiết lập môi trường Python
- Thiết lập máy chủ FastAPI
- Cách thêm tuyến đường vào API
- Cách tạo mã thông báo phiên trò chuyện bằng UUID
- Cách kiểm tra API bằng Người đưa thư
- Trình quản lý kết nối và ổ cắm web
- Chèn phần phụ thuộc trong FastAPI
Phần 3
- Cách xây dựng Hệ thống thời gian thực bằng Redis
- Hàng đợi tin nhắn phân tán và Redis
- Cách kết nối với cụm Redis bằng Python bằng ứng dụng khách Redis
- Cách làm việc với luồng Redis
- Cách lập mô hình dữ liệu trò chuyện
- Cách làm việc với Redis JSON
- Cách cập nhật phần phụ thuộc của mã thông báo
Phần 4
- Cách thêm thông tin vào Chatbot bằng mô hình AI
- Cách bắt đầu ôm mặt
- Cách tương tác với mô hình ngôn ngữ
- Cách mô phỏng bộ nhớ ngắn hạn cho mô hình AI
- Truyền dữ liệu tiêu dùng và dữ liệu thời gian thực từ hàng đợi tin nhắn
- Cách cập nhật ứng dụng trò chuyện bằng phản hồi AI
- Mã thông báo làm mới
- Cách kiểm tra trò chuyện với nhiều khách hàng trong Postman
Kiến trúc ứng dụng
Việc phác thảo kiến trúc giải pháp sẽ cung cấp cho bạn cái nhìn tổng quan cấp cao về ứng dụng của bạn, các công cụ bạn định sử dụng và cách các thành phần sẽ giao tiếp với nhau.
Tôi đã vẽ một kiến trúc đơn giản bên dưới bằng draw.io:
Kiến trúc chatbot Fullstack
Hãy cùng tìm hiểu chi tiết hơn về các phần khác nhau của kiến trúc:
Giao diện khách hàng/người dùng
Chúng tôi sẽ sử dụng React phiên bản 18 để xây dựng giao diện người dùng. Giao diện người dùng trò chuyện sẽ liên lạc với chương trình phụ trợ thông qua WebSockets.
GPT-J-6B và API suy luận Huggingface
GPT-J-6B là mô hình ngôn ngữ tổng quát được đào tạo với 6 tỷ tham số và thực hiện chặt chẽ với GPT-3 của OpenAI trong một số tác vụ.
Tôi đã chọn sử dụng GPT-J-6B vì đây là mô hình nguồn mở và không yêu cầu mã thông báo trả phí cho các trường hợp sử dụng đơn giản.
Huggingface cũng cung cấp cho chúng tôi API theo yêu cầu để kết nối với mô hình này khá miễn phí. Bạn có thể đọc thêm về GPT-J-6B và API suy luận khuôn mặt ôm.
Làm lại
Khi gửi lời nhắc tới GPT, chúng tôi cần một cách để lưu trữ lời nhắc và dễ dàng truy xuất phản hồi. Chúng tôi sẽ sử dụng Redis JSON để lưu trữ dữ liệu trò chuyện, đồng thời sử dụng Redis Streams để xử lý giao tiếp theo thời gian thực với API suy luận ôm.
Redis là kho lưu trữ khóa-giá trị trong bộ nhớ cho phép tìm nạp và lưu trữ dữ liệu giống JSON cực nhanh. Đối với hướng dẫn này, chúng tôi sẽ sử dụng bộ lưu trữ Redis miễn phí được quản lý do Redis Enterprise cung cấp cho mục đích thử nghiệm.
Ổ cắm web và API trò chuyện
Để gửi tin nhắn giữa máy khách và máy chủ trong thời gian thực, chúng ta cần mở kết nối socket. Điều này là do kết nối HTTP sẽ không đủ để đảm bảo liên lạc hai chiều theo thời gian thực giữa máy khách và máy chủ.
Chúng tôi sẽ sử dụng FastAPI cho máy chủ trò chuyện vì nó cung cấp máy chủ Python nhanh và hiện đại để chúng tôi sử dụng. Hãy xem tài liệu FastAPI) để tìm hiểu thêm về WebSockets.
Cách thiết lập môi trường phát triển
Bạn có thể sử dụng hệ điều hành mong muốn của mình để xây dựng ứng dụng này - Tôi hiện đang sử dụng MacOS và Visual Studio Code. Chỉ cần đảm bảo rằng bạn đã cài đặt Python và NodeJ.
Để thiết lập cấu trúc dự án, hãy tạo thư mục có tênfullstack-ai-chatbot . Sau đó tạo hai thư mục trong dự án có tên client và server . Máy chủ sẽ giữ mã cho phần phụ trợ, trong khi máy khách sẽ giữ mã cho giao diện người dùng.
Tiếp theo trong thư mục dự án, khởi tạo kho lưu trữ Git trong thư mục gốc của dự án bằng lệnh "git init". Sau đó tạo tệp .gitignore bằng cách sử dụng "touch .gitignore":
git init
touch .gitignore
Trong phần tiếp theo, chúng tôi sẽ xây dựng máy chủ web trò chuyện bằng FastAPI và Python.
Cách xây dựng máy chủ trò chuyện bằng Python, FastAPI và WebSockets
Trong phần này, chúng ta sẽ xây dựng máy chủ trò chuyện bằng FastAPI để giao tiếp với người dùng. Chúng tôi sẽ sử dụng WebSockets để đảm bảo liên lạc hai chiều giữa máy khách và máy chủ để có thể gửi phản hồi cho người dùng theo thời gian thực.
Cách thiết lập môi trường Python
Để khởi động máy chủ, chúng ta cần thiết lập môi trường Python. Mở thư mục dự án trong VS Code và mở terminal.
Từ thư mục gốc của dự án, cd vào thư mục máy chủ và chạy python3.8 -m venv env . Điều này sẽ tạo ra một môi trường ảo cho dự án Python của chúng tôi, dự án này sẽ được đặt tên là env . Để kích hoạt môi trường ảo, hãy chạy source env/bin/activate
Tiếp theo, cài đặt một số thư viện trong môi trường Python của bạn.
pip install fastapi uuid uvicorn gunicorn WebSockets python-dotenv aioredis
Tiếp theo, tạo tệp môi trường bằng cách chạy touch .env trong thiết bị đầu cuối. Chúng tôi sẽ xác định các biến ứng dụng và biến bí mật trong .env tập tin.
Thêm biến môi trường ứng dụng của bạn và đặt thành "phát triển" như sau:export APP_ENV=development . Tiếp theo, chúng tôi sẽ thiết lập máy chủ phát triển với máy chủ FastAPI.
Thiết lập máy chủ FastAPI
Tại thư mục gốc của thư mục máy chủ, tạo một tệp mới có tên main.py sau đó dán mã bên dưới cho máy chủ phát triển:
from fastapi import FastAPI, Request
import uvicorn
import os
from dotenv import load_dotenv
load_dotenv()
api = FastAPI()
@api.get("/test")
async def root():
return {"msg": "API is Online"}
if __name__ == "__main__":
if os.environ.get('APP_ENV') == "development":
uvicorn.run("main:api", host="0.0.0.0", port=3500,
workers=4, reload=True)
else:
pass
Đầu tiên chúng ta import FastAPI và khởi tạo nó là api . Sau đó chúng ta import load_dotenv từ python-dotenv thư viện và khởi tạo nó để tải các biến từ .env tập tin,
Sau đó, chúng tôi tạo một lộ trình thử nghiệm đơn giản để kiểm tra API. Lộ trình thử nghiệm sẽ trả về một phản hồi JSON đơn giản cho chúng tôi biết API đang trực tuyến.
Cuối cùng, chúng tôi thiết lập máy chủ phát triển bằng cách sử dụng uvicorn.run và đưa ra những lý lẽ cần thiết. API sẽ chạy trên cổng 3500 .
Cuối cùng, chạy máy chủ trong terminal với python main.py . Khi bạn nhìn thấy Application startup complete trong thiết bị đầu cuối, điều hướng đến URL http://localhost:3500/test trên trình duyệt của bạn và bạn sẽ nhận được một trang web như thế này:
Trang kiểm tra API
Cách thêm tuyến đường vào API
Trong phần này, chúng tôi sẽ thêm các tuyến đường vào API của mình. Tạo một thư mục mới có tên src . Đây là thư mục chứa tất cả mã API của chúng tôi.
Tạo thư mục con có tên routes , cd vào thư mục, tạo file mới tên chat.py rồi thêm mã bên dưới:
import os
from fastapi import APIRouter, FastAPI, WebSocket, Request
chat = APIRouter()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(request: Request):
return None
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
return None
# @route Websocket /chat
# @desc Socket for chatbot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket = WebSocket):
return None
Chúng tôi đã tạo ba điểm cuối:
/tokensẽ cấp cho người dùng mã thông báo phiên để truy cập vào phiên trò chuyện. Vì ứng dụng trò chuyện sẽ được mở công khai nên chúng tôi không cần lo lắng về việc xác thực mà chỉ đơn giản hóa quy trình – nhưng chúng tôi vẫn cần một cách để xác định từng phiên người dùng duy nhất./refresh_tokensẽ lấy lịch sử phiên cho người dùng nếu mất kết nối, miễn là mã thông báo vẫn hoạt động và chưa hết hạn./chatsẽ mở một WebSocket để gửi tin nhắn giữa máy khách và máy chủ.
Tiếp theo, kết nối tuyến trò chuyện với API chính của chúng tôi. Đầu tiên chúng ta cần import chat from src.chat trong main.py của chúng tôi tập tin. Sau đó, chúng tôi sẽ bao gồm bộ định tuyến bằng cách gọi include_router theo đúng nghĩa đen phương thức trên FastAPI được khởi tạo class và chuyển trò chuyện làm đối số.
Cập nhật api.py của bạn mã như hình dưới đây:
from fastapi import FastAPI, Request
import uvicorn
import os
from dotenv import load_dotenv
from routes.chat import chat
load_dotenv()
api = FastAPI()
api.include_router(chat)
@api.get("/test")
async def root():
return {"msg": "API is Online"}
if __name__ == "__main__":
if os.environ.get('APP_ENV') == "development":
uvicorn.run("main:api", host="0.0.0.0", port=3500,
workers=4, reload=True)
else:
pass
Cách tạo mã thông báo phiên trò chuyện bằng UUID
Để tạo mã thông báo người dùng, chúng tôi sẽ sử dụng uuid4 để tạo các tuyến động cho điểm cuối trò chuyện của chúng tôi. Vì đây là điểm cuối có sẵn công khai nên chúng ta sẽ không cần đi sâu vào chi tiết về JWT và xác thực.
Nếu bạn chưa cài đặt uuid ban đầu, hãy chạy pip install uuid . Tiếp theo trong chat.py, nhập UUID và cập nhật /token định tuyến với mã bên dưới:
from fastapi import APIRouter, FastAPI, WebSocket, Request, BackgroundTasks, HTTPException
import uuid
# @route POST /token
# @desc Route generating chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})
token = str(uuid.uuid4())
data = {"name": name, "token": token}
return data
Trong đoạn mã trên, khách hàng cung cấp tên của họ, điều này là bắt buộc. Chúng tôi kiểm tra nhanh để đảm bảo rằng trường tên không trống, sau đó tạo mã thông báo bằng uuid4.
Dữ liệu phiên là một từ điển đơn giản về tên và mã thông báo. Cuối cùng, chúng tôi sẽ cần duy trì dữ liệu phiên này và đặt thời gian chờ, nhưng hiện tại chúng tôi chỉ trả lại dữ liệu đó cho khách hàng.
Cách kiểm tra API bằng Postman
Vì chúng ta sẽ thử nghiệm điểm cuối WebSocket nên chúng ta cần sử dụng một công cụ như Postman cho phép điều này (vì các tài liệu vênh vang mặc định trên FastAPI không hỗ trợ WebSockets).
Trong Postman, tạo bộ sưu tập cho môi trường phát triển của bạn và gửi yêu cầu POST tới localhost:3500/token chỉ định tên làm tham số truy vấn và chuyển cho nó một giá trị. Bạn sẽ nhận được phản hồi như bên dưới:
Người đưa thư tạo mã thông báo
Trình quản lý kết nối và ổ cắm web
Trong thư mục gốc src, tạo một thư mục mới có tên socket và thêm một tệp có tên connection.py . Trong tệp này, chúng tôi sẽ xác định lớp kiểm soát các kết nối đến WebSockets của chúng tôi và tất cả các phương thức trợ giúp để kết nối và ngắt kết nối.
Trong connection.py thêm mã bên dưới:
from fastapi import WebSocket
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
ConnectionManager lớp được khởi tạo với active_connections thuộc tính đó là danh sách các kết nối đang hoạt động.
Sau đó, connect không đồng bộ phương thức sẽ chấp nhận WebSocket và thêm nó vào danh sách các kết nối đang hoạt động, trong khi disconnect phương thức sẽ xóa Websocket từ danh sách các kết nối đang hoạt động.
Cuối cùng là send_personal_message phương thức sẽ nhận một tin nhắn và Websocket chúng tôi muốn gửi tin nhắn đến và gửi tin nhắn không đồng bộ.
WebSockets là một chủ đề rất rộng và chúng tôi chỉ tìm hiểu sơ qua ở đây. Tuy nhiên, điều này là đủ để tạo nhiều kết nối và xử lý các thông báo đến các kết nối đó một cách không đồng bộ.
Bạn có thể đọc thêm về Lập trình ổ cắm và ổ cắm web FastAPI.
Để sử dụng ConnectionManager , nhập và khởi tạo nó trong src.routes.chat.py và cập nhật /chat Tuyến đường WebSocket với mã bên dưới:
from ..socket.connection import ConnectionManager
manager = ConnectionManager()
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(data)
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
Trong websocket_endpoint hàm sử dụng WebSocket, chúng tôi thêm websocket mới vào trình quản lý kết nối và chạy while True vòng lặp, để đảm bảo rằng ổ cắm vẫn mở. Ngoại trừ khi ổ cắm bị ngắt kết nối.
Trong khi kết nối được mở, chúng tôi nhận được bất kỳ tin nhắn nào được gửi bởi khách hàng với websocket.receive_test() và in chúng vào thiết bị đầu cuối ngay bây giờ.
Sau đó, chúng tôi sẽ gửi phản hồi được mã hóa cứng lại cho khách hàng ngay bây giờ. Cuối cùng, tin nhắn nhận được từ khách hàng sẽ được gửi đến Mô hình AI và phản hồi được gửi lại cho khách hàng sẽ là phản hồi từ Mô hình AI.
Trong Postman, chúng ta có thể kiểm tra điểm cuối này bằng cách tạo yêu cầu WebSocket mới và kết nối với điểm cuối WebSocket localhost:3500/chat .
Khi bạn nhấp vào kết nối, ngăn Tin nhắn sẽ hiển thị rằng ứng dụng khách API được kết nối với URL và ổ cắm đang mở.
Để kiểm tra điều này, hãy gửi tin nhắn "Xin chào Bot" đến máy chủ trò chuyện và bạn sẽ nhận được phản hồi kiểm tra ngay lập tức "Phản hồi:Mô phỏng phản hồi từ dịch vụ GPT" như hiển thị bên dưới:
Kiểm tra trò chuyện của người đưa thư
Chèn phụ thuộc vào FastAPI
Để có thể phân biệt giữa hai phiên khách hàng khác nhau và giới hạn các phiên trò chuyện, chúng tôi sẽ sử dụng mã thông báo tính thời gian, được chuyển dưới dạng tham số truy vấn tới kết nối WebSocket.
Trong thư mục socket, tạo một tệp có tên utils.py sau đó thêm mã bên dưới:
from fastapi import WebSocket, status, Query
from typing import Optional
async def get_token(
websocket: WebSocket,
token: Optional[str] = Query(None),
):
if token is None or token == "":
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return token
Hàm get_token nhận WebSocket và mã thông báo, sau đó kiểm tra xem mã thông báo đó là Không có hay rỗng.
Trong trường hợp này, hàm sẽ trả về trạng thái vi phạm chính sách và nếu có, hàm sẽ chỉ trả về mã thông báo. Cuối cùng, chúng tôi sẽ mở rộng chức năng này sau bằng cách xác thực mã thông báo bổ sung.
Để sử dụng chức năng này, chúng tôi đưa nó vào /chat tuyến đường. FastAPI cung cấp một lớp Phụ thuộc để dễ dàng chèn các phần phụ thuộc, vì vậy chúng ta không cần phải mày mò với các công cụ trang trí.
Cập nhật /chat định tuyến đến sau:
from ..socket.utils import get_token
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(data)
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
Bây giờ khi bạn cố gắng kết nối với /chat điểm cuối trong Postman, bạn sẽ gặp lỗi 403. Hiện tại, hãy cung cấp mã thông báo làm tham số truy vấn và cung cấp bất kỳ giá trị nào cho mã thông báo. Sau đó, bạn sẽ có thể kết nối như trước, chỉ bây giờ kết nối cần có mã thông báo.
Kiểm tra trò chuyện của người đưa thư bằng mã thông báo
Chúc mừng bạn đã tiến xa đến mức này! chat.py của bạn tập tin bây giờ trông như thế này:
import os
from fastapi import APIRouter, FastAPI, WebSocket, WebSocketDisconnect, Request, Depends, HTTPException
import uuid
from ..socket.connection import ConnectionManager
from ..socket.utils import get_token
chat = APIRouter()
manager = ConnectionManager()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
token = str(uuid.uuid4())
if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})
data = {"name": name, "token": token}
return data
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
return None
# @route Websocket /chat
# @desc Socket for chatbot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(data)
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
Trong phần tiếp theo của hướng dẫn này, chúng ta sẽ tập trung vào việc xử lý trạng thái của ứng dụng và truyền dữ liệu giữa máy khách và máy chủ.
Cách xây dựng hệ thống thời gian thực bằng Redis
Ứng dụng của chúng tôi hiện không lưu trữ bất kỳ trạng thái nào và không có cách nào để xác định người dùng hoặc lưu trữ và truy xuất dữ liệu trò chuyện. Chúng tôi cũng đang trả lại phản hồi được mã hóa cứng cho khách hàng trong các phiên trò chuyện.
Trong phần hướng dẫn này, chúng tôi sẽ đề cập đến những nội dung sau:
- Cách kết nối với Cụm Redis bằng Python và thiết lập Máy khách Redis
- Cách lưu trữ và truy xuất dữ liệu bằng Redis JSON
- Cách thiết lập Luồng Redis dưới dạng hàng đợi tin nhắn giữa máy chủ web và môi trường công nhân
Hàng đợi tin nhắn phân tán và Redis
Redis là kho lưu trữ dữ liệu trong bộ nhớ nguồn mở mà bạn có thể sử dụng làm cơ sở dữ liệu, bộ đệm, trình trung chuyển tin nhắn và công cụ phát trực tuyến. Nó hỗ trợ một số cấu trúc dữ liệu và là giải pháp hoàn hảo cho các ứng dụng phân tán có khả năng thời gian thực.
Đám mây doanh nghiệp Redis là dịch vụ đám mây được quản lý hoàn toàn do Redis cung cấp, giúp chúng tôi triển khai các cụm Redis ở quy mô vô hạn mà không phải lo lắng về cơ sở hạ tầng.
Chúng tôi sẽ sử dụng phiên bản Redis Enterprise Cloud miễn phí cho hướng dẫn này. Bạn có thể Bắt đầu sử dụng Redis Cloud miễn phí tại đây và làm theo Hướng dẫn này để thiết lập cơ sở dữ liệu Redis và Redis Insight, một GUI để tương tác với Redis.
Khi bạn đã thiết lập cơ sở dữ liệu Redis của mình, hãy tạo một thư mục mới trong thư mục gốc của dự án (bên ngoài thư mục máy chủ) có tên worker .
Chúng tôi sẽ cách ly môi trường nhân viên của mình khỏi máy chủ web để khi khách hàng gửi tin nhắn đến WebSocket của chúng tôi, máy chủ web không phải xử lý yêu cầu đến dịch vụ bên thứ ba. Ngoài ra, tài nguyên có thể được giải phóng cho những người dùng khác.
Giao tiếp nền với API suy luận được xử lý bởi dịch vụ riêng biệt này thông qua Redis.
Yêu cầu từ tất cả các máy khách được kết nối sẽ được thêm vào hàng đợi tin nhắn (nhà sản xuất), trong khi nhân viên xử lý tin nhắn, gửi yêu cầu đến API suy luận và thêm phản hồi vào hàng đợi phản hồi.
Khi API nhận được phản hồi, nó sẽ gửi lại cho khách hàng.
Trong chuyến đi giữa nhà sản xuất và người tiêu dùng, khách hàng có thể gửi nhiều tin nhắn và những tin nhắn này sẽ được xếp hàng đợi và phản hồi theo thứ tự.
Lý tưởng nhất là chúng ta có thể để chạy nhân viên này trên một máy chủ hoàn toàn khác, trong môi trường riêng của nó, nhưng hiện tại, chúng ta sẽ tạo môi trường Python của riêng nó trên máy cục bộ của mình.
Bạn có thể thắc mắc – tại sao chúng ta cần nhân viên? Hãy tưởng tượng một tình huống trong đó máy chủ web cũng tạo yêu cầu tới dịch vụ của bên thứ ba. Điều này có nghĩa là trong khi chờ phản hồi từ dịch vụ bên thứ ba trong khi kết nối ổ cắm, máy chủ sẽ bị chặn và tài nguyên bị ràng buộc cho đến khi nhận được phản hồi từ API.
Bạn có thể thử điều này bằng cách tạo một giấc ngủ ngẫu nhiên time.sleep(10) trước khi gửi phản hồi được mã hóa cứng và gửi tin nhắn mới. Sau đó thử kết nối bằng một mã thông báo khác trong phiên đưa thư mới.
Bạn sẽ nhận thấy rằng phiên trò chuyện sẽ không kết nối cho đến khi hết thời gian ngủ ngẫu nhiên.
Mặc dù chúng tôi có thể sử dụng các kỹ thuật không đồng bộ và nhóm công nhân trong thiết lập máy chủ tập trung vào sản xuất hơn, nhưng điều đó cũng sẽ không đủ khi số lượng người dùng đồng thời tăng lên.
Cuối cùng, chúng tôi muốn tránh việc ràng buộc tài nguyên máy chủ web bằng cách sử dụng Redis để kết nối giao tiếp giữa API trò chuyện của chúng tôi và API của bên thứ ba.
Tiếp theo, mở một thiết bị đầu cuối mới, cd vào thư mục worker, đồng thời tạo và kích hoạt môi trường ảo Python mới tương tự như những gì chúng ta đã làm trong phần 1.
Tiếp theo, cài đặt các phần phụ thuộc sau:
pip install aiohttp aioredis python-dotenv
Cách kết nối với cụm Redis bằng Python bằng ứng dụng khách Redis
Chúng tôi sẽ sử dụng ứng dụng khách aioredis để kết nối với cơ sở dữ liệu Redis. Chúng tôi cũng sẽ sử dụng thư viện yêu cầu để gửi yêu cầu tới API suy luận Huggingface.
Tạo hai tệp .env , và main.py . Sau đó tạo thư mục có tên src . Ngoài ra, hãy tạo một thư mục có tên redis và thêm một tệp mới có tên config.py .
Trong .env tệp, thêm mã sau – và đảm bảo bạn cập nhật các trường bằng thông tin xác thực được cung cấp trong Cụm Redis của bạn.
export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>
Trong config.py thêm Lớp Redis bên dưới:
import os
from dotenv import load_dotenv
import aioredis
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
Chúng tôi tạo một đối tượng Redis và khởi tạo các tham số cần thiết từ các biến môi trường. Sau đó, chúng tôi tạo một phương thức không đồng bộ create_connection để tạo kết nối Redis và trả về nhóm kết nối thu được từ aioredis phương thức from_url .
Tiếp theo, chúng tôi kiểm tra kết nối Redis trong main.py bằng cách chạy mã bên dưới. Thao tác này sẽ tạo một nhóm kết nối Redis mới, đặt một "khóa" khóa đơn giản và gán một "giá trị" chuỗi cho nó.
from src.redis.config import Redis
import asyncio
async def main():
redis = Redis()
redis = await redis.create_connection()
print(redis)
await redis.set("key", "value")
if __name__ == "__main__":
asyncio.run(main())
Bây giờ hãy mở Redis Insight (nếu bạn đã làm theo hướng dẫn để tải xuống và cài đặt nó) Bạn sẽ thấy một cái gì đó như thế này:
Kiểm tra thông tin chi tiết về Redis
Cách làm việc với luồng Redis
Bây giờ chúng ta đã thiết lập xong môi trường Worker, chúng ta có thể tạo một nhà sản xuất trên máy chủ web và một người tiêu dùng trên Worker.
Đầu tiên, hãy tạo lại lớp Redis của chúng ta trên máy chủ. Trong server.src tạo một thư mục có tên redis và thêm hai tệp, config.py và producer.py .
Trong config.py , hãy thêm mã bên dưới như chúng tôi đã làm cho môi trường Worker:
import os
from dotenv import load_dotenv
import aioredis
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
Trong tệp .env, hãy thêm thông tin đăng nhập Redis:
export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>
Cuối cùng, ở server.src.redis.producer.py thêm đoạn mã sau:
from .config import Redis
class Producer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def add_to_stream(self, data: dict, stream_channel):
try:
msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
print(f"Message id {msg_id} added to {stream_channel} stream")
return msg_id
except Exception as e:
print(f"Error sending msg to stream => {e}")
Chúng tôi đã tạo một lớp Nhà sản xuất được khởi tạo bằng ứng dụng khách Redis. Chúng tôi sử dụng ứng dụng khách này để thêm dữ liệu vào luồng bằng add_to_stream phương thức lấy dữ liệu và tên kênh Redis.
Lệnh Redis để thêm dữ liệu vào kênh truyền phát là xadd và nó có cả chức năng cấp cao và cấp thấp trong aioredis.
Tiếp theo, để chạy Nhà sản xuất mới tạo của chúng tôi, hãy cập nhật chat.py và WebSocket /chat điểm cuối như dưới đây. Lưu ý tên kênh được cập nhật message_channel .
from ..redis.producer import Producer
from ..redis.config import Redis
chat = APIRouter()
manager = ConnectionManager()
redis = Redis()
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
redis_client = await redis.create_connection()
producer = Producer(redis_client)
try:
while True:
data = await websocket.receive_text()
print(data)
stream_data = {}
stream_data[token] = data
await producer.add_to_stream(stream_data, "message_channel")
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
Tiếp theo, trong Postman, tạo kết nối và gửi số lượng tin nhắn bất kỳ có nội dung Hello . Bạn sẽ in các thông báo luồng tới thiết bị đầu cuối như bên dưới:
Kiểm tra tin nhắn kênh đầu cuối
Trong Redis Insight, bạn sẽ thấy mesage_channel mới được tạo và một hàng đợi có dấu thời gian chứa đầy các tin nhắn được gửi từ máy khách. Hàng đợi được đánh dấu thời gian này rất quan trọng để duy trì thứ tự của các tin nhắn.
Kênh thông tin chi tiết về Redis
Cách lập mô hình dữ liệu trò chuyện
Tiếp theo, chúng ta sẽ tạo một mô hình cho tin nhắn trò chuyện của mình. Hãy nhớ rằng chúng ta đang gửi dữ liệu văn bản qua WebSockets, nhưng dữ liệu trò chuyện của chúng ta cần chứa nhiều thông tin hơn là chỉ văn bản. Chúng tôi cần đánh dấu thời gian khi cuộc trò chuyện được gửi, tạo ID cho từng tin nhắn và thu thập dữ liệu về phiên trò chuyện, sau đó lưu trữ dữ liệu này ở định dạng JSON.
Chúng tôi có thể lưu trữ dữ liệu JSON này trong Redis để không bị mất lịch sử trò chuyện khi mất kết nối vì WebSocket của chúng tôi không lưu trữ trạng thái.
Trong server.src tạo một thư mục mới có tên schema . Sau đó tạo một tệp có tên chat.py trong server.src.schema thêm đoạn mã sau:
from datetime import datetime
from pydantic import BaseModel
from typing import List, Optional
import uuid
class Message(BaseModel):
id = uuid.uuid4()
msg: str
timestamp = str(datetime.now())
class Chat(BaseModel):
token: str
messages: List[Message]
name: str
session_start = str(datetime.now())
Chúng tôi đang sử dụng BaseModel của Pydantic lớp để mô hình hóa dữ liệu trò chuyện. Chat lớp sẽ chứa dữ liệu về một phiên Trò chuyện. Nó sẽ lưu trữ mã thông báo, tên người dùng và dấu thời gian được tạo tự động cho thời gian bắt đầu phiên trò chuyện bằng cách sử dụng datetime.now() .
Các tin nhắn được gửi và nhận trong phiên trò chuyện này được lưu trữ bằng Message lớp tạo id trò chuyện nhanh chóng bằng cách sử dụng uuid4 . Dữ liệu duy nhất chúng tôi cần cung cấp khi khởi tạo Message này class là nội dung tin nhắn.
Cách làm việc với Redis JSON
Để sử dụng khả năng lưu trữ lịch sử trò chuyện của Redis JSON, chúng ta cần cài đặt rejson do phòng thí nghiệm Redis cung cấp.
Trong terminal, cd vào server và cài đặt rejson với pip install rejson . Sau đó cập nhật Redis của bạn lớp trong server.src.redis.config.py để bao gồm create_rejson_connection phương pháp:
import os
from dotenv import load_dotenv
import aioredis
from rejson import Client
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
self.REDIS_HOST = os.environ['REDIS_HOST']
self.REDIS_PORT = os.environ['REDIS_PORT']
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
def create_rejson_connection(self):
self.redisJson = Client(host=self.REDIS_HOST,
port=self.REDIS_PORT, decode_responses=True, username=self.REDIS_USER, password=self.REDIS_PASSWORD)
return self.redisJson
Chúng tôi đang thêm create_rejson_connection phương thức kết nối với Redis bằng rejson Client . Điều này cung cấp cho chúng tôi các phương pháp để tạo và thao tác dữ liệu JSON trong Redis, những phương pháp này không có trong aioredis.
Tiếp theo, ở server.src.routes.chat.py chúng tôi có thể cập nhật /token điểm cuối để tạo Chat mới instance và lưu trữ dữ liệu phiên trong Redis JSON như sau:
@chat.post("/token")
async def token_generator(name: str, request: Request):
token = str(uuid.uuid4())
if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})
# Create new chat session
json_client = redis.create_rejson_connection()
chat_session = Chat(
token=token,
messages=[],
name=name
)
# Store chat session in redis JSON with the token as key
json_client.jsonset(str(token), Path.rootPath(), chat_session.dict())
# Set a timeout for redis data
redis_client = await redis.create_connection()
await redis_client.expire(str(token), 3600)
return chat_session.dict()
LƯU Ý:Vì đây là ứng dụng demo nên tôi không muốn lưu trữ dữ liệu trò chuyện trong Redis quá lâu. Vì vậy, tôi đã thêm thời gian chờ 60 phút cho mã thông báo bằng ứng dụng khách aioredis (rejson không triển khai thời gian chờ). Điều này có nghĩa là sau 60 phút, dữ liệu phiên trò chuyện sẽ bị mất.
Điều này là cần thiết vì chúng tôi không xác thực người dùng và chúng tôi muốn kết xuất dữ liệu trò chuyện sau một khoảng thời gian xác định. Bước này là tùy chọn và bạn không cần phải thêm nó vào.
Tiếp theo, trong Postman, khi bạn gửi yêu cầu POST để tạo mã thông báo mới, bạn sẽ nhận được phản hồi có cấu trúc giống như bên dưới. Bạn cũng có thể kiểm tra Redis Insight để xem dữ liệu trò chuyện được lưu trữ cùng với mã thông báo dưới dạng khóa JSON và dữ liệu dưới dạng giá trị.
Đã cập nhật trình tạo mã thông báo
Cách cập nhật phụ thuộc mã thông báo
Bây giờ chúng ta đã tạo và lưu trữ mã thông báo, đây là thời điểm thích hợp để cập nhật get_token phụ thuộc vào /chat của chúng tôi WebSocket. Chúng tôi thực hiện việc này để kiểm tra mã thông báo hợp lệ trước khi bắt đầu phiên trò chuyện.
Trong server.src.socket.utils.py cập nhật get_token để kiểm tra xem mã thông báo có tồn tại trong phiên bản Redis hay không. Nếu đúng như vậy thì chúng tôi trả lại mã thông báo, điều đó có nghĩa là kết nối ổ cắm hợp lệ. Nếu nó không tồn tại, chúng tôi đóng kết nối.
Mã thông báo được tạo bởi /token sẽ ngừng tồn tại sau 60 phút. Vì vậy, chúng tôi có thể có một số logic đơn giản ở giao diện người dùng để chuyển hướng người dùng tạo mã thông báo mới nếu phản hồi lỗi được tạo trong khi cố gắng bắt đầu trò chuyện.
from ..redis.config import Redis
async def get_token(
websocket: WebSocket,
token: Optional[str] = Query(None),
):
if token is None or token == "":
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
redis_client = await redis.create_connection()
isexists = await redis_client.exists(token)
if isexists == 1:
return token
else:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Session not authenticated or expired token")
Để kiểm tra sự phụ thuộc, hãy kết nối với phiên trò chuyện bằng mã thông báo ngẫu nhiên mà chúng tôi đang sử dụng và bạn sẽ gặp lỗi 403. (Lưu ý rằng bạn phải xóa mã thông báo theo cách thủ công trong Redis Insight.)
Bây giờ hãy sao chép mã thông báo được tạo khi bạn gửi yêu cầu đăng bài tới /token điểm cuối (hoặc tạo một yêu cầu mới) và dán nó làm giá trị cho tham số truy vấn mã thông báo được yêu cầu bởi /chat WebSocket. Sau đó kết nối. Bạn sẽ nhận được kết nối thành công.
Phiên trò chuyện bằng mã thông báo
Kết hợp tất cả lại với nhau, chat.py của bạn sẽ trông giống như bên dưới.
import os
from fastapi import APIRouter, FastAPI, WebSocket, WebSocketDisconnect, Request, Depends
import uuid
from ..socket.connection import ConnectionManager
from ..socket.utils import get_token
import time
from ..redis.producer import Producer
from ..redis.config import Redis
from ..schema.chat import Chat
from rejson import Path
chat = APIRouter()
manager = ConnectionManager()
redis = Redis()
# @route POST /token
# @desc Route to generate chat token
# @access Public
@chat.post("/token")
async def token_generator(name: str, request: Request):
token = str(uuid.uuid4())
if name == "":
raise HTTPException(status_code=400, detail={
"loc": "name", "msg": "Enter a valid name"})
# Create nee chat session
json_client = redis.create_rejson_connection()
chat_session = Chat(
token=token,
messages=[],
name=name
)
print(chat_session.dict())
# Store chat session in redis JSON with the token as key
json_client.jsonset(str(token), Path.rootPath(), chat_session.dict())
# Set a timeout for redis data
redis_client = await redis.create_connection()
await redis_client.expire(str(token), 3600)
return chat_session.dict()
# @route POST /refresh_token
# @desc Route to refresh token
# @access Public
@chat.post("/refresh_token")
async def refresh_token(request: Request):
return None
# @route Websocket /chat
# @desc Socket for chat bot
# @access Public
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
redis_client = await redis.create_connection()
producer = Producer(redis_client)
json_client = redis.create_rejson_connection()
try:
while True:
data = await websocket.receive_text()
stream_data = {}
stream_data[token] = data
await producer.add_to_stream(stream_data, "message_channel")
await manager.send_personal_message(f"Response: Simulating response from the GPT service", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
Làm tốt lắm khi đạt được điều đó đến nay! Trong phần tiếp theo, chúng ta sẽ tập trung vào việc giao tiếp với mô hình AI và xử lý việc truyền dữ liệu giữa máy khách, máy chủ, nhân viên và API bên ngoài.
Cách thêm trí thông minh vào Chatbot bằng Mô hình AI
Trong phần này, chúng tôi sẽ tập trung vào việc xây dựng trình bao bọc để giao tiếp với mô hình máy biến áp, gửi lời nhắc từ người dùng đến API ở định dạng hội thoại, đồng thời nhận và chuyển đổi phản hồi cho ứng dụng trò chuyện của chúng tôi.
Cách bắt đầu ôm mặt
Chúng tôi sẽ không xây dựng hoặc triển khai bất kỳ mô hình ngôn ngữ nào trên Hugginface. Thay vào đó, chúng tôi sẽ tập trung vào việc sử dụng API suy luận tăng tốc của Huggingface để kết nối với các mô hình được đào tạo trước.
Model chúng tôi sẽ sử dụng là Model GPT-J-6B do EleutherAI cung cấp. Đó là một mô hình ngôn ngữ tổng quát được đào tạo với 6 tỷ tham số.
Huggingface cung cấp cho chúng tôi API giới hạn theo yêu cầu để kết nối với mô hình này khá miễn phí.
Để bắt đầu với Huggingface, hãy tạo một tài khoản miễn phí. Trong cài đặt của bạn, hãy tạo mã thông báo truy cập mới. Với tối đa 30 nghìn mã thông báo, Huggingface cung cấp quyền truy cập miễn phí vào API suy luận.
Bạn có thể theo dõi việc sử dụng API của mình tại đây. Hãy đảm bảo bạn giữ mã thông báo này an toàn và không tiết lộ nó một cách công khai.
Lưu ý:Chúng tôi sẽ sử dụng kết nối HTTP để liên lạc với API vì chúng tôi đang sử dụng tài khoản miễn phí. Nhưng tài khoản PRO Huggingface hỗ trợ phát trực tuyến bằng WebSockets, xem các công việc song song và hàng loạt.
Điều này có thể giúp cải thiện đáng kể thời gian phản hồi giữa mô hình và ứng dụng trò chuyện của chúng tôi và hy vọng tôi sẽ đề cập đến phương pháp này trong bài viết tiếp theo.
Cách tương tác với mô hình ngôn ngữ
Đầu tiên, chúng tôi thêm thông tin xác thực kết nối Huggingface vào tệp .env trong thư mục worker của chúng tôi.
export HUGGINFACE_INFERENCE_TOKEN=<HUGGINGFACE ACCESS TOKEN>
export MODEL_URL=https://api-inference.huggingface.co/models/EleutherAI/gpt-j-6B
Tiếp theo, ở worker.src tạo thư mục có tên model sau đó thêm tệp gptj.py . Sau đó thêm lớp GPT bên dưới:
import os
from dotenv import load_dotenv
import requests
import json
load_dotenv()
class GPT:
def __init__(self):
self.url = os.environ.get('MODEL_URL')
self.headers = {
"Authorization": f"Bearer {os.environ.get('HUGGINFACE_INFERENCE_TOKEN')}"}
self.payload = {
"inputs": "",
"parameters": {
"return_full_text": False,
"use_cache": True,
"max_new_tokens": 25
}
}
def query(self, input: str) -> list:
self.payload["inputs"] = input
data = json.dumps(self.payload)
response = requests.request(
"POST", self.url, headers=self.headers, data=data)
print(json.loads(response.content.decode("utf-8")))
return json.loads(response.content.decode("utf-8"))
if __name__ == "__main__":
GPT().query("Will artificial intelligence help humanity conquer the universe?")
GPT lớp được khởi tạo với mô hình Huggingface url , xác thực header và payload được xác định trước . Nhưng đầu vào tải trọng là trường động được cung cấp bởi query và được cập nhật trước khi chúng tôi gửi yêu cầu đến điểm cuối Huggingface.
Cuối cùng, chúng tôi kiểm tra điều này bằng cách chạy trực tiếp phương thức truy vấn trên một phiên bản của lớp GPT. Trong terminal, chạy python src/model/gptj.py , và bạn sẽ nhận được phản hồi như thế này (chỉ cần lưu ý rằng phản hồi của bạn chắc chắn sẽ khác với phản hồi này):
[{'generated_text': ' (AI) could solve all the problems on this planet? I am of the opinion that in the short term artificial intelligence is much better than human beings, but in the long and distant future human beings will surpass artificial intelligence.\n\nIn the distant'}]
Tiếp theo, chúng tôi thêm một số điều chỉnh vào đầu vào để tương tác với mô hình mang tính trò chuyện hơn bằng cách thay đổi định dạng của đầu vào.
Cập nhật GPT lớp như vậy:
class GPT:
def __init__(self):
self.url = os.environ.get('MODEL_URL')
self.headers = {
"Authorization": f"Bearer {os.environ.get('HUGGINFACE_INFERENCE_TOKEN')}"}
self.payload = {
"inputs": "",
"parameters": {
"return_full_text": False,
"use_cache": False,
"max_new_tokens": 25
}
}
def query(self, input: str) -> list:
self.payload["inputs"] = f"Human: {input} Bot:"
data = json.dumps(self.payload)
response = requests.request(
"POST", self.url, headers=self.headers, data=data)
data = json.loads(response.content.decode("utf-8"))
text = data[0]['generated_text']
res = str(text.split("Human:")[0]).strip("\n").strip()
return res
if __name__ == "__main__":
GPT().query("Will artificial intelligence help humanity conquer the universe?")
Chúng tôi đã cập nhật dữ liệu đầu vào bằng một chuỗi ký tự f"Human: {input} Bot:" . Đầu vào của con người được đặt trong chuỗi và Bot sẽ đưa ra phản hồi. Định dạng đầu vào này biến GPT-J6B thành mô hình đàm thoại. Những thay đổi khác mà bạn có thể nhận thấy bao gồm
- use_cache:bạn có thể đặt giá trị này là Sai nếu bạn muốn mô hình tạo phản hồi mới khi đầu vào giống nhau. Tôi khuyên bạn nên để giá trị này là True trong quá trình sản xuất để tránh làm cạn kiệt mã thông báo miễn phí của bạn nếu người dùng tiếp tục gửi spam cho bot với cùng một tin nhắn. Việc sử dụng bộ đệm không thực sự tải phản hồi mới từ mô hình.
- return_full_text:là Sai, vì chúng tôi không cần trả lại dữ liệu đầu vào – chúng tôi đã có dữ liệu đó. Khi nhận được phản hồi, chúng tôi sẽ loại bỏ "Bot:" và dấu cách ở đầu/cuối khỏi phản hồi và chỉ trả lại văn bản phản hồi.
Cách mô phỏng bộ nhớ ngắn hạn cho mô hình AI
Đối với mỗi đầu vào mới mà chúng tôi gửi tới mô hình, không có cách nào để mô hình ghi nhớ lịch sử hội thoại. Điều này rất quan trọng nếu chúng ta muốn giữ bối cảnh trong cuộc trò chuyện.
Nhưng hãy nhớ rằng khi số lượng mã thông báo chúng tôi gửi đến mô hình tăng lên, quá trình xử lý sẽ tốn kém hơn và thời gian phản hồi cũng lâu hơn.
Vì vậy, chúng ta sẽ cần tìm cách lấy lại lịch sử ngắn hạn và gửi nó đến mô hình. Chúng tôi cũng cần phải tìm ra một điểm phù hợp - chúng tôi muốn truy xuất và gửi bao nhiêu dữ liệu lịch sử đến mô hình?
Để xử lý lịch sử trò chuyện, chúng tôi cần quay lại cơ sở dữ liệu JSON của mình. Chúng ta sẽ sử dụng token để lấy dữ liệu trò chuyện cuối cùng và sau đó khi chúng tôi nhận được phản hồi, hãy thêm phản hồi vào cơ sở dữ liệu JSON.
Cập nhật worker.src.redis.config.py để bao gồm create_rejson_connection phương pháp. Ngoài ra, hãy cập nhật tệp .env với dữ liệu xác thực và đảm bảo đã cài đặt rejson.
worker.src.redis.config.py của bạn sẽ trông như thế này:
import os
from dotenv import load_dotenv
import aioredis
from rejson import Client
load_dotenv()
class Redis():
def __init__(self):
"""initialize connection """
self.REDIS_URL = os.environ['REDIS_URL']
self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
self.REDIS_USER = os.environ['REDIS_USER']
self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
self.REDIS_HOST = os.environ['REDIS_HOST']
self.REDIS_PORT = os.environ['REDIS_PORT']
async def create_connection(self):
self.connection = aioredis.from_url(
self.connection_url, db=0)
return self.connection
def create_rejson_connection(self):
self.redisJson = Client(host=self.REDIS_HOST,
port=self.REDIS_PORT, decode_responses=True, username=self.REDIS_USER, password=self.REDIS_PASSWORD)
return self.redisJson
Trong khi tệp .env của bạn sẽ trông như thế này:
export REDIS_URL=<REDIS URL PROVIDED IN REDIS CLOUD>
export REDIS_USER=<REDIS USER IN REDIS CLOUD>
export REDIS_PASSWORD=<DATABASE PASSWORD IN REDIS CLOUD>
export REDIS_HOST=<REDIS HOST IN REDIS CLOUD>
export REDIS_PORT=<REDIS PORT IN REDIS CLOUD>
export HUGGINFACE_INFERENCE_TOKEN=<HUGGINGFACE ACCESS TOKEN>
export MODEL_URL=https://api-inference.huggingface.co/models/EleutherAI/gpt-j-6B
Tiếp theo, vào worker.src.redis tạo một tệp mới có tên cache.py và thêm mã bên dưới:
from .config import Redis
from rejson import Path
class Cache:
def __init__(self, json_client):
self.json_client = json_client
async def get_chat_history(self, token: str):
data = self.json_client.jsonget(
str(token), Path.rootPath())
return data
Bộ đệm được khởi tạo bằng máy khách rejson và phương thức get_chat_history lấy mã thông báo để lấy lịch sử trò chuyện cho mã thông báo đó, từ Redis. Đảm bảo bạn nhập đối tượng Đường dẫn từ rejson.
Tiếp theo, cập nhật worker.main.py với mã bên dưới:
from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
redis = Redis()
async def main():
json_client = redis.create_rejson_connection()
data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)
if __name__ == "__main__":
asyncio.run(main())
Tôi đã mã hóa cứng mã thông báo mẫu được tạo từ các thử nghiệm trước đó trong Postman. Nếu bạn chưa tạo mã thông báo, chỉ cần gửi yêu cầu mới tới /token và sao chép mã thông báo, sau đó chạy python main.py trong thiết bị đầu cuối. Bạn sẽ thấy dữ liệu trong terminal như sau:
{'token': '18196e23-763b-4808-ae84-064348a0daff', 'messages': [], 'name': 'Stephen', 'session_start': '2022-07-16 13:20:01.092109'}
Tiếp theo, chúng ta cần thêm add_message_to_cache phương thức tới Cache của chúng tôi lớp thêm thông báo vào Redis cho một mã thông báo cụ thể.
async def add_message_to_cache(self, token: str, message_data: dict):
self.json_client.jsonarrappend(
str(token), Path('.messages'), message_data)
jsonarrappend phương thức do rejson cung cấp sẽ thêm thông báo mới vào mảng thông báo.
Lưu ý để truy cập vào mảng thông báo chúng ta cần cung cấp .messages như một đối số cho Con đường. Nếu dữ liệu tin nhắn của bạn có cấu trúc khác/lồng nhau, chỉ cần cung cấp đường dẫn đến mảng mà bạn muốn nối dữ liệu mới vào.
Để kiểm tra phương pháp này, hãy cập nhật hàm main trong tệp main.py với mã bên dưới:
async def main():
json_client = redis.create_rejson_connection()
await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", message_data={
"id": "1",
"msg": "Hello",
"timestamp": "2022-07-16 13:20:01.092109"
})
data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)
Chúng tôi đang gửi tin nhắn được mã hóa cứng vào bộ đệm và lấy lịch sử trò chuyện từ bộ đệm. Khi bạn chạy python main.py trong thiết bị đầu cuối trong thư mục công nhân, bạn sẽ nhận được một cái gì đó như thế này được in trong thiết bị đầu cuối, với thông báo được thêm vào mảng thông báo.
{'token': '18196e23-763b-4808-ae84-064348a0daff', 'messages': [{'id': '1', 'msg': 'Hello', 'timestamp': '2022-07-16 13:20:01.092109'}], 'name': 'Stephen', 'session_start': '2022-07-16 13:20:01.092109'}
Cuối cùng, chúng ta cần cập nhật chức năng chính để gửi dữ liệu tin nhắn đến mô hình GPT và cập nhật đầu vào bằng 4 cuối cùng tin nhắn được gửi giữa client và model.
Trước tiên hãy cập nhật add_message_to_cache của chúng tôi hoạt động với một "nguồn" đối số mới sẽ cho chúng tôi biết tin nhắn là con người hay bot. Sau đó, chúng tôi có thể sử dụng đối số này để thêm thẻ "Human:" hoặc "Bot:" vào dữ liệu trước khi lưu trữ vào bộ đệm.
Cập nhật add_message_to_cache phương thức trong lớp Cache như sau:
async def add_message_to_cache(self, token: str, source: str, message_data: dict):
if source == "human":
message_data['msg'] = "Human: " + (message_data['msg'])
elif source == "bot":
message_data['msg'] = "Bot: " + (message_data['msg'])
self.json_client.jsonarrappend(
str(token), Path('.messages'), message_data)
Then update the main function in main.py in the worker directory, and run python main.py to see the new results in the Redis database.
async def main():
json_client = redis.create_rejson_connection()
await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="human", message_data={
"id": "1",
"msg": "Hello",
"timestamp": "2022-07-16 13:20:01.092109"
})
data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)
Next, we need to update the main function to add new messages to the cache, read the previous 4 messages from the cache, and then make an API call to the model using the query method. It'll have a payload consisting of a composite string of the last 4 messages.
You can always tune the number of messages in the history you want to extract, but I think 4 messages is a pretty good number for a demo.
In worker.src , create a new folder schema. Then create a new file named chat.py and paste our message schema in chat.py like so:
from datetime import datetime
from pydantic import BaseModel
from typing import List, Optional
import uuid
class Message(BaseModel):
id = str(uuid.uuid4())
msg: str
timestamp = str(datetime.now())
Next, update the main.py file like below:
async def main():
json_client = redis.create_rejson_connection()
await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="human", message_data={
"id": "3",
"msg": "I would like to go to the moon to, would you take me?",
"timestamp": "2022-07-16 13:20:01.092109"
})
data = await Cache(json_client).get_chat_history(token="18196e23-763b-4808-ae84-064348a0daff")
print(data)
message_data = data['messages'][-4:]
input = ["" + i['msg'] for i in message_data]
input = " ".join(input)
res = GPT().query(input=input)
msg = Message(
msg=res
)
print(msg)
await Cache(json_client).add_message_to_cache(token="18196e23-763b-4808-ae84-064348a0daff", source="bot", message_data=msg.dict())
In the code above, we add new message data to the cache. This message will ultimately come from the message queue. Next we get the chat history from the cache, which will now include the most recent data we added.
Note that we are using the same hard-coded token to add to the cache and get from the cache, temporarily just to test this out.
Next, we trim off the cache data and extract only the last 4 items. Then we consolidate the input data by extracting the msg in a list and join it to an empty string.
Finally, we create a new Message instance for the bot response and add the response to the cache specifying the source as "bot"
Next, run python main.py a couple of times, changing the human message and id as desired with each run. You should have a full conversation input and output with the model.
Open Redis Insight and you should have something similar to the below:
Conversational Chat
Stream Consumer and Real-time Data Pull from the Message Queue
Next, we want to create a consumer and update our worker.main.py to connect to the message queue. We want it to pull the token data in real-time, as we are currently hard-coding the tokens and message inputs.
In worker.src.redis create a new file named stream.py . Add a StreamConsumer class with the code below:
class StreamConsumer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def consume_stream(self, count: int, block: int, stream_channel):
response = await self.redis_client.xread(
streams={stream_channel: '0-0'}, count=count, block=block)
return response
async def delete_message(self, stream_channel, message_id):
await self.redis_client.xdel(stream_channel, message_id)
The StreamConsumer class is initialized with a Redis client. The consume_stream method pulls a new message from the queue from the message channel, using the xread method provided by aioredis.
Next, update the worker.main.py file with a while loop to keep the connection to the message channel alive, like so:
from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
from src.redis.config import Redis
from src.redis.stream import StreamConsumer
import os
from src.schema.chat import Message
redis = Redis()
async def main():
json_client = redis.create_rejson_connection()
redis_client = await redis.create_connection()
consumer = StreamConsumer(redis_client)
cache = Cache(json_client)
print("Stream consumer started")
print("Stream waiting for new messages")
while True:
response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0)
if response:
for stream, messages in response:
# Get message from stream, and extract token, message data and message id
for message in messages:
message_id = message[0]
token = [k.decode('utf-8')
for k, v in message[1].items()][0]
message = [v.decode('utf-8')
for k, v in message[1].items()][0]
print(token)
# Create a new message instance and add to cache, specifying the source as human
msg = Message(msg=message)
await cache.add_message_to_cache(token=token, source="human", message_data=msg.dict())
# Get chat history from cache
data = await cache.get_chat_history(token=token)
# Clean message input and send to query
message_data = data['messages'][-4:]
input = ["" + i['msg'] for i in message_data]
input = " ".join(input)
res = GPT().query(input=input)
msg = Message(
msg=res
)
print(msg)
await cache.add_message_to_cache(token=token, source="bot", message_data=msg.dict())
# Delete messaage from queue after it has been processed
await consumer.delete_message(stream_channel="message_channel", message_id=message_id)
if __name__ == "__main__":
asyncio.run(main())
This is quite the update, so let's take it step by step:
We use a while True loop so that the worker can be online listening to messages from the queue.
Next, we await new messages from the message_channel by calling our consume_stream method. If we have a message in the queue, we extract the message_id, token, and message. Then we create a new instance of the Message class, add the message to the cache, and then get the last 4 messages. We set it as input to the GPT model query method.
Once we get a response, we then add the response to the cache using the add_message_to_cache method, then delete the message from the queue.
How to Update the Chat Client with the AI Response
So far, we are sending a chat message from the client to the message_channel (which is received by the worker that queries the AI model) to get a response.
Next, we need to send this response to the client. As long as the socket connection is still open, the client should be able to receive the response.
If the connection is closed, the client can always get a response from the chat history using the refresh_token điểm cuối.
In worker.src.redis create a new file named producer.py , and add a Producer class similar to what we had on the chat web server:
class Producer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def add_to_stream(self, data: dict, stream_channel) -> bool:
msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
print(f"Message id {msg_id} added to {stream_channel} stream")
return msg_id
Next, in the main.py file, update the main function to initialize the producer, create a stream data, and send the response to a response_channel using the add_to_stream method:
from src.redis.config import Redis
import asyncio
from src.model.gptj import GPT
from src.redis.cache import Cache
from src.redis.config import Redis
from src.redis.stream import StreamConsumer
import os
from src.schema.chat import Message
from src.redis.producer import Producer
redis = Redis()
async def main():
json_client = redis.create_rejson_connection()
redis_client = await redis.create_connection()
consumer = StreamConsumer(redis_client)
cache = Cache(json_client)
producer = Producer(redis_client)
print("Stream consumer started")
print("Stream waiting for new messages")
while True:
response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0)
if response:
for stream, messages in response:
# Get message from stream, and extract token, message data and message id
for message in messages:
message_id = message[0]
token = [k.decode('utf-8')
for k, v in message[1].items()][0]
message = [v.decode('utf-8')
for k, v in message[1].items()][0]
# Create a new message instance and add to cache, specifying the source as human
msg = Message(msg=message)
await cache.add_message_to_cache(token=token, source="human", message_data=msg.dict())
# Get chat history from cache
data = await cache.get_chat_history(token=token)
# Clean message input and send to query
message_data = data['messages'][-4:]
input = ["" + i['msg'] for i in message_data]
input = " ".join(input)
res = GPT().query(input=input)
msg = Message(
msg=res
)
stream_data = {}
stream_data[str(token)] = str(msg.dict())
await producer.add_to_stream(stream_data, "response_channel")
await cache.add_message_to_cache(token=token, source="bot", message_data=msg.dict())
# Delete messaage from queue after it has been processed
await consumer.delete_message(stream_channel="message_channel", message_id=message_id)
if __name__ == "__main__":
asyncio.run(main())
Next, we need to let the client know when we receive responses from the worker in the /chat socket endpoint. We do this by listening to the response stream. We do not need to include a while loop here as the socket will be listening as long as the connection is open.
Note that we also need to check which client the response is for by adding logic to check if the token connected is equal to the token in the response. Then we delete the message in the response queue once it's been read.
In server.src.redis create a new file named stream.py and add our StreamConsumer class like this:
from .config import Redis
class StreamConsumer:
def __init__(self, redis_client):
self.redis_client = redis_client
async def consume_stream(self, count: int, block: int, stream_channel):
response = await self.redis_client.xread(
streams={stream_channel: '0-0'}, count=count, block=block)
return response
async def delete_message(self, stream_channel, message_id):
await self.redis_client.xdel(stream_channel, message_id)
Next, update the /chat socket endpoint like so:
from ..redis.stream import StreamConsumer
@chat.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await manager.connect(websocket)
redis_client = await redis.create_connection()
producer = Producer(redis_client)
json_client = redis.create_rejson_connection()
consumer = StreamConsumer(redis_client)
try:
while True:
data = await websocket.receive_text()
stream_data = {}
stream_data[str(token)] = str(data)
await producer.add_to_stream(stream_data, "message_channel")
response = await consumer.consume_stream(stream_channel="response_channel", block=0)
print(response)
for stream, messages in response:
for message in messages:
response_token = [k.decode('utf-8')
for k, v in message[1].items()][0]
if token == response_token:
response_message = [v.decode('utf-8')
for k, v in message[1].items()][0]
print(message[0].decode('utf-8'))
print(token)
print(response_token)
await manager.send_personal_message(response_message, websocket)
await consumer.delete_message(stream_channel="response_channel", message_id=message[0].decode('utf-8'))
except WebSocketDisconnect:
manager.disconnect(websocket)
Refresh Token
Finally, we need to update the /refresh_token endpoint to get the chat history from the Redis database using our Cache lớp học.
In server.src.redis , add a cache.py file and add the code below:
from rejson import Path
class Cache:
def __init__(self, json_client):
self.json_client = json_client
async def get_chat_history(self, token: str):
data = self.json_client.jsonget(
str(token), Path.rootPath())
return data
Next, in server.src.routes.chat.py import the Cache class and update the /token endpoint to the below:
from ..redis.cache import Cache
@chat.get("/refresh_token")
async def refresh_token(request: Request, token: str):
json_client = redis.create_rejson_connection()
cache = Cache(json_client)
data = await cache.get_chat_history(token)
if data == None:
raise HTTPException(
status_code=400, detail="Session expired or does not exist")
else:
return data
Now, when we send a GET request to the /refresh_token endpoint with any token, the endpoint will fetch the data from the Redis database.
If the token has not timed out, the data will be sent to the user. Or it'll send a 400 response if the token is not found.
How to Test the Chat with multiple Clients in Postman
Finally, we will test the chat system by creating multiple chat sessions in Postman, connecting multiple clients in Postman, and chatting with the bot on the clients.
Lastly, we will try to get the chat history for the clients and hopefully get a proper response.
Recap
Let's have a quick recap as to what we have achieved with our chat system. The chat client creates a token for each chat session with a client. This token is used to identify each client, and each message sent by clients connected to or web server is queued in a Redis channel (message_chanel), identified by the token.
Our worker environment reads from this channel. It does not have any clue who the client is (except that it's a unique token) and uses the message in the queue to send requests to the Huggingface inference API.
When it gets a response, the response is added to a response channel and the chat history is updated. The client listening to the response_channel immediately sends the response to the client once it receives a response with its token.
If the socket is still open, this response is sent. If the socket is closed, we are certain that the response is preserved because the response is added to the chat history. The client can get the history, even if a page refresh happens or in the event of a lost connection.
Congratulations on getting this far! You have been able to build a working chat system.
In follow-up articles, I will focus on building a chat user interface for the client, creating unit and functional tests, fine-tuning our worker environment for faster response time with WebSockets and asynchronous requests, and ultimately deploying the chat application on AWS.
This Article is part of a series on building full-stack intelligent chatbots with tools like Python, React, Huggingface, Redis, and so on. You can follow the full series on my blog:blog.stephensanwo.dev - AI ChatBot Series**
You can download the full repository on My Github Repository
I wrote this tutorial in collaboration with Redis. Need help getting started with Redis? Try the following resources:
- Try Redis Cloud free of charge
- Watch this video on the benefits of Redis Cloud over other Redis providers
- Redis Developer Hub - tools, guides, and tutorials about Redis
- RedisInsight Desktop GUI
Học cách viết mã miễn phí. Chương trình giảng dạy mã nguồn mở của freeCodeCamp đã giúp hơn 40.000 người có được việc làm với tư cách là nhà phát triển. Bắt đầu