package com.ruoyi.framework.websocket; import com.alibaba.fastjson2.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.*; import java.util.concurrent.Semaphore; import static jdk.nashorn.internal.runtime.regexp.joni.Config.log; /** * websocket 消息处理 * * @author ruoyi */ @Component @ServerEndpoint("/websocket/message") public class WebSocketServer { /** * WebSocketServer 日志控制器 */ private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class); /** * 默认最多允许同时在线人数100 */ public static int socketMaxOnlineCount = 100; private static Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount); /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session) throws Exception { boolean semaphoreFlag = false; // 尝试获取信号量 semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore); if (!semaphoreFlag) { // 未获取到信号量 LOGGER.error("\n 当前在线人数超过限制数- {}", socketMaxOnlineCount); WebSocketUsers.sendMessageToUserByText(session, "当前在线人数超过限制数:" + socketMaxOnlineCount); session.close(); } else { // 添加用户 WebSocketUsers.put(session.getId(), session); LOGGER.info("\n 建立连接 - {}", session); LOGGER.info("\n 当前人数 - {}", WebSocketUsers.getUsers().size()); WebSocketUsers.sendMessageToUserByText(session, "连接成功"); } } /** * 连接关闭时处理 */ @OnClose public void onClose(Session session) { LOGGER.info("\n 关闭连接 - {}", session); Set strings = configPageSession.keySet(); if(strings.size()>0){ for (String string : strings) { List sessions = configPageSession.get(string); Iterator iterator = sessions.iterator(); while (iterator.hasNext()){ Session next = iterator.next(); if(next.equals(session)){ iterator.remove(); } } } } // 移除用户 WebSocketUsers.remove(session.getId()); // 获取到信号量则需释放 SemaphoreUtils.release(socketSemaphore); } /** * 抛出异常时处理 */ @OnError public void onError(Session session, Throwable exception) throws Exception { if (session.isOpen()) { // 关闭连接 session.close(); } String sessionId = session.getId(); LOGGER.info("\n 连接异常 - {}", sessionId); LOGGER.info("\n 异常信息 - {}", exception); // 移出用户 WebSocketUsers.remove(sessionId); // 获取到信号量则需释放 SemaphoreUtils.release(socketSemaphore); } private static List mainPageSession = new ArrayList<>(); private static Map> configPageSession = new HashMap<>();//key: 摄像头id,value:session /** * 服务器接收到客户端消息时调用的方法 */ @OnMessage public void onMessage(String message, Session session) { if (message.startsWith("main")) { mainPageSession.add(session); } if (message.startsWith("config")) { String[] configs = message.split(":"); if (configs.length == 2) { List sessions = configPageSession.get(configs[1]); if (sessions == null) { sessions = new ArrayList<>(); } sessions.add(session); configPageSession.put(configs[1], sessions); } } String msg = message.replace("你", "我").replace("吗", ""); WebSocketUsers.sendMessageToUserByText(session, msg); } public static synchronized void sendPicMessage(String message, String topicId) throws EncodeException, IOException { // LOGGER.info("main方法启动参数topicId{}",topicId); if(mainPageSession.size()>0){ Iterator mainIterator = mainPageSession.iterator(); while(mainIterator.hasNext()){ Session session = mainIterator.next(); if (session != null && session.isOpen()) { /* WebsocketObiect websocketObiect=new WebsocketObiect(); websocketObiect.setMessage(message); websocketObiect.setTopicId(topicId); session.getBasicRemote().sendText(JSON.toJSONString(websocketObiect));*/ // System.out.println("websocketmain成功发送"); } else { mainIterator.remove(); } } } // LOGGER.info("config方法启动参数topicId{}",topicId); if (topicId != null) { List sessions = configPageSession.get(topicId); if(sessions!=null&&sessions.size()>0){ Iterator iterator = sessions.iterator(); while(iterator.hasNext()){ Session session = iterator.next(); if (session != null && session.isOpen()) { session.getBasicRemote().sendText(message); // System.out.println("websocketconfig成功发送"); } else { iterator.remove(); } } } } } }