diff --git a/ruoyi-admin/src/main/java/com/ruoyi/web/controller/system/SysNoticeController.java b/ruoyi-admin/src/main/java/com/ruoyi/web/controller/system/SysNoticeController.java index 185d42e..6516b34 100644 --- a/ruoyi-admin/src/main/java/com/ruoyi/web/controller/system/SysNoticeController.java +++ b/ruoyi-admin/src/main/java/com/ruoyi/web/controller/system/SysNoticeController.java @@ -1,15 +1,14 @@ package com.ruoyi.web.controller.system; +import java.io.IOException; import java.util.*; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.stream.Collectors; import com.alibaba.fastjson2.JSONObject; import com.ruoyi.common.constant.WebsocketConst; -import com.ruoyi.framework.websocket.OneToManyWebSocket; -import com.ruoyi.framework.websocket.WebSocketServer; -import com.ruoyi.framework.websocket.WebSocketUsers; +import com.ruoyi.framework.websocket.WebSocket; import com.ruoyi.system.domain.SysNoticeUser; -import org.checkerframework.checker.units.qual.A; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.validation.annotation.Validated; @@ -43,9 +42,6 @@ public class SysNoticeController extends BaseController @Autowired private ISysNoticeService noticeService; - @Autowired - private OneToManyWebSocket oneToManyWebSocket; - /** * 获取通知公告列表 */ @@ -76,11 +72,15 @@ public class SysNoticeController extends BaseController @PostMapping public AjaxResult add(@Validated @RequestBody SysNotice notice) { - notice.setCreateBy(getUsername()); - noticeService.insertNotice(notice); - //推送消息插入中间表 - Long userId = getLoginUser().getUserId();//当前登陆者 - insertNoticeUser(userId,notice,noticeService,oneToManyWebSocket,null); + try { + notice.setCreateBy(getUsername()); + noticeService.insertNotice(notice); + //推送消息插入中间表 + Long userId = getLoginUser().getUserId();//当前登陆者 + insertNoticeUser(userId,notice,noticeService,null); + }catch (Exception e){ + e.printStackTrace(); + } return success(); } @@ -139,9 +139,8 @@ public class SysNoticeController extends BaseController * @param userId 发送对象排除自身 * @param notice 发送信息对象 * @param noticeService 接口 - * @param oneToManyWebSocket WebSocket服务 */ - public static void insertNoticeUser(Long userId,SysNotice notice,ISysNoticeService noticeService,OneToManyWebSocket oneToManyWebSocket,List userIds){ + public static void insertNoticeUser(Long userId,SysNotice notice,ISysNoticeService noticeService,List userIds) throws IOException { List sysNoticeUsers = new ArrayList(); JSONObject obj = new JSONObject(); @@ -150,11 +149,12 @@ public class SysNoticeController extends BaseController obj.put(WebsocketConst.MSG_CONTENT, notice.getNoticeContent()); if(!"3".equals(notice.getNoticeType())){//系统通知 或 通知公告 - oneToManyWebSocket.sendMessage(obj.toString(),String.valueOf(userId)); + WebSocket.sendInfo(obj.toString(),null); SysNoticeUser sysNoticeUser = null; - Map users = oneToManyWebSocket.getUsers();//获取在线用户 - for(String userid:users.keySet()){ + CopyOnWriteArraySet webSocketSet = WebSocket.getWebSocketSet();//获取在线用户 + for (WebSocket item : webSocketSet) { + String userid = item.sid; if(!userid.equals(String.valueOf(userId))){ sysNoticeUser = new SysNoticeUser(); sysNoticeUser.setNoticeId(notice.getNoticeId()); @@ -167,11 +167,13 @@ public class SysNoticeController extends BaseController } }else{ if(userIds!=null&&userIds.size()>0){ - oneToManyWebSocket.sendMessageByUserIds(obj.toString(),userIds); - + for(String uid:userIds){ + WebSocket.sendInfo(obj.toString(),uid); + } SysNoticeUser sysNoticeUser = null; - Map users = oneToManyWebSocket.getUsers();//获取在线用户 - for(String userid:users.keySet()){ + CopyOnWriteArraySet webSocketSet = WebSocket.getWebSocketSet();//获取在线用户 + for (WebSocket item : webSocketSet) { + String userid = item.sid; if(userIds.contains(userid)){ sysNoticeUser = new SysNoticeUser(); sysNoticeUser.setNoticeId(notice.getNoticeId()); diff --git a/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/OneToManyWebSocket.java b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/OneToManyWebSocket.java deleted file mode 100644 index c4fb082..0000000 --- a/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/OneToManyWebSocket.java +++ /dev/null @@ -1,133 +0,0 @@ -package com.ruoyi.framework.websocket; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; - -import javax.websocket.*; -import javax.websocket.server.PathParam; -import javax.websocket.server.ServerEndpoint; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * - * 前后端交互的类实现消息的接收推送(自己发送给所有人(不包括自己)) - * - */ -@ServerEndpoint(value = "/websocket/message/{userId}") -@Component -public class OneToManyWebSocket { - /** - * WebSocketUsers 日志控制器 - */ - private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketUsers.class); - - /** 记录当前在线连接数 */ - private static AtomicInteger onlineCount = new AtomicInteger(0); - - /** 存放所有在线的客户端 key 为用户Id */ - private static final Map clients = new ConcurrentHashMap<>(); - - /** - * 连接建立成功调用的方法 - */ - @OnOpen - public void onOpen(Session session,@PathParam("userId") String userId) { - if(clients.containsKey(userId)){ - clients.remove(userId); - clients.put(userId, session); - }else{ - onlineCount.incrementAndGet(); // 在线数加1 - clients.put(userId, session); - //sendMessage("你好啊客户端"+userId,session); - } - } - - /** - * 连接关闭调用的方法 - */ - @OnClose - public void onClose(Session session) { - for (String userId : clients.keySet()) { - if(clients.get(userId).equals(session)){ - clients.remove(session); - onlineCount.decrementAndGet(); // 在线数减1 - } - } - } - - /** - * 收到客户端消息后调用的方法 - * - * @param message - * 客户端发送过来的消息 - */ - @OnMessage - public void onMessage(String message, Session session) { - LOGGER.info("服务端收到客户端[{}]的消息:{}", session.getId(), message); - //this.sendMessage(message, session); - } - - @OnError - public void onError(Session session, Throwable error) { - LOGGER.error("发生错误"); - error.printStackTrace(); - } - - /** - * 获取在线用户列表 - * - * @return 返回用户集合 - */ - public static Map getUsers() - { - return clients; - } - - - /** - * 群发消息 - * @param message - * @param userId - */ - public static void sendMessage(String message,String userId) { - for (Map.Entry sessionEntry : clients.entrySet()) { - Session toSession = sessionEntry.getValue(); - String suid = sessionEntry.getKey(); - // 排除掉自己 - if (!suid.equals(userId)) { - LOGGER.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); - toSession.getAsyncRemote().sendText(message); - } - } - } - - /** - * 群发消息 - * @param message - * @param userIds - */ - - public static void sendMessageByUserIds(String message, List userIds) { - - for (String id : userIds) { - Session session = clients.get(id); - if(session!=null){ - LOGGER.info("服务端给客户端[{}]发送消息{}", session.getId(), message); - try { - session.getAsyncRemote().sendText(message); - }catch (Exception e){ - LOGGER.info("数据发送失败!疑似断开连接", session.getId(), message); - clients.remove(id); - } - } - } - } - - public Session getUserSession(String userId){ - return clients.get(userId); - } -} diff --git a/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/WebSocket.java b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/WebSocket.java new file mode 100644 index 0000000..520e1a6 --- /dev/null +++ b/ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/WebSocket.java @@ -0,0 +1,129 @@ +package com.ruoyi.framework.websocket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; +import javax.websocket.*; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArraySet; + +// @ServerEndpoint 声明并创建了webSocket端点, 并且指明了请求路径 +// id 为客户端请求时携带的参数, 用于服务端区分客户端使用 +@ServerEndpoint(value = "/websocket/message/{sid}") +@Component +public class WebSocket { + + // 日志对象 + private static final Logger log = LoggerFactory.getLogger(WebSocket.class); + + // 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 + private static int onlineCount = 0; + + // concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 + private static CopyOnWriteArraySet webSocketSet = new CopyOnWriteArraySet(); + // private static ConcurrentHashMap websocketList = new ConcurrentHashMap<>(); + + // 与某个客户端的连接会话,需要通过它来给客户端发送数据 + private Session session; + + // 接收sid + public String sid = ""; + + /* + * 客户端创建连接时触发 + * */ + @OnOpen + public void onOpen(Session session, @PathParam("sid") String sid) { + this.session = session; + webSocketSet.add(this); // 加入set中 + addOnlineCount(); // 在线数加1 + log.info("有新窗口开始监听:" + sid + ", 当前在线人数为" + getOnlineCount()); + this.sid = sid; + log.info("连接成功"); + } + + /** + * 客户端连接关闭时触发 + **/ + @OnClose + public void onClose() { + webSocketSet.remove(this); // 从set中删除 + subOnlineCount(); // 在线数减1 + log.info("有一连接关闭!当前在线人数为" + getOnlineCount()); + } + + /** + * 接收到客户端消息时触发 + */ + @OnMessage + public void onMessage(String message, Session session) { + log.info("收到来自窗口" + sid + "的信息:" + message); + // 群发消息 + /*for (WebSocket item : webSocketSet) { + try { + item.sendMessage(message); + } catch (IOException e) { + e.printStackTrace(); + } + }*/ + } + + /** + * 连接发生异常时候触发 + */ + @OnError + public void onError(Session session, Throwable error) { + log.error("发生错误"); + error.printStackTrace(); + } + + public static CopyOnWriteArraySet getWebSocketSet() + { + return webSocketSet; + } + + + /** + * 实现服务器主动推送(向浏览器发消息) + */ + public void sendMessage(String message) throws IOException { + log.info("服务器消息推送:"+message); + this.session.getBasicRemote().sendText(message); + } + + /** + * 发送消息到所有客户端 + * 指定sid则向指定客户端发消息 + * 不指定sid则向所有客户端发送消息 + * */ + public static void sendInfo(String message, String sid) throws IOException { + log.info("推送消息到窗口" + sid + ",推送内容:" + message); + for (WebSocket item : webSocketSet) { + try { + // 这里可以设定只推送给这个sid的,为null则全部推送 + if (sid == null) { + item.sendMessage(message); + } else if (item.sid.equals(sid)) { + item.sendMessage(message); + } + } catch (IOException e) { + continue; + } + } + } + + public static synchronized int getOnlineCount() { + return onlineCount; + } + + public static synchronized void addOnlineCount() { + WebSocket.onlineCount++; + } + + public static synchronized void subOnlineCount() { + WebSocket.onlineCount--; + } +} +