image.png

引入依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

WebSocket 配置类

注册 WebSocket 扫描类到容器中(注册该对象主要用于扫描带有@ServerEndpoint 的类,如果使用外置的 tomcat 就不需要该对象)

1
2
3
4
5
6
7
8
@Configuration
public class WebSocketStompConfig {
//这个bean的注册,用于扫描带有@ServerEndpoint的注解成为websocket ,如果你使用外置的tomcat就不需要该配置文件
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}

编写 WebSocket 服务类(类似 Controller,接受处理前端的请求)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@ServerEndpoint("/websocket") //映射路径,类时配置Controller的映射路径
@Component //添加到容器
public class WebSocketServer {
private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>(); //线程安全的Map集合

/**
* 连接建立成功调用的方法*/
@OnOpen
public void onOpen(Session session,@PathParam("userId") String userId) {//@PathParam是拿到请求url中带来的参数
System.out.println(userId+"连接服务器");
}

/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(Session session) {
System.out.println(session.getId()+"断开连接");
}

/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息*/
@OnMessage
public void onMessage(String message, Session session) {
try {
JSONObject jsonObject = JSON.parseObject(message);
String id=jsonObject.getString("toUserId");
Session toSession=sessionMap.get(id);
if (toSession!=null){
toSession.getBasicRemote().sendText(message);//推送消息给客户端
}else {
session.getBasicRemote().sendText("该用户不在线");//推送消息给客户端
}

} catch (IOException e) {
e.printStackTrace();
}
}

/**
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误:"+this.userId+",原因:"+error.getMessage());
error.printStackTrace();
}

}

总结:
1、添加依赖
2、将扫描类注册到容器中
3、写 WebSocket 的服务类(类似 Controller)(这里是重点)

一、类上用到注解

1、@ServerEndpoint
作用范围:类
功能:使用了该注解,就代表类成为了 WebSocket 服务类,相当于 Controller
参数:value //该属性值写的就是映射路径

2、@Component
WebSocket 服务类需要添加到容器中

二、方法绑定事件使用的注解

1、@OnOpen
作用范围:方法上
功能:方法上添加该注解后,每次有客户端连接当前服务器,就会执行该方法

2、@OnClose
作用范围:方法上
功能:方法上添加该注解后,每次有客户端断开与当前服务类的连接时,就会执行该方法

3、@OnMessage
作用范围:方法上
功能:方法上添加该注解后,每次有客户端发送消息给当前服务类时,就会执行该方法

4、@OnError·
作用范围:方法上
功能:方法上添加该注解后,每次有客户端断开与当前服务类的连接出错时,就会执行该方法

三、使用到的对象

Session
作用:每个客户端连接服务类,都会产生一个 Session 对象,客户端与服务器之间进行通信就是用该对象

四、参数绑定(和 Springmvc 的参数绑定差不多)

1、有什么用?
可以绑定 Session 对象,这样就可以知道当前事件是哪个客户端触发的
例如:
用户 1 发送消息到服务端
用户 1 断开与服务器的连接

2、什么时候可以用
当方法上使用类绑定事件注解,就可以使用参数绑定

3、可以绑定的参数的类型:
1、Session 每个客户端连接服务器特有的对象
2、String 在@OnMessage 方法里绑定,可以得到客户端发来的消息

五、服务器推送消息与断开连接

1、推送消息
session.getBasicRemote().sendText(“内容”);

2、断开连接
session.close();

注意:Session 对象是哪个客户端的,那么消息就是消息就是推送给谁的;关闭连接也是

聊天室实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
package com.dzgu.myblog.service.impl;

import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.dzgu.myblog.dto.ChatRecordDTO;
import com.dzgu.myblog.dto.RecallMessageDTO;
import com.dzgu.myblog.dto.WebsocketMessageDTO;
import com.dzgu.myblog.entity.ChatRecord;
import com.dzgu.myblog.enums.FilePathEnum;
import com.dzgu.myblog.mapper.ChatRecordMapper;
import com.dzgu.myblog.strategy.context.UploadStrategyContext;
import com.dzgu.myblog.utils.BeanCopyUtils;
import com.dzgu.myblog.utils.HTMLUtils;
import com.dzgu.myblog.utils.IpUtils;
import com.dzgu.myblog.vo.VoiceVO;
import lombok.extern.slf4j.Slf4j;
import org.omg.CORBA.PUBLIC_MEMBER;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import javax.websocket.*;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpoint;
import javax.websocket.server.ServerEndpointConfig;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArraySet;

import static com.dzgu.myblog.enums.ChatTypeEnum.*;
import static com.dzgu.myblog.enums.FilePathEnum.VOICE;

/**
* @description: websocket服务
* @Author: dzgu
* @Date: 2021/11/14 13:23
*/
@Service
@ServerEndpoint(value = "/websocket",configurator = WebSocketServiceImpl.ChatConfigurator.class)
@Slf4j
public class WebSocketServiceImpl {
/**
* 用户session
*/
private Session session;
private static ChatRecordMapper chatRecordMapper;
private static UploadStrategyContext uploadStrategyContext;
/**
* 用户session集合
*/
private static CopyOnWriteArraySet<WebSocketServiceImpl> webSocketSet = new CopyOnWriteArraySet<>();
@Autowired
public void setChatRecordMapper(ChatRecordMapper chatRecordMapper) {
WebSocketServiceImpl.chatRecordMapper = chatRecordMapper;
}
@Autowired
public void setUploadStrategyContext(UploadStrategyContext uploadStrategyContext) {
WebSocketServiceImpl.uploadStrategyContext = uploadStrategyContext;
}

/**
* 获取客户端真实ip
*/
public static class ChatConfigurator extends ServerEndpointConfig.Configurator {

public static String HEADER_NAME = "X-Real-IP";

@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
try {
String firstFoundHeader = request.getHeaders().get(HEADER_NAME.toLowerCase()).get(0);
sec.getUserProperties().put(HEADER_NAME, firstFoundHeader);
} catch (Exception e) {
sec.getUserProperties().put(HEADER_NAME, "未知ip");
}
}
}

/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, EndpointConfig endpointConfig) throws IOException {
// 加入连接
this.session = session;
webSocketSet.add(this);
// 异步更新所有用户显示的在线人数
updateOnlineCount();
// 加载历史聊天记录
ChatRecordDTO chatRecordDTO = listChartRecords(endpointConfig);
// 发送消息
WebsocketMessageDTO messageDTO = WebsocketMessageDTO.builder()
.type(HISTORY_RECORD.getType())
.data(chatRecordDTO)
.build();
synchronized (session) {
session.getBasicRemote().sendText(JSON.toJSONString(messageDTO));
}
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) throws IOException {
WebsocketMessageDTO messageDTO = JSON.parseObject(message, WebsocketMessageDTO.class);
switch (Objects.requireNonNull(getChatType(messageDTO.getType()))) {
case SEND_MESSAGE:
// 发送消息
ChatRecord chatRecord = JSON.parseObject(JSON.toJSONString(messageDTO.getData()), ChatRecord.class);
// 过滤html标签
chatRecord.setContent(HTMLUtils.deleteTag(chatRecord.getContent()));
chatRecordMapper.insert(chatRecord);
messageDTO.setData(chatRecord);
// 广播消息
broadcastMessage(messageDTO);
break;
case RECALL_MESSAGE:
// 撤回消息
RecallMessageDTO recallMessage = JSON.parseObject(JSON.toJSONString(messageDTO.getData()), RecallMessageDTO.class);
// 删除记录
chatRecordMapper.deleteById(recallMessage.getId());
// 广播消息
broadcastMessage(messageDTO);
break;
case HEART_BEAT:
// 心跳消息
messageDTO.setData("pong");
session.getBasicRemote().sendText(JSON.toJSONString(JSON.toJSONString(messageDTO)));
default:
break;
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() throws IOException {
// 更新在线人数
webSocketSet.remove(this);
updateOnlineCount();
}


/**
* 加载历史聊天记录
*
* @param endpointConfig 配置
* @return 加载历史聊天记录
*/
private ChatRecordDTO listChartRecords(EndpointConfig endpointConfig) {
// 获取聊天历史记录
List<ChatRecord> chatRecordList = chatRecordMapper.selectList(new LambdaQueryWrapper<ChatRecord>()
.ge(ChatRecord::getCreateTime, DateUtil.offsetHour(new Date(), -12)));
// 获取当前用户ip
String ipAddress = endpointConfig.getUserProperties().get(ChatConfigurator.HEADER_NAME).toString();
return ChatRecordDTO.builder()
.chatRecordList(chatRecordList)
.ipAddress(ipAddress)
.ipSource(IpUtils.getIpSource(ipAddress))
.build();

}

/**
* 更新在线人数
*
* @throws IOException io异常
*/
@Async
public void updateOnlineCount() throws IOException {
//当前在线人数
WebsocketMessageDTO messageDTO = WebsocketMessageDTO.builder()
.type(ONLINE_COUNT.getType())
.data(webSocketSet.size())
.build();
// 广播消息
broadcastMessage(messageDTO);
}

/**
* 发送语音
*
* @param voiceVO 语音路径
*/
public void sendVoice(VoiceVO voiceVO) {
// 上传语音文件
String content = uploadStrategyContext.executeUploadStrategy(voiceVO.getFile(), FilePathEnum.VOICE.getPath());
voiceVO.setContent(content);
// 保存记录
ChatRecord chatRecord = BeanCopyUtils.copyObject(voiceVO, ChatRecord.class);
chatRecordMapper.insert(chatRecord);
// 发送消息
WebsocketMessageDTO messageDTO = WebsocketMessageDTO.builder()
.type(VOICE_MESSAGE.getType())
.data(chatRecord)
.build();
// 广播消息
try {
broadcastMessage(messageDTO);
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 广播消息
*
* @param messageDTO 消息dto
* @throws IOException io异常
*/
private void broadcastMessage(WebsocketMessageDTO messageDTO) throws IOException {
for (WebSocketServiceImpl webSocketService : webSocketSet) {
webSocketService.session.getBasicRemote().sendText(JSON.toJSONString(messageDTO));
}
}
}