#include <iostream>
|
#include <opencv2/opencv.hpp>
|
#include <chrono>
|
#include <thread>
|
#include <iomanip>
|
#include <string>
|
#include <stdio.h>
|
#include <fstream>
|
#include <future>
|
#include <json/json.h>
|
#include "affine.h"
|
#include "PreProcessFn.h"
|
|
using namespace std;
|
|
// 异步初始化线程池(改为异步任务)
|
void AsyncInitThreadPool(PreProcessFn& ppf, const Json::Value& ipcsNode)
|
{
|
std::vector<std::queue<Mat>> queMatVec(ipcsNode["cameras"].size());//视频队列1
|
std::vector<std::queue<Mat>> queMatVec2(ipcsNode["cameras"].size());//视频队列2
|
|
std::vector<std::future<void>> getVideoTasks;//视频取流任务队列
|
std::vector<std::future<void>> recognitionTasks;//视频处理任务队列
|
std::vector<std::future<void>> pushVideoTasks;//视频推流处理任务队列
|
//保证程序一直运行
|
while (true)
|
{
|
ppf.asyncStop = false;
|
|
for (auto i = 0; i < ipcsNode["cameras"].size(); ++i) {
|
Json::Value ipcNode = ipcsNode["cameras"][i];
|
std::string id = ipcNode["id"].asString();
|
std::string ipccode = ipcNode["code"].asString();
|
std::string name = ipcNode["name"].asString();
|
std::string video = ipcNode["video"].asString();
|
long skipN = stol(ipcNode["skipN"].asString());
|
ppf.skipN = skipN;
|
std::string push_type = ipcNode["push_type"].asString();
|
std::string fps = ipcNode["fps"].asString();
|
std::string toRtmp = ipcNode["toRtmp"].asString();
|
std::string interval = ipcNode["interval"].asString();
|
Json::Value modelArray = ipcNode["model"];//模型节点
|
|
|
if (!modelArray.isNull()) {
|
// 创建并启动异步任务
|
getVideoTasks.push_back(std::async(std::launch::async, &PreProcessFn::fnGetVideoMatToQueue, &ppf,video, std::ref(queMatVec[i])));
|
recognitionTasks.push_back(std::async(std::launch::async, &PreProcessFn::fnImRecognitionPro, &ppf, ipcNode,modelArray, std::ref(queMatVec[i]), std::ref(queMatVec2[i])));
|
if (push_type == "rabbitmq") {
|
pushVideoTasks.push_back(std::async(std::launch::async, &PreProcessFn::fnPushVideoInRabbitMQ, &ppf, std::ref(queMatVec2[i]), ipccode));
|
}
|
else if (push_type == "stream") {
|
pushVideoTasks.push_back(std::async(std::launch::async, &PreProcessFn::fnPushVideoToUrl, &ppf, std::ref(queMatVec2[i]),toRtmp,fps, ipccode));
|
}
|
}
|
|
//初始化时加个延迟
|
std::this_thread::sleep_for(std::chrono::milliseconds(50));// 延迟等待一段时间
|
}
|
|
for (auto& task : getVideoTasks) {
|
try {
|
task.get();
|
}
|
catch (const std::exception& ex) {
|
std::cerr << "拉流异步异步任务出错: " << ex.what() << std::endl;
|
}
|
}
|
|
for (auto& task : recognitionTasks) {
|
try {
|
task.get();
|
}
|
catch (const std::exception& ex) {
|
std::cerr << "处理视频帧异步异步任务出错: " << ex.what() << std::endl;
|
}
|
}
|
|
for (auto& task : pushVideoTasks) {
|
try {
|
task.get();
|
}
|
catch (const std::exception& ex) {
|
std::cerr << "推流异步异步任务出错: " << ex.what() << std::endl;
|
}
|
}
|
|
queMatVec.clear();
|
queMatVec2.clear();
|
getVideoTasks.clear();
|
recognitionTasks.clear();
|
pushVideoTasks.clear();
|
|
std::this_thread::sleep_for(std::chrono::seconds(1));// 等待一段时间
|
}
|
}
|
|
int main()
|
{
|
|
//printHello();
|
try
|
{
|
// 读取配置文件
|
std::ifstream config_file("config.json", std::ifstream::binary);
|
if (!config_file.is_open()) {
|
std::cerr << "读取json文件出错" << std::endl;
|
return -1;
|
}
|
|
Json::Value root;
|
config_file >> root;
|
|
// 初始化线程池(改为异步任务)
|
PreProcessFn ppf;
|
ppf.coalCode= root["ipcs"]["coal_code"].asString();//煤矿编号;
|
ppf.warningFilePath = root["ipcs"]["warning_path"].asString();//录像保存路径
|
ppf.redispath = root["redis"]["ip"].asString();
|
ppf.redisport = root["redis"]["port"].asString();
|
ppf.redispass = root["redis"]["pwd"].asString();
|
ppf.mysqlpath = root["mysql"]["ip"].asString();
|
ppf.mysqlport = root["mysql"]["port"].asString();
|
ppf.mysqluser = root["mysql"]["user"].asString();
|
ppf.mysqlpass = root["mysql"]["pwd"].asString();
|
ppf.mysqldatabase = root["mysql"]["dbname"].asString();
|
ppf.rabbitpath = root["rabbit"]["ip"].asString();
|
ppf.rabbitport = root["rabbit"]["port"].asString();
|
ppf.rabbituser = root["rabbit"]["user"].asString();
|
ppf.rabbitpass = root["rabbit"]["pwd"].asString();
|
//ppf.deley = stol(root["ipcs"]["deley"].asString());//跳帧
|
ppf.recordtime = stoi(root["ipcs"]["record_time"].asString());//录像时间
|
//ppf.RecIntervalime = stoi(root["ipcs"]["RecIntervalime"].asString());//录像间隔时间
|
//加载模型标签颜色
|
for (int i = 0; i < root["label_colors"].size(); i++)
|
{
|
Json::Value members = root["label_colors"][i];
|
|
int nsize = members.size();
|
std::vector<int> numbers;
|
for (int j = 0; j < nsize; j++)
|
{
|
int crood = members[j].asInt();
|
numbers.push_back(crood);
|
}
|
ppf.c_list.push_back(numbers);
|
}
|
AsyncInitThreadPool(ppf, root);
|
|
}
|
catch(const std::exception& e)
|
{
|
std::cerr << e.what() << '\n';
|
}
|
return 0;
|
}
|