admin
2025-04-25 a01457b5471049edc3d8302c0d978cb8b53e79c9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#include <iostream>
#include <opencv2/opencv.hpp>
#include <chrono>
#include <thread>
#include <iomanip>
#include <string>
#include <stdio.h>
#include <fstream>
#include <future>
#include <json/json.h>
#include "affine.h"
#include "PreProcessFn.h"
 
using namespace std;
 
// 异步初始化线程池(改为异步任务)
void AsyncInitThreadPool(PreProcessFn& ppf, const Json::Value& ipcsNode)
    std::vector<std::queue<Mat>> queMatVec(ipcsNode["cameras"].size());//视频队列1
    std::vector<std::queue<Mat>> queMatVec2(ipcsNode["cameras"].size());//视频队列2
 
    std::vector<std::future<void>> getVideoTasks;//视频取流任务队列
    std::vector<std::future<void>> recognitionTasks;//视频处理任务队列
    std::vector<std::future<void>> pushVideoTasks;//视频推流处理任务队列
    //保证程序一直运行
    while (true)
    {
        ppf.asyncStop = false;
 
        for (auto i = 0; i < ipcsNode["cameras"].size(); ++i) {
            Json::Value ipcNode = ipcsNode["cameras"][i];
            std::string id = ipcNode["id"].asString();
            std::string ipccode = ipcNode["code"].asString();     
            std::string name = ipcNode["name"].asString();       
            std::string video = ipcNode["video"].asString();
            long skipN = stol(ipcNode["skipN"].asString()); 
            ppf.skipN = skipN;
            std::string push_type = ipcNode["push_type"].asString();
            std::string fps = ipcNode["fps"].asString();
            std::string toRtmp = ipcNode["toRtmp"].asString();
            std::string interval = ipcNode["interval"].asString();
            Json::Value modelArray = ipcNode["model"];//模型节点    
 
               
            if (!modelArray.isNull()) {
                 // 创建并启动异步任务
                getVideoTasks.push_back(std::async(std::launch::async, &PreProcessFn::fnGetVideoMatToQueue, &ppf,video, std::ref(queMatVec[i])));
                recognitionTasks.push_back(std::async(std::launch::async, &PreProcessFn::fnImRecognitionPro, &ppf, ipcNode,modelArray, std::ref(queMatVec[i]), std::ref(queMatVec2[i])));
               if (push_type == "rabbitmq") {
                    pushVideoTasks.push_back(std::async(std::launch::async, &PreProcessFn::fnPushVideoInRabbitMQ, &ppf, std::ref(queMatVec2[i]), ipccode));
               }
               else if (push_type == "stream") {
                   pushVideoTasks.push_back(std::async(std::launch::async, &PreProcessFn::fnPushVideoToUrl, &ppf, std::ref(queMatVec2[i]),toRtmp,fps, ipccode));
               }
            }  
 
            //初始化时加个延迟
            std::this_thread::sleep_for(std::chrono::milliseconds(50));// 延迟等待一段时间       
        }
        
        for (auto& task : getVideoTasks) {
            try {
                task.get();
            }
            catch (const std::exception& ex) {
                std::cerr << "拉流异步异步任务出错: " << ex.what() << std::endl;
            }
        }
 
        for (auto& task : recognitionTasks) {
            try {
                task.get();
            }
            catch (const std::exception& ex) {
                std::cerr << "处理视频帧异步异步任务出错: " << ex.what() << std::endl;
            }
        }
        
        for (auto& task : pushVideoTasks) {
            try {
                task.get();
            }
            catch (const std::exception& ex) {
                std::cerr << "推流异步异步任务出错: " << ex.what() << std::endl;
            }
        }
 
        queMatVec.clear();
        queMatVec2.clear();
        getVideoTasks.clear();
        recognitionTasks.clear();
        pushVideoTasks.clear();
 
        std::this_thread::sleep_for(std::chrono::seconds(1));// 等待一段时间
    }
}
 
int main()
{  
 
   //printHello();
    try
    {
        // 读取配置文件
        std::ifstream config_file("config.json", std::ifstream::binary);
        if (!config_file.is_open()) {
            std::cerr << "读取json文件出错" << std::endl;
            return -1;
        }
        
        Json::Value root;
        config_file >> root; 
        
        // 初始化线程池(改为异步任务)
        PreProcessFn ppf;
        ppf.coalCode= root["ipcs"]["coal_code"].asString();//煤矿编号;
        ppf.warningFilePath = root["ipcs"]["warning_path"].asString();//录像保存路径
        ppf.redispath = root["redis"]["ip"].asString();
        ppf.redisport = root["redis"]["port"].asString();  
        ppf.redispass = root["redis"]["pwd"].asString();
        ppf.mysqlpath = root["mysql"]["ip"].asString();
        ppf.mysqlport = root["mysql"]["port"].asString();
        ppf.mysqluser = root["mysql"]["user"].asString();
        ppf.mysqlpass = root["mysql"]["pwd"].asString();
        ppf.mysqldatabase = root["mysql"]["dbname"].asString();
        ppf.rabbitpath = root["rabbit"]["ip"].asString();
        ppf.rabbitport = root["rabbit"]["port"].asString();
        ppf.rabbituser = root["rabbit"]["user"].asString();
        ppf.rabbitpass = root["rabbit"]["pwd"].asString();
        //ppf.deley = stol(root["ipcs"]["deley"].asString());//跳帧 
        ppf.recordtime = stoi(root["ipcs"]["record_time"].asString());//录像时间
        //ppf.RecIntervalime = stoi(root["ipcs"]["RecIntervalime"].asString());//录像间隔时间 
//加载模型标签颜色
                for (int i = 0; i < root["label_colors"].size(); i++)
                {
                    Json::Value members = root["label_colors"][i];
 
                    int nsize = members.size();
                    std::vector<int> numbers;
                    for (int j = 0; j < nsize; j++)
                    {
                        int crood = members[j].asInt();
                        numbers.push_back(crood);
                    }
                    ppf.c_list.push_back(numbers);
                }
        AsyncInitThreadPool(ppf, root); 
 
    }
    catch(const std::exception& e)
    {
        std::cerr << e.what() << '\n';
    }    
    return 0;
}