#include #include #include #include #include #include #include #include #include #include #include "affine.h" #include "PreProcessFn.h" using namespace std; // 异步初始化线程池(改为异步任务) void AsyncInitThreadPool(PreProcessFn& ppf, const Json::Value& ipcsNode) { std::vector> queMatVec(ipcsNode["cameras"].size());//视频队列1 std::vector> queMatVec2(ipcsNode["cameras"].size());//视频队列2 std::vector> getVideoTasks;//视频取流任务队列 std::vector> recognitionTasks;//视频处理任务队列 std::vector> pushVideoTasks;//视频推流处理任务队列 //保证程序一直运行 while (true) { ppf.asyncStop = false; for (auto i = 0; i < ipcsNode["cameras"].size(); ++i) { Json::Value ipcNode = ipcsNode["cameras"][i]; std::string id = ipcNode["id"].asString(); std::string ipccode = ipcNode["code"].asString(); std::string name = ipcNode["name"].asString(); std::string video = ipcNode["video"].asString(); long skipN = stol(ipcNode["skipN"].asString()); ppf.skipN = skipN; std::string push_type = ipcNode["push_type"].asString(); std::string fps = ipcNode["fps"].asString(); std::string toRtmp = ipcNode["toRtmp"].asString(); std::string interval = ipcNode["interval"].asString(); Json::Value modelArray = ipcNode["model"];//模型节点 if (!modelArray.isNull()) { // 创建并启动异步任务 getVideoTasks.push_back(std::async(std::launch::async, &PreProcessFn::fnGetVideoMatToQueue, &ppf,video, std::ref(queMatVec[i]))); recognitionTasks.push_back(std::async(std::launch::async, &PreProcessFn::fnImRecognitionPro, &ppf, ipcNode,modelArray, std::ref(queMatVec[i]), std::ref(queMatVec2[i]))); if (push_type == "rabbitmq") { pushVideoTasks.push_back(std::async(std::launch::async, &PreProcessFn::fnPushVideoInRabbitMQ, &ppf, std::ref(queMatVec2[i]), ipccode)); } else if (push_type == "stream") { pushVideoTasks.push_back(std::async(std::launch::async, &PreProcessFn::fnPushVideoToUrl, &ppf, std::ref(queMatVec2[i]),toRtmp,fps, ipccode)); } } //初始化时加个延迟 std::this_thread::sleep_for(std::chrono::milliseconds(50));// 延迟等待一段时间 } for (auto& task : getVideoTasks) { try { task.get(); } catch (const std::exception& ex) { std::cerr << "拉流异步异步任务出错: " << ex.what() << std::endl; } } for (auto& task : recognitionTasks) { try { task.get(); } catch (const std::exception& ex) { std::cerr << "处理视频帧异步异步任务出错: " << ex.what() << std::endl; } } for (auto& task : pushVideoTasks) { try { task.get(); } catch (const std::exception& ex) { std::cerr << "推流异步异步任务出错: " << ex.what() << std::endl; } } queMatVec.clear(); queMatVec2.clear(); getVideoTasks.clear(); recognitionTasks.clear(); pushVideoTasks.clear(); std::this_thread::sleep_for(std::chrono::seconds(1));// 等待一段时间 } } int main() { //printHello(); try { // 读取配置文件 std::ifstream config_file("config.json", std::ifstream::binary); if (!config_file.is_open()) { std::cerr << "读取json文件出错" << std::endl; return -1; } Json::Value root; config_file >> root; // 初始化线程池(改为异步任务) PreProcessFn ppf; ppf.coalCode= root["ipcs"]["coal_code"].asString();//煤矿编号; ppf.warningFilePath = root["ipcs"]["warning_path"].asString();//录像保存路径 ppf.redispath = root["redis"]["ip"].asString(); ppf.redisport = root["redis"]["port"].asString(); ppf.redispass = root["redis"]["pwd"].asString(); ppf.mysqlpath = root["mysql"]["ip"].asString(); ppf.mysqlport = root["mysql"]["port"].asString(); ppf.mysqluser = root["mysql"]["user"].asString(); ppf.mysqlpass = root["mysql"]["pwd"].asString(); ppf.mysqldatabase = root["mysql"]["dbname"].asString(); ppf.rabbitpath = root["rabbit"]["ip"].asString(); ppf.rabbitport = root["rabbit"]["port"].asString(); ppf.rabbituser = root["rabbit"]["user"].asString(); ppf.rabbitpass = root["rabbit"]["pwd"].asString(); //ppf.deley = stol(root["ipcs"]["deley"].asString());//跳帧 ppf.recordtime = stoi(root["ipcs"]["record_time"].asString());//录像时间 //ppf.RecIntervalime = stoi(root["ipcs"]["RecIntervalime"].asString());//录像间隔时间 //加载模型标签颜色 for (int i = 0; i < root["label_colors"].size(); i++) { Json::Value members = root["label_colors"][i]; int nsize = members.size(); std::vector numbers; for (int j = 0; j < nsize; j++) { int crood = members[j].asInt(); numbers.push_back(crood); } ppf.c_list.push_back(numbers); } AsyncInitThreadPool(ppf, root); } catch(const std::exception& e) { std::cerr << e.what() << '\n'; } return 0; }