#include #include #include #include #include #include #include #include #include #include #include "affine.h" #include "PreProcessFn.h" using namespace std; // 异步初始化线程池(改为异步任务) void AsyncInitThreadPool(PreProcessFn& ppf, const Json::Value& ipcsNode) { std::vector> queMatVec(ipcsNode["ipc"].size());//视频队列1 std::vector> queMatVec2(ipcsNode["ipc"].size());//视频队列2 std::vector> getVideoTasks;//视频取流任务队列 std::vector> recognitionTasks;//视频处理任务队列 std::vector> pushVideoTasks;//视频推流处理任务队列 //保证程序一直运行 while (true) { ppf.asyncStop = false; for (auto i = 0; i < ipcsNode["ipc"].size(); ++i) { Json::Value ipcNode = ipcsNode["ipc"][i]; std::string id = ipcNode["id"].asString(); std::string ipccode = ipcNode["ipccode"].asString(); std::string video = ipcNode["video"].asString(); std::string pushflow = ipcNode["pushflow"].asString(); Json::Value modelsNode = ipcNode["models"];//模型节点 if (!modelsNode.isNull()) { Json::Value modelArray = modelsNode["model"]; // 创建并启动异步任务 getVideoTasks.push_back(std::async(std::launch::async, &PreProcessFn::fnGetVideoMatToQueue, &ppf,video, std::ref(queMatVec[i]))); recognitionTasks.push_back(std::async(std::launch::async, &PreProcessFn::fnImRecognitionPro, &ppf, ipcNode,modelArray, std::ref(queMatVec[i]), std::ref(queMatVec2[i]))); pushVideoTasks.push_back(std::async(std::launch::async, &PreProcessFn::fnPushVideoInRabbitMQ, &ppf, std::ref(queMatVec2[i]), ipccode)); //pushVideoTasks.push_back(std::async(std::launch::async, &PreProcessFn::fnPushVideo, &ppf, std::ref(queMatVec2[i]), ipccode)); } //初始化时加个延迟 std::this_thread::sleep_for(std::chrono::milliseconds(50));// 延迟等待一段时间 } for (auto& task : getVideoTasks) { try { task.get(); } catch (const std::exception& ex) { std::cerr << "拉流异步异步任务出错: " << ex.what() << std::endl; } } for (auto& task : recognitionTasks) { try { task.get(); } catch (const std::exception& ex) { std::cerr << "处理视频帧异步异步任务出错: " << ex.what() << std::endl; } } for (auto& task : pushVideoTasks) { try { task.get(); } catch (const std::exception& ex) { std::cerr << "推流异步异步任务出错: " << ex.what() << std::endl; } } queMatVec.clear(); queMatVec2.clear(); getVideoTasks.clear(); recognitionTasks.clear(); pushVideoTasks.clear(); std::this_thread::sleep_for(std::chrono::seconds(1));// 等待一段时间 } } int main() { //printHello(); try { // 读取配置文件 std::ifstream config_file("config.json", std::ifstream::binary); if (!config_file.is_open()) { std::cerr << "读取json文件出错" << std::endl; return -1; } Json::Value root; config_file >> root; // 初始化线程池(改为异步任务) PreProcessFn ppf; ppf.coalCode= root["ipcs"]["coalCode"].asString();//煤矿编号; ppf.warningFilePath = root["ipcs"]["warningFilePath"].asString();//录像保存路径 ppf.redispath = root["ipcs"]["redispath"].asString(); ppf.redisport = root["ipcs"]["redisport"].asString(); ppf.redispass = root["ipcs"]["redispass"].asString(); ppf.mysqlpath = root["ipcs"]["mysqlpath"].asString(); ppf.mysqlport = root["ipcs"]["mysqlport"].asString(); ppf.mysqluser = root["ipcs"]["mysqluser"].asString(); ppf.mysqlpass = root["ipcs"]["mysqlpass"].asString(); ppf.mysqldatabase = root["ipcs"]["mysqldatabase"].asString(); ppf.rabbitpath = root["ipcs"]["rabbitpath"].asString(); ppf.rabbitport = root["ipcs"]["rabbitport"].asString(); ppf.rabbituser = root["ipcs"]["rabbituser"].asString(); ppf.rabbitpass = root["ipcs"]["rabbitpass"].asString(); ppf.deley = stol(root["ipcs"]["deley"].asString());//跳帧 ppf.recordtime = stoi(root["ipcs"]["recordtime"].asString());//录像时间 ppf.RecIntervalime = stoi(root["ipcs"]["RecIntervalime"].asString());//录像间隔时间 AsyncInitThreadPool(ppf, root["ipcs"]); } catch(const std::exception& e) { std::cerr << e.what() << '\n'; } return 0; }