This commit is contained in:
xd 2024-04-25 20:19:53 +08:00
parent b12b8adb80
commit a3a60c28be
3 changed files with 152 additions and 154 deletions

View File

@ -1,15 +1,14 @@
package com.ruoyi.web.controller.system;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Collectors;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.common.constant.WebsocketConst;
import com.ruoyi.framework.websocket.OneToManyWebSocket;
import com.ruoyi.framework.websocket.WebSocketServer;
import com.ruoyi.framework.websocket.WebSocketUsers;
import com.ruoyi.framework.websocket.WebSocket;
import com.ruoyi.system.domain.SysNoticeUser;
import org.checkerframework.checker.units.qual.A;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.annotation.Validated;
@ -43,9 +42,6 @@ public class SysNoticeController extends BaseController
@Autowired
private ISysNoticeService noticeService;
@Autowired
private OneToManyWebSocket oneToManyWebSocket;
/**
* 获取通知公告列表
*/
@ -76,11 +72,15 @@ public class SysNoticeController extends BaseController
@PostMapping
public AjaxResult add(@Validated @RequestBody SysNotice notice)
{
notice.setCreateBy(getUsername());
noticeService.insertNotice(notice);
//推送消息插入中间表
Long userId = getLoginUser().getUserId();//当前登陆者
insertNoticeUser(userId,notice,noticeService,oneToManyWebSocket,null);
try {
notice.setCreateBy(getUsername());
noticeService.insertNotice(notice);
//推送消息插入中间表
Long userId = getLoginUser().getUserId();//当前登陆者
insertNoticeUser(userId,notice,noticeService,null);
}catch (Exception e){
e.printStackTrace();
}
return success();
}
@ -139,9 +139,8 @@ public class SysNoticeController extends BaseController
* @param userId 发送对象排除自身
* @param notice 发送信息对象
* @param noticeService 接口
* @param oneToManyWebSocket WebSocket服务
*/
public static void insertNoticeUser(Long userId,SysNotice notice,ISysNoticeService noticeService,OneToManyWebSocket oneToManyWebSocket,List<String> userIds){
public static void insertNoticeUser(Long userId,SysNotice notice,ISysNoticeService noticeService,List<String> userIds) throws IOException {
List<SysNoticeUser> sysNoticeUsers = new ArrayList<SysNoticeUser>();
JSONObject obj = new JSONObject();
@ -150,11 +149,12 @@ public class SysNoticeController extends BaseController
obj.put(WebsocketConst.MSG_CONTENT, notice.getNoticeContent());
if(!"3".equals(notice.getNoticeType())){//系统通知 通知公告
oneToManyWebSocket.sendMessage(obj.toString(),String.valueOf(userId));
WebSocket.sendInfo(obj.toString(),null);
SysNoticeUser sysNoticeUser = null;
Map<String, Session> users = oneToManyWebSocket.getUsers();//获取在线用户
for(String userid:users.keySet()){
CopyOnWriteArraySet<WebSocket> webSocketSet = WebSocket.getWebSocketSet();//获取在线用户
for (WebSocket item : webSocketSet) {
String userid = item.sid;
if(!userid.equals(String.valueOf(userId))){
sysNoticeUser = new SysNoticeUser();
sysNoticeUser.setNoticeId(notice.getNoticeId());
@ -167,11 +167,13 @@ public class SysNoticeController extends BaseController
}
}else{
if(userIds!=null&&userIds.size()>0){
oneToManyWebSocket.sendMessageByUserIds(obj.toString(),userIds);
for(String uid:userIds){
WebSocket.sendInfo(obj.toString(),uid);
}
SysNoticeUser sysNoticeUser = null;
Map<String, Session> users = oneToManyWebSocket.getUsers();//获取在线用户
for(String userid:users.keySet()){
CopyOnWriteArraySet<WebSocket> webSocketSet = WebSocket.getWebSocketSet();//获取在线用户
for (WebSocket item : webSocketSet) {
String userid = item.sid;
if(userIds.contains(userid)){
sysNoticeUser = new SysNoticeUser();
sysNoticeUser.setNoticeId(notice.getNoticeId());

View File

@ -1,133 +0,0 @@
package com.ruoyi.framework.websocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* 前后端交互的类实现消息的接收推送(自己发送给所有人(不包括自己))
*
*/
@ServerEndpoint(value = "/websocket/message/{userId}")
@Component
public class OneToManyWebSocket {
/**
* WebSocketUsers 日志控制器
*/
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketUsers.class);
/** 记录当前在线连接数 */
private static AtomicInteger onlineCount = new AtomicInteger(0);
/** 存放所有在线的客户端 key 为用户Id */
private static final Map<String, Session> clients = new ConcurrentHashMap<>();
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session,@PathParam("userId") String userId) {
if(clients.containsKey(userId)){
clients.remove(userId);
clients.put(userId, session);
}else{
onlineCount.incrementAndGet(); // 在线数加1
clients.put(userId, session);
//sendMessage("你好啊客户端"+userId,session);
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(Session session) {
for (String userId : clients.keySet()) {
if(clients.get(userId).equals(session)){
clients.remove(session);
onlineCount.decrementAndGet(); // 在线数减1
}
}
}
/**
* 收到客户端消息后调用的方法
*
* @param message
* 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
LOGGER.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
//this.sendMessage(message, session);
}
@OnError
public void onError(Session session, Throwable error) {
LOGGER.error("发生错误");
error.printStackTrace();
}
/**
* 获取在线用户列表
*
* @return 返回用户集合
*/
public static Map<String, Session> getUsers()
{
return clients;
}
/**
* 群发消息
* @param message
* @param userId
*/
public static void sendMessage(String message,String userId) {
for (Map.Entry<String, Session> sessionEntry : clients.entrySet()) {
Session toSession = sessionEntry.getValue();
String suid = sessionEntry.getKey();
// 排除掉自己
if (!suid.equals(userId)) {
LOGGER.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
toSession.getAsyncRemote().sendText(message);
}
}
}
/**
* 群发消息
* @param message
* @param userIds
*/
public static void sendMessageByUserIds(String message, List<String> userIds) {
for (String id : userIds) {
Session session = clients.get(id);
if(session!=null){
LOGGER.info("服务端给客户端[{}]发送消息{}", session.getId(), message);
try {
session.getAsyncRemote().sendText(message);
}catch (Exception e){
LOGGER.info("数据发送失败!疑似断开连接", session.getId(), message);
clients.remove(id);
}
}
}
}
public Session getUserSession(String userId){
return clients.get(userId);
}
}

View File

@ -0,0 +1,129 @@
package com.ruoyi.framework.websocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
// @ServerEndpoint 声明并创建了webSocket端点, 并且指明了请求路径
// id 为客户端请求时携带的参数, 用于服务端区分客户端使用
@ServerEndpoint(value = "/websocket/message/{sid}")
@Component
public class WebSocket {
// 日志对象
private static final Logger log = LoggerFactory.getLogger(WebSocket.class);
// 静态变量用来记录当前在线连接数应该把它设计成线程安全的
private static int onlineCount = 0;
// concurrent包的线程安全Set用来存放每个客户端对应的MyWebSocket对象
private static CopyOnWriteArraySet<WebSocket> webSocketSet = new CopyOnWriteArraySet<WebSocket>();
// private static ConcurrentHashMap<String,WebSocket> websocketList = new ConcurrentHashMap<>();
// 与某个客户端的连接会话需要通过它来给客户端发送数据
private Session session;
// 接收sid
public String sid = "";
/*
* 客户端创建连接时触发
* */
@OnOpen
public void onOpen(Session session, @PathParam("sid") String sid) {
this.session = session;
webSocketSet.add(this); // 加入set中
addOnlineCount(); // 在线数加1
log.info("有新窗口开始监听:" + sid + ", 当前在线人数为" + getOnlineCount());
this.sid = sid;
log.info("连接成功");
}
/**
* 客户端连接关闭时触发
**/
@OnClose
public void onClose() {
webSocketSet.remove(this); // 从set中删除
subOnlineCount(); // 在线数减1
log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
}
/**
* 接收到客户端消息时触发
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("收到来自窗口" + sid + "的信息:" + message);
// 群发消息
/*for (WebSocket item : webSocketSet) {
try {
item.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
}
}*/
}
/**
* 连接发生异常时候触发
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
error.printStackTrace();
}
public static CopyOnWriteArraySet<WebSocket> getWebSocketSet()
{
return webSocketSet;
}
/**
* 实现服务器主动推送(向浏览器发消息)
*/
public void sendMessage(String message) throws IOException {
log.info("服务器消息推送:"+message);
this.session.getBasicRemote().sendText(message);
}
/**
* 发送消息到所有客户端
* 指定sid则向指定客户端发消息
* 不指定sid则向所有客户端发送消息
* */
public static void sendInfo(String message, String sid) throws IOException {
log.info("推送消息到窗口" + sid + ",推送内容:" + message);
for (WebSocket item : webSocketSet) {
try {
// 这里可以设定只推送给这个sid的为null则全部推送
if (sid == null) {
item.sendMessage(message);
} else if (item.sid.equals(sid)) {
item.sendMessage(message);
}
} catch (IOException e) {
continue;
}
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocket.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocket.onlineCount--;
}
}