引入依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
|
WebSocket 配置类
注册 WebSocket 扫描类到容器中(注册该对象主要用于扫描带有@ServerEndpoint 的类,如果使用外置的 tomcat 就不需要该对象)
1 2 3 4 5 6 7 8
| @Configuration public class WebSocketStompConfig { @Bean public ServerEndpointExporter serverEndpointExporter(){ return new ServerEndpointExporter(); } }
|
编写 WebSocket 服务类(类似 Controller,接受处理前端的请求)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| @ServerEndpoint("/websocket") @Component public class WebSocketServer { private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
@OnOpen public void onOpen(Session session,@PathParam("userId") String userId) { System.out.println(userId+"连接服务器"); }
@OnClose public void onClose(Session session) { System.out.println(session.getId()+"断开连接"); }
@OnMessage public void onMessage(String message, Session session) { try { JSONObject jsonObject = JSON.parseObject(message); String id=jsonObject.getString("toUserId"); Session toSession=sessionMap.get(id); if (toSession!=null){ toSession.getBasicRemote().sendText(message); }else { session.getBasicRemote().sendText("该用户不在线"); }
} catch (IOException e) { e.printStackTrace(); } }
@OnError public void onError(Session session, Throwable error) { log.error("用户错误:"+this.userId+",原因:"+error.getMessage()); error.printStackTrace(); }
}
|
总结:
1、添加依赖
2、将扫描类注册到容器中
3、写 WebSocket 的服务类(类似 Controller)(这里是重点)
一、类上用到注解
1、@ServerEndpoint
作用范围:类
功能:使用了该注解,就代表类成为了 WebSocket 服务类,相当于 Controller
参数:value //该属性值写的就是映射路径
2、@Component
WebSocket 服务类需要添加到容器中
二、方法绑定事件使用的注解
1、@OnOpen
作用范围:方法上
功能:方法上添加该注解后,每次有客户端连接当前服务器,就会执行该方法
2、@OnClose
作用范围:方法上
功能:方法上添加该注解后,每次有客户端断开与当前服务类的连接时,就会执行该方法
3、@OnMessage
作用范围:方法上
功能:方法上添加该注解后,每次有客户端发送消息给当前服务类时,就会执行该方法
4、@OnError·
作用范围:方法上
功能:方法上添加该注解后,每次有客户端断开与当前服务类的连接出错时,就会执行该方法
三、使用到的对象
Session
作用:每个客户端连接服务类,都会产生一个 Session 对象,客户端与服务器之间进行通信就是用该对象
四、参数绑定(和 Springmvc 的参数绑定差不多)
1、有什么用?
可以绑定 Session 对象,这样就可以知道当前事件是哪个客户端触发的
例如:
用户 1 发送消息到服务端
用户 1 断开与服务器的连接
2、什么时候可以用
当方法上使用类绑定事件注解,就可以使用参数绑定
3、可以绑定的参数的类型:
1、Session 每个客户端连接服务器特有的对象
2、String 在@OnMessage 方法里绑定,可以得到客户端发来的消息
五、服务器推送消息与断开连接
1、推送消息
session.getBasicRemote().sendText(“内容”);
2、断开连接
session.close();
注意:Session 对象是哪个客户端的,那么消息就是消息就是推送给谁的;关闭连接也是
聊天室实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
| package com.dzgu.myblog.service.impl;
import cn.hutool.core.date.DateUtil; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.dzgu.myblog.dto.ChatRecordDTO; import com.dzgu.myblog.dto.RecallMessageDTO; import com.dzgu.myblog.dto.WebsocketMessageDTO; import com.dzgu.myblog.entity.ChatRecord; import com.dzgu.myblog.enums.FilePathEnum; import com.dzgu.myblog.mapper.ChatRecordMapper; import com.dzgu.myblog.strategy.context.UploadStrategyContext; import com.dzgu.myblog.utils.BeanCopyUtils; import com.dzgu.myblog.utils.HTMLUtils; import com.dzgu.myblog.utils.IpUtils; import com.dzgu.myblog.vo.VoiceVO; import lombok.extern.slf4j.Slf4j; import org.omg.CORBA.PUBLIC_MEMBER; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service;
import javax.websocket.*; import javax.websocket.server.HandshakeRequest; import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpointConfig; import java.io.IOException; import java.util.Date; import java.util.List; import java.util.Objects; import java.util.concurrent.CopyOnWriteArraySet;
import static com.dzgu.myblog.enums.ChatTypeEnum.*; import static com.dzgu.myblog.enums.FilePathEnum.VOICE;
@Service @ServerEndpoint(value = "/websocket",configurator = WebSocketServiceImpl.ChatConfigurator.class) @Slf4j public class WebSocketServiceImpl {
private Session session; private static ChatRecordMapper chatRecordMapper; private static UploadStrategyContext uploadStrategyContext;
private static CopyOnWriteArraySet<WebSocketServiceImpl> webSocketSet = new CopyOnWriteArraySet<>(); @Autowired public void setChatRecordMapper(ChatRecordMapper chatRecordMapper) { WebSocketServiceImpl.chatRecordMapper = chatRecordMapper; } @Autowired public void setUploadStrategyContext(UploadStrategyContext uploadStrategyContext) { WebSocketServiceImpl.uploadStrategyContext = uploadStrategyContext; }
public static class ChatConfigurator extends ServerEndpointConfig.Configurator {
public static String HEADER_NAME = "X-Real-IP";
@Override public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) { try { String firstFoundHeader = request.getHeaders().get(HEADER_NAME.toLowerCase()).get(0); sec.getUserProperties().put(HEADER_NAME, firstFoundHeader); } catch (Exception e) { sec.getUserProperties().put(HEADER_NAME, "未知ip"); } } }
@OnOpen public void onOpen(Session session, EndpointConfig endpointConfig) throws IOException { this.session = session; webSocketSet.add(this); updateOnlineCount(); ChatRecordDTO chatRecordDTO = listChartRecords(endpointConfig); WebsocketMessageDTO messageDTO = WebsocketMessageDTO.builder() .type(HISTORY_RECORD.getType()) .data(chatRecordDTO) .build(); synchronized (session) { session.getBasicRemote().sendText(JSON.toJSONString(messageDTO)); } }
@OnMessage public void onMessage(String message, Session session) throws IOException { WebsocketMessageDTO messageDTO = JSON.parseObject(message, WebsocketMessageDTO.class); switch (Objects.requireNonNull(getChatType(messageDTO.getType()))) { case SEND_MESSAGE: ChatRecord chatRecord = JSON.parseObject(JSON.toJSONString(messageDTO.getData()), ChatRecord.class); chatRecord.setContent(HTMLUtils.deleteTag(chatRecord.getContent())); chatRecordMapper.insert(chatRecord); messageDTO.setData(chatRecord); broadcastMessage(messageDTO); break; case RECALL_MESSAGE: RecallMessageDTO recallMessage = JSON.parseObject(JSON.toJSONString(messageDTO.getData()), RecallMessageDTO.class); chatRecordMapper.deleteById(recallMessage.getId()); broadcastMessage(messageDTO); break; case HEART_BEAT: messageDTO.setData("pong"); session.getBasicRemote().sendText(JSON.toJSONString(JSON.toJSONString(messageDTO))); default: break; } }
@OnClose public void onClose() throws IOException { webSocketSet.remove(this); updateOnlineCount(); }
private ChatRecordDTO listChartRecords(EndpointConfig endpointConfig) { List<ChatRecord> chatRecordList = chatRecordMapper.selectList(new LambdaQueryWrapper<ChatRecord>() .ge(ChatRecord::getCreateTime, DateUtil.offsetHour(new Date(), -12))); String ipAddress = endpointConfig.getUserProperties().get(ChatConfigurator.HEADER_NAME).toString(); return ChatRecordDTO.builder() .chatRecordList(chatRecordList) .ipAddress(ipAddress) .ipSource(IpUtils.getIpSource(ipAddress)) .build();
}
@Async public void updateOnlineCount() throws IOException { WebsocketMessageDTO messageDTO = WebsocketMessageDTO.builder() .type(ONLINE_COUNT.getType()) .data(webSocketSet.size()) .build(); broadcastMessage(messageDTO); }
public void sendVoice(VoiceVO voiceVO) { String content = uploadStrategyContext.executeUploadStrategy(voiceVO.getFile(), FilePathEnum.VOICE.getPath()); voiceVO.setContent(content); ChatRecord chatRecord = BeanCopyUtils.copyObject(voiceVO, ChatRecord.class); chatRecordMapper.insert(chatRecord); WebsocketMessageDTO messageDTO = WebsocketMessageDTO.builder() .type(VOICE_MESSAGE.getType()) .data(chatRecord) .build(); try { broadcastMessage(messageDTO); } catch (IOException e) { e.printStackTrace(); } }
private void broadcastMessage(WebsocketMessageDTO messageDTO) throws IOException { for (WebSocketServiceImpl webSocketService : webSocketSet) { webSocketService.session.getBasicRemote().sendText(JSON.toJSONString(messageDTO)); } } }
|