目录
1 了解 WebSockets
2 为 WebSockets 设置 FastAPI
3 管理 WebSocket 连接
4 实现实时通知
5 处理身份验证和授权
6 扩展 WebSocket 应用程序
7 性能注意事项
8 总结
在当今快节奏的数字环境中,实时功能已成为现代 Web 应用程序的关键。用户希望获得即时更新、实时通知和无缝交互,而无需重新加载页面。这就是 WebSockets 发挥作用的地方,它在客户端和服务器之间提供了一个全双工、双向的通信通道。
FastAPI 是一个现代的高性能 Web 框架,用于使用 Python 构建 API,为 WebSockets 提供了出色的支持。
在本文中,我们将深入探讨如何利用 FastAPI 和 WebSockets 在你的应用程序中创建强大的实时功能和通知。
了解 WebSockets
在我们深入研究实现细节之前,让我们简要回顾一下什么是 WebSockets 以及为什么它们对于实时通信至关重要。
WebSockets 是一种协议,它通过单个 TCP 连接在客户端(通常是 Web 浏览器)和服务器之间实现双向通信。与遵循请求-响应模型的传统 HTTP 请求不同,WebSockets 允许连续、实时的数据交换,而无需轮询或长轮询技术。
WebSockets 说明
WebSockets 的主要优点包括:
- 低延迟:消息是即时发送和接收的。
- 减少服务器负载:无需持续轮询。
- 双向通信:客户端和服务器都可以启动数据传输。
- 高效用于实时应用程序:非常适合聊天应用程序、实时更新和通知。
为 WebSockets 设置 FastAPI
让我们先设置一个支持 WebSocket 的基本 FastAPI 应用程序。首先,确保你已安装 FastAPI 及其依赖项:
pip install fastapi[all]
现在,让我们创建一个带有 WebSocket 端点的简单 FastAPI 应用程序:
from fastapi import FastAPI, WebSocket
from fastapi.websockets import WebSocketDisconnect
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message received: {data}")
except WebSocketDisconnect:
print("Client disconnected")
此基本设置在 上创建一个 WebSocket 终端节点。当客户端连接时,服务器接受连接并进入一个循环,接收消息并将其回显给客户端。/ws
管理 WebSocket 连接
在实际应用程序中,您可能需要管理多个 WebSocket 连接。让我们创建一个连接管理器来处理这个问题:
from fastapi import FastAPI, WebSocket
from typing import List
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
app = FastAPI()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f"Client {client_id}: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client {client_id} left the chat")
此实现允许多个客户端连接并将消息广播到所有连接的客户端。
实施实时通知
现在我们已经有了基本的 WebSocket 设置,让我们实现一个实时通知系统。我们将创建一个简单的任务管理系统,用户可以在其中创建任务,其他用户将在添加新任务时收到实时通知。
首先,让我们定义我们的数据模型:
from pydantic import BaseModel
from typing import List, Dict
from datetime import datetime
class Task(BaseModel):
id: int
title: str
description: str
created_at: datetime
class NotificationManager:
def __init__(self):
self.subscriptions: Dict[str, List[WebSocket]] = {}
async def subscribe(self, topic: str, websocket: WebSocket):
if topic not in self.subscriptions:
self.subscriptions[topic] = []
self.subscriptions[topic].append(websocket)
def unsubscribe(self, topic: str, websocket: WebSocket):
if topic in self.subscriptions:
self.subscriptions[topic].remove(websocket)
async def notify(self, topic: str, message: str):
if topic in self.subscriptions:
for websocket in self.subscriptions[topic]:
await websocket.send_text(message)
notification_manager = NotificationManager()
现在,让我们更新 FastAPI 应用程序以包含任务创建和实时通知:
from fastapi import FastAPI, WebSocket, HTTPException
from typing import List, Dict
from datetime import datetime
app = FastAPI()
tasks: List[Task] = []
@app.post("/tasks")
async def create_task(task: Task):
task.created_at = datetime.now()
tasks.append(task)
await notification_manager.notify("tasks", f"New task created: {task.title}")
return {"message": "Task created successfully"}
@app.get("/tasks")
async def get_tasks():
return tasks
@app.websocket("/ws/notifications/{topic}")
async def websocket_endpoint(websocket: WebSocket, topic: str):
await websocket.accept()
await notification_manager.subscribe(topic, websocket)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
notification_manager.unsubscribe(topic, websocket)
在此实施中,客户端可以订阅特定主题(在本例中为 “任务”)的通知。创建新任务时,所有订阅的客户端都会收到实时通知。
处理身份验证和授权
在生产环境中,您需要保护 WebSocket 连接。FastAPI 可以轻松地将身份验证和授权与 WebSockets 集成。以下是使用 JWT 令牌的示例:
from fastapi import FastAPI, WebSocket, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from pydantic import BaseModel
# ... (previous imports and code)
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
def get_current_user(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
return User(username=username)
except JWTError:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
@app.websocket("/ws/secure/{topic}")
async def websocket_endpoint(
websocket: WebSocket,
topic: str,
token: str = Depends(oauth2_scheme)
):
user = get_current_user(token)
await websocket.accept()
await notification_manager.subscribe(topic, websocket)
try:
while True:
data = await websocket.receive_text()
await notification_manager.notify(topic, f"{user.username}: {data}")
except WebSocketDisconnect:
notification_manager.unsubscribe(topic, websocket)
此设置可确保只有经过身份验证的用户才能连接到 WebSocket 终端节点并接收通知。
扩展 WebSocket 应用程序
随着应用程序的增长,您可能需要扩展 WebSocket 实现。以下是一些需要考虑的策略:
- Redis Pub/Sub:使用 Redis 作为消息代理来处理跨多个服务器实例的 WebSocket 通信。
- 负载平衡:实施粘性会话以确保客户端保持与同一服务器的连接。
- 水平扩展:使用 AWS ElastiCache 或 Azure Cache for Redis 等服务在多个节点之间分配 WebSocket 连接。
以下是通过 FastAPI 和 WebSockets 使用 Redis 进行发布/订阅的基本示例:
import aioredis
from fastapi import FastAPI, WebSocket
app = FastAPI()
async def get_redis():
redis = await aioredis.create_redis_pool("redis://localhost")
return redis
@app.websocket("/ws/redis/{channel}")
async def websocket_endpoint(websocket: WebSocket, channel: str):
await websocket.accept()
redis = await get_redis()
try:
channel = await redis.subscribe(channel)
async for message in channel[0].iter():
await websocket.send_text(message.decode())
finally:
await redis.unsubscribe(channel)
redis.close()
await redis.wait_closed()
@app.post("/publish/{channel}")
async def publish(channel: str, message: str):
redis = await get_redis()
await redis.publish(channel, message)
return {"message": "Published"}
此实现允许您跨多个实例扩展 WebSocket 应用程序,同时保持实时通信。
性能注意事项
在 FastAPI 中使用 WebSockets 时,请牢记以下性能提示:
- 连接限制:为您的服务器可以处理的并发 WebSocket 连接数量设置适当的限制。
- 消息大小:实施消息大小限制以防止潜在的 DoS 攻击。
- 检测信号:实施定期检测信号,以保持连接处于活动状态并尽早检测断开连接。
- 压缩:使用 WebSocket 压缩来减少大型负载的带宽使用。
以下是实现检测信号和连接限制的示例:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio
app = FastAPI()
MAX_CONNECTIONS = 1000
HEARTBEAT_INTERVAL = 30 # seconds
class ConnectionManager:
def __init__(self):
self.active_connections = set()
async def connect(self, websocket: WebSocket):
if len(self.active_connections) >= MAX_CONNECTIONS:
await websocket.close(code=1008) # Connection limit exceeded
return
await websocket.accept()
self.active_connections.add(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_heartbeat(self, websocket: WebSocket):
while True:
try:
await asyncio.sleep(HEARTBEAT_INTERVAL)
await websocket.send_text("heartbeat")
except WebSocketDisconnect:
break
manager = ConnectionManager()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
heartbeat_task = asyncio.create_task(manager.send_heartbeat(websocket))
try:
while True:
data = await websocket.receive_text()
# Process received data
except WebSocketDisconnect:
manager.disconnect(websocket)
finally:
heartbeat_task.cancel()
此实现包括连接限制和心跳,以保持稳定高效的 WebSocket 连接。
结论
FastAPI 对 WebSockets 的支持为在 Web 应用程序中构建实时功能和通知提供了强大的基础。通过使用 WebSockets,您可以创建响应迅速、高效且可扩展的系统,以满足现代用户的需求。
在本文中,我们介绍了:
- 为 WebSockets 设置 FastAPI
- 管理多个 WebSocket 连接
- 实施实时通知
- 处理身份验证和授权
- 扩展 WebSocket 应用程序
- 性能注意事项
在继续构建和优化实时应用程序时,请记住考虑安全性、可扩展性和性能等因素。借助 FastAPI 和 WebSockets,您可以使用工具创建强大的高性能实时功能,让您的用户满意并使您的应用程序与众不同。