我想继续实现当播放到2_2视频之后再次检测到触屏之后进行手势识别和音频播放,

Viewed 114

重现步骤

期待结果和实际结果

软硬件版本信息

错误日志

尝试解决过程

补充材料

2 Answers
这是代码块# 整合版程序 - 视频播放与手势音频双模式切换
import os
import ujson
import _thread
import gc
import sys
import time
import uctypes
from media.media import *
from mpp.mp4_format import *
from mpp.mp4_format_struct import *
from media.pyaudio import *
import media.g711 as g711
from mpp.payload_struct import *
import media.vdecoder as vdecoder
from media.display import *
from machine import TOUCH
from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import media.wave as wave  
import nncase_runtime as nn
import ulab.numpy as np
import aicube

# 全局状态管理
class SystemState:
    def __init__(self):
        self.cur_state = 1  # 1:视频模式 2:手势模式
        self.video_files = [
            "/sdcard/examples/utils/1_1.mp4",
            "/sdcard/examples/utils/2_2.mp4"
        ]
        self.audio_gesture_map = {
            "fist": "/sdcard/examples/utils/tset.wav",
            "five": "/sdcard/examples/utils/123.wav",
            "gun": "/sdcard/examples/utils/gun.wav",
            "love": "/sdcard/examples/utils/love.wav",
            "one": "/sdcard/examples/utils/one.wav",
            "six": "/sdcard/examples/utils/six.wav",
            "three": "/sdcard/examples/utils/three.wav",
            "yeah": "/sdcard/examples/utils/yeah.wav",
            "thumbUp": "/sdcard/examples/utils/thumbUp.wav"
        }
        self.audio_run = True
        self.latest_gesture = None

# 资源管理
class ResourceManager:
    def __init__(self):
        self.display = None
        self.vdec = None
        self.hkc = None
        self.output_stream = None
        self.p = PyAudio()
        
    def init_video(self):
        """初始化视频解码资源"""
        self.vdec = vdecoder.Decoder(K_PT_H264)
        self.vdec.create()
        bind_info = self.vdec.bind_info(
            width=ALIGN_UP(800, 16),
            height=480,
            chn=self.vdec.get_vdec_channel()
        )
        Display.bind_layer(**bind_info, layer=Display.LAYER_VIDEO1)
        self.vdec.start()
        
    def init_gesture(self):
        """初始化手势识别资源[2,6](@ref)"""
        hand_det_kmodel = "/sdcard/examples/kmodel/hand_det.kmodel"
        hand_kp_kmodel = "/sdcard/examples/kmodel/handkp_det.kmodel"
        anchors = [26,27,53,52,75,71,80,99,106,82,99,134,140,113,161,172,245,276]
        
        self.hkc = HandKeyPointClass(
            hand_det_kmodel, hand_kp_kmodel,
            det_input_size=[512,512], kp_input_size=[256,256],
            labels=["hand"], anchors=anchors,
            confidence_threshold=0.4, nms_threshold=0.5,
            rgb888p_size=[1920,1080], display_size=[800,480]
        )
        self.output_stream = self.p.open(
            format=paInt16, channels=2,
            rate=16000, output=True,
            frames_per_buffer=int(0.3 * 16000)
        )

    def release_video(self):
        """释放视频资源"""
        if self.vdec:
            self.vdec.stop()
            self.vdec.destroy()
            self.vdec = None
            gc.collect()

    def release_gesture(self):
        """释放手势资源[6,9](@ref)"""
        self.audio_run = False
        if self.output_stream:
            self.output_stream.stop_stream()
            self.output_stream.close()
        if self.hkc:
            self.hkc.hand_det.deinit()
            self.hkc.hand_kp.deinit()
            self.hkc = None
            gc.collect()

# 核心逻辑
class CoreSystem:
    def __init__(self):
        self.state = SystemState()
        self.res_mgr = ResourceManager()
        self.tp = TOUCH(0)
        self.pl = PipeLine(rgb888p_size=[1920,1080], display_size=[800,480], display_mode="lcd")
        
    def clear_cache(self):
        """多级缓存清理[2,6](@ref)"""
        # 强制垃圾回收
        gc.collect()
        # 清理媒体缓存
        MediaManager.deinit()
        time.sleep_ms(50)
        MediaManager.init()
        # 清理临时文件
        temp_dirs = ["/tmp/video_cache", "/tmp/audio_cache"]
        for d in temp_dirs:
            if os.path.exists(d):
                os.removedirs(d)

    def check_touch(self):
        """触摸检测"""
        p = self.tp.read()
        return len(p) > 0

    def video_play_loop(self):
        """视频播放主循环"""
        try:
            while self.state.cur_state == 1:
                os.exitpoint()
                if self.check_touch():
                    self.switch_mode()
                self.demuxer_mp4(self.state.video_files[0], self.res_mgr.vdec)
        finally:
            self.res_mgr.release_video()

    def gesture_audio_loop(self):
        """手势音频主循环[6,9](@ref)"""
        self.pl.create()
        clock = time.clock()
        try:
            while self.state.cur_state == 2:
                os.exitpoint()
                clock.tick()
                img = self.pl.get_frame()
                det_boxes, gesture_res = self.hkc.run(img)
                self.hkc.draw_result(self.pl, det_boxes, gesture_res)
                self.audio_output_thread()
                self.pl.show_image()
                print(f"FPS: {clock.fps()}")
        finally:
            self.res_mgr.release_gesture()
            self.pl.destroy()

    def audio_output_thread(self):
        """音频输出线程[8,9](@ref)"""
        if self.state.latest_gesture:
            wav_path = self.state.audio_gesture_map.get(self.state.latest_gesture)
            if wav_path and os.path.exists(wav_path):
                wf = wave.open(wav_path, "rb")
                chunk = int(0.3 * 16000)
                data = wf.read_frames(chunk)
                while data:
                    self.res_mgr.output_stream.write(data)
                    data = wf.read_frames(chunk)
                wf.close()
            self.state.latest_gesture = None

    def switch_mode(self):
        """模式切换处理[2,6](@ref)"""
        if self.state.cur_state == 1:
            # 切换到手势模式
            self.state.cur_state = 2
            self.clear_cache()
            self.res_mgr.init_gesture()
            _thread.start_new_thread(self.gesture_audio_loop, ())
        else:
            # 切换回视频模式
            self.state.cur_state = 1
            self.clear_cache()
            self.res_mgr.init_video()
            _thread.start_new_thread(self.video_play_loop, ())

    def demuxer_mp4(self, filename, vdec):
        """视频解复用器[原程序逻辑]"""
        # ... [保持原有demuxer_mp4函数实现不变] ...
        mp4_cfg = k_mp4_config_s()
        video_info = k_mp4_video_info_s()
        video_track = False
        audio_info = k_mp4_audio_info_s()
        audio_track = False
        mp4_handle = k_u64_ptr()
    
        mp4_cfg.config_type = K_MP4_CONFIG_DEMUXER
        mp4_cfg.muxer_config.file_name[:] = bytes(filename, 'utf-8')
        mp4_cfg.muxer_config.fmp4_flag = 0
    
        ret = kd_mp4_create(mp4_handle, mp4_cfg)
        if ret:
            raise OSError("kd_mp4_create failed:",filename)
    
        file_info = k_mp4_file_info_s()
        kd_mp4_get_file_info(mp4_handle.value, file_info)
        #print("=====file_info: track_num:",file_info.track_num,"duration:",file_info.duration)
    
        for i in range(file_info.track_num):
            track_info = k_mp4_track_info_s()
            ret = kd_mp4_get_track_by_index(mp4_handle.value, i, track_info)
            if (ret < 0):
                raise ValueError("kd_mp4_get_track_by_index failed")
    
            if (track_info.track_type == K_MP4_STREAM_VIDEO):
                if (track_info.video_info.codec_id == K_MP4_CODEC_ID_H264 or track_info.video_info.codec_id == K_MP4_CODEC_ID_H265):
                    video_track = True
                    video_info = track_info.video_info
                    print("    codec_id: ", video_info.codec_id)
                    print("    track_id: ", video_info.track_id)
                    print("    width: ", video_info.width)
                    print("    height: ", video_info.height)
                else:
                    print("video not support codecid:",track_info.video_info.codec_id)
            elif (track_info.track_type == K_MP4_STREAM_AUDIO):
                if (track_info.audio_info.codec_id == K_MP4_CODEC_ID_G711A or track_info.audio_info.codec_id == K_MP4_CODEC_ID_G711U):
                    audio_track = True
                    audio_info = track_info.audio_info
                    print("    codec_id: ", audio_info.codec_id)
                    print("    track_id: ", audio_info.track_id)
                    print("    channels: ", audio_info.channels)
                    print("    sample_rate: ", audio_info.sample_rate)
                    print("    bit_per_sample: ", audio_info.bit_per_sample)
                    #audio_info.channels = 2
                else:
                    print("audio not support codecid:",track_info.audio_info.codec_id)
    
        if (video_track == False):
            raise ValueError("video track not found")
       

class HandKeyPointClass:
    """手势识别类[保持原有类实现不变]"""
    # ... [原有HandKeyPointClass完整实现] ...
    def __init__(self,hand_det_kmodel,hand_kp_kmodel,det_input_size,kp_input_size,labels,anchors,confidence_threshold=0.25,nms_threshold=0.3,nms_option=False,strides=[8,16,32],rgb888p_size=[1280,720],display_size=[1920,1080],debug_mode=0):
        # 手掌检测模型路径
        self.hand_det_kmodel=hand_det_kmodel
        # 手掌关键点模型路径
        self.hand_kp_kmodel=hand_kp_kmodel
        # 手掌检测模型输入分辨率
        self.det_input_size=det_input_size
        # 手掌关键点模型输入分辨率
        self.kp_input_size=kp_input_size
        self.labels=labels
        # anchors
        self.anchors=anchors
        # 置信度阈值
        self.confidence_threshold=confidence_threshold
        # nms阈值
        self.nms_threshold=nms_threshold
        self.nms_option=nms_option
        self.strides=strides
        # sensor给到AI的图像分辨率,宽16字节对齐
        self.rgb888p_size=[ALIGN_UP(rgb888p_size[0],16),rgb888p_size[1]]
        # 视频输出VO分辨率,宽16字节对齐
        self.display_size=[ALIGN_UP(display_size[0],16),display_size[1]]
        # debug_mode模式
        self.debug_mode=debug_mode
        self.hand_det=HandDetApp(self.hand_det_kmodel,self.labels,model_input_size=self.det_input_size,anchors=self.anchors,confidence_threshold=self.confidence_threshold,nms_threshold=self.nms_threshold,nms_option=self.nms_option,strides=self.strides,rgb888p_size=self.rgb888p_size,display_size=self.display_size,debug_mode=0)
        self.hand_kp=HandKPClassApp(self.hand_kp_kmodel,model_input_size=self.kp_input_size,rgb888p_size=self.rgb888p_size,display_size=self.display_size)
        self.hand_det.config_preprocess()

    # run函数
    def run(self,input_np):
        # 执行手掌检测
        det_boxes=self.hand_det.run(input_np)
        boxes=[]
        gesture_res=[]
        for det_box in det_boxes:
            # 对于检测到的每一个手掌执行关键点识别
            x1, y1, x2, y2 = det_box[2],det_box[3],det_box[4],det_box[5]
            w,h= int(x2 - x1),int(y2 - y1)
            if (h<(0.1*self.rgb888p_size[1])):
                continue
            if (w<(0.25*self.rgb888p_size[0]) and ((x1<(0.03*self.rgb888p_size[0])) or (x2>(0.97*self.rgb888p_size[0])))):
                continue
            if (w<(0.15*self.rgb888p_size[0]) and ((x1<(0.01*self.rgb888p_size[0])) or (x2>(0.99*self.rgb888p_size[0])))):
                continue
            self.hand_kp.config_preprocess(det_box)
            results_show,gesture=self.hand_kp.run(input_np)
            gesture_res.append((results_show,gesture))
            boxes.append(det_box)

        return boxes,gesture_res


    # 绘制效果,绘制关键点、手掌检测框和识别结果
    def draw_result(self,pl,dets,gesture_res):
        global latest_gesture
        pl.osd_img.clear()
        if len(dets)>0:
            for k in range(len(dets)):
                det_box=dets[k]
                x1, y1, x2, y2 = det_box[2],det_box[3],det_box[4],det_box[5]
                w,h= int(x2 - x1),int(y2 - y1)
                if (h<(0.1*self.rgb888p_size[1])):
                    continue
                if (w<(0.25*self.rgb888p_size[0]) and ((x1<(0.03*self.rgb888p_size[0])) or (x2>(0.97*self.rgb888p_size[0])))):
                    continue
                if (w<(0.15*self.rgb888p_size[0]) and ((x1<(0.01*self.rgb888p_size[0])) or (x2>(0.99*self.rgb888p_size[0])))):
                    continue
                w_det = int(float(x2 - x1) * self.display_size[0] // self.rgb888p_size[0])
                h_det = int(float(y2 - y1) * self.display_size[1] // self.rgb888p_size[1])
                x_det = int(x1*self.display_size[0] // self.rgb888p_size[0])
                y_det = int(y1*self.display_size[1] // self.rgb888p_size[1])
                pl.osd_img.draw_rectangle(x_det, y_det, w_det, h_det, color=(255, 0, 255, 0), thickness = 2)

                results_show=gesture_res[k][0]
                for i in range(len(results_show)/2):
                    pl.osd_img.draw_circle(results_show[i*2], results_show[i*2+1], 1, color=(255, 0, 255, 0),fill=False)
                for i in range(5):
                    j = i*8
                    if i==0:
                        R = 255; G = 0; B = 0
                    if i==1:
                        R = 255; G = 0; B = 255
                    if i==2:
                        R = 255; G = 255; B = 0
                    if i==3:
                        R = 0; G = 255; B = 0
                    if i==4:
                        R = 0; G = 0; B = 255
                    pl.osd_img.draw_line(results_show[0], results_show[1], results_show[j+2], results_show[j+3], color=(255,R,G,B), thickness = 3)
                    pl.osd_img.draw_line(results_show[j+2], results_show[j+3], results_show[j+4], results_show[j+5], color=(255,R,G,B), thickness = 3)
                    pl.osd_img.draw_line(results_show[j+4], results_show[j+5], results_show[j+6], results_show[j+7], color=(255,R,G,B), thickness = 3)
                    pl.osd_img.draw_line(results_show[j+6], results_show[j+7], results_show[j+8], results_show[j+9], color=(255,R,G,B), thickness = 3)

                gesture_str=gesture_res[k][1]
                latest_gesture=gesture_str
                pl.osd_img.draw_string_advanced( x_det , y_det-50,32, " " + str(gesture_str), color=(255,0, 255, 0))

if __name__ == "__main__":
    os.exitpoint(os.EXITPOINT_ENABLE)
    system = CoreSystem()
    Display.init(Display.ST7701, to_ide=True)
    MediaManager.init()
    
    # 初始视频模式
    system.res_mgr.init_video()
    _thread.start_new_thread(system.video_play_loop, ())
    
    # 主守护循环
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        system.res_mgr.release_video()
        system.res_mgr.release_gesture()
        Display.deinit()
        MediaManager.deinit()

现在缓存区有问题似乎

MicroPython v1.1 on 2024-09-15; k230_canmv_01studio with K230
buffer pool : 3
input_pool_id:3,input_pool_size:1044480,input_pool_cnt:4
output_pool_id:4,output_pool_size:3133440,output_pool_cnt:6
buffer pool : 1
一直卡在这儿不动