程序员求职经验分享与学习资料整理平台

网站首页 > 文章精选 正文

FastAPI 和 WebSockets — 构建实时功能和通知

balukai 2025-02-28 14:48:30 文章精选 14 ℃

目录

1 了解 WebSockets

2 为 WebSockets 设置 FastAPI

3 管理 WebSocket 连接

4 实现实时通知

5 处理身份验证和授权

6 扩展 WebSocket 应用程序

7 性能注意事项

8 总结

在当今快节奏的数字环境中,实时功能已成为现代 Web 应用程序的关键。用户希望获得即时更新、实时通知和无缝交互,而无需重新加载页面。这就是 WebSockets 发挥作用的地方,它在客户端和服务器之间提供了一个全双工、双向的通信通道。

FastAPI 是一个现代的高性能 Web 框架,用于使用 Python 构建 API,为 WebSockets 提供了出色的支持。

在本文中,我们将深入探讨如何利用 FastAPI 和 WebSockets 在你的应用程序中创建强大的实时功能和通知。

了解 WebSockets

在我们深入研究实现细节之前,让我们简要回顾一下什么是 WebSockets 以及为什么它们对于实时通信至关重要。

WebSockets 是一种协议,它通过单个 TCP 连接在客户端(通常是 Web 浏览器)和服务器之间实现双向通信。与遵循请求-响应模型的传统 HTTP 请求不同,WebSockets 允许连续、实时的数据交换,而无需轮询或长轮询技术。

WebSockets 说明

WebSockets 的主要优点包括:

  1. 低延迟:消息是即时发送和接收的。
  2. 减少服务器负载:无需持续轮询。
  3. 双向通信:客户端和服务器都可以启动数据传输。
  4. 高效用于实时应用程序:非常适合聊天应用程序、实时更新和通知。

为 WebSockets 设置 FastAPI

让我们先设置一个支持 WebSocket 的基本 FastAPI 应用程序。首先,确保你已安装 FastAPI 及其依赖项:

pip install fastapi[all]

现在,让我们创建一个带有 WebSocket 端点的简单 FastAPI 应用程序:

from fastapi import FastAPI, WebSocket
from fastapi.websockets import WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Message received: {data}")
    except WebSocketDisconnect:
        print("Client disconnected")

此基本设置在 上创建一个 WebSocket 终端节点。当客户端连接时,服务器接受连接并进入一个循环,接收消息并将其回显给客户端。/ws

管理 WebSocket 连接

在实际应用程序中,您可能需要管理多个 WebSocket 连接。让我们创建一个连接管理器来处理这个问题:

from fastapi import FastAPI, WebSocket
from typing import List

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

app = FastAPI()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(f"Client {client_id}: {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f"Client {client_id} left the chat")

此实现允许多个客户端连接并将消息广播到所有连接的客户端。

实施实时通知

现在我们已经有了基本的 WebSocket 设置,让我们实现一个实时通知系统。我们将创建一个简单的任务管理系统,用户可以在其中创建任务,其他用户将在添加新任务时收到实时通知。

首先,让我们定义我们的数据模型:

from pydantic import BaseModel
from typing import List, Dict
from datetime import datetime

class Task(BaseModel):
    id: int
    title: str
    description: str
    created_at: datetime

class NotificationManager:
    def __init__(self):
        self.subscriptions: Dict[str, List[WebSocket]] = {}

    async def subscribe(self, topic: str, websocket: WebSocket):
        if topic not in self.subscriptions:
            self.subscriptions[topic] = []
        self.subscriptions[topic].append(websocket)

    def unsubscribe(self, topic: str, websocket: WebSocket):
        if topic in self.subscriptions:
            self.subscriptions[topic].remove(websocket)

    async def notify(self, topic: str, message: str):
        if topic in self.subscriptions:
            for websocket in self.subscriptions[topic]:
                await websocket.send_text(message)

notification_manager = NotificationManager()

现在,让我们更新 FastAPI 应用程序以包含任务创建和实时通知:

from fastapi import FastAPI, WebSocket, HTTPException
from typing import List, Dict
from datetime import datetime

app = FastAPI()

tasks: List[Task] = []

@app.post("/tasks")
async def create_task(task: Task):
    task.created_at = datetime.now()
    tasks.append(task)
    await notification_manager.notify("tasks", f"New task created: {task.title}")
    return {"message": "Task created successfully"}

@app.get("/tasks")
async def get_tasks():
    return tasks

@app.websocket("/ws/notifications/{topic}")
async def websocket_endpoint(websocket: WebSocket, topic: str):
    await websocket.accept()
    await notification_manager.subscribe(topic, websocket)
    try:
        while True:
            await websocket.receive_text()
    except WebSocketDisconnect:
        notification_manager.unsubscribe(topic, websocket)

在此实施中,客户端可以订阅特定主题(在本例中为 “任务”)的通知。创建新任务时,所有订阅的客户端都会收到实时通知。

处理身份验证和授权

在生产环境中,您需要保护 WebSocket 连接。FastAPI 可以轻松地将身份验证和授权与 WebSockets 集成。以下是使用 JWT 令牌的示例:

from fastapi import FastAPI, WebSocket, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from pydantic import BaseModel

# ... (previous imports and code)

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

class User(BaseModel):
    username: str

def get_current_user(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise HTTPException(status_code=401, detail="Invalid authentication credentials")
        return User(username=username)
    except JWTError:
        raise HTTPException(status_code=401, detail="Invalid authentication credentials")

@app.websocket("/ws/secure/{topic}")
async def websocket_endpoint(
    websocket: WebSocket,
    topic: str,
    token: str = Depends(oauth2_scheme)
):
    user = get_current_user(token)
    await websocket.accept()
    await notification_manager.subscribe(topic, websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await notification_manager.notify(topic, f"{user.username}: {data}")
    except WebSocketDisconnect:
        notification_manager.unsubscribe(topic, websocket)

此设置可确保只有经过身份验证的用户才能连接到 WebSocket 终端节点并接收通知。

扩展 WebSocket 应用程序

随着应用程序的增长,您可能需要扩展 WebSocket 实现。以下是一些需要考虑的策略:

  1. Redis Pub/Sub:使用 Redis 作为消息代理来处理跨多个服务器实例的 WebSocket 通信。
  2. 负载平衡:实施粘性会话以确保客户端保持与同一服务器的连接。
  3. 水平扩展:使用 AWS ElastiCache 或 Azure Cache for Redis 等服务在多个节点之间分配 WebSocket 连接。

以下是通过 FastAPI 和 WebSockets 使用 Redis 进行发布/订阅的基本示例:

import aioredis
from fastapi import FastAPI, WebSocket

app = FastAPI()

async def get_redis():
    redis = await aioredis.create_redis_pool("redis://localhost")
    return redis

@app.websocket("/ws/redis/{channel}")
async def websocket_endpoint(websocket: WebSocket, channel: str):
    await websocket.accept()
    redis = await get_redis()
    try:
        channel = await redis.subscribe(channel)
        async for message in channel[0].iter():
            await websocket.send_text(message.decode())
    finally:
        await redis.unsubscribe(channel)
        redis.close()
        await redis.wait_closed()

@app.post("/publish/{channel}")
async def publish(channel: str, message: str):
    redis = await get_redis()
    await redis.publish(channel, message)
    return {"message": "Published"}

此实现允许您跨多个实例扩展 WebSocket 应用程序,同时保持实时通信。

性能注意事项

在 FastAPI 中使用 WebSockets 时,请牢记以下性能提示:

  1. 连接限制:为您的服务器可以处理的并发 WebSocket 连接数量设置适当的限制。
  2. 消息大小:实施消息大小限制以防止潜在的 DoS 攻击。
  3. 检测信号:实施定期检测信号,以保持连接处于活动状态并尽早检测断开连接。
  4. 压缩:使用 WebSocket 压缩来减少大型负载的带宽使用。

以下是实现检测信号和连接限制的示例:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio

app = FastAPI()

MAX_CONNECTIONS = 1000
HEARTBEAT_INTERVAL = 30  # seconds

class ConnectionManager:
    def __init__(self):
        self.active_connections = set()

    async def connect(self, websocket: WebSocket):
        if len(self.active_connections) >= MAX_CONNECTIONS:
            await websocket.close(code=1008)  # Connection limit exceeded
            return
        await websocket.accept()
        self.active_connections.add(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_heartbeat(self, websocket: WebSocket):
        while True:
            try:
                await asyncio.sleep(HEARTBEAT_INTERVAL)
                await websocket.send_text("heartbeat")
            except WebSocketDisconnect:
                break

manager = ConnectionManager()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    heartbeat_task = asyncio.create_task(manager.send_heartbeat(websocket))
    try:
        while True:
            data = await websocket.receive_text()
            # Process received data
    except WebSocketDisconnect:
        manager.disconnect(websocket)
    finally:
        heartbeat_task.cancel()

此实现包括连接限制和心跳,以保持稳定高效的 WebSocket 连接。

结论

FastAPI 对 WebSockets 的支持为在 Web 应用程序中构建实时功能和通知提供了强大的基础。通过使用 WebSockets,您可以创建响应迅速、高效且可扩展的系统,以满足现代用户的需求。

在本文中,我们介绍了:

  1. 为 WebSockets 设置 FastAPI
  2. 管理多个 WebSocket 连接
  3. 实施实时通知
  4. 处理身份验证和授权
  5. 扩展 WebSocket 应用程序
  6. 性能注意事项

在继续构建和优化实时应用程序时,请记住考虑安全性、可扩展性和性能等因素。借助 FastAPI 和 WebSockets,您可以使用工具创建强大的高性能实时功能,让您的用户满意并使您的应用程序与众不同。

最近发表
标签列表