重现步骤
期待结果和实际结果
软硬件版本信息
错误日志
尝试解决过程
补充材料
这是代码块# 整合版程序 - 视频播放与手势音频双模式切换
import os
import ujson
import _thread
import gc
import sys
import time
import uctypes
from media.media import *
from mpp.mp4_format import *
from mpp.mp4_format_struct import *
from media.pyaudio import *
import media.g711 as g711
from mpp.payload_struct import *
import media.vdecoder as vdecoder
from media.display import *
from machine import TOUCH
from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import media.wave as wave
import nncase_runtime as nn
import ulab.numpy as np
import aicube
# 全局状态管理
class SystemState:
def __init__(self):
self.cur_state = 1 # 1:视频模式 2:手势模式
self.video_files = [
"/sdcard/examples/utils/1_1.mp4",
"/sdcard/examples/utils/2_2.mp4"
]
self.audio_gesture_map = {
"fist": "/sdcard/examples/utils/tset.wav",
"five": "/sdcard/examples/utils/123.wav",
"gun": "/sdcard/examples/utils/gun.wav",
"love": "/sdcard/examples/utils/love.wav",
"one": "/sdcard/examples/utils/one.wav",
"six": "/sdcard/examples/utils/six.wav",
"three": "/sdcard/examples/utils/three.wav",
"yeah": "/sdcard/examples/utils/yeah.wav",
"thumbUp": "/sdcard/examples/utils/thumbUp.wav"
}
self.audio_run = True
self.latest_gesture = None
# 资源管理
class ResourceManager:
def __init__(self):
self.display = None
self.vdec = None
self.hkc = None
self.output_stream = None
self.p = PyAudio()
def init_video(self):
"""初始化视频解码资源"""
self.vdec = vdecoder.Decoder(K_PT_H264)
self.vdec.create()
bind_info = self.vdec.bind_info(
width=ALIGN_UP(800, 16),
height=480,
chn=self.vdec.get_vdec_channel()
)
Display.bind_layer(**bind_info, layer=Display.LAYER_VIDEO1)
self.vdec.start()
def init_gesture(self):
"""初始化手势识别资源[2,6](@ref)"""
hand_det_kmodel = "/sdcard/examples/kmodel/hand_det.kmodel"
hand_kp_kmodel = "/sdcard/examples/kmodel/handkp_det.kmodel"
anchors = [26,27,53,52,75,71,80,99,106,82,99,134,140,113,161,172,245,276]
self.hkc = HandKeyPointClass(
hand_det_kmodel, hand_kp_kmodel,
det_input_size=[512,512], kp_input_size=[256,256],
labels=["hand"], anchors=anchors,
confidence_threshold=0.4, nms_threshold=0.5,
rgb888p_size=[1920,1080], display_size=[800,480]
)
self.output_stream = self.p.open(
format=paInt16, channels=2,
rate=16000, output=True,
frames_per_buffer=int(0.3 * 16000)
)
def release_video(self):
"""释放视频资源"""
if self.vdec:
self.vdec.stop()
self.vdec.destroy()
self.vdec = None
gc.collect()
def release_gesture(self):
"""释放手势资源[6,9](@ref)"""
self.audio_run = False
if self.output_stream:
self.output_stream.stop_stream()
self.output_stream.close()
if self.hkc:
self.hkc.hand_det.deinit()
self.hkc.hand_kp.deinit()
self.hkc = None
gc.collect()
# 核心逻辑
class CoreSystem:
def __init__(self):
self.state = SystemState()
self.res_mgr = ResourceManager()
self.tp = TOUCH(0)
self.pl = PipeLine(rgb888p_size=[1920,1080], display_size=[800,480], display_mode="lcd")
def clear_cache(self):
"""多级缓存清理[2,6](@ref)"""
# 强制垃圾回收
gc.collect()
# 清理媒体缓存
MediaManager.deinit()
time.sleep_ms(50)
MediaManager.init()
# 清理临时文件
temp_dirs = ["/tmp/video_cache", "/tmp/audio_cache"]
for d in temp_dirs:
if os.path.exists(d):
os.removedirs(d)
def check_touch(self):
"""触摸检测"""
p = self.tp.read()
return len(p) > 0
def video_play_loop(self):
"""视频播放主循环"""
try:
while self.state.cur_state == 1:
os.exitpoint()
if self.check_touch():
self.switch_mode()
self.demuxer_mp4(self.state.video_files[0], self.res_mgr.vdec)
finally:
self.res_mgr.release_video()
def gesture_audio_loop(self):
"""手势音频主循环[6,9](@ref)"""
self.pl.create()
clock = time.clock()
try:
while self.state.cur_state == 2:
os.exitpoint()
clock.tick()
img = self.pl.get_frame()
det_boxes, gesture_res = self.hkc.run(img)
self.hkc.draw_result(self.pl, det_boxes, gesture_res)
self.audio_output_thread()
self.pl.show_image()
print(f"FPS: {clock.fps()}")
finally:
self.res_mgr.release_gesture()
self.pl.destroy()
def audio_output_thread(self):
"""音频输出线程[8,9](@ref)"""
if self.state.latest_gesture:
wav_path = self.state.audio_gesture_map.get(self.state.latest_gesture)
if wav_path and os.path.exists(wav_path):
wf = wave.open(wav_path, "rb")
chunk = int(0.3 * 16000)
data = wf.read_frames(chunk)
while data:
self.res_mgr.output_stream.write(data)
data = wf.read_frames(chunk)
wf.close()
self.state.latest_gesture = None
def switch_mode(self):
"""模式切换处理[2,6](@ref)"""
if self.state.cur_state == 1:
# 切换到手势模式
self.state.cur_state = 2
self.clear_cache()
self.res_mgr.init_gesture()
_thread.start_new_thread(self.gesture_audio_loop, ())
else:
# 切换回视频模式
self.state.cur_state = 1
self.clear_cache()
self.res_mgr.init_video()
_thread.start_new_thread(self.video_play_loop, ())
def demuxer_mp4(self, filename, vdec):
"""视频解复用器[原程序逻辑]"""
# ... [保持原有demuxer_mp4函数实现不变] ...
mp4_cfg = k_mp4_config_s()
video_info = k_mp4_video_info_s()
video_track = False
audio_info = k_mp4_audio_info_s()
audio_track = False
mp4_handle = k_u64_ptr()
mp4_cfg.config_type = K_MP4_CONFIG_DEMUXER
mp4_cfg.muxer_config.file_name[:] = bytes(filename, 'utf-8')
mp4_cfg.muxer_config.fmp4_flag = 0
ret = kd_mp4_create(mp4_handle, mp4_cfg)
if ret:
raise OSError("kd_mp4_create failed:",filename)
file_info = k_mp4_file_info_s()
kd_mp4_get_file_info(mp4_handle.value, file_info)
#print("=====file_info: track_num:",file_info.track_num,"duration:",file_info.duration)
for i in range(file_info.track_num):
track_info = k_mp4_track_info_s()
ret = kd_mp4_get_track_by_index(mp4_handle.value, i, track_info)
if (ret < 0):
raise ValueError("kd_mp4_get_track_by_index failed")
if (track_info.track_type == K_MP4_STREAM_VIDEO):
if (track_info.video_info.codec_id == K_MP4_CODEC_ID_H264 or track_info.video_info.codec_id == K_MP4_CODEC_ID_H265):
video_track = True
video_info = track_info.video_info
print(" codec_id: ", video_info.codec_id)
print(" track_id: ", video_info.track_id)
print(" width: ", video_info.width)
print(" height: ", video_info.height)
else:
print("video not support codecid:",track_info.video_info.codec_id)
elif (track_info.track_type == K_MP4_STREAM_AUDIO):
if (track_info.audio_info.codec_id == K_MP4_CODEC_ID_G711A or track_info.audio_info.codec_id == K_MP4_CODEC_ID_G711U):
audio_track = True
audio_info = track_info.audio_info
print(" codec_id: ", audio_info.codec_id)
print(" track_id: ", audio_info.track_id)
print(" channels: ", audio_info.channels)
print(" sample_rate: ", audio_info.sample_rate)
print(" bit_per_sample: ", audio_info.bit_per_sample)
#audio_info.channels = 2
else:
print("audio not support codecid:",track_info.audio_info.codec_id)
if (video_track == False):
raise ValueError("video track not found")
class HandKeyPointClass:
"""手势识别类[保持原有类实现不变]"""
# ... [原有HandKeyPointClass完整实现] ...
def __init__(self,hand_det_kmodel,hand_kp_kmodel,det_input_size,kp_input_size,labels,anchors,confidence_threshold=0.25,nms_threshold=0.3,nms_option=False,strides=[8,16,32],rgb888p_size=[1280,720],display_size=[1920,1080],debug_mode=0):
# 手掌检测模型路径
self.hand_det_kmodel=hand_det_kmodel
# 手掌关键点模型路径
self.hand_kp_kmodel=hand_kp_kmodel
# 手掌检测模型输入分辨率
self.det_input_size=det_input_size
# 手掌关键点模型输入分辨率
self.kp_input_size=kp_input_size
self.labels=labels
# anchors
self.anchors=anchors
# 置信度阈值
self.confidence_threshold=confidence_threshold
# nms阈值
self.nms_threshold=nms_threshold
self.nms_option=nms_option
self.strides=strides
# sensor给到AI的图像分辨率,宽16字节对齐
self.rgb888p_size=[ALIGN_UP(rgb888p_size[0],16),rgb888p_size[1]]
# 视频输出VO分辨率,宽16字节对齐
self.display_size=[ALIGN_UP(display_size[0],16),display_size[1]]
# debug_mode模式
self.debug_mode=debug_mode
self.hand_det=HandDetApp(self.hand_det_kmodel,self.labels,model_input_size=self.det_input_size,anchors=self.anchors,confidence_threshold=self.confidence_threshold,nms_threshold=self.nms_threshold,nms_option=self.nms_option,strides=self.strides,rgb888p_size=self.rgb888p_size,display_size=self.display_size,debug_mode=0)
self.hand_kp=HandKPClassApp(self.hand_kp_kmodel,model_input_size=self.kp_input_size,rgb888p_size=self.rgb888p_size,display_size=self.display_size)
self.hand_det.config_preprocess()
# run函数
def run(self,input_np):
# 执行手掌检测
det_boxes=self.hand_det.run(input_np)
boxes=[]
gesture_res=[]
for det_box in det_boxes:
# 对于检测到的每一个手掌执行关键点识别
x1, y1, x2, y2 = det_box[2],det_box[3],det_box[4],det_box[5]
w,h= int(x2 - x1),int(y2 - y1)
if (h<(0.1*self.rgb888p_size[1])):
continue
if (w<(0.25*self.rgb888p_size[0]) and ((x1<(0.03*self.rgb888p_size[0])) or (x2>(0.97*self.rgb888p_size[0])))):
continue
if (w<(0.15*self.rgb888p_size[0]) and ((x1<(0.01*self.rgb888p_size[0])) or (x2>(0.99*self.rgb888p_size[0])))):
continue
self.hand_kp.config_preprocess(det_box)
results_show,gesture=self.hand_kp.run(input_np)
gesture_res.append((results_show,gesture))
boxes.append(det_box)
return boxes,gesture_res
# 绘制效果,绘制关键点、手掌检测框和识别结果
def draw_result(self,pl,dets,gesture_res):
global latest_gesture
pl.osd_img.clear()
if len(dets)>0:
for k in range(len(dets)):
det_box=dets[k]
x1, y1, x2, y2 = det_box[2],det_box[3],det_box[4],det_box[5]
w,h= int(x2 - x1),int(y2 - y1)
if (h<(0.1*self.rgb888p_size[1])):
continue
if (w<(0.25*self.rgb888p_size[0]) and ((x1<(0.03*self.rgb888p_size[0])) or (x2>(0.97*self.rgb888p_size[0])))):
continue
if (w<(0.15*self.rgb888p_size[0]) and ((x1<(0.01*self.rgb888p_size[0])) or (x2>(0.99*self.rgb888p_size[0])))):
continue
w_det = int(float(x2 - x1) * self.display_size[0] // self.rgb888p_size[0])
h_det = int(float(y2 - y1) * self.display_size[1] // self.rgb888p_size[1])
x_det = int(x1*self.display_size[0] // self.rgb888p_size[0])
y_det = int(y1*self.display_size[1] // self.rgb888p_size[1])
pl.osd_img.draw_rectangle(x_det, y_det, w_det, h_det, color=(255, 0, 255, 0), thickness = 2)
results_show=gesture_res[k][0]
for i in range(len(results_show)/2):
pl.osd_img.draw_circle(results_show[i*2], results_show[i*2+1], 1, color=(255, 0, 255, 0),fill=False)
for i in range(5):
j = i*8
if i==0:
R = 255; G = 0; B = 0
if i==1:
R = 255; G = 0; B = 255
if i==2:
R = 255; G = 255; B = 0
if i==3:
R = 0; G = 255; B = 0
if i==4:
R = 0; G = 0; B = 255
pl.osd_img.draw_line(results_show[0], results_show[1], results_show[j+2], results_show[j+3], color=(255,R,G,B), thickness = 3)
pl.osd_img.draw_line(results_show[j+2], results_show[j+3], results_show[j+4], results_show[j+5], color=(255,R,G,B), thickness = 3)
pl.osd_img.draw_line(results_show[j+4], results_show[j+5], results_show[j+6], results_show[j+7], color=(255,R,G,B), thickness = 3)
pl.osd_img.draw_line(results_show[j+6], results_show[j+7], results_show[j+8], results_show[j+9], color=(255,R,G,B), thickness = 3)
gesture_str=gesture_res[k][1]
latest_gesture=gesture_str
pl.osd_img.draw_string_advanced( x_det , y_det-50,32, " " + str(gesture_str), color=(255,0, 255, 0))
if __name__ == "__main__":
os.exitpoint(os.EXITPOINT_ENABLE)
system = CoreSystem()
Display.init(Display.ST7701, to_ide=True)
MediaManager.init()
# 初始视频模式
system.res_mgr.init_video()
_thread.start_new_thread(system.video_play_loop, ())
# 主守护循环
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
system.res_mgr.release_video()
system.res_mgr.release_gesture()
Display.deinit()
MediaManager.deinit()
现在缓存区有问题似乎
MicroPython v1.1 on 2024-09-15; k230_canmv_01studio with K230
buffer pool : 3
input_pool_id:3,input_pool_size:1044480,input_pool_cnt:4
output_pool_id:4,output_pool_size:3133440,output_pool_cnt:6
buffer pool : 1
一直卡在这儿不动