引入依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
|
WebSocket 配置类
注册 WebSocket 扫描类到容器中(注册该对象主要用于扫描带有@ServerEndpoint 的类,如果使用外置的 tomcat 就不需要该对象)
1 2 3 4 5 6 7 8
| @Configuration public class WebSocketStompConfig { @Bean public ServerEndpointExporter serverEndpointExporter(){ return new ServerEndpointExporter(); } }
|
编写 WebSocket 服务类(类似 Controller,接受处理前端的请求)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| @ServerEndpoint("/websocket") @Component public class WebSocketServer { private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
@OnOpen public void onOpen(Session session,@PathParam("userId") String userId) { System.out.println(userId+"连接服务器"); }
@OnClose public void onClose(Session session) { System.out.println(session.getId()+"断开连接"); }
@OnMessage public void onMessage(String message, Session session) { try { JSONObject jsonObject = JSON.parseObject(message); String id=jsonObject.getString("toUserId"); Session toSession=sessionMap.get(id); if (toSession!=null){ toSession.getBasicRemote().sendText(message); }else { session.getBasicRemote().sendText("该用户不在线"); }
} catch (IOException e) { e.printStackTrace(); } }
@OnError public void onError(Session session, Throwable error) { log.error("用户错误:"+this.userId+",原因:"+error.getMessage()); error.printStackTrace(); }
}
|
总结:
1、添加依赖
2、将扫描类注册到容器中
3、写 WebSocket 的服务类(类似 Controller)(这里是重点)
一、类上用到注解
1、@ServerEndpoint
作用范围:类
功能:使用了该注解,就代表类成为了 WebSocket 服务类,相当于 Controller
参数:value //该属性值写的就是映射路径
2、@Component
WebSocket 服务类需要添加到容器中
二、方法绑定事件使用的注解
1、@OnOpen
作用范围:方法上
功能:方法上添加该注解后,每次有客户端连接当前服务器,就会执行该方法
2、@OnClose
作用范围:方法上
功能:方法上添加该注解后,每次有客户端断开与当前服务类的连接时,就会执行该方法
3、@OnMessage
作用范围:方法上
功能:方法上添加该注解后,每次有客户端发送消息给当前服务类时,就会执行该方法
4、@OnError·
作用范围:方法上
功能:方法上添加该注解后,每次有客户端断开与当前服务类的连接出错时,就会执行该方法
三、使用到的对象
Session
作用:每个客户端连接服务类,都会产生一个 Session 对象,客户端与服务器之间进行通信就是用该对象
四、参数绑定(和 Springmvc 的参数绑定差不多)
1、有什么用?
可以绑定 Session 对象,这样就可以知道当前事件是哪个客户端触发的
例如:
用户 1 发送消息到服务端
用户 1 断开与服务器的连接
2、什么时候可以用
当方法上使用类绑定事件注解,就可以使用参数绑定
3、可以绑定的参数的类型:
1、Session 每个客户端连接服务器特有的对象
2、String 在@OnMessage 方法里绑定,可以得到客户端发来的消息
五、服务器推送消息与断开连接
1、推送消息
session.getBasicRemote().sendText(“内容”);
2、断开连接
session.close();
注意:Session 对象是哪个客户端的,那么消息就是消息就是推送给谁的;关闭连接也是
聊天室实现

| package com.dzgu.myblog.service.impl;
import cn.hutool.core.date.DateUtil; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.dzgu.myblog.dto.ChatRecordDTO; import com.dzgu.myblog.dto.RecallMessageDTO; import com.dzgu.myblog.dto.WebsocketMessageDTO; import com.dzgu.myblog.entity.ChatRecord; import com.dzgu.myblog.enums.FilePathEnum; import com.dzgu.myblog.mapper.ChatRecordMapper; import com.dzgu.myblog.strategy.context.UploadStrategyContext; import com.dzgu.myblog.utils.BeanCopyUtils; import com.dzgu.myblog.utils.HTMLUtils; import com.dzgu.myblog.utils.IpUtils; import com.dzgu.myblog.vo.VoiceVO; import lombok.extern.slf4j.Slf4j; import org.omg.CORBA.PUBLIC_MEMBER; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service;
import javax.websocket.*; import javax.websocket.server.HandshakeRequest; import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpointConfig; import java.io.IOException; import java.util.Date; import java.util.List; import java.util.Objects; import java.util.concurrent.CopyOnWriteArraySet;
import static com.dzgu.myblog.enums.ChatTypeEnum.*; import static com.dzgu.myblog.enums.FilePathEnum.VOICE;
@Service @ServerEndpoint(value = "/websocket",configurator = WebSocketServiceImpl.ChatConfigurator.class) @Slf4j public class WebSocketServiceImpl {
private Session session; private static ChatRecordMapper chatRecordMapper; private static UploadStrategyContext uploadStrategyContext;
private static CopyOnWriteArraySet<WebSocketServiceImpl> webSocketSet = new CopyOnWriteArraySet<>(); @Autowired public void setChatRecordMapper(ChatRecordMapper chatRecordMapper) { WebSocketServiceImpl.chatRecordMapper = chatRecordMapper; } @Autowired public void setUploadStrategyContext(UploadStrategyContext uploadStrategyContext) { WebSocketServiceImpl.uploadStrategyContext = uploadStrategyContext; }
public static class ChatConfigurator extends ServerEndpointConfig.Configurator {
public static String HEADER_NAME = "X-Real-IP";
@Override public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) { try { String firstFoundHeader = request.getHeaders().get(HEADER_NAME.toLowerCase()).get(0); sec.getUserProperties().put(HEADER_NAME, firstFoundHeader); } catch (Exception e) { sec.getUserProperties().put(HEADER_NAME, "未知ip"); } } }
@OnOpen public void onOpen(Session session, EndpointConfig endpointConfig) throws IOException { this.session = session; webSocketSet.add(this); updateOnlineCount(); ChatRecordDTO chatRecordDTO = listChartRecords(endpointConfig); WebsocketMessageDTO messageDTO = WebsocketMessageDTO.builder() .type(HISTORY_RECORD.getType()) .data(chatRecordDTO) .build(); synchronized (session) { session.getBasicRemote().sendText(JSON.toJSONString(messageDTO)); } }
@OnMessage public void onMessage(String message, Session session) throws IOException { WebsocketMessageDTO messageDTO = JSON.parseObject(message, WebsocketMessageDTO.class); switch (Objects.requireNonNull(getChatType(messageDTO.getType()))) { case SEND_MESSAGE: ChatRecord chatRecord = JSON.parseObject(JSON.toJSONString(messageDTO.getData()), ChatRecord.class); chatRecord.setContent(HTMLUtils.deleteTag(chatRecord.getContent())); chatRecordMapper.insert(chatRecord); messageDTO.setData(chatRecord); broadcastMessage(messageDTO); break; case RECALL_MESSAGE: RecallMessageDTO recallMessage = JSON.parseObject(JSON.toJSONString(messageDTO.getData()), RecallMessageDTO.class); chatRecordMapper.deleteById(recallMessage.getId()); broadcastMessage(messageDTO); break; case HEART_BEAT: messageDTO.setData("pong"); session.getBasicRemote().sendText(JSON.toJSONString(JSON.toJSONString(messageDTO))); default: break; } }
@OnClose public void onClose() throws IOException { webSocketSet.remove(this); updateOnlineCount(); }
private ChatRecordDTO listChartRecords(EndpointConfig endpointConfig) { List<ChatRecord> chatRecordList = chatRecordMapper.selectList(new LambdaQueryWrapper<ChatRecord>() .ge(ChatRecord::getCreateTime, DateUtil.offsetHour(new Date(), -12))); String ipAddress = endpointConfig.getUserProperties().get(ChatConfigurator.HEADER_NAME).toString(); return ChatRecordDTO.builder() .chatRecordList(chatRecordList) .ipAddress(ipAddress) .ipSource(IpUtils.getIpSource(ipAddress)) .build();
}
@Async public void updateOnlineCount() throws IOException { WebsocketMessageDTO messageDTO = WebsocketMessageDTO.builder() .type(ONLINE_COUNT.getType()) .data(webSocketSet.size()) .build(); broadcastMessage(messageDTO); }
public void sendVoice(VoiceVO voiceVO) { String content = uploadStrategyContext.executeUploadStrategy(voiceVO.getFile(), FilePathEnum.VOICE.getPath()); voiceVO.setContent(content); ChatRecord chatRecord = BeanCopyUtils.copyObject(voiceVO, ChatRecord.class); chatRecordMapper.insert(chatRecord); WebsocketMessageDTO messageDTO = WebsocketMessageDTO.builder() .type(VOICE_MESSAGE.getType()) .data(chatRecord) .build(); try { broadcastMessage(messageDTO); } catch (IOException e) { e.printStackTrace(); } }
private void broadcastMessage(WebsocketMessageDTO messageDTO) throws IOException { for (WebSocketServiceImpl webSocketService : webSocketSet) { webSocketService.session.getBasicRemote().sendText(JSON.toJSONString(messageDTO)); } } }
|