如何在 Shiny 应用中处理长时间运行任务并保持 UI 响应性

如何在 Shiny 应用中处理长时间运行任务并保持 UI 响应性

在 Shiny for Python 应用中,长时间运行的任务(如循环发送串口数据)会阻塞主事件循环,导致用户界面失去响应,无法及时处理其他输入(如停止按钮)。本文将详细介绍如何利用 Python 的 threading 模块和 threading.Event 对象,将耗时操作放到独立的线程中执行,从而确保 Shiny 应用的核心响应性,使用户能够随时中断正在进行的任务。

1. 问题背景:阻塞式操作与 Shiny 应用的响应性

在开发基于 shiny for python 的交互式应用时,我们经常需要处理一些耗时的操作,例如通过串口发送一系列指令来控制外部设备。如果这些操作直接放在 @reactive.effect 或 @reactive.event 装饰器修饰的函数内部,并且包含了阻塞式的循环或长时间的延迟(如 time.sleep() 或忙等待 while 循环),就会导致整个 shiny 应用的用户界面(ui)失去响应。

考虑一个控制流体泵的场景:用户点击“启动”按钮(p1),应用开始按照预设的流量曲线循环发送串口指令。如果用户希望在传输过程中随时点击“停止”按钮(p2)来中断传输,那么一个阻塞式的启动逻辑将无法满足需求。原始实现中,p1 按钮对应的 _ 函数内部包含一个 while 循环,每次发送指令后都会等待两秒。这意味着在循环完成之前,p2 按钮的点击事件将无法被 Shiny 应用的主事件循环及时捕获和处理,导致停止指令被排队,直到当前传输循环结束后才能执行。

原始的阻塞式代码示例(存在响应性问题):

import time import serial from shiny import reactive  # 假设 ser 已经初始化为串口对象 ser = serial.Serial("COM6", 115200)  @reactive.Effect @reactive.event(input.p1) def _():     y = yg.get() # 从 reactive value yg 获取电压数组      for e in y: # 遍历数组         msg = "1:1:"+str(e)+":100" # 格式化驱动电压消息         ser.write(bytes(msg,'utf-8')) # 发送消息         t0 = time.time() # 记录时间戳          while(((time.time()-t0)<=2)): # 忙等待,直到2秒后             pass     ser.write(bytes("0:1",'utf-8')) # 传输结束后停止泵  @reactive.Effect @reactive.event(input.p2) def _():     #print("1:0")     ser.write(bytes("0:1",'utf-8')) # 停止泵

问题分析: 上述 input.p1 对应的 _ 函数内部的 for 循环和 while 忙等待是导致问题的根源。在 Shiny 应用中,所有 reactive.Effect 和 reactive.event 装饰器修饰的函数都在同一个主线程中执行。当一个函数长时间运行时,它会独占主线程,阻止其他事件(如 input.p2 的点击)被处理,从而导致 UI 卡顿和失去响应。

2. 解决方案:利用多线程实现非阻塞操作

为了解决主线程阻塞问题,我们可以将耗时操作从主线程中剥离,放到一个独立的后台线程中执行。Python 的 threading 模块提供了实现这一目标的工具,特别是 threading.Thread 用于创建新线程,以及 threading.Event 用于线程间的信号通信。

核心思路:

  1. 创建一个独立的函数,包含需要长时间运行的逻辑(如串口数据传输循环)。
  2. 使用 threading.Thread 将这个函数包装成一个新线程。
  3. 利用 threading.Event 对象作为信号量,实现主线程与子线程之间的通信。主线程可以在需要停止任务时设置 Event,子线程则周期性检查 Event 的状态以决定是否继续执行。

改进后的代码实现:

如何在 Shiny 应用中处理长时间运行任务并保持 UI 响应性

挖错网

一款支持文本、图片、视频纠错和AIGC检测的内容审核校对平台。

如何在 Shiny 应用中处理长时间运行任务并保持 UI 响应性29

查看详情 如何在 Shiny 应用中处理长时间运行任务并保持 UI 响应性

import serial import time import numpy as np import threading as th from shiny import App, ui, reactive  # 假设 ser 已经初始化 ser = serial.Serial("COM6", 115200)  # 定义一个全局的 Event 对象,用于线程间通信 sflag = th.Event()  # 辅助函数:发送串口消息 def transmit(e):     """     根据给定的电压值 e 格式化消息并发送到串口。     """     msg = "1:1:"+str(e)+":100"     # print(msg) # 调试用     ser.write(bytes(msg,'utf-8'))  # 后台线程执行的函数:定时发送数据 def rtimer(y, sflag):     """     在独立线程中执行的函数,循环遍历数组 y 并发送数据。     每隔2秒发送一次,直到数组遍历完毕或 sflag 被设置。     """     i = 0     while i < np.size(y) and not sflag.is_set():         transmit(y[i])         i += 1         time.sleep(2) # 使用 time.sleep() 在子线程中安全等待      # 循环结束后,如果不是因为 sflag 停止,则发送停止指令     # 但由于 p2 也会发送停止指令,此处可以根据实际需求调整     if not sflag.is_set(): # 如果是正常完成,而不是被中断         ser.write(bytes("0:1",'utf-8')) # 停止泵  # p1 按钮的响应函数:启动传输线程 @reactive.Effect() @reactive.event(input.p1) def start_pump_transmission():     """     处理 p1 按钮点击事件,启动数据传输线程。     """     y = yg.get() # 从 reactive value yg 获取数据     sflag.clear() # 启动前清除停止信号,确保线程可以运行     # 创建并启动新线程     timer_thread = th.Thread(target=rtimer, args=[y, sflag])     timer_thread.start()  # p2 按钮的响应函数:停止传输 @reactive.Effect() @reactive.event(input.p2) def stop_pump_transmission():     """     处理 p2 按钮点击事件,设置停止信号并立即发送停止指令。     """     sflag.set() # 设置停止信号,通知后台线程停止     ser.write(bytes("1:0",'utf-8')) # 立即发送停止泵的指令

代码解释:

  1. sflag = th.Event(): 创建一个 Event 对象,它包含一个内部标志,默认是 False。
    • sflag.clear(): 将内部标志设置为 False。
    • sflag.set(): 将内部标志设置为 True。
    • sflag.is_set(): 检查内部标志是否为 True。
  2. transmit(e) 函数: 这是一个简单的辅助函数,用于格式化并发送串口消息。它与主线程或子线程的执行逻辑无关,因此可以被任一线程调用。
  3. rtimer(y, sflag) 函数: 这是在独立线程中执行的核心逻辑。
    • 它接收数据数组 y 和 sflag 作为参数。
    • while i < np.size(y) and not sflag.is_set()::循环条件不仅检查是否遍历完数组,还检查 sflag 是否被设置。如果 sflag 被设置(即 sflag.is_set() 为 True),循环将立即终止。
    • time.sleep(2): 在子线程中使用 time.sleep() 是安全的,因为它只会阻塞当前子线程,而不会阻塞主线程和 UI。
  4. start_pump_transmission() (@reactive.event(input.p1)):
    • 在启动新任务之前,调用 sflag.clear() 确保停止信号被清除,以便新线程能够正常运行。
    • th.Thread(target=rtimer, args=[y, sflag]):创建一个新的线程实例,指定其目标函数为 rtimer,并将 y 和 sflag 作为参数传递给它。
    • timer_thread.start():启动新线程。此时,rtimer 函数将在一个独立的后台线程中运行,而主线程则继续处理 Shiny 应用的 UI 事件。
  5. stop_pump_transmission() (@reactive.event(input.p2)):
    • sflag.set():当用户点击“停止”按钮时,主线程会立即执行此操作,设置 sflag 的内部标志为 True。
    • 后台线程在下一次循环迭代时检查 sflag.is_set() 会发现标志已设置,从而跳出循环,实现任务的平滑终止。
    • ser.write(bytes(“1:0”,’utf-8′)):同时,主线程可以立即发送停止泵的串口指令,确保物理设备能尽快停止。

3. 优点与注意事项

优点:

  • 保持 UI 响应性: 长时间运行的任务被移至后台线程,主线程不再被阻塞,Shiny 应用的 UI 保持流畅和响应。
  • 即时中断: 用户可以随时点击“停止”按钮,后台任务会迅速响应停止信号并终止。
  • 清晰的任务控制: threading.Event 提供了一种简单有效的线程间通信机制,用于控制后台任务的生命周期。

注意事项:

  • 线程安全: 当多个线程访问和修改共享资源(如全局变量、数据库连接、串口对象)时,需要特别注意线程安全。在本例中,ser 对象在主线程和子线程中都被访问,但由于 transmit 函数和 stop_pump_transmission 函数是串行地对 ser 进行写操作(通常 ser.write 是原子操作或底层有锁),且 sflag 专门用于协调,因此风险较低。但在更复杂的场景中,可能需要使用 threading.Lock 来保护共享资源。
  • 错误处理: 在后台线程中发生的异常不会自动传播到主线程。应在 rtimer 函数内部添加适当的 try-except 块来捕获和处理潜在的错误。
  • 资源清理: 确保在应用关闭或任务结束后,正确关闭串口等资源。
  • 替代方案:asyncio: 对于 I/O 密集型任务(如串口通信、网络请求),Python 的 asyncio 模块通常是比 threading 更现代、更高效的解决方案。然而,asyncio 需要整个应用架构都支持异步,如果现有代码是同步阻塞式的,使用 threading 可能是更直接的“打补丁”方式。Shiny for Python 本身是基于 asyncio 构建的,因此将同步阻塞任务放入线程是避免阻塞其事件循环的有效方法。

4. 总结

在 Shiny for Python 应用中,处理耗时或阻塞式操作的关键在于将其从主事件循环中分离。通过利用 Python 的 threading 模块,我们可以将这些任务放到独立的后台线程中执行,并使用 threading.Event 等机制进行线程间的有效通信,从而实现非阻塞的 UI 体验和对任务的精确控制。这种方法不仅解决了 UI 响应性问题,也使得应用能够更好地处理复杂的实时交互场景,如本例中对流体泵的即时启停控制。

react python app 工具 点击事件 有锁 Python 架构 for while try 全局变量 循环 Event 线程 多线程 主线程 Thread 并发 对象 事件 异步 input th 数据库 ui

上一篇
下一篇