K230在进行人脸识别时如何进行RTSP推流

Viewed 121

重现步骤

期待结果和实际结果

软硬件版本信息

错误日志

尝试解决过程

补充材料

2 Answers

请参考如下的例子:

from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
from media.vencoder import *
from media.sensor import *
from media.display import *
from media.media import *
import time, os,utime
import _thread
import multimedia as mm
import nncase_runtime as nn
import ulab.numpy as np
import aidemo
import aicube
import gc

def rgb_to_yuv(rgb):
    """
    将单个RGB颜色值转换为YUV颜色值。

    参数:
    r, g, b: int
        输入的RGB颜色值,范围为[0, 255]。

    返回:
    tuple
        转换后的YUV颜色值,范围为:
        Y [0, 255], U [-128, 127], V [-128, 127]。
    """
    # 转换RGB值为[0, 1]范围
    r = rgb[0] / 255.0
    g = rgb[1] / 255.0
    b = rgb[2] / 255.0

    # 应用转换公式
    y = 0.299 * r + 0.587 * g + 0.114 * b
    u = -0.14713 * r - 0.28886 * g + 0.436 * b
    v = 0.615 * r - 0.51499 * g - 0.10001 * b

    # 将YUV转换为常用范围
    y = round(y * 255)
    u = round(u * 255)
    v = round(v * 255)

    return (y, u, v)


# 自定义人脸检测类,继承自AIBase基类
class FaceDetectionApp(AIBase):
    def __init__(self, kmodel_path, model_input_size, anchors, confidence_threshold=0.5, nms_threshold=0.2, rgb888p_size=[224,224], display_size=[1920,1080], debug_mode=0):
        super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode)  # 调用基类的构造函数
        self.kmodel_path = kmodel_path  # 模型文件路径
        self.model_input_size = model_input_size  # 模型输入分辨率
        self.confidence_threshold = confidence_threshold  # 置信度阈值
        self.nms_threshold = nms_threshold  # NMS(非极大值抑制)阈值
        self.anchors = anchors  # 锚点数据,用于目标检测
        self.rgb888p_size = [ALIGN_UP(rgb888p_size[0], 16), rgb888p_size[1]]  # sensor给到AI的图像分辨率,并对宽度进行16的对齐
        self.display_size = [ALIGN_UP(display_size[0], 16), display_size[1]]  # 显示分辨率,并对宽度进行16的对齐
        self.debug_mode = debug_mode  # 是否开启调试模式
        self.ai2d = Ai2d(debug_mode)  # 实例化Ai2d,用于实现模型预处理
        self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)  # 设置Ai2d的输入输出格式和类型

    # 配置预处理操作,这里使用了pad和resize,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/app/libs/AI2D.py查看
    def config_preprocess(self, input_image_size=None):
        with ScopedTiming("set preprocess config", self.debug_mode > 0):  # 计时器,如果debug_mode大于0则开启
            ai2d_input_size = input_image_size if input_image_size else self.rgb888p_size  # 初始化ai2d预处理配置,默认为sensor给到AI的尺寸,可以通过设置input_image_size自行修改输入尺寸
            top, bottom, left, right = self.get_padding_param()  # 获取padding参数
            self.ai2d.pad([0, 0, 0, 0, top, bottom, left, right], 0, [104, 117, 123])  # 填充边缘
            self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)  # 缩放图像
            self.ai2d.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,self.model_input_size[1],self.model_input_size[0]])  # 构建预处理流程

    # 自定义当前任务的后处理,results是模型输出array列表,这里使用了aidemo库的face_det_post_process接口
    def postprocess(self, results):
        with ScopedTiming("postprocess", self.debug_mode > 0):
            post_ret = aidemo.face_det_post_process(self.confidence_threshold, self.nms_threshold, self.model_input_size[1], self.anchors, self.rgb888p_size, results)
            if len(post_ret) == 0:
                return post_ret
            else:
                return post_ret[0]

    # 绘制检测结果到画面上
    def draw_result(self, osd_img, dets):
        with ScopedTiming("display_draw", self.debug_mode > 0):
            if dets:
                osd_img.clear()  # 清除OSD图像
                for det in dets:
                    # 将检测框的坐标转换为显示分辨率下的坐标
                    x, y, w, h = map(lambda x: int(round(x, 0)), det[:4])
                    x = x * self.display_size[0] // self.rgb888p_size[0]
                    y = y * self.display_size[1] // self.rgb888p_size[1]
                    w = w * self.display_size[0] // self.rgb888p_size[0]
                    h = h * self.display_size[1] // self.rgb888p_size[1]
                    osd_img.draw_rectangle(x, y, w, h, color=(255, 255, 0, 255), thickness=2)  # 绘制矩形框
            else:
                osd_img.clear()

    # 获取padding参数
    def get_padding_param(self):
        dst_w = self.model_input_size[0]  # 模型输入宽度
        dst_h = self.model_input_size[1]  # 模型输入高度
        ratio_w = dst_w / self.rgb888p_size[0]  # 宽度缩放比例
        ratio_h = dst_h / self.rgb888p_size[1]  # 高度缩放比例
        ratio = min(ratio_w, ratio_h)  # 取较小的缩放比例
        new_w = int(ratio * self.rgb888p_size[0])  # 新宽度
        new_h = int(ratio * self.rgb888p_size[1])  # 新高度
        dw = (dst_w - new_w) / 2  # 宽度差
        dh = (dst_h - new_h) / 2  # 高度差
        top = int(round(0))
        bottom = int(round(dh * 2 + 0.1))
        left = int(round(0))
        right = int(round(dw * 2 - 0.1))
        return top, bottom, left, right

class RtspServer:
    def __init__(self,session_name="test",port=8554,video_type = mm.multi_media_type.media_h264,enable_audio=False,width=1280,height=720):
        self.session_name = session_name
        self.video_type = video_type
        self.enable_audio = enable_audio
        self.port = port
        self.rtspserver = mm.rtsp_server()
        self.venc_chn = VENC_CHN_ID_0
        self.start_stream = False
        self.width=ALIGN_UP(width, 16)
        self.height=height
        self.encoder = Encoder()
        self.encoder.SetOutBufs(self.venc_chn, 15, self.width, self.height)

    def start(self):
        chnAttr = ChnAttrStr(self.encoder.PAYLOAD_TYPE_H264, self.encoder.H264_PROFILE_MAIN, self.width, self.height)
        self.encoder.Create(self.venc_chn, chnAttr)
        self.rtspserver.rtspserver_init(self.port)
        self.rtspserver.rtspserver_createsession(self.session_name,self.video_type,self.enable_audio)
        self.rtspserver.rtspserver_start()
        self.encoder.Start(self.venc_chn)
        self.start_stream = True

    def stop(self):
        self.start_stream = False
        self.encoder.Stop(self.venc_chn)
        self.encoder.Destroy(self.venc_chn)
        self.rtspserver.rtspserver_stop()
        self.rtspserver.rtspserver_deinit()

    def get_rtsp_url(self):
        return self.rtspserver.rtspserver_getrtspurl(self.session_name)

    def _send_rtsp_img(self,rtsp_img):
        frame_info = k_video_frame_info()
        frame_info.v_frame.width = rtsp_img.width()
        frame_info.v_frame.height = rtsp_img.height()
        frame_info.v_frame.pixel_format = Sensor.YUV420SP
        frame_info.pool_id = rtsp_img.poolid()
        frame_info.v_frame.phys_addr[0] = rtsp_img.phyaddr()
        frame_info.v_frame.phys_addr[1] = frame_info.v_frame.phys_addr[0] + frame_info.v_frame.width*frame_info.v_frame.height

        #encode frame
        self.encoder.SendFrame(self.venc_chn,frame_info)
        streamData = StreamData()
        self.encoder.GetStream(self.venc_chn, streamData) # 获取一帧码流

        self.rtspserver.rtspserver_sendvideodata_byphyaddr(self.session_name,streamData.phy_addr[0], streamData.data_size[0],1000)
        self.encoder.ReleaseStream(self.venc_chn, streamData) # 释放一帧码流


def ai_multi_camera_test():
    display_size=[1920,1080]
    # 设置模型路径和其他参数
    face_det_kmodel_path = "/sdcard/examples/kmodel/face_detection_320.kmodel"
    # 其它参数
    face_det_confidence_threshold = 0.5
    face_det_nms_threshold = 0.2
    face_det_anchor_len = 4200
    face_det_det_dim = 4
    face_det_anchors_path = "/sdcard/examples/utils/prior_data_320.bin"
    face_det_anchors = np.fromfile(face_det_anchors_path, dtype=np.float)
    face_det_anchors = face_det_anchors.reshape((face_det_anchor_len, face_det_det_dim))
    face_det_rgb888p_size = [1280, 720]
    # 初始化自定义人脸检测实例
    face_det = FaceDetectionApp(face_det_kmodel_path, model_input_size=[320, 320], anchors=face_det_anchors, confidence_threshold=face_det_confidence_threshold, nms_threshold=face_det_nms_threshold, rgb888p_size=face_det_rgb888p_size, display_size=display_size, debug_mode=0)

    rtsp_size=[1280,720]
    rtspserver = RtspServer(session_name="test",port=8554,enable_audio=False,width=rtsp_size[0],height=rtsp_size[1]) #创建rtsp server对象


    sensor = Sensor()
    sensor.reset()
    #sensor0 chnannel 0
    sensor.set_framesize(width = display_size[0], height = display_size[1],chn=CAM_CHN_ID_0)
    sensor.set_pixformat(Sensor.YUV420SP,chn=CAM_CHN_ID_0)

    #sensor chnannel 2
    sensor.set_framesize(width = rtsp_size[0], height = rtsp_size[1],chn=CAM_CHN_ID_2)
    sensor.set_pixformat(Sensor.YUV420SP, chn=CAM_CHN_ID_2)

    sensor.set_framesize(width = face_det_rgb888p_size[0] , height = face_det_rgb888p_size[1],chn=CAM_CHN_ID_1)
    sensor.set_pixformat(Sensor.RGBP888, chn=CAM_CHN_ID_1)

    sensor_bind_info = sensor.bind_info(x = 0, y = 0, chn = CAM_CHN_ID_0)
    Display.bind_layer(**sensor_bind_info, layer = Display.LAYER_VIDEO1)


    osd_img = image.Image(display_size[0], display_size[1], image.ARGB8888)
    # use hdmi as display output
    Display.init(Display.LT9611, to_ide = True)

    MediaManager.init()
    # sensor start run
    sensor.run()

    face_det.config_preprocess()  # 配置预处理
    rtspserver.start() #启动rtsp server
    print("rtsp server start:",rtspserver.get_rtsp_url()) #打印rtsp server start

    rtsp_img = None
    ai_img = None
    clock = time.clock()
    color_argb=(255,255,0,255)
    color_yuv=rgb_to_yuv((color_argb[1],color_argb[2],color_argb[3]))
    try:
        while 1:
            os.exitpoint()
            clock.tick()

            rtsp_img = sensor.snapshot(chn=CAM_CHN_ID_2)
            if (rtsp_img == -1):
                continue

            ai_img = sensor.snapshot(chn=CAM_CHN_ID_1)
            if (ai_img == -1):
                continue

            face_res=face_det.run(ai_img.to_numpy_ref())

            osd_img.clear()
            for det in face_res:
                x_ori, y_ori, w_ori, h_ori = map(lambda x: int(round(x, 0)), det[:4])

                x_rtsp = x_ori * rtsp_size[0] // face_det_rgb888p_size[0]
                y_rtsp = y_ori * rtsp_size[1] // face_det_rgb888p_size[1]
                w_rtsp = w_ori * rtsp_size[0] // face_det_rgb888p_size[0]
                h_rtsp = h_ori * rtsp_size[1] // face_det_rgb888p_size[1]
                rtsp_img.draw_rectangle(x_rtsp, y_rtsp, w_rtsp, h_rtsp, color=color_yuv, thickness=2)  # 绘制矩形框

                x_osd = x_ori * display_size[0] // face_det_rgb888p_size[0]
                y_osd = y_ori * display_size[1] // face_det_rgb888p_size[1]
                w_osd = w_ori * display_size[0] // face_det_rgb888p_size[0]
                h_osd = h_ori * display_size[1] // face_det_rgb888p_size[1]
                osd_img.draw_rectangle(x_osd, y_osd, w_osd, h_osd, color=color_argb, thickness=2)  # 绘制矩形框


            rtspserver._send_rtsp_img(rtsp_img)

            Display.show_image(osd_img, 0, 0, Display.LAYER_OSD3)
            gc.collect()
#            print(clock.fps())
    except KeyboardInterrupt as e:
        print("user stop: ", e)
    except BaseException as e:
        print(f"Exception {e}")
    finally:
        face_det.deinit()
        sensor.stop()
        # deinit lcd
        Display.deinit()
        time.sleep_ms(50)
        # deinit media buffer
        MediaManager.deinit()
        rtspserver.stop()


if __name__ == "__main__":
    os.exitpoint(os.EXITPOINT_ENABLE)
    ai_multi_camera_test()

while(true)
{
printf("非常感谢");

}

请问您想用micropython还是rtos方案?

使用micropython方案,做一个智慧教室的毕设,既可以使用人脸识别进行考勤,并发执行RTSP推流进行远程监控,并且使用单摄像头