期待结果和实际结果
比如说视频播放时,识别到物体暂停播放,处理完后再重新播放
软硬件版本信息
嘉立创庐山派开发板
2025年3月28号的固件(带着LCKFB的固件)
期待结果和实际结果
比如说视频播放时,识别到物体暂停播放,处理完后再重新播放
软硬件版本信息
嘉立创庐山派开发板
2025年3月28号的固件(带着LCKFB的固件)
我只能完成:先播放一次视频,然后持续检测,没有遇到物体退出识别,然后循环播放。
import time, os, sys # 导入时间、操作系统和系统模块
import ujson # 导入微json模块,用于处理JSON数据
import aicube # 导入AI立方体模块,用于AI相关操作
from libs.PipeLine import ScopedTiming # 从管道库导入ScopedTiming,用于计时
from libs.Utils import * # 从工具库导入所有函数
from media.sensor import * # 从媒体传感器库导入所有函数
from media.display import * # 从媒体显示库导入所有函数
from media.media import * # 从媒体库导入所有函数
import nncase_runtime as nn # 导入NNCase运行时库,用于神经网络操作
import ulab.numpy as np # 导入ulab的numpy模块,用于数值计算
import image # 导入图像处理模块
import gc # 导入垃圾收集模块
from machine import UART # 从机器库导入UART,用于串口通信
from machine import FPIOA # 从机器库导入FPIOA,用于引脚配置
from media.player import * # 从媒体播放器库导入所有函数
# 配置引脚
fpioa = FPIOA() # 创建FPIOA对象
fpioa.set_function(3, FPIOA.UART1_TXD) # 设置引脚3为UART1的TXD
fpioa.set_function(4, FPIOA.UART1_RXD) # 设置引脚4为UART1的RXD
# 初始化UART1,波特率9600,8位数据位,无校验,1位停止位
uart = UART(UART.UART1, baudrate=9600, bits=UART.EIGHTBITS, parity=UART.PARITY_NONE, stop=UART.STOPBITS_ONE)
display_mode = "hdmi" # 设置显示模式为HDMI
if display_mode == "lcd": # 如果显示模式是LCD
DISPLAY_WIDTH = ALIGN_UP(800, 16) # 设置显示宽度为800,对齐到16的倍数
DISPLAY_HEIGHT = 480 # 设置显示高度为480
else: # 否则(HDMI模式)
DISPLAY_WIDTH = ALIGN_UP(1920, 16) # 设置显示宽度为1920,对齐到16的倍数
DISPLAY_HEIGHT = 1080 # 设置显示高度为1080
OUT_RGB888P_WIDTH = ALIGN_UP(1280, 16) # 设置输出RGB888P宽度为1280,对齐到16的倍数
OUT_RGB888P_HEIGH = 720 # 设置输出RGB888P高度为720
root_path = "/sdcard/mp_deployment_source/" # 设置根路径
config_path = root_path + "deploy_config.json" # 设置配置文件路径
deploy_conf = {} # 初始化部署配置字典
debug_mode = 1 # 设置调试模式为1
# 定义垃圾分类映射
garbage_class_map = {
"有害垃圾": [1, 2], # 有害垃圾对应的标签
"可回收垃圾": [3, 4, 10], # 可回收垃圾对应的标签
"厨余垃圾": [5, 6, 7], # 厨余垃圾对应的标签
"其他垃圾": [8, 9] # 其他垃圾对应的标签
}
def two_side_pad_param(input_size, output_size): # 定义两侧填充参数函数
ratio_w = output_size[0] / input_size[0] # 计算宽度缩放比例
ratio_h = output_size[1] / input_size[1] # 计算高度缩放比例
ratio = min(ratio_w, ratio_h) # 取较小的缩放比例
new_w = int(ratio * input_size[0]) # 计算新宽度
new_h = int(ratio * input_size[1]) # 计算新高度
dw = (output_size[0] - new_w) / 2 # 计算宽度差
dh = (output_size[1] - new_h) / 2 # 计算高度差
top = int(round(dh - 0.1)) # 计算顶部填充
bottom = int(round(dh + 0.1)) # 计算底部填充
left = int(round(dw - 0.1)) # 计算左侧填充
right = int(round(dw - 0.1)) # 计算右侧填充
return top, bottom, left, right, ratio # 返回填充参数和缩放比例
def read_deploy_config(config_path): # 定义读取部署配置函数
with open(config_path, 'r') as json_file: # 打开配置文件
try:
config = ujson.load(json_file) # 尝试加载JSON数据
except ValueError as e: # 如果发生值错误
print("JSON 解析错误:", e) # 打印错误信息
return config # 返回配置
start_play = False # 初始化播放标志为False
Play_count = False # 初始化播放计数标志为False
def player_event(event, data): # 定义播放事件处理函数
global start_play # 声明全局变量start_play
if event == K_PLAYER_EVENT_EOF: # 如果事件是播放结束
start_play = False # 设置播放标志为False
def play_mp4_test(filename): # 定义播放MP4测试函数
global start_play # 声明全局变量start_play
global Play_count # 声明全局变量Play_count
player = Player(Display.LT9611) # 创建播放器对象
# 单次播放
if Play_count == False: # 如果播放计数标志为False
player.load(filename) # 加载视频文件
player.set_event_callback(player_event) # 设置播放结束事件回调
player.start() # 开始播放
start_play = True # 设置播放标志为True
while start_play: # 等待播放结束
time.sleep(0.1) # 短暂延迟
player.stop() # 停止播放
print("播放结束") # 打印播放结束信息
Play_count = True # 设置播放计数标志为True
return # 返回
# 多次播放
else: # 否则(播放计数标志为True)
while True: # 无限循环播放
print("开始循环播放视频") # 打印开始循环播放信息
player.load(filename) # 加载视频文件
player.set_event_callback(player_event) # 设置播放结束事件回调
player.start() # 开始播放
start_play = True # 设置播放标志为True
while start_play: # 等待播放结束
time.sleep(0.1) # 短暂延迟
player.stop() # 停止播放
time.sleep(0.1) # 短暂延迟,确保播放器状态正确
Play_count = True # 设置播放计数标志为True
def detection(): # 定义检测函数
print("det_infer start") # 打印检测开始信息
deploy_conf = read_deploy_config(config_path) # 读取部署配置
kmodel_name = deploy_conf["kmodel_path"] # 获取kmodel路径
labels = deploy_conf["categories"] # 获取标签
confidence_threshold = deploy_conf["confidence_threshold"] # 获取置信度阈值
nms_threshold = deploy_conf["nms_threshold"] # 获取NMS阈值
img_size = deploy_conf["img_size"] # 获取图像尺寸
num_classes = deploy_conf["num_classes"] # 获取类别数
color_four = get_colors(num_classes) # 获取颜色
nms_option = deploy_conf["nms_option"] # 获取NMS选项
model_type = deploy_conf["model_type"] # 获取模型类型
if model_type == "AnchorBaseDet": # 如果模型类型是AnchorBaseDet
anchors = deploy_conf["anchors"][0] + deploy_conf["anchors"][1] + deploy_conf["anchors"][2] # 获取锚点
kmodel_frame_size = img_size # 设置kmodel帧尺寸
frame_size = [OUT_RGB888P_WIDTH, OUT_RGB888P_HEIGH] # 设置帧尺寸
strides = [8, 16, 32] # 设置步幅
top, bottom, left, right, ratio = two_side_pad_param(frame_size, kmodel_frame_size) # 计算填充参数
kpu = nn.kpu() # 创建kpu对象
kpu.load_kmodel(root_path + kmodel_name) # 加载kmodel
ai2d = nn.ai2d() # 创建ai2d对象
ai2d.set_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8) # 设置数据类型
ai2d.set_pad_param(True, [0, 0, 0, 0, top, bottom, left, right], 0, [114, 114, 114]) # 设置填充参数
ai2d.set_resize_param(True, nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel) # 设置resize参数
ai2d_builder = ai2d.build([1, 3, OUT_RGB888P_HEIGH, OUT_RGB888P_WIDTH], [1, 3, kmodel_frame_size[1], kmodel_frame_size[0]]) # 构建ai2d
sensor = Sensor() # 创建传感器对象
sensor.reset() # 重置传感器
sensor.set_hmirror(False) # 设置水平镜像为False
sensor.set_vflip(False) # 设置垂直翻转为False
sensor.set_framesize(width=DISPLAY_WIDTH, height=DISPLAY_HEIGHT) # 设置帧尺寸
sensor.set_pixformat(PIXEL_FORMAT_YUV_SEMIPLANAR_420) # 设置像素格式
sensor.set_framesize(width=OUT_RGB888P_WIDTH, height=OUT_RGB888P_HEIGH, chn=CAM_CHN_ID_2) # 设置帧尺寸
sensor.set_pixformat(PIXEL_FORMAT_RGB_888_PLANAR, chn=CAM_CHN_ID_2) # 设置像素格式
sensor_bind_info = sensor.bind_info(x=0, y=0, chn=CAM_CHN_ID_0) # 获取传感器绑定信息
Display.bind_layer(**sensor_bind_info, layer=Display.LAYER_VIDEO1) # 绑定显示层
if display_mode == "lcd": # 如果显示模式是LCD
Display.init(Display.ST7701, to_ide=True) # 初始化显示
else: # 否则(HDMI模式)
Display.init(Display.LT9611, to_ide=True) # 初始化显示
osd_img = image.Image(DISPLAY_WIDTH, DISPLAY_HEIGHT, image.ARGB8888) # 创建OSD图像
MediaManager.init() # 初始化媒体管理器
sensor.run() # 运行传感器
rgb888p_img = None # 初始化RGB888P图像为None
ai2d_input_tensor = None # 初始化ai2d输入张量为None
data = np.ones((1, 3, kmodel_frame_size[1], kmodel_frame_size[0]), dtype=np.uint8) # 创建数据数组
ai2d_output_tensor = nn.from_numpy(data) # 创建ai2d输出张量
last_send_time = time.time() # 记录上次发送时间
last_display_time = 0 # 初始化上次显示时间
current_display_class = None # 初始化当前显示类别为None
show_warning = False # 初始化显示警告为False
show_warning_in = False # 初始化内部显示警告为False
warning_start_time = 0 # 初始化警告开始时间为0
# 显示垃圾序号,垃圾类型,垃圾数量,成功与否
amount_of_garbage = 1 # 设置垃圾数量为1
Garbage_Identification_Number = 1 # 设置垃圾识别序号为1
# 反馈的垃圾类别
category_of_feedback_garbage = None # 初始化反馈垃圾类别为None
try:
# 旧时间
Garbage_identification_time_old = time.time() # 记录旧的垃圾识别时间
while True: # 无限循环
os.exitpoint() # 检查退出点
data = uart.read() # 读取串口数据
# 判断是否提醒满载
if data == b'\x01': # 如果数据是0x01
# 通过CanMV IDE K230中的串行终端控制台打印出来
print(data) # 打印数据
show_warning = b'\x01' # 设置显示警告为0x01
elif data == b'\x02': # 如果数据是0x02
# 通过CanMV IDE K230中的串行终端控制台打印出来
print(data) # 打印数据
show_warning = b'\x02' # 设置显示警告为0x02
elif data == b'\x03': # 如果数据是0x03
# 通过CanMV IDE K230中的串行终端控制台打印出来
print(data) # 打印数据
show_warning = b'\x03' # 设置显示警告为0x03
elif data == b'\x04': # 如果数据是0x04
# 通过CanMV IDE K230中的串行终端控制台打印出来
print(data) # 打印数据
show_warning = b'\x04' # 设置显示警告为0x04
elif data == b'\x00': # 如果数据是0x00
show_warning = False # 设置显示警告为False
# 下位机反馈功能
elif data == b'\x05': # 如果数据是0x05
show_warning_in = True # 设置内部显示警告为True
category_of_feedback_garbage = "可回收垃圾" # 设置反馈垃圾类别为可回收垃圾
warning_start_time = time.time() # 记录警告开始时间
# 下位机反馈功能
elif data == b'\x06': # 如果数据是0x06
show_warning_in = True # 设置内部显示警告为True
category_of_feedback_garbage = "有害垃圾" # 设置反馈垃圾类别为有害垃圾
warning_start_time = time.time() # 记录警告开始时间
# 下位机反馈功能
elif data == b'\x07': # 如果数据是0x07
show_warning_in = True # 设置内部显示警告为True
category_of_feedback_garbage = "厨余垃圾" # 设置反馈垃圾类别为厨余垃圾
warning_start_time = time.time() # 记录警告开始时间
# 下位机反馈功能
elif data == b'\x08': # 如果数据是0x08
show_warning_in = True # 设置内部显示警告为True
category_of_feedback_garbage = "其他垃圾" # 设置反馈垃圾类别为其他垃圾
warning_start_time = time.time() # 记录警告开始时间
# 检查是否超过1秒
if show_warning_in and time.time() - warning_start_time > 1: # 如果内部显示警告为True且时间超过1秒
show_warning_in = False # 设置内部显示警告为False
Garbage_Identification_Number += 1 # 增加垃圾识别序号
time.sleep(0.1) # 延时避免占用过多CPU资源
# 新时间,与旧时间相差15秒后跳转到播放视频
Garbage_identification_time_new = time.time() # 记录新的垃圾识别时间
start_time = time.time() # 记录开始时间
with ScopedTiming("total", debug_mode > 0): # 使用ScopedTiming计时
rgb888p_img = sensor.snapshot(chn=CAM_CHN_ID_2) # 获取传感器快照
if rgb888p_img.format() == image.RGBP888: # 如果图像格式是RGBP888
ai2d_input = rgb888p_img.to_numpy_ref() # 转换为numpy引用
ai2d_input_tensor = nn.from_numpy(ai2d_input) # 创建ai2d输入张量
ai2d_builder.run(ai2d_input_tensor, ai2d_output_tensor) # 运行ai2d
kpu.set_input_tensor(0, ai2d_output_tensor) # 设置kpu输入张量
kpu.run() # 运行kpu
results = [] # 初始化结果列表
for i in range(kpu.outputs_size()): # 遍历kpu输出大小
out_data = kpu.get_output_tensor(i) # 获取输出张量
result = out_data.to_numpy() # 转换为numpy数组
result = result.reshape((result.shape[0] * result.shape[1] * result.shape[2] * result.shape[3])) # 重塑结果
del out_data # 删除输出张量
results.append(result) # 添加结果到列表
det_boxes = aicube.anchorbasedet_post_process(results[0], results[1], results[2], kmodel_frame_size, frame_size, strides, num_classes, confidence_threshold, nms_threshold, anchors, nms_option) # 后处理检测框
osd_img.clear() # 清除OSD图像
highest_confidence_label = None # 初始化最高置信度标签为None
if det_boxes: # 如果有检测框
det_boxes.sort(key=lambda x: x[1], reverse=True) # 按置信度排序
det_boxes = det_boxes[:3] # 取前3个检测框
for det_boxe in det_boxes: # 遍历检测框
x1, y1, x2, y2 = det_boxe[2], det_boxe[3], det_boxe[4], det_boxe[5] # 获取检测框坐标
x = int(x1 * DISPLAY_WIDTH // OUT_RGB888P_WIDTH) # 计算显示x坐标
y = int(y1 * DISPLAY_HEIGHT // OUT_RGB888P_HEIGH) # 计算显示y坐标
w = int((x2 - x1) * DISPLAY_WIDTH // OUT_RGB888P_WIDTH) # 计算宽度
h = int((y2 - y1) * DISPLAY_HEIGHT // OUT_RGB888P_HEIGH) # 计算高度
osd_img.draw_rectangle(x, y, w, h, color=color_four[det_boxe[0]][1:]) # 绘制矩形
text = labels[det_boxe[0]] + " " + str(round(det_boxe[1], 2)) # 创建文本
osd_img.draw_string_advanced(x, y - 40, 32, text, color=color_four[det_boxe[0]][1:]) # 绘制文本
Garbage_identification_time_old = time.time() # 更新旧的垃圾识别时间
print("检查点:旧时间成功更新") # 打印检查点信息
highest_confidence_label = labels[det_boxes[0][0]] if det_boxes else None # 获取最高置信度标签
# 新时间与旧时间相差15秒后跳转到播放视频
if Garbage_identification_time_new - Garbage_identification_time_old >= 15: # 如果时间差超过15秒
print("已超时,即将退出") # 打印超时信息
return None # 返回None
current_time = time.time() # 获取当前时间
if current_time - last_send_time >= 1.0: # 如果时间差超过1秒
if highest_confidence_label is not None: # 如果最高置信度标签不为None
uart.write(highest_confidence_label) # 发送标签
print(highest_confidence_label) # 打印标签
for category, label_numbers in garbage_class_map.items(): # 遍历垃圾类别映射
if int(highest_confidence_label) in label_numbers: # 如果标签在类别中
current_display_class = category # 设置当前显示类别
last_display_time = current_time # 更新上次显示时间
break
last_send_time = current_time # 更新上次发送时间
# 绘制垃圾类型
osd_img.draw_rectangle(5, 20, 670, 82, color=(255, 255, 255), thickness=5) # 绘制矩形
if current_display_class and current_time - last_display_time < 1.0: # 如果有当前显示类别且时间差小于1秒
osd_img.draw_string_advanced(20, 25, 62, "检测到:", color=(255, 255, 255)) # 绘制文本
osd_img.draw_string_advanced(240, 25, 62, current_display_class, color=(255, 255, 255)) # 绘制类别
else:
# 如果没有检测到垃圾,显示“检测中”
osd_img.draw_string_advanced(20, 25, 62, "智能检测中...", color=(255, 255, 255)) # 绘制文本
# 绘制警告提醒
osd_img.draw_rectangle(5, 107, 670, 82, color=(255, 255, 255), thickness=5) # 绘制矩形
if show_warning == b'\x01': # 如果显示警告是0x01
osd_img.draw_string_advanced(20, 112, 62, "可回收垃圾,满载警告!", color=(255, 255, 0), scale=2) # 绘制警告文本
elif show_warning == b'\x02': # 如果显示警告是0x02
osd_img.draw_string_advanced(20, 112, 62, "有害垃圾,满载警告!", color=(255, 255, 0), scale=2) # 绘制警告文本
elif show_warning == b'\x03': # 如果显示警告是0x03
osd_img.draw_string_advanced(20, 112, 62, "厨余垃圾,满载警告!", color=(255, 255, 0), scale=2) # 绘制警告文本
elif show_warning == b'\x04': # 如果显示警告是0x04
osd_img.draw_string_advanced(20, 112, 62, "其他垃圾,满载警告!", color=(255, 255, 0), scale=2) # 绘制警告文本
else:
# 如果没有满载,显示“机体状态正常!”
osd_img.draw_string_advanced(20, 112, 62, "机体状态:一切正常!", color=(255, 255, 255)) # 绘制状态文本
# 绘制分类情况
osd_img.draw_rectangle(5, 194, 670, 82, color=(255, 255, 255), thickness=5) # 绘制矩形
osd_img.draw_rectangle(5, 281, 670, 82, color=(255, 255, 255), thickness=5) # 绘制矩形
osd_img.draw_rectangle(5, 368, 670, 82, color=(255, 255, 255), thickness=5) # 绘制矩形
if show_warning_in == True: # 如果内部显示警告为True
osd_img.draw_string_advanced(20, 199, 62, "分类情况:分类成功", color=(255, 255, 255), scale=2) # 绘制分类成功文本
osd_img.draw_string_advanced(20, 373, 62, "序号:", color=(255, 255, 255), scale=2) # 绘制序号文本
osd_img.draw_string_advanced(226, 373, 62, str(Garbage_Identification_Number), color=(255, 255, 255), scale=2) # 绘制序号
osd_img.draw_string_advanced(296, 373, 62, "数量:", color=(255, 255, 255), scale=2) # 绘制数量文本
osd_img.draw_string_advanced(502, 373, 63, str(amount_of_garbage), color=(255, 255, 255), scale=2) # 绘制数量
osd_img.draw_string_advanced(20, 281, 62, "数据情况:", color=(255, 255, 255)) # 绘制数据情况文本
osd_img.draw_string_advanced(315, 281, 62, category_of_feedback_garbage, color=(255, 255, 255), scale=2) # 绘制反馈垃圾类别
else:
# 如果没有反馈,则显示“等待中...”
osd_img.draw_string_advanced(20, 199, 62, "分类情况:等待中...", color=(255, 255, 255)) # 绘制等待文本
osd_img.draw_string_advanced(20, 281, 62, "数据情况:等待中...", color=(255, 255, 255)) # 绘制等待文本
osd_img.draw_string_advanced(20, 373, 62, "数据情况:等待记录...", color=(255, 255, 255)) # 绘制等待记录文本
Display.show_image(osd_img, 0, 0, Display.LAYER_OSD3) # 显示OSD图像
gc.collect() # 收集垃圾
rgb888p_img = None # 设置RGB888P图像为None
elapsed_time = time.time() - start_time # 计算经过时间
if elapsed_time < 0.1: # 如果经过时间小于0.1秒
time.sleep(0.1 - elapsed_time) # 睡眠剩余时间
except KeyboardInterrupt as e: # 捕获键盘中断
print("用户停止: ", e) # 打印用户停止信息
except BaseException as e: # 捕获基础异常
print(f"异常: {e}") # 打印异常信息
finally:
uart.deinit() # 反初始化UART
if isinstance(sensor, Sensor): # 如果传感器是Sensor实例
sensor.stop() # 停止传感器
Display.deinit() # 反初始化显示
os.exitpoint(os.EXITPOINT_ENABLE_SLEEP) # 设置退出点
time.sleep_ms(100) # 睡眠100毫秒
uart.deinit() # 反初始化UART
MediaManager.deinit() # 反初始化媒体管理器
del ai2d_input_tensor # 删除ai2d输入张量
del ai2d_output_tensor # 删除ai2d输出张量
gc.collect() # 收集垃圾
time.sleep(1) # 睡眠1秒
nn.shrink_memory_pool() # 缩小内存池
print("det_infer end") # 打印检测结束信息
return 0 # 返回0
if __name__ == "__main__": # 如果是主模块
play_mp4_test("/sdcard/test/test.mp4") # 播放MP4测试
print("检查点:第一次播放视频结束") # 打印检查点信息
detection() # 运行检测
print("检查点:已退出垃圾检测") # 打印检查点信息
play_mp4_test("/sdcard/test/test.mp4") # 播放MP4测试
以下是player代码
from media.media import *
from mpp.mp4_format import *
from mpp.mp4_format_struct import *
from media.pyaudio import *
import media.g711 as g711
from mpp.payload_struct import *
import media.vdecoder as vdecoder
from media.display import *
import uctypes
import time
import _thread
import os
K_PLAYER_EVENT_EOF = 0
K_PLAYER_EVENT_PROGRESS = 1
DIV = 25
PLAY_START = 0
PLAY_STOP = 1
PLAY_PAUSE = 2
class Player:
def __init__(self,display_type = None,display_to_ide = True):
self.mp4_cfg = k_mp4_config_s()
self.video_info = k_mp4_video_info_s()
self.video_track = False
self.audio_info = k_mp4_audio_info_s()
self.audio_track = False
self.mp4_handle = k_u64_ptr()
self.play_status = PLAY_STOP
self.display_type = display_type
self.display_to_ide = display_to_ide
def _init_media_buffer(self):
if (self.audio_track):
CHUNK = self.audio_info.sample_rate//DIV
self.pyaudio = PyAudio()
self.pyaudio.initialize(CHUNK)
if (self.audio_info.codec_id == K_MP4_CODEC_ID_G711A):
self.adec = g711.Decoder(K_PT_G711A,CHUNK)
elif (self.audio_info.codec_id == K_MP4_CODEC_ID_G711U):
self.adec = g711.Decoder(K_PT_G711U,CHUNK)
if (self.video_track):
if (self.video_info.codec_id == K_MP4_CODEC_ID_H264):
self.vdec = vdecoder.Decoder(K_PT_H264)
elif (self.video_info.codec_id == K_MP4_CODEC_ID_H265):
self.vdec = vdecoder.Decoder(K_PT_H265)
if (not self.audio_track):
self.pyaudio = PyAudio()
self.pyaudio.initialize(48000//25)
if (self.display_type == None):
self.display_type = Display.VIRT
if (self.display_type == Display.VIRT):
Display.init(self.display_type,width = self.video_info.width, height = self.video_info.height)
else:
Display.init(self.display_type,to_ide = self.display_to_ide)
MediaManager.init() #vb buffer初始化
if (self.video_track):
self.vdec.create()
bind_info = self.vdec.bind_info(width=self.video_info.width,height=self.video_info.height,chn=self.vdec.get_vdec_channel())
Display.bind_layer(**bind_info, layer = Display.LAYER_VIDEO1)
if (self.audio_track):
self.adec.create()
def _deinit_media_buffer(self):
if (self.video_track):
self.vdec.destroy()
if (self.audio_track):
self.pyaudio.terminate()
self.adec.destroy()
else:
self.pyaudio.terminate()
if (self.video_track):
os.exitpoint(os.EXITPOINT_ENABLE_SLEEP)
time.sleep(1)
os.exitpoint(os.EXITPOINT_ENABLE)
Display.deinit()
self.video_track = False
self.audio_track = False
MediaManager.deinit() #释放vb buffer
def _do_file_data(self):
frame_data = k_mp4_frame_data_s()
# 记录初始系统时间
start_system_time = time.ticks_ms()
# 记录初始视频时间戳
start_video_timestamp = 0
while(self.play_status == PLAY_START or self.play_status == PLAY_PAUSE):
if (self.play_status == PLAY_PAUSE):
time.sleep(0.1)
else:
ret = kd_mp4_get_frame(self.mp4_handle.value, frame_data)
if (ret < 0):
raise OSError("get frame data failed")
if (frame_data.eof):
break
if (frame_data.codec_id == K_MP4_CODEC_ID_H264 or frame_data.codec_id == K_MP4_CODEC_ID_H265):
# control video play speed
if (not self.audio_track):
# 计算视频时间戳经历的时长
video_timestamp_elapsed = frame_data.time_stamp - start_video_timestamp
# 计算系统时间戳经历的时长
current_system_time = time.ticks_ms()
system_time_elapsed = current_system_time - start_system_time
# 如果系统时间戳经历的时长小于视频时间戳经历的时长,进行延时
if system_time_elapsed < video_timestamp_elapsed:
time.sleep_ms(video_timestamp_elapsed - system_time_elapsed)
data = uctypes.bytes_at(frame_data.data,frame_data.data_length)
self.vdec.decode(data)
elif(frame_data.codec_id == K_MP4_CODEC_ID_G711A or frame_data.codec_id == K_MP4_CODEC_ID_G711U):
data = uctypes.bytes_at(frame_data.data,frame_data.data_length)
self.audio_out_stream.write(self.adec.decode(data))
self.callback(K_PLAYER_EVENT_EOF,0)
def debug_codec_info(self):
if (self.video_track):
if (self.video_info.codec_id == K_MP4_CODEC_ID_H264):
print("video track h264")
elif (self.video_info.codec_id == K_MP4_CODEC_ID_H265):
print("video track h265")
if (self.audio_track):
if (self.audio_info.codec_id == K_MP4_CODEC_ID_G711A):
print("audio track g711a")
elif (self.audio_info.codec_id == K_MP4_CODEC_ID_G711U):
print("audio track g711u")
def load(self,filename):
self.mp4_cfg.config_type = K_MP4_CONFIG_DEMUXER
self.mp4_cfg.muxer_config.file_name[:] = bytes(filename, 'utf-8')
self.mp4_cfg.muxer_config.fmp4_flag = 0
ret = kd_mp4_create(self.mp4_handle, self.mp4_cfg)
if ret:
raise OSError("kd_mp4_create failed:",filename)
file_info = k_mp4_file_info_s()
kd_mp4_get_file_info(self.mp4_handle.value, file_info)
#print("=====file_info: track_num:",file_info.track_num,"duration:",file_info.duration)
for i in range(file_info.track_num):
track_info = k_mp4_track_info_s()
ret = kd_mp4_get_track_by_index(self.mp4_handle.value, i, track_info)
if (ret < 0):
raise ValueError("kd_mp4_get_track_by_index failed")
if (track_info.track_type == K_MP4_STREAM_VIDEO):
if (track_info.video_info.codec_id == K_MP4_CODEC_ID_H264 or track_info.video_info.codec_id == K_MP4_CODEC_ID_H265):
self.video_track = True
self.video_info = track_info.video_info
print(" codec_id: ", self.video_info.codec_id)
print(" track_id: ", self.video_info.track_id)
print(" width: ", self.video_info.width)
print(" height: ", self.video_info.height)
else:
print("video not support codecid:",track_info.video_info.codec_id)
elif (track_info.track_type == K_MP4_STREAM_AUDIO):
if (track_info.audio_info.codec_id == K_MP4_CODEC_ID_G711A or track_info.audio_info.codec_id == K_MP4_CODEC_ID_G711U):
self.audio_track = True
self.audio_info = track_info.audio_info
print(" codec_id: ", self.audio_info.codec_id)
print(" track_id: ", self.audio_info.track_id)
print(" channels: ", self.audio_info.channels)
print(" sample_rate: ", self.audio_info.sample_rate)
print(" bit_per_sample: ", self.audio_info.bit_per_sample)
#self.audio_info.channels = 2
else:
print("audio not support codecid:",track_info.audio_info.codec_id)
self.debug_codec_info()
if (self.video_track == True and self.display_type == Display.ST7701):
if (self.video_info.width > 800 or self.video_info.height > 480):
raise ValueError("Display.ST7701 max support 800x480")
else:
self.video_info.width = ALIGN_UP(self.video_info.width, 16)
def start(self):
self._init_media_buffer()
if (self.video_track):
self.vdec.start()
if (self.audio_track):
self.audio_out_stream = self.pyaudio.open(format=paInt16,
channels=self.audio_info.channels,
rate=self.audio_info.sample_rate,
output=True,
frames_per_buffer=self.audio_info.sample_rate//DIV)
self.play_status = PLAY_START
#self._do_file_data()
_thread.start_new_thread(self._do_file_data,())
def stop(self):
Display.unbind_layer(layer = Display.LAYER_VIDEO1)
self.play_status = PLAY_STOP
if (self.video_track):
self.vdec.stop()
ret = kd_mp4_destroy(self.mp4_handle.value)
if (ret < 0):
raise OSError("destroy mp4 failed.")
if (self.audio_track):
self.audio_out_stream.stop_stream()
self.audio_out_stream.close()
self._deinit_media_buffer()
def pause(self):
self.play_status = PLAY_PAUSE
def resume(self):
self.play_status = PLAY_START
def set_event_callback(self,callback):
self.callback = callback
def destroy_mp4(self):
ret = kd_mp4_destroy(self.mp4_handle.value)
if (ret < 0):
raise OSError("destroy mp4 failed.")