LivePublicScreenChatWebsocket.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. package com.izouma.meta.websocket;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONArray;
  4. import com.alibaba.fastjson.JSONObject;
  5. import com.izouma.meta.config.Constants;
  6. import com.izouma.meta.domain.MetaVisitor;
  7. import com.izouma.meta.domain.PublicScreenChat;
  8. import com.izouma.meta.dto.PublicScreenChatExceptionMsg;
  9. import com.izouma.meta.repo.MetaVisitorRepo;
  10. import com.izouma.meta.repo.PublicScreenChatRepo;
  11. import com.izouma.meta.service.ContentAuditService;
  12. import com.izouma.meta.utils.ApplicationContextUtil;
  13. import lombok.extern.slf4j.Slf4j;
  14. import org.apache.commons.collections.CollectionUtils;
  15. import org.apache.commons.lang.StringUtils;
  16. import org.springframework.stereotype.Service;
  17. import javax.websocket.*;
  18. import javax.websocket.server.PathParam;
  19. import javax.websocket.server.ServerEndpoint;
  20. import java.time.LocalDateTime;
  21. import java.util.List;
  22. import java.util.Map;
  23. import java.util.Objects;
  24. import java.util.Set;
  25. import java.util.concurrent.ConcurrentHashMap;
  26. @Service
  27. @ServerEndpoint(value = "/websocket/live/public/screen/{userId}")
  28. @Slf4j
  29. public class LivePublicScreenChatWebsocket {
  30. /**
  31. * 当前在线的客户端map
  32. */
  33. private static final Map<String, Session> clients = new ConcurrentHashMap();
  34. private static final Map<String, MetaVisitor> user = new ConcurrentHashMap<>();
  35. private PublicScreenChatRepo publicScreenChatRepo;
  36. private final String PREFIX = "meta-live-chat:";
  37. private ContentAuditService contentAuditService;
  38. private WebsocketCommon websocketCommon;
  39. private MetaVisitorRepo metaVisitorRepo;
  40. private void init() {
  41. if (Objects.isNull(metaVisitorRepo)) {
  42. metaVisitorRepo = (MetaVisitorRepo) ApplicationContextUtil.getBean("metaVisitorRepo");
  43. }
  44. if (Objects.isNull(publicScreenChatRepo)) {
  45. publicScreenChatRepo = (PublicScreenChatRepo) ApplicationContextUtil.getBean("publicScreenChatRepo");
  46. }
  47. if (Objects.isNull(contentAuditService)) {
  48. contentAuditService = (ContentAuditService) ApplicationContextUtil.getBean("contentAuditService");
  49. }
  50. if (Objects.isNull(websocketCommon)) {
  51. websocketCommon = (WebsocketCommon) ApplicationContextUtil.getBean("websocketCommon");
  52. }
  53. }
  54. @OnOpen
  55. public void onOpen(@PathParam("userId") String userId, Session session) {
  56. init();
  57. MetaVisitor metaVisitor = metaVisitorRepo.findById(Long.parseLong(userId)).orElse(null);
  58. if (Objects.isNull(metaVisitor)) {
  59. log.error("当前游客信息不存在!");
  60. return;
  61. }
  62. if (!user.containsKey(userId)) {
  63. user.put(userId, metaVisitor);
  64. }
  65. // 判断当前玩家是否在其他地方登陆
  66. if (clients.containsKey(PREFIX.concat(metaVisitor.getNickname()))) {
  67. String msg = String.format("已在别处登陆,sessionId为[%S],正在为您关闭本连接", session.getId());
  68. exceptionHandle(userId, new PublicScreenChatExceptionMsg(1, msg));
  69. try {
  70. log.info("关闭session连接");
  71. clients.get(PREFIX.concat(userId)).close();
  72. } catch (Exception e) {
  73. exceptionHandle(userId, new PublicScreenChatExceptionMsg(1, String.format("session close throw exception[%S]", e)));
  74. return;
  75. }
  76. }
  77. log.info("现在来连接的sessionId:" + session.getId() + "玩家id:" + userId + "玩家昵称" + metaVisitor.getNickname());
  78. clients.put(PREFIX.concat(userId), session);
  79. String format = String.format("玩家[%S][%S]进入大厅", userId, metaVisitor.getNickname());
  80. PublicScreenChat publicScreenChat;
  81. try {
  82. publicScreenChat = savePublicScreenChat(metaVisitor, format, true);
  83. } catch (Exception e) {
  84. String errMsg = String.format("玩家进入大厅,保存信息发生异常[%S]", e);
  85. exceptionHandle(userId, new PublicScreenChatExceptionMsg(2, errMsg));
  86. return;
  87. }
  88. websocketCommon.sendMessageToOther(clients, JSON.toJSONString(publicScreenChat), PREFIX.concat(userId));
  89. }
  90. @OnError
  91. public void onError(Session session, Throwable error) {
  92. error.printStackTrace();
  93. // 异常处理
  94. log.error(String.format("sessionId[%S]的服务端发生了错误:[%S]", session.getId(), error));
  95. }
  96. @OnClose
  97. public void onClose(@PathParam("userId") String userId, Session session) {
  98. init();
  99. MetaVisitor metaVisitor = user.get(userId);
  100. if (Objects.isNull(metaVisitor)) {
  101. metaVisitor = metaVisitorRepo.findById(Long.parseLong(userId)).orElse(null);
  102. }
  103. if (Objects.isNull(metaVisitor)) {
  104. String errMsg = "游客信息为空!";
  105. exceptionHandle(userId, new PublicScreenChatExceptionMsg(4, errMsg));
  106. return;
  107. }
  108. log.info(String.format("sessionId:[%S] userId:[%S] is closed", session.getId(), userId));
  109. String format = String.format("玩家[%S][%S]离开大厅", userId, metaVisitor.getNickname());
  110. PublicScreenChat publicScreenChat;
  111. try {
  112. publicScreenChat = savePublicScreenChat(metaVisitor, format, true);
  113. } catch (Exception e) {
  114. String errMsg = String.format("玩家离开大厅,保存信息发生异常[%S]", e);
  115. exceptionHandle(userId, new PublicScreenChatExceptionMsg(2, errMsg));
  116. return;
  117. }
  118. websocketCommon.sendMessageToOther(clients, JSON.toJSONString(publicScreenChat), PREFIX.concat(userId));
  119. clients.remove(PREFIX.concat(userId));
  120. }
  121. @OnMessage
  122. public void onMessage(@PathParam("userId") String userId, String message, Session session) {
  123. init();
  124. MetaVisitor metaVisitor = metaVisitorRepo.findById(Long.parseLong(userId)).orElse(null);
  125. if (Objects.isNull(metaVisitor)) {
  126. String errMsg = "游客信息为空!";
  127. exceptionHandle(userId, new PublicScreenChatExceptionMsg(4, errMsg));
  128. return;
  129. }
  130. if (metaVisitor.isTaboo()) {
  131. String errMsg = "当前用户已被禁言";
  132. log.info(errMsg);
  133. exceptionHandle(userId, new PublicScreenChatExceptionMsg(5, errMsg));
  134. return;
  135. }
  136. if (StringUtils.isBlank(message)) {
  137. String errMsg = "Illegal parameter : message can not be null";
  138. exceptionHandle(userId, new PublicScreenChatExceptionMsg(4, errMsg));
  139. return;
  140. }
  141. JSONObject jsonObject = JSON.parseObject(message);
  142. if (!jsonObject.containsKey("type")) {
  143. String errMsg = "Illegal parameter : type can not be null";
  144. exceptionHandle(userId, new PublicScreenChatExceptionMsg(4, errMsg));
  145. return;
  146. }
  147. String type = jsonObject.getString("type");
  148. if (StringUtils.isBlank(type)) {
  149. String errMsg = "Illegal parameter : type can not be null";
  150. exceptionHandle(userId, new PublicScreenChatExceptionMsg(4, errMsg));
  151. return;
  152. }
  153. // 心跳检测
  154. if (Constants.HEART_RECEIVE.equals(type)) {
  155. log.info(String.format("sessionId:[%S] userId:[%S] 连接正常", session.getId(), userId));
  156. websocketCommon.sendMessageToSingle(clients, Constants.HEART_RETURN, PREFIX.concat(userId));
  157. return;
  158. }
  159. // 查询需要撤回的消息
  160. if (Constants.MESSAGE_RECALL.equals(type)) {
  161. List<PublicScreenChat> publicScreenChats = publicScreenChatRepo.findAllByTypeAndRecallAndDel(2, 2, false);
  162. websocketCommon.sendMessageToSingle(clients, JSONObject.toJSONString(publicScreenChats), PREFIX.concat(userId));
  163. return;
  164. }
  165. // 消息撤回确定
  166. if (Constants.MESSAGE_RECALL_CONFIRM.equals(type)) {
  167. if (!jsonObject.containsKey("ids")) {
  168. String errMsg = "Illegal parameter : ids can not be null";
  169. exceptionHandle(userId, new PublicScreenChatExceptionMsg(4, errMsg));
  170. return;
  171. }
  172. JSONArray jsonArray = jsonObject.getJSONArray("ids");
  173. List<Long> ids = jsonArray.toJavaList(Long.class);
  174. if (CollectionUtils.isEmpty(ids)) {
  175. String errMsg = "Illegal parameter : ids can not be null";
  176. exceptionHandle(userId, new PublicScreenChatExceptionMsg(4, errMsg));
  177. return;
  178. }
  179. ids.forEach(id -> {
  180. publicScreenChatRepo.recallConfirm(id);
  181. });
  182. websocketCommon.sendMessageToSingle(clients, "消息已被撤回", PREFIX.concat(userId));
  183. return;
  184. }
  185. // 正常发送消息
  186. if (Constants.MESSAGE_NORMAL.equals(type)) {
  187. if (!jsonObject.containsKey("value")) {
  188. String errMsg = "Illegal parameter : value can not be null";
  189. exceptionHandle(userId, new PublicScreenChatExceptionMsg(4, errMsg));
  190. return;
  191. }
  192. String value = jsonObject.getString("value");
  193. if (StringUtils.isBlank(value)) {
  194. String errMsg = "Illegal parameter : value can not be null";
  195. exceptionHandle(userId, new PublicScreenChatExceptionMsg(4, errMsg));
  196. return;
  197. }
  198. log.info(String.format("收到来自userId[%S]的消息[%S]", userId, value));
  199. if (!contentAuditService.auditText(value)) {
  200. savePublicScreenChat(metaVisitor, value, false);
  201. exceptionHandle(userId, new PublicScreenChatExceptionMsg(3, "消息包含非法内容"));
  202. return;
  203. }
  204. PublicScreenChat publicScreenChat;
  205. try {
  206. publicScreenChat = savePublicScreenChat(metaVisitor, value, true);
  207. } catch (Exception e) {
  208. String errMsg = String.format("玩家发送消息,保存信息发生异常[%S]", e);
  209. exceptionHandle(userId, new PublicScreenChatExceptionMsg(2, errMsg));
  210. return;
  211. }
  212. sendMessageToAll(clients, JSON.toJSONString(publicScreenChat), PREFIX.concat(userId));
  213. }
  214. }
  215. /**
  216. * 给所有用户发消息
  217. *
  218. * @param clients 在线客户端
  219. * @param message 消息
  220. */
  221. public void sendMessageToAll(Map<String, Session> clients, String message, String userId) {
  222. PublicScreenChat publicScreenChat = JSON.parseObject(message, PublicScreenChat.class);
  223. Set<String> userIds = clients.keySet();
  224. userIds.forEach(id -> {
  225. try {
  226. publicScreenChat.setMyself(id.equals(userId));
  227. log.info(String.format("服务器给所有在线用户发送消息,当前在线人员为[%S]。消息:[%S]", id, JSON.toJSONString(publicScreenChat)));
  228. clients.get(id).getBasicRemote().sendText(JSON.toJSONString(publicScreenChat));
  229. } catch (Exception e) {
  230. log.error(String.format("send message [%S] to [%S] throw exception [%S]:", JSON.toJSONString(publicScreenChat), id, e));
  231. }
  232. });
  233. }
  234. private PublicScreenChat savePublicScreenChat(MetaVisitor metaVisitor, String messageInfo, boolean illegal) {
  235. PublicScreenChat publicScreenChat = new PublicScreenChat();
  236. publicScreenChat.setUserId(metaVisitor.getId().toString());
  237. publicScreenChat.setSex(metaVisitor.getSex());
  238. publicScreenChat.setNickname(metaVisitor.getNickname());
  239. publicScreenChat.setMessageInfo(messageInfo);
  240. publicScreenChat.setTime(LocalDateTime.now());
  241. publicScreenChat.setIllegal(illegal);
  242. publicScreenChat.setRecall(1);
  243. publicScreenChat.setType(2);
  244. return publicScreenChatRepo.save(publicScreenChat);
  245. }
  246. private void exceptionHandle(String userId, PublicScreenChatExceptionMsg msg) {
  247. log.error(JSON.toJSONString(msg));
  248. // 推送消息给该玩家
  249. websocketCommon.sendMessageToSingle(clients, JSON.toJSONString(msg), PREFIX.concat(userId));
  250. }
  251. }