#include <iostream>
|
#include <opencv2/opencv.hpp>
|
#include <chrono>
|
#include <thread>
|
#include <iomanip>
|
#include <string>
|
#include <stdio.h>
|
#include <fstream>
|
#include <future>
|
#include <json/json.h>
|
#include "affine.h"
|
#include "PreProcessFn.h"
|
|
using namespace std;
|
|
// 异步初始化线程池(改为异步任务)
|
void AsyncInitThreadPool(PreProcessFn& ppf, const Json::Value& ipcsNode)
|
{
|
std::vector<std::queue<Mat>> queMatVec(ipcsNode["ipc"].size());//视频队列1
|
std::vector<std::queue<Mat>> queMatVec2(ipcsNode["ipc"].size());//视频队列2
|
|
std::vector<std::future<void>> getVideoTasks;//视频取流任务队列
|
std::vector<std::future<void>> recognitionTasks;//视频处理任务队列
|
std::vector<std::future<void>> pushVideoTasks;//视频推流处理任务队列
|
//保证程序一直运行
|
while (true)
|
{
|
ppf.asyncStop = false;
|
|
for (auto i = 0; i < ipcsNode["ipc"].size(); ++i) {
|
Json::Value ipcNode = ipcsNode["ipc"][i];
|
std::string id = ipcNode["id"].asString();
|
std::string ipccode = ipcNode["ipccode"].asString();
|
std::string video = ipcNode["video"].asString();
|
std::string pushflow = ipcNode["pushflow"].asString();
|
Json::Value modelsNode = ipcNode["models"];//模型节点
|
|
if (!modelsNode.isNull()) {
|
Json::Value modelArray = modelsNode["model"];
|
|
// 创建并启动异步任务
|
getVideoTasks.push_back(std::async(std::launch::async, &PreProcessFn::fnGetVideoMatToQueue, &ppf,video, std::ref(queMatVec[i])));
|
recognitionTasks.push_back(std::async(std::launch::async, &PreProcessFn::fnImRecognitionPro, &ppf, ipcNode,modelArray, std::ref(queMatVec[i]), std::ref(queMatVec2[i])));
|
pushVideoTasks.push_back(std::async(std::launch::async, &PreProcessFn::fnPushVideoInRabbitMQ, &ppf, std::ref(queMatVec2[i]), ipccode));
|
//pushVideoTasks.push_back(std::async(std::launch::async, &PreProcessFn::fnPushVideo, &ppf, std::ref(queMatVec2[i]), ipccode));
|
}
|
|
//初始化时加个延迟
|
std::this_thread::sleep_for(std::chrono::milliseconds(50));// 延迟等待一段时间
|
}
|
|
for (auto& task : getVideoTasks) {
|
try {
|
task.get();
|
}
|
catch (const std::exception& ex) {
|
std::cerr << "拉流异步异步任务出错: " << ex.what() << std::endl;
|
}
|
}
|
|
for (auto& task : recognitionTasks) {
|
try {
|
task.get();
|
}
|
catch (const std::exception& ex) {
|
std::cerr << "处理视频帧异步异步任务出错: " << ex.what() << std::endl;
|
}
|
}
|
|
for (auto& task : pushVideoTasks) {
|
try {
|
task.get();
|
}
|
catch (const std::exception& ex) {
|
std::cerr << "推流异步异步任务出错: " << ex.what() << std::endl;
|
}
|
}
|
|
queMatVec.clear();
|
queMatVec2.clear();
|
getVideoTasks.clear();
|
recognitionTasks.clear();
|
pushVideoTasks.clear();
|
|
std::this_thread::sleep_for(std::chrono::seconds(1));// 等待一段时间
|
}
|
}
|
|
int main()
|
{
|
|
//printHello();
|
try
|
{
|
// 读取配置文件
|
std::ifstream config_file("config.json", std::ifstream::binary);
|
if (!config_file.is_open()) {
|
std::cerr << "读取json文件出错" << std::endl;
|
return -1;
|
}
|
|
Json::Value root;
|
config_file >> root;
|
|
// 初始化线程池(改为异步任务)
|
PreProcessFn ppf;
|
ppf.coalCode= root["ipcs"]["coalCode"].asString();//煤矿编号;
|
ppf.warningFilePath = root["ipcs"]["warningFilePath"].asString();//录像保存路径
|
ppf.redispath = root["ipcs"]["redispath"].asString();
|
ppf.redisport = root["ipcs"]["redisport"].asString();
|
ppf.redispass = root["ipcs"]["redispass"].asString();
|
ppf.mysqlpath = root["ipcs"]["mysqlpath"].asString();
|
ppf.mysqlport = root["ipcs"]["mysqlport"].asString();
|
ppf.mysqluser = root["ipcs"]["mysqluser"].asString();
|
ppf.mysqlpass = root["ipcs"]["mysqlpass"].asString();
|
ppf.mysqldatabase = root["ipcs"]["mysqldatabase"].asString();
|
ppf.rabbitpath = root["ipcs"]["rabbitpath"].asString();
|
ppf.rabbitport = root["ipcs"]["rabbitport"].asString();
|
ppf.rabbituser = root["ipcs"]["rabbituser"].asString();
|
ppf.rabbitpass = root["ipcs"]["rabbitpass"].asString();
|
ppf.deley = stol(root["ipcs"]["deley"].asString());//跳帧
|
ppf.recordtime = stoi(root["ipcs"]["recordtime"].asString());//录像时间
|
ppf.RecIntervalime = stoi(root["ipcs"]["RecIntervalime"].asString());//录像间隔时间
|
|
AsyncInitThreadPool(ppf, root["ipcs"]);
|
|
}
|
catch(const std::exception& e)
|
{
|
std::cerr << e.what() << '\n';
|
}
|
return 0;
|
}
|