其实使用消息队列也可以实现会话,直接前端监听指定的队列,使用rabbitmq的分组还可以实现不同群聊的效果。
4.0.0 org.example WebStock 1.0-SNAPSHOT org.springframework.boot spring-boot-starter-parent 2.7.8 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-websocket org.projectlombok lombok 8 8 UTF-8 SpringBoot都已集成完毕了,不用使用原生的WebStock。
如果是工作中,要单独起一个服务来操作这个比较好,反正是微服务,多一个少一个没啥的
WebStock连接配置、
package com.quxiao.config; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.socket.BinaryMessage; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.AbstractWebSocketHandler; import java.time.LocalDateTime; /** * ws消息处理类 */ @Component @Slf4j public class MyWsHandler extends AbstractWebSocketHandler { @Autowired WsService wsService; @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { log.info("建立ws连接"); WsSessionManager.add(session.getId(), session); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { log.info("发送文本消息"); // 获得客户端传来的消息 String payload = message.getPayload(); //这里也是一样,要取出前端传来的参数,判断发给谁.这里我就是发给了连接客户端自己. log.info("server 接收到消息 " + payload); wsService.sendMsg(session, "server 发送给的消息 " + payload + ",发送时间:" + LocalDateTime.now().toString()); String url = session.getUri().toString(); //使用?拼接参数,后端取出判断发给谁 System.out.println("获取到的参数:" + url.substring(url.indexOf('?') + 1)); } @Override protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception { log.info("发送二进制消息"); } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { log.error("异常处理"); WsSessionManager.removeAndClose(session.getId()); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { log.info("关闭ws连接"); WsSessionManager.removeAndClose(session.getId()); } }
package com.quxiao.config; import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.WebSocketSession; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; @Slf4j public class WsSessionManager { /** * 保存连接 session 的地方 */ public static ConcurrentHashMap
SESSION_POOL = new ConcurrentHashMap<>(); /** * 添加 session * * @param key */ public static void add(String key, WebSocketSession session) { // 添加 session SESSION_POOL.put(key, session); } /** * 删除 session,会返回删除的 session * * @param key * @return */ public static WebSocketSession remove(String key) { // 删除 session return SESSION_POOL.remove(key); } /** * 删除并同步关闭连接 * * @param key */ public static void removeAndClose(String key) { WebSocketSession session = remove(key); if (session != null) { try { // 关闭连接 session.close(); } catch (IOException e) { // todo: 关闭出现异常处理 e.printStackTrace(); } } } /** * 获得 session * * @param key * @return */ public static WebSocketSession get(String key) { // 获得 session return SESSION_POOL.get(key); } } package com.quxiao.config; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; @Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Autowired private MyWsHandler myWsHandler; @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry .addHandler(myWsHandler, "myWs") //允许跨域 .setAllowedOrigins("*"); } }
这里有几个后续逻辑:
(1)、连接时,通过前端的参数或者对其按登陆人信息绑定连接消息。
(2)、收到一个消息,发送给谁就得需要前端传来参数:例如某个群的id,然后通过群绑定人员,因为(1)中通过
人员id为key,value时连接信息。 直接遍历这个群下面的所有人员id,获取已经连接的信息,发送给他们。这里还要搞一个表存储群消息log。这样其他群员连接时,可以获取到以往的消息。
发送消息工具类
package com.quxiao.config; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import java.io.IOException; /** * ws操作相关服务 */ @Service @Slf4j public class WsServiceUtil { /** * 发送消息 * @param session * @param text * @return * @throws IOException */ public void sendMsg(WebSocketSession session, String text) throws IOException { session.sendMessage(new TextMessage(text)); } /** * 广播消息 * @param text * @return * @throws IOException */ public void broadcastMsg(String text) throws IOException { for (WebSocketSession session: WsSessionManager.SESSION_POOL.values()) { session.sendMessage(new TextMessage(text)); } } }
这里广播我就是遍历了储存连接消息的map容器。
package com.quxiao.controller; import com.quxiao.config.WsServiceUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import org.springframework.web.multipart.MultipartFile; import javax.servlet.http.HttpServletRequest; @RestController @RequestMapping("/put") public class MessageHandler { @Autowired WsServiceUtil wsServiceUtil; @PostMapping("/t1/{test}") public void t1(@PathVariable("test") String text) { try { wsServiceUtil.broadcastMsg(Thread.currentThread().getName() + ": " + text); } catch (Exception e) { throw new RuntimeException(e); } } }
My WebSocket
所以最重要的是这个消息发给谁
后端要做的就是需要根据不同的群、人员标识符去发送消息,
前端需要做的就是
传入不同的标识符,如果是私聊,就得传人员id,如果是群聊,就需要传入群id。