webSocketStore.js 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. // src/stores/websocket.ts
  2. import { defineStore } from "pinia";
  3. import { $root as protobuf } from "@/common/proto/proto";
  4. import * as Constant from "@/common/constant/Constant";
  5. import { useWalletStore } from "@/stores/modules/walletStore";
  6. import { getMessageApi } from "@/api/path/im.api";
  7. import { MSG_TYPE, MSG_TYPE_MAP } from "@/common/constant/msgType";
  8. import {
  9. setMessageHook,
  10. handleMessageHook,
  11. } from "@/views/im/hook/messagesHook";
  12. const IM_PATH = import.meta.env.VITE_IM_PATH_FIlE;
  13. export const useWebSocketStore = defineStore("webSocketStore", {
  14. // 状态定义
  15. state: () => ({
  16. socket: null, // socket实例
  17. peer: null, // peer实例
  18. lockConnection: false, // 锁定连接
  19. reconnectAttempts: 0, // 重连次数
  20. maxReconnectAttempts: 10, // 最大重连次数
  21. reconnectInterval: 3000, // 重连间隔
  22. messages: [], // 消息列表
  23. toUserInfo: {},
  24. unreadMessages: [], // 未读消息列表
  25. uuid: null,
  26. onMessageCallbacks: [],
  27. // 心跳检测配置
  28. heartCheck: {
  29. timeout: 10000, // 连接丢失后,多长时间内没有收到服务端的消息,则认为连接已断开
  30. timeoutObj: null, // 定时器对象
  31. serverTimeoutObj: null, // 服务器定时器对象
  32. num: 3, // 重连次数
  33. },
  34. }),
  35. persist: {
  36. key: "toUserInfo",
  37. storage: localStorage,
  38. paths: ["toUserInfo"], // 正确的简单路径格式
  39. serializer: {
  40. serialize: (state) => JSON.stringify(state.toUserInfo),
  41. deserialize: (str) => ({ toUserInfo: JSON.parse(str) }),
  42. },
  43. },
  44. // 计算属性
  45. getters: {
  46. isConnected: (state) => state.socket?.readyState === WebSocket.OPEN,
  47. hasUnreadMessages: (state) => state.unreadMessages.length > 0,
  48. },
  49. // 方法
  50. actions: {
  51. // 获取消息
  52. async getMessages(params) {
  53. const { data } = await getMessageApi({
  54. messageType: params.messageType,
  55. uuid: params.uuid,
  56. friendUsername: params.friendUsername,
  57. });
  58. this.messages =
  59. data?.map((item) => {
  60. item.avatar = item.avatar ? IM_PATH + item.avatar : item.avatar;
  61. return item;
  62. }) || [];
  63. },
  64. // 发送消息
  65. sendMessage(messageData) {
  66. if (!this.socket || this.socket.readyState !== WebSocket.OPEN) {
  67. console.error("WebSocket未连接");
  68. return false;
  69. }
  70. // 获取url上uuid参数
  71. const walletStore = useWalletStore();
  72. let data = {
  73. ...messageData,
  74. fromUsername: walletStore.username,
  75. from: walletStore.account,
  76. to: this.toUserInfo.uuid,
  77. };
  78. console.log("发送消息=", data)
  79. try {
  80. const MessageType = protobuf.lookupType("protocol.Message");
  81. const messagePB = MessageType.create(data);
  82. const buffer = MessageType.encode(messagePB).finish();
  83. this.socket.send(buffer);
  84. // 发送完成后:添加头像
  85. data.avatar = walletStore.avatar;
  86. setMessageHook(data, this);
  87. return true;
  88. } catch (error) {
  89. console.error("消息编码错误:", error);
  90. return false;
  91. }
  92. },
  93. // 初始化socket
  94. startHeartbeat() {
  95. const self = this;
  96. const _num = this.heartCheck.num;
  97. this.heartCheck.timeoutObj && clearTimeout(this.heartCheck.timeoutObj);
  98. this.heartCheck.serverTimeoutObj &&
  99. clearTimeout(this.heartCheck.serverTimeoutObj);
  100. this.heartCheck.timeoutObj = setTimeout(() => {
  101. if (this.socket?.readyState === WebSocket.OPEN) {
  102. const data = {
  103. type: "heatbeat",
  104. content: "ping",
  105. };
  106. const MessageType = protobuf.lookupType("protocol.Message");
  107. const messagePB = MessageType.create(data);
  108. const buffer = MessageType.encode(messagePB).finish();
  109. this.socket.send(buffer);
  110. }
  111. self.heartCheck.serverTimeoutObj = setTimeout(() => {
  112. _num--;
  113. if (_num <= 0) {
  114. console.log("the ping num is more then 3, close socket!");
  115. this.socket?.close();
  116. }
  117. }, self.heartCheck.timeout);
  118. }, this.heartCheck.timeout);
  119. },
  120. resetHeartbeat() {
  121. this.heartCheck.timeoutObj && clearTimeout(this.heartCheck.timeoutObj);
  122. this.heartCheck.serverTimeoutObj &&
  123. clearTimeout(this.heartCheck.serverTimeoutObj);
  124. this.heartCheck.num = 3;
  125. },
  126. // 链接ws
  127. connect(userUuid) {
  128. if (!userUuid) return;
  129. console.log("开始连接...");
  130. this.disconnect(); // 确保先断开现有连接
  131. this.peer = new RTCPeerConnection();
  132. this.socket = new WebSocket(
  133. `${import.meta.env.VITE_PRO_IM_WSS}?user=${userUuid}`
  134. );
  135. // 连接成功
  136. this.socket.onopen = () => {
  137. this.startHeartbeat();
  138. console.log("WebSocket连接成功");
  139. };
  140. this.socket.onmessage = (event) => {
  141. this.startHeartbeat();
  142. this.handleMessage(event.data);
  143. };
  144. this.socket.onclose = () => {
  145. console.log("连接关闭,尝试重新连接...");
  146. this.resetHeartbeat();
  147. this.reconnect();
  148. };
  149. this.socket.onerror = (error) => {
  150. console.error("WebSocket错误:", error);
  151. this.resetHeartbeat();
  152. this.reconnect();
  153. };
  154. },
  155. // 处理消息
  156. handleMessage(data) {
  157. const MessageType = protobuf.lookupType("protocol.Message");
  158. const reader = new FileReader();
  159. reader.onload = (event) => {
  160. try {
  161. const messagePB = MessageType.decode(
  162. new Uint8Array(event.target?.result)
  163. );
  164. const message = MessageType.toObject(messagePB, {
  165. longs: String,
  166. enums: String,
  167. bytes: String,
  168. });
  169. console.log("收到消息:", message);
  170. // 处理消息
  171. handleMessageHook(message, this);
  172. if (message.type === "heatbeat") return;
  173. if (message.type === Constant.MESSAGE_TRANS_TYPE) {
  174. this.dealWebRtcMessage(message);
  175. return;
  176. }
  177. // 其他消息处理逻辑...
  178. } catch (error) {
  179. console.error("消息解码错误:", error);
  180. }
  181. };
  182. reader.readAsArrayBuffer(data);
  183. },
  184. // 断线重连
  185. reconnect() {
  186. if (
  187. this.lockConnection ||
  188. this.reconnectAttempts >= this.maxReconnectAttempts
  189. ) {
  190. return;
  191. }
  192. this.lockConnection = true;
  193. this.reconnectAttempts++;
  194. console.log(
  195. `重新连接中... (尝试 ${this.reconnectAttempts}/${this.maxReconnectAttempts})`
  196. );
  197. setTimeout(() => {
  198. this.connect(localStorage.uuid);
  199. this.lockConnection = false;
  200. }, this.reconnectInterval);
  201. },
  202. // 重连
  203. disconnect() {
  204. if (this.socket) {
  205. this.resetHeartbeat();
  206. this.socket.close();
  207. this.socket = null;
  208. }
  209. if (this.peer) {
  210. this.peer.close();
  211. this.peer = null;
  212. }
  213. this.reconnectAttempts = 0;
  214. },
  215. // 发送WebRTC消息
  216. dealWebRtcMessage(message) {
  217. // 实现WebRTC消息处理逻辑
  218. },
  219. handleMessage(data) {
  220. const MessageType = protobuf.lookupType("protocol.Message");
  221. const reader = new FileReader();
  222. reader.onload = (event) => {
  223. try {
  224. const messagePB = MessageType.decode(new Uint8Array(event.target?.result));
  225. const message = MessageType.toObject(messagePB, {
  226. longs: String,
  227. enums: String,
  228. bytes: String,
  229. });
  230. console.log("收到消息:", message);
  231. handleMessageHook(message, this);
  232. // 调用所有回调函数
  233. this.onMessageCallbacks.forEach(cb => {
  234. try {
  235. cb(message);
  236. } catch (e) {
  237. console.error("消息回调错误:", e);
  238. }
  239. });
  240. if (message.type === "heatbeat") return;
  241. if (message.type === Constant.MESSAGE_TRANS_TYPE) {
  242. this.dealWebRtcMessage(message);
  243. return;
  244. }
  245. // 其他消息处理逻辑...
  246. } catch (error) {
  247. console.error("消息解码错误:", error);
  248. }
  249. };
  250. reader.readAsArrayBuffer(data);
  251. },
  252. addOnMessageCallback(cb) {
  253. if (typeof cb === "function") {
  254. this.onMessageCallbacks.push(cb);
  255. }
  256. },
  257. removeOnMessageCallback(cb) {
  258. this.onMessageCallbacks = this.onMessageCallbacks.filter(item => item !== cb);
  259. },
  260. },
  261. });