重现步骤
描述中代码会在补充材料中提供
1.要实现的任务是对人体的识别与物体识别同时进行 人体用的是k230的例程代码 物体识别用的是嘉南社区的训练之后提供的代码。
2.通过works中的代码初始化以及创建识别线程分别调用人体识别以及物体识别。
期待结果和实际结果
1.两个线程同时运行不报错为期望效果
2.同时对人体以及物体进行识别。
软硬件版本信息
k230_canmv_01studio with K230 固件0.4
错误日志
尝试解决过程
(比较菜所以根本不会改)因为有多个摄像头尝试更改不同的摄像头使能不起作用
补充材料
#work 线程创建
#AI识别模块的工作文件
import sys #导入线程模块
import time#导入定时器模块
import persondata #人体探测模 #关键物体探测模块 #网络传输模块#设备动作控制模块
import det_video
import _thread
def excute(Mod):
#global Mod
print("开始工作:",Mod)
#初始化参数
persondata.PersonDetection(Mod)
det_video.detection(Mod)
#设置线程,线程1:人体识别;线程2:关键物体识别;线程3:根据结果进行操作
#_thread.start_new_thread(func,("1",)) #开启线程1,参数必须是元组
#_thread.start_new_thread(func,("2",)) #开启线程2,参数必须是元组
while (Mod[2] != 0):
#persondata.run(Mod)
#det_video.func1(Mod)
_thread.start_new_thread(det_video.func1,("1",)) #开启线程1,参数必须是元组
_thread.start_new_thread(persondata.run,("2",)) #开启线程2,参数必须是元组
time.sleep(0.01) #防止CPU满跑
print("退出工作状态!")
if __name__=="__main__":
Mod=[0,0,1,0]
excute(Mod)
# 人体识别 persondata.py
from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
import ujson
from media.media import *
from time import *
import nncase_runtime as nn
import ulab.numpy as np
import time
import utime
import image
import random
import gc
import sys
import aicube
'''
定义全局字典
'''
global_data={}
# 自定义人体检测类
class PersonDetectionApp(AIBase):
def __init__(self,kmodel_path,model_input_size,labels,anchors,confidence_threshold=0.2,nms_threshold=0.5,nms_option=False,strides=[8,16,32],rgb888p_size=[224,224],display_size=[1920,1080],debug_mode=0):
super().__init__(kmodel_path,model_input_size,rgb888p_size,debug_mode)
self.kmodel_path=kmodel_path
# 模型输入分辨率
self.model_input_size=model_input_size
# 标签
self.labels=labels
# 检测anchors设置
self.anchors=anchors
# 特征图降采样倍数
self.strides=strides
# 置信度阈值设置
self.confidence_threshold=confidence_threshold
# nms阈值设置
self.nms_threshold=nms_threshold
self.nms_option=nms_option
# sensor给到AI的图像分辨率
self.rgb888p_size=[ALIGN_UP(rgb888p_size[0],16),rgb888p_size[1]]
# 显示分辨率
self.display_size=[ALIGN_UP(display_size[0],16),display_size[1]]
self.debug_mode=debug_mode
# Ai2d实例,用于实现模型预处理
self.ai2d=Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT,nn.ai2d_format.NCHW_FMT,np.uint8, np.uint8)
# 配置预处理操作,这里使用了pad和resize,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/app/libs/AI2D.py查看
def config_preprocess(self,input_image_size=None):
with ScopedTiming("set preprocess config",self.debug_mode > 0):
# 初始化ai2d预处理配置,默认为sensor给到AI的尺寸,您可以通过设置input_image_size自行修改输入尺寸
ai2d_input_size=input_image_size if input_image_size else self.rgb888p_size
top,bottom,left,right=self.get_padding_param()
self.ai2d.pad([0,0,0,0,top,bottom,left,right], 0, [0,0,0])
self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
self.ai2d.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,self.model_input_size[1],self.model_input_size[0]])
# 自定义当前任务的后处理
def postprocess(self,results):
with ScopedTiming("postprocess",self.debug_mode > 0):
# 这里使用了aicube模型的后处理接口anchorbasedet_post_preocess
dets = aicube.anchorbasedet_post_process(results[0], results[1], results[2], self.model_input_size, self.rgb888p_size, self.strides, len(self.labels), self.confidence_threshold, self.nms_threshold, self.anchors, self.nms_option)
return dets
# 绘制结果
def draw_result(self,pl,dets):
det_data=0.30 # 判断阈值大于则为yes 反之为no
with ScopedTiming("display_draw",self.debug_mode >0):
if dets:
pl.osd_img.clear()
for det_box in dets:
x1, y1, x2, y2 = det_box[2],det_box[3],det_box[4],det_box[5]
w = float(x2 - x1) * self.display_size[0] // self.rgb888p_size[0]
h = float(y2 - y1) * self.display_size[1] // self.rgb888p_size[1]
x1 = int(x1 * self.display_size[0] // self.rgb888p_size[0])
y1 = int(y1 * self.display_size[1] // self.rgb888p_size[1])
x2 = int(x2 * self.display_size[0] // self.rgb888p_size[0])
y2 = int(y2 * self.display_size[1] // self.rgb888p_size[1])
if (h<(0.1*self.display_size[0])):
continue
if (w<(0.25*self.display_size[0]) and ((x1<(0.03*self.display_size[0])) or (x2>(0.97*self.display_size[0])))):
continue
if (w<(0.15*self.display_size[0]) and ((x1<(0.01*self.display_size[0])) or (x2>(0.99*self.display_size[0])))):
continue
pl.osd_img.draw_rectangle(x1 , y1 , int(w) , int(h), color=(255, 0, 255, 0), thickness = 2)
if (str(round(det_box[1],2))>str(round(det_data,2))) :
det_p="yes"
if (str(round(det_box[1],2))<=str(round(det_data,2))) :
det_p="no"
print((x1+int(w))/2,(y1+int(h))/2,"person",str(round(det_box[1],2)),det_p)
x=(x1+int(w))/2
y=(y1+int(h))/2
local=(x,y)
global_data["local"]=local
global_data["person"]=(str(round(det_box[1],2)),det_p)
pl.osd_img.draw_string_advanced( x1 , y1-50,32, " " + self.labels[det_box[0]] + " " + str(round(det_box[1],2)), color=(255,0, 255, 0))
else:
pl.osd_img.clear()
global_data["local"]=(0,0)
global_data["person"]=(0,0)
# 计算padding参数
def get_padding_param(self):
dst_w = self.model_input_size[0]
dst_h = self.model_input_size[1]
input_width = self.rgb888p_size[0]
input_high = self.rgb888p_size[1]
ratio_w = dst_w / input_width
ratio_h = dst_h / input_high
if ratio_w < ratio_h:
ratio = ratio_w
else:
ratio = ratio_h
new_w = (int)(ratio * input_width)
new_h = (int)(ratio * input_high)
dw = (dst_w - new_w) / 2
dh = (dst_h - new_h) / 2
top = int(round(dh - 0.1))
bottom = int(round(dh + 0.1))
left = int(round(dw - 0.1))
right = int(round(dw - 0.1))
return top, bottom, left, right
def PersonDetection(Mod):
global pl
global person_det
# 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
display_mode="lcd"
# k230保持不变,k230d可调整为[640,360]
rgb888p_size = [1920, 1080]
if display_mode=="hdmi":
display_size=[1920,1080]
else:
display_size=[800,480]
# 模型路径
kmodel_path="/sdcard/examples/kmodel/person_detect_yolov5n.kmodel"
# 其它参数设置
confidence_threshold = 0.2
nms_threshold = 0.6
labels = ["person"]
anchors = [10, 13, 16, 30, 33, 23, 30, 61, 62, 45, 59, 119, 116, 90, 156, 198, 373, 326]
# 初始化PipeLine
pl=PipeLine(rgb888p_size=rgb888p_size,display_size=display_size,display_mode=display_mode)
pl.create()
# 初始化自定义人体检测实例
person_det=PersonDetectionApp(kmodel_path,model_input_size=[640,640],labels=labels,anchors=anchors,confidence_threshold=confidence_threshold,nms_threshold=nms_threshold,nms_option=False,strides=[8,16,32],rgb888p_size=rgb888p_size,display_size=display_size,debug_mode=0)
person_det.config_preprocess()
def run(Mod):
global pl
global person_det
while True:
with ScopedTiming("total",0):
# 获取当前帧数据
img=pl.get_frame()
# 推理当前帧
res=person_det.run(img)
# 绘制结果到PipeLine的osd图像
person_det.draw_result(pl,res)
# 显示当前的绘制结果
pl.show_image()
gc.collect()
person_det.deinit()
pl.destroy()
if __name__=="__main__":
PersonDetection(1)
run(1)
#关键物体识别 det_video.py
import os
import ujson
import aicube
from libs.PipeLine import ScopedTiming
from libs.Utils import *
from media.sensor import *
from media.display import *
from media.media import *
import nncase_runtime as nn
import ulab.numpy as np
import image
import gc
global_data={}
display_mode="lcd"
if display_mode=="lcd":
DISPLAY_WIDTH = ALIGN_UP(800, 16)
DISPLAY_HEIGHT = 480
else:
DISPLAY_WIDTH = ALIGN_UP(1920, 16)
DISPLAY_HEIGHT = 1080
OUT_RGB888P_WIDTH = ALIGN_UP(1280, 16)
OUT_RGB888P_HEIGH = 720
root_path="/sdcard/mp_deployment_source/"
config_path=root_path+"deploy_config.json"
deploy_conf={}
debug_mode=1
def two_side_pad_param(input_size,output_size):
ratio_w = output_size[0] / input_size[0] # 宽度缩放比例
ratio_h = output_size[1] / input_size[1] # 高度缩放比例
ratio = min(ratio_w, ratio_h) # 取较小的缩放比例
new_w = int(ratio * input_size[0]) # 新宽度
new_h = int(ratio * input_size[1]) # 新高度
dw = (output_size[0] - new_w) / 2 # 宽度差
dh = (output_size[1] - new_h) / 2 # 高度差
top = int(round(dh - 0.1))
bottom = int(round(dh + 0.1))
left = int(round(dw - 0.1))
right = int(round(dw - 0.1))
return top, bottom, left, right,ratio
def read_deploy_config(config_path):
# 打开JSON文件以进行读取deploy_config
with open(config_path, 'r') as json_file:
try:
# 从文件中加载JSON数据
config = ujson.load(json_file)
except ValueError as e:
print("JSON 解析错误:", e)
return config
def detection(Mod):
global width
global height
global ai2d_builder
global kpu
global model_type
global kmodel_frame_size
global frame_size
global strides, num_classes, confidence_threshold, nms_threshold, anchors, nms_option,osd_img,labels
print("det_infer start")
# 使用json读取内容初始化部署变量
deploy_conf=read_deploy_config(config_path)
kmodel_name=deploy_conf["kmodel_path"]
labels=deploy_conf["categories"]
confidence_threshold= deploy_conf["confidence_threshold"]
nms_threshold = deploy_conf["nms_threshold"]
img_size=deploy_conf["img_size"]
num_classes=deploy_conf["num_classes"]
color_four=get_colors(num_classes)
nms_option = deploy_conf["nms_option"]
model_type = deploy_conf["model_type"]
if model_type == "AnchorBaseDet":
anchors = deploy_conf["anchors"][0] + deploy_conf["anchors"][1] + deploy_conf["anchors"][2]
kmodel_frame_size = img_size
frame_size = [OUT_RGB888P_WIDTH,OUT_RGB888P_HEIGH]
strides = [8,16,32]
# 计算padding值
top, bottom, left, right,ratio=two_side_pad_param(frame_size,kmodel_frame_size)
# 初始化kpu
kpu = nn.kpu()
kpu.load_kmodel(root_path+kmodel_name)
# 初始化ai2d
ai2d = nn.ai2d()
ai2d.set_dtype(nn.ai2d_format.NCHW_FMT,nn.ai2d_format.NCHW_FMT,np.uint8, np.uint8)
ai2d.set_pad_param(True, [0,0,0,0,top,bottom,left,right], 0, [114,114,114])
ai2d.set_resize_param(True, nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel )
ai2d_builder = ai2d.build([1,3,OUT_RGB888P_HEIGH,OUT_RGB888P_WIDTH], [1,3,kmodel_frame_size[1],kmodel_frame_size[0]])
# 初始化并配置sensor
global sensor
sensor = Sensor(id=1,fps=30)
sensor.reset()
# 设置镜像
sensor.set_hmirror(False)
# 设置翻转
sensor.set_vflip(False)
# 通道0直接给到显示VO,格式为YUV420
sensor.set_framesize(width = DISPLAY_WIDTH, height = DISPLAY_HEIGHT)
sensor.set_pixformat(PIXEL_FORMAT_YUV_SEMIPLANAR_420)
# 通道2给到AI做算法处理,格式为RGB888
sensor.set_framesize(width = OUT_RGB888P_WIDTH , height = OUT_RGB888P_HEIGH, chn=CAM_CHN_ID_2)
sensor.set_pixformat(PIXEL_FORMAT_RGB_888_PLANAR, chn=CAM_CHN_ID_2)
# 绑定通道0的输出到vo
sensor_bind_info = sensor.bind_info(x = 0, y = 0, chn = CAM_CHN_ID_0)
Display.bind_layer(**sensor_bind_info, layer = Display.LAYER_VIDEO1)
if display_mode=="lcd":
# 设置为ST7701显示,默认800x480
Display.init(Display.ST7701, to_ide = True)
else:
# 设置为LT9611显示,默认1920x1080
Display.init(Display.LT9611, to_ide = True)
#创建OSD图像
osd_img = image.Image(DISPLAY_WIDTH, DISPLAY_HEIGHT, image.ARGB8888)
# media初始化
MediaManager.init()
# 启动sensor
def func1(Mod):
global sensor
global ai2d_builder
global kpu
global kmodel_frame_size
sensor.run()
rgb888p_img = None
ai2d_input_tensor = None
data = np.ones((1,3,kmodel_frame_size[1],kmodel_frame_size[0]),dtype=np.uint8)
ai2d_output_tensor = nn.from_numpy(data)
while True:
det_data=0.5
with ScopedTiming("total",debug_mode < 0):
rgb888p_img = sensor.snapshot(chn=CAM_CHN_ID_2)
if rgb888p_img.format() == image.RGBP888:
ai2d_input = rgb888p_img.to_numpy_ref()
ai2d_input_tensor = nn.from_numpy(ai2d_input)
# 使用ai2d进行预处理
ai2d_builder.run(ai2d_input_tensor, ai2d_output_tensor)
# 设置模型输入
kpu.set_input_tensor(0, ai2d_output_tensor)
# 模型推理
kpu.run()
# 获取模型输出
results = []
for i in range(kpu.outputs_size()):
out_data = kpu.get_output_tensor(i)
result = out_data.to_numpy()
result = result.reshape((result.shape[0]*result.shape[1]*result.shape[2]*result.shape[3]))
del out_data
results.append(result)
# 使用aicube模块封装的接口进行后处理
det_boxes = aicube.anchorbasedet_post_process( results[0], results[1], results[2], kmodel_frame_size, frame_size, strides, num_classes, confidence_threshold, nms_threshold, anchors, nms_option)
# 绘制结果
osd_img.clear()
if det_boxes:
for det_boxe in det_boxes:
x1, y1, x2, y2 = det_boxe[2],det_boxe[3],det_boxe[4],det_boxe[5]
x=int(x1 * DISPLAY_WIDTH // OUT_RGB888P_WIDTH)
y=int(y1 * DISPLAY_HEIGHT // OUT_RGB888P_HEIGH)
w = int((x2 - x1) * DISPLAY_WIDTH // OUT_RGB888P_WIDTH)
h = int((y2 - y1) * DISPLAY_HEIGHT // OUT_RGB888P_HEIGH)
osd_img.draw_rectangle(x, y, w, h, color=color_four[det_boxe[0]][1:])
text=labels[det_boxe[0]] + " " + str(round(det_boxe[1],2))
label = labels[det_boxe[0]]
score = str(round(det_boxe[1],2))
if (str(round(det_boxe[1],2))>str(round(det_data,2))) :
det_p="yes"
if (str(round(det_boxe[1],2))<=str(round(det_data,2))) :
det_p="no"
print(((int(x1 * DISPLAY_WIDTH // OUT_RGB888P_WIDTH)+int(w)))/2,((int(y1 * DISPLAY_HEIGHT // OUT_RGB888P_HEIGH))+int(h))/2,"gou",score,det_p)
#x=(x1+int(w))/2
#y=(y1+int(h))/2
local=(x,y)
global_data["local"]=local
global_data["gou"]=(str(round(det_boxe[1],2)),det_p)
osd_img.draw_string_advanced(x,y-40,32,text, color=color_four[det_boxe[0]][1:])
Display.show_image(osd_img, 0, 0, Display.LAYER_OSD3)
gc.collect()
rgb888p_img = None
#del ai2d_input_tensor
#del ai2d_output_tensor
##停止摄像头输出
#sensor.stop()
##去初始化显示设备
#Display.deinit()
##释放媒体缓冲区
#MediaManager.deinit()
#gc.collect()
#time.sleep(1)
#nn.shrink_memory_pool()
#print("det_infer end")
#return 0
if __name__=="__main__":
detection(1)
func1(1)