运行异步TCP服务器与FastAPI:统一事件循环下的应用集成

运行异步TCP服务器与FastAPI:统一事件循环下的应用集成

本文详细阐述了如何在fastapi应用中,利用其`lifespan`事件管理器,高效且优雅地集成多个异步tcp服务器。通过正确使用`asyncio.create_task`在应用启动时启动后台服务,并在应用关闭时实现这些服务的平滑终止,确保fastapi与自定义tcp服务在同一个事件循环中协同工作,实现数据从tcp到websocket的无缝转发。

1. 引言:FastAPI与异步服务的融合

在构建现代异步应用时,我们常常需要将Web服务(如基于FastAPI)与自定义的后台服务(如TCP服务器)结合起来。FastAPI以其高性能和异步特性而闻名,而python的asyncio库则为构建并发网络应用提供了强大的支持。本文将探讨如何在同一个Python进程和事件循环中,无缝地运行一个FastAPI应用和多个异步TCP服务器,并实现数据在它们之间的流转,例如将TCP接收到的数据通过websocket广播给客户端。

2. 理解FastAPI的Lifespan事件管理器

FastAPI提供了lifespan事件管理器,这是一个基于contextlib.asynccontextmanager的强大工具,用于在应用程序启动和关闭时执行异步操作。其核心在于yield关键字:

  • yield之前的部分:在应用程序启动时执行。这里适合进行资源初始化、数据库连接、启动后台任务等操作。
  • yield之后的部分:在应用程序关闭时执行。这里适合进行资源清理、关闭连接、停止后台任务等操作。

常见误区:将需要持续运行的后台任务的启动逻辑放置在yield之后。这样做会导致任务仅在应用程序关闭时才尝试启动,而非在应用程序运行期间。

3. 正确集成异步TCP服务器的策略

为了让TCP服务器与FastAPI应用同时运行,并共享同一个事件循环,我们需要遵循以下策略:

  1. 使用 asyncio.create_task() 启动后台任务:将TCP服务器的启动协程包装成一个asyncio.Task。这会立即调度协程在事件循环中运行,而不会阻塞lifespan的启动流程。
  2. 在 yield 之前启动任务:确保所有需要随应用生命周期运行的后台任务都在lifespan的yield语句之前被创建并启动。
  3. 在 yield 之后实现优雅关闭:当应用收到关闭信号时(例如Ctrl+C或进程终止),lifespan的yield之后的部分会被执行。此时,我们应该取消之前启动的后台任务,并等待它们完成清理工作,以确保资源被正确释放。

4. 完整的代码示例

以下是根据上述策略修改后的代码,包括server.py, globals.py, websocket_manager.py 和 main.py。

websocket_manager.py (WebSocket连接管理)

# websocket_manager.py from fastapi import WebSocket from typing import List  class WebSocketManager:     """     管理活跃的WebSocket连接,并提供广播功能。     """     def __init__(self):         self.active_connections: List[WebSocket] = []      async def connect(self, websocket: WebSocket):         """建立WebSocket连接并将其添加到活跃连接列表。"""         await websocket.accept()         self.active_connections.append(websocket)      def disconnect(self, websocket: WebSocket):         """从活跃连接列表中移除断开的WebSocket连接。"""         if websocket in self.active_connections:             self.active_connections.remove(websocket)      async def broadcast(self, data: str):         """向所有活跃的WebSocket连接广播数据。"""         # 遍历时创建一个副本以避免在迭代过程中修改列表         for connection in list(self.active_connections):             try:                 await connection.send_text(data)             except Exception as e:                 print(f"Error broadcasting to WebSocket: {e}. Disconnecting...")                 self.disconnect(connection) # 广播失败则断开连接

globals.py (全局变量)

# globals.py import threading from websocket_manager import WebSocketManager  # 示例:全局数据存储和锁(当前示例中未使用,但保留结构) data_storage = {} data_lock = threading.Lock() # 注意:在asyncio环境中,通常应使用asyncio.Lock  # WebSocket管理器实例,供其他模块访问 websocket_manager = WebSocketManager()

server.py (异步TCP服务器)

# server.py import asyncio import globals  async def handle_client(reader: asyncio.streamReader, writer: asyncio.StreamWriter):     """     处理单个TCP客户端连接。     从客户端读取数据,并通过WebSocketManager广播。     """     peername = writer.get_extra_info('peername')     print(f"TCP client connected from {peername}")     try:         while True:             data = await reader.read(1024) # 读取最多1024字节             if not data:                 print(f"TCP client {peername} disconnected.")                 break             # 将接收到的原始数据解码为UTF-8字符串并广播             message = data.decode('utf-8', errors='ignore')             print(f"Received from TCP {peername}: {message}")             await globals.websocket_manager.broadcast(message)     except asyncio.CancelledError:         print(f"TCP client handler for {peername} cancelled.")     except Exception as e:         print(f"Error handling TCP client {peername}: {e}")     finally:         writer.close()         await writer.wait_closed()         print(f"TCP client writer for {peername} closed.")  async def run_tcp_server_task(port: int):     """     启动一个TCP服务器,并在事件循环中运行。     此函数设计为可取消的后台任务。     """     server = None     try:         print(f"Starting TCP server on 0.0.0.0:{port}...")         server = await asyncio.start_server(handle_client, '0.0.0.0', port)         async with server:             await server.serve_forever() # 阻塞直到任务被取消     except asyncio.CancelledError:         print(f"TCP server on port {port} task cancelled.")     except Exception as e:         print(f"Error in TCP server on port {port}: {e}")     finally:         if server:             server.close() # 关闭服务器套接字             await server.wait_closed() # 等待服务器完全关闭             print(f"TCP server on port {port} closed.")

main.py (FastAPI应用入口)

# main.py from fastapi import FastAPI, WebSocket import asyncio from contextlib import asynccontextmanager import globals # 导入全局变量 from server import run_tcp_server_task # 导入TCP服务器启动函数  @asynccontextmanager async def startup_event(app: FastAPI):     """     FastAPI应用的生命周期事件管理器。     在应用启动时启动TCP服务器,在应用关闭时停止它们。     """     print("Application startup: Initializing and starting background tasks...")      # 定义需要启动的TCP服务器端口     ports = [8001, 8002, 8003]     # 为每个TCP服务器创建一个后台任务     # 这些任务会在当前事件循环中并发运行     tcp_server_tasks = [asyncio.create_task(run_tcp_server_task(port)) for port in ports]      # `yield` 标志着应用启动完成,可以开始处理请求     yield      # `yield` 之后的部分在应用关闭时执行     print("Application shutdown: Stopping background tasks...")     # 取消所有TCP服务器任务     for task in tcp_server_tasks:         task.cancel()     # 等待所有任务完成取消和清理工作     # `return_exceptions=True` 确保即使有任务取消失败也不会阻塞其他任务     await asyncio.gather(*tcp_server_tasks, return_exceptions=True)     print("All background tasks stopped gracefully.")  # 使用lifespan事件管理器创建FastAPI应用 app = FastAPI(lifespan=startup_event)  @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket):     """     FastAPI的WebSocket端点。     管理WebSocket连接,并在连接断开时进行清理。     """     print("WebSocket connection established.")     await globals.websocket_manager.connect(websocket)     try:         # 保持WebSocket连接活跃,或处理来自客户端的WebSocket消息         # 在此示例中,我们只接收消息以保持连接开放,实际应用中可能需要处理消息         while True:             await websocket.receive_text()     except Exception as e:         print(f"WebSocket Error: {e}")     finally:         globals.websocket_manager.disconnect(websocket)         print("WebSocket connection closed.")  # 运行应用:uvicorn main:app --reload

5. 代码工作原理与注意事项

  1. lifespan的正确使用

    运行异步TCP服务器与FastAPI:统一事件循环下的应用集成

    AppMall应用商店

    AI应用商店,提供即时交付、按需付费的人工智能应用服务

    运行异步TCP服务器与FastAPI:统一事件循环下的应用集成56

    查看详情 运行异步TCP服务器与FastAPI:统一事件循环下的应用集成

    • 在main.py的startup_event中,asyncio.create_task(run_tcp_server_task(port))在yield之前被调用。这确保了TCP服务器任务在FastAPI应用启动时立即开始运行,并作为后台任务与FastAPI的http/WebSocket服务共享同一个事件循环。
    • 当应用收到关闭信号时,yield之后的代码块被执行。此时,我们通过task.cancel()向每个TCP服务器任务发送取消信号,并使用asyncio.gather(*tcp_server_tasks, return_exceptions=True)等待所有任务优雅地完成其清理工作。
  2. TCP服务器的优雅关闭

    • run_tcp_server_task函数内部使用了await server.serve_forever(),这是一个阻塞调用。当其所在的任务被取消时,它会抛出asyncio.CancelledError。
    • try…except asyncio.CancelledError…finally块确保了即使任务被取消,server.close()和await server.wait_closed()也能被执行,从而正确关闭TCP服务器的套接字。
  3. WebSocketManager

    • 负责管理所有活跃的WebSocket连接。当TCP服务器接收到数据时,它会通过globals.websocket_manager.broadcast()将数据发送给所有连接的WebSocket客户端。
    • 在广播过程中,加入了错误处理,如果向某个WebSocket发送数据失败,会将其从活跃连接中移除,提高健壮性。
  4. 全局变量 (globals.py)

    • 用于在不同模块间共享WebSocketManager实例。
    • 注意,如果需要共享其他可变状态并在多个异步任务中访问,应优先使用asyncio.Lock而非threading.Lock,以避免阻塞事件循环。在本例中,data_storage和data_lock未被实际使用。
  5. 运行应用

    • 使用uvicorn main:app –reload命令即可启动FastAPI应用。Uvicorn会自动管理事件循环,并执行lifespan事件。

6. 总结

通过本文的指导,我们学习了如何利用FastAPI的lifespan事件管理器,在同一个事件循环中有效地运行FastAPI应用和多个异步TCP服务器。关键在于理解yield的语义,并使用asyncio.create_task来调度后台任务,同时实现任务的优雅启动和关闭。这种模式对于构建需要集成多种网络服务类型的复杂异步应用至关重要,它确保了资源的有效利用和应用的健壮性。

上一篇
下一篇
text=ZqhQzanResources