消息推送功能完成

This commit is contained in:
xd 2024-04-29 15:10:48 +08:00
parent 115e610fbe
commit 766607c3f1
14 changed files with 568 additions and 66 deletions

View File

@ -2,17 +2,22 @@ package com.ruoyi.web.controller.system;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.common.constant.CacheConstants;
import com.ruoyi.common.constant.RedisConstant;
import com.ruoyi.common.constant.WebsocketConst; import com.ruoyi.common.constant.WebsocketConst;
import com.ruoyi.common.core.domain.model.LoginUser; import com.ruoyi.common.core.domain.model.LoginUser;
import com.ruoyi.common.core.redis.RedisCache;
import com.ruoyi.common.utils.DateUtils; import com.ruoyi.common.utils.DateUtils;
import com.ruoyi.framework.websocket.WebSocket; import com.ruoyi.framework.websocket.WebSocket;
import com.ruoyi.system.domain.NoticeUserSelect; import com.ruoyi.system.domain.NoticeUserSelect;
import com.ruoyi.system.domain.SysNoticeUser; import com.ruoyi.system.domain.SysNoticeUser;
import com.ruoyi.system.domain.SysUserOnline;
import com.ruoyi.web.utils.SendNotice.NoticeUtil;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.DeleteMapping;
@ -31,6 +36,7 @@ import com.ruoyi.common.enums.BusinessType;
import com.ruoyi.system.domain.SysNotice; import com.ruoyi.system.domain.SysNotice;
import com.ruoyi.system.service.ISysNoticeService; import com.ruoyi.system.service.ISysNoticeService;
import javax.annotation.PostConstruct;
import javax.websocket.Session; import javax.websocket.Session;
/** /**
@ -80,7 +86,7 @@ public class SysNoticeController extends BaseController
noticeService.insertNotice(notice); noticeService.insertNotice(notice);
//推送消息插入中间表 //推送消息插入中间表
LoginUser loginUser = getLoginUser();//当前登陆者 LoginUser loginUser = getLoginUser();//当前登陆者
insertNoticeUser(loginUser,notice,noticeService,null); NoticeUtil.sendNotices(loginUser,notice,noticeService,null);
}catch (Exception e){ }catch (Exception e){
return error("系统异常"); return error("系统异常");
} }
@ -178,66 +184,6 @@ public class SysNoticeController extends BaseController
return success(noticeService.selectNoticeById(noticeId)); return success(noticeService.selectNoticeById(noticeId));
} }
/**
* 推送消息插入中间表
* @param loginUser 发送对象排除自身
* @param notice 发送信息对象
* @param noticeService 接口
*/
public static void insertNoticeUser(LoginUser loginUser,SysNotice notice,ISysNoticeService noticeService,List<String> userNames) throws IOException {
List<SysNoticeUser> sysNoticeUsers = new ArrayList<SysNoticeUser>();
JSONObject obj = new JSONObject();
obj.put(WebsocketConst.MSG_ID, notice.getNoticeId());
obj.put(WebsocketConst.MSG_TITLE, notice.getNoticeTitle());
obj.put(WebsocketConst.MSG_CONTENT, notice.getNoticeContent());
if(!"3".equals(notice.getNoticeType())){//系统通知 通知公告
WebSocket.sendInfo(obj.toString(),null);
SysNoticeUser sysNoticeUser = null;
CopyOnWriteArraySet<WebSocket> webSocketSet = WebSocket.getWebSocketSet();//获取在线用户
Date sendTime = DateUtils.getNowDate();
for (WebSocket item : webSocketSet) {
String username = item.sid;
if(!username.equals(loginUser.getUsername())){
sysNoticeUser = new SysNoticeUser();
sysNoticeUser.setNoticeId(notice.getNoticeId());
sysNoticeUser.setUserId(username);
sysNoticeUser.setSendUser(loginUser.getUser().getNickName());
sysNoticeUser.setSendTime(sendTime);
sysNoticeUsers.add(sysNoticeUser);
}
}
if(!sysNoticeUsers.isEmpty()){
noticeService.insertNoticeUserBatch(sysNoticeUsers);
}
}else{
if(userNames!=null&&userNames.size()>0){
for(String names:userNames){
WebSocket.sendInfo(obj.toString(),names);
}
SysNoticeUser sysNoticeUser = null;
CopyOnWriteArraySet<WebSocket> webSocketSet = WebSocket.getWebSocketSet();//获取在线用户
Date sendTime = DateUtils.getNowDate();
for (WebSocket item : webSocketSet) {
String username = item.sid;
if(userNames.contains(username)){
sysNoticeUser = new SysNoticeUser();
sysNoticeUser.setNoticeId(notice.getNoticeId());
sysNoticeUser.setUserId(username);
sysNoticeUser.setSendUser(loginUser.getUser().getNickName());
sysNoticeUser.setSendTime(sendTime);
sysNoticeUsers.add(sysNoticeUser);
}
}
if(!sysNoticeUsers.isEmpty()){
noticeService.insertNoticeUserBatch(sysNoticeUsers);
}
}
}
}
/** /**
* 获取对应消息对象的 人员 * 获取对应消息对象的 人员
* @param noticeUserSelect * @param noticeUserSelect

View File

@ -0,0 +1,113 @@
package com.ruoyi.web.utils.SendNotice;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.common.constant.CacheConstants;
import com.ruoyi.common.constant.WebsocketConst;
import com.ruoyi.common.core.domain.model.LoginUser;
import com.ruoyi.common.core.redis.RedisCache;
import com.ruoyi.common.utils.DateUtils;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.common.utils.spring.SpringUtils;
import com.ruoyi.framework.websocket.redisWebsocket.WebsocketService;
import com.ruoyi.system.domain.SysNotice;
import com.ruoyi.system.domain.SysNoticeUser;
import com.ruoyi.system.service.ISysNoticeService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.MessageSource;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
public class NoticeUtil {
@Autowired
private static RedisCache redis = SpringUtils.getBean(RedisCache.class);
@Autowired
private static WebsocketService wbService = SpringUtils.getBean(WebsocketService.class);
/**
* 推送消息插入中间表
* @param loginUser 发送对象排除自身
* @param notice 发送信息对象
* @param noticeService 接口
*/
public static void sendNotices(LoginUser loginUser, SysNotice notice, ISysNoticeService noticeService, List<String> userIds) throws IOException {
JSONObject obj = new JSONObject();
obj.put(WebsocketConst.MSG_ID, notice.getNoticeId());
obj.put(WebsocketConst.MSG_TITLE, notice.getNoticeTitle());
obj.put(WebsocketConst.MSG_CONTENT, notice.getNoticeContent());
if(!"3".equals(notice.getNoticeType())){//系统通知 通知公告
wbService.sendMessageAll("ruoyi",obj.toString());
insertNoticeUser(loginUser,notice,noticeService,null);
}else{
if(userIds!=null&&userIds.size()>0){
for(String userId:userIds){
wbService.sendMessageById("ruoyi",userId,obj.toString());
}
insertNoticeUser(loginUser,notice,noticeService,userIds);
}
}
}
public static void insertNoticeUser(LoginUser loginUser,SysNotice notice,ISysNoticeService noticeService, List<String> userIds){
List<SysNoticeUser> sysNoticeUsers = new ArrayList<SysNoticeUser>();
//发送日期
Date sendTime = DateUtils.getNowDate();
//获取在线用户
Collection<String> keys = redis.keys(CacheConstants.LOGIN_TOKEN_KEY + "*");
if(userIds!=null&&userIds.size()>0){
for (String key : keys) {
LoginUser user = redis.getCacheObject(key);
String userId = user.getUsername();
if(userIds.contains(userId)){
SysNoticeUser sysnoticeuser = noticeSet(loginUser,notice,userId,sendTime);
sysNoticeUsers.add(sysnoticeuser);
}
}
if(!sysNoticeUsers.isEmpty()){
noticeService.insertNoticeUserBatch(sysNoticeUsers);
}
}else{
for (String key : keys) {
LoginUser user = redis.getCacheObject(key);
String userId = user.getUsername();
if(!userId.equals(loginUser.getUsername())){
SysNoticeUser sysnoticeuser = noticeSet(loginUser,notice,userId,sendTime);
sysNoticeUsers.add(sysnoticeuser);
}
}
if(!sysNoticeUsers.isEmpty()){
noticeService.insertNoticeUserBatch(sysNoticeUsers);
}
}
}
/**
* 消息用户中间表设置
* @param loginUser 当前登录者
* @param notice 消息实体类
* @param userId 在线用户账号
* @param sendTime 发送时间
* @return
*/
public static SysNoticeUser noticeSet(LoginUser loginUser,SysNotice notice,String userId,Date sendTime){
SysNoticeUser sysNoticeUser = new SysNoticeUser();
sysNoticeUser.setNoticeId(notice.getNoticeId());
sysNoticeUser.setUserId(userId);
sysNoticeUser.setSendUser(loginUser.getUser().getNickName());
sysNoticeUser.setSendTime(sendTime);
return sysNoticeUser;
}
}

View File

@ -0,0 +1,6 @@
package com.ruoyi.common.constant;
public class RedisConstant {
public static final String SYS_TOPIC = "sys_topic";//系统消息
public static final String SYS_USER_TOPIC = "sys_user_topic";//用户消息
}

View File

@ -34,7 +34,11 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId> <artifactId>spring-boot-starter-websocket</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>java-websocket</artifactId>
<version>1.3.3</version>
</dependency>
<!-- 阿里数据库连接池 --> <!-- 阿里数据库连接池 -->
<dependency> <dependency>
<groupId>com.alibaba</groupId> <groupId>com.alibaba</groupId>
@ -64,7 +68,12 @@
<groupId>com.ruoyi</groupId> <groupId>com.ruoyi</groupId>
<artifactId>ruoyi-system</artifactId> <artifactId>ruoyi-system</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -0,0 +1,50 @@
package com.ruoyi.framework.websocket.redisWebsocket;
import com.ruoyi.common.constant.RedisConstant;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
/**
* @author
*/
@Configuration
public class RedisMessageListenerConfig {
@Autowired
private RedisReceiver redisReceiver;
/**
* 监听redis中的订阅信息
* @param redisConnectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
//添加redis消息队列监听监听im-topic消息主题的消息使用messageListenerAdapter()中设置的类和方法处理消息
redisMessageListenerContainer.addMessageListener(messageListenerAdapter(), new PatternTopic(RedisConstant.SYS_USER_TOPIC));
//同上一样
redisMessageListenerContainer.addMessageListener(messageAllListenerAdapter(), new PatternTopic(RedisConstant.SYS_TOPIC));
return redisMessageListenerContainer;
}
/**
* 添加订阅消息处理类通过反射获取处理类中的处理方法
* 即使用RedisReceiver类中的sendMsg方法处理消息
* @return
*/
@Bean
public MessageListenerAdapter messageListenerAdapter() {
return new MessageListenerAdapter(redisReceiver, "sendMsg");
}
@Bean
public MessageListenerAdapter messageAllListenerAdapter(){
return new MessageListenerAdapter(redisReceiver, "sendAllMsg");
}
}

View File

@ -0,0 +1,35 @@
package com.ruoyi.framework.websocket.redisWebsocket;
import com.alibaba.fastjson2.JSONObject;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 处理订阅redis的消息
* @author
*/
@Component
public class RedisReceiver {
@Resource
WebsocketEndpoint websocketEndpoint;
/**
* 处理一对一消息
* @param message 消息队列中的消息
*/
public void sendMsg(String message) {
SendMsg msg = JSONObject.parseObject(message, SendMsg.class);
websocketEndpoint.sendMessageById(msg.getProjectId(),msg.getUserId(),msg.getMsg());
}
/**
* 处理广播消息
* @param message
*/
public void sendAllMsg(String message){
SendMsgAll msg = JSONObject.parseObject(message, SendMsgAll.class);
websocketEndpoint.batchSendMessage(msg.getProjectId(),msg.getMsg());
}
}

View File

@ -0,0 +1,20 @@
package com.ruoyi.framework.websocket.redisWebsocket;
/**
* 按用户推送
* @author
*/
public class SendMsg extends SendMsgAll{
/**
* 用户ID
*/
private String userId;
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
}

View File

@ -0,0 +1,46 @@
package com.ruoyi.framework.websocket.redisWebsocket;
/**
* 推送全部
*/
public class SendMsgAll {
/**
* websocket业务数据(json)
*/
private String msg;
/**
* 业务模块类型
*/
private String type;
/**
* 项目ID
*/
private String projectId;
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getProjectId() {
return projectId;
}
public void setProjectId(String projectId) {
this.projectId = projectId;
}
}

View File

@ -0,0 +1,37 @@
package com.ruoyi.framework.websocket.redisWebsocket;
import javax.websocket.Session;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author
*/
public class WebSocketBean {
/**
* 连接session对象
*/
private Session session;
/**
* 连接错误次数
*/
private AtomicInteger erroerLinkCount = new AtomicInteger(0);
public int getErroerLinkCount() {
// 线程安全,以原子方式将当前值加1注意这里返回的是自增前的值
return erroerLinkCount.getAndIncrement();
}
public void cleanErrorNum() {
// 清空计数
erroerLinkCount = new AtomicInteger(0);
}
public Session getSession() {
return session;
}
public void setSession(Session session) {
this.session = session;
}
}

View File

@ -0,0 +1,35 @@
package com.ruoyi.framework.websocket.redisWebsocket;
import com.alibaba.fastjson2.JSON;
import com.ruoyi.common.constant.RedisConstant;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
/**
* @author
*/
@Service
public class WebSocketServerImpl implements WebsocketService {
@Autowired
RedisTemplate<String,String> redisTemplate;
@Override
public void sendMessageAll(String projectId ,String message) {
SendMsgAll sendMsgAll = new SendMsgAll();
sendMsgAll.setProjectId(projectId);
sendMsgAll.setMsg(message);
redisTemplate.convertAndSend(RedisConstant.SYS_TOPIC, JSON.toJSONString(sendMsgAll));
}
@Override
public void sendMessageById(String projectId,String userId, String message) {
SendMsg sendMsg = new SendMsg();
sendMsg.setProjectId(projectId);
sendMsg.setUserId(userId);
sendMsg.setMsg(message);
redisTemplate.convertAndSend(RedisConstant.SYS_USER_TOPIC,JSON.toJSONString(sendMsg));
}
}

View File

@ -0,0 +1,25 @@
package com.ruoyi.framework.websocket.redisWebsocket;
/**
* 给客户端发送消息
* @author
*/
public interface WebsocketEndpoint {
/**
* 向所有在线用户群发消息
* @param projectId 项目ID
* @param message 发送给客户端的消息
*/
void batchSendMessage(String projectId,String message);
/**
* 发送给对应的用户
* @param userId 用户的ID
* @param projectId 项目ID
* @param message 发送的消息
*/
void sendMessageById(String projectId,String userId, String message);
}

View File

@ -0,0 +1,154 @@
package com.ruoyi.framework.websocket.redisWebsocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author
*/
@Service
@ServerEndpoint(value = "/websocket/message/{projectId}/{userId}")
@Component
public class WebsocketEndpointImpl implements WebsocketEndpoint {
private static Logger log = LoggerFactory.getLogger(WebsocketEndpointImpl.class);
/**
* 错误最大重试次数
*/
private static final int MAX_ERROR_NUM = 3;
/**
* 用来存放每个客户端对应的webSocket对象
*/
private static Map<String, Map<String, WebSocketBean>> webSocketInfo;
static {
// concurrent包的线程安全map
webSocketInfo = new ConcurrentHashMap<String, Map<String, WebSocketBean>>();
}
@OnOpen
public void onOpen(Session session, EndpointConfig config, @PathParam("userId") String userId,@PathParam("projectId") String projectId) {
WebSocketBean bean = new WebSocketBean();
bean.setSession(session);
Map<String,WebSocketBean> concurrentHashMap = new ConcurrentHashMap();
concurrentHashMap.put(userId,bean);
webSocketInfo.put(projectId, concurrentHashMap);
log.info("ws项目:"+projectId+",客户端连接服务器userId :" + userId + "当前连接数:" + countUser(projectId));
}
@OnClose
public void onClose(Session session, @PathParam("userId") String userId,@PathParam("projectId") String projectId) {
// 客户端断开连接移除websocket对象
Map<String,WebSocketBean> concurrentHashMap = webSocketInfo.get(projectId);
if(concurrentHashMap != null){concurrentHashMap.remove(userId);}
log.info("ws项目:"+projectId+",客户端断开连接,当前连接数:" + countUser(projectId));
}
@OnMessage
public void onMessage(Session session, String message, @PathParam("userId") String userId,@PathParam("projectId") String projectId) {
log.info("ws项目:"+projectId+",客户端 userId: " + userId + ",消息:" + message);
}
@OnError
public void onError(Session session, Throwable throwable) {
// log.error("ws发生错误" + throwable.getMessage(), throwable);
}
public void sendMessage(Session session, String message, String projectId, String userId) {
log.info("ws项目:"+projectId+",连接数:"+countUser(projectId)+",发送消息 " + session);
try {
// 发送消息
synchronized (session) {
if (session.isOpen()) {
session.getBasicRemote().sendText(message);
}
}
// 清空错误计数
this.cleanErrorNum(projectId, userId);
} catch (Exception e) {
log.error("ws项目:"+projectId+",用户:"+userId+",发送消息失败" + e.getMessage(), e);
int errorNum = this.getErroerLinkCount(projectId, userId);
// 小于最大重试次数重发
if (errorNum <= MAX_ERROR_NUM) {
sendMessage(session, message, projectId, userId);
} else {
log.error("ws发送消息失败超过最大次数");
// 清空错误计数
this.cleanErrorNum(projectId, userId);
}
}
}
@Override
public void batchSendMessage(String projectId,String message) {
Map<String,WebSocketBean> concurrentHashMap = webSocketInfo.get(projectId);
if(concurrentHashMap != null){
Set<Map.Entry<String, WebSocketBean>> set = concurrentHashMap.entrySet();
for(Map.Entry<String, WebSocketBean> map: set ){
sendMessage(map.getValue().getSession(), message,projectId, map.getKey());
}
}
}
@Override
public void sendMessageById(String projectId,String userId, String message) {
Map<String,WebSocketBean> concurrentHashMap = webSocketInfo.get(projectId);
if(concurrentHashMap != null){
WebSocketBean webSocketBean = concurrentHashMap.get(userId);
if (webSocketBean != null) {
sendMessage(webSocketBean.getSession(), message, projectId,userId);
}
}
}
/**
* 清空错误计数
*/
private void cleanErrorNum(String projectId, String userId){
Map<String,WebSocketBean> concurrentHashMap = webSocketInfo.get(projectId);
if(concurrentHashMap != null){
WebSocketBean webSocketBean = concurrentHashMap.get(userId);
if (webSocketBean != null) {
webSocketBean.cleanErrorNum();
}
}
}
/**
* 获取错误计数
*/
private int getErroerLinkCount(String projectId, String userId){
int errorNum = 0;
Map<String,WebSocketBean> concurrentHashMap = webSocketInfo.get(projectId);
if(concurrentHashMap != null){
WebSocketBean webSocketBean = concurrentHashMap.get(userId);
if (webSocketBean != null) {
errorNum = webSocketBean.getErroerLinkCount();
}
}
return errorNum;
}
private Integer countUser (String projectId){
int size = 0;
Map<String,WebSocketBean> concurrentHashMap = webSocketInfo.get(projectId);
if(concurrentHashMap != null) {
size = concurrentHashMap.size();
}
return size;
}
}

View File

@ -0,0 +1,24 @@
package com.ruoyi.framework.websocket.redisWebsocket;
/**
* 往Redis中存入消息
* @author
*/
public interface WebsocketService {
/**
* 向所有在线用户群发消息
* @param message 发送给客户端的消息
*/
void sendMessageAll(String projectId,String message);
/**
* 发送给对应的用户
* @param userId 用户的ID
* @param message 发送的消息
*/
void sendMessageById(String projectId,String userId, String message);
}

View File

@ -233,12 +233,14 @@
const webSocketApiUrl = 'ws://' + host + '/' + websocket_pattern; const webSocketApiUrl = 'ws://' + host + '/' + websocket_pattern;
// WebSocket, // WebSocket,
// /websocket/template-push/ websocket // /websocket/template-push/ websocket
let url = webSocketApiUrl + '/websocket/message/'+userName; let url = webSocketApiUrl + '/websocket/message/ruoyi/'+userName;
this.websock = new WebSocket(url) this.websock = new WebSocket(url)
this.websock.onopen = this.websocketOnopen this.websock.onopen = this.websocketOnopen
this.websock.onerror = this.websocketOnerror this.websock.onerror = this.websocketOnerror
this.websock.onmessage = this.websocketOnmessage this.websock.onmessage = this.websocketOnmessage
this.websock.onclose = this.websocketOnclose this.websock.onclose = this.websocketOnclose
console.log('WebSocket 连接地址:'+host)
}, },
websocketOnopen: function() { websocketOnopen: function() {
console.log('WebSocket连接成功') console.log('WebSocket连接成功')
@ -259,7 +261,7 @@
this.$notify({ this.$notify({
title: '消息', title: '消息',
type: 'warning', type: 'warning',
duration: 2000, duration: 5000,
dangerouslyUseHTMLString: true, dangerouslyUseHTMLString: true,
message: JSON.parse(e.data).noticeTitle message: JSON.parse(e.data).noticeTitle
}); });