|
|
@@ -1,8 +1,8 @@
|
|
|
-import { Inject, Injectable, Logger, OnModuleInit, forwardRef } from '@nestjs/common'
|
|
|
+import { Inject, Injectable, InternalServerErrorException, Logger, OnModuleInit, forwardRef } from '@nestjs/common'
|
|
|
import { InjectRepository } from '@nestjs/typeorm'
|
|
|
import { In, Like, MoreThan, Repository } from 'typeorm'
|
|
|
import { Danmu } from './entities/danmu.enitity'
|
|
|
-import axios from 'axios'
|
|
|
+import axios, { AxiosInstance } from 'axios'
|
|
|
import { Client } from 'tmi.js'
|
|
|
import { AccessToken, AccessTokenType } from './entities/access-token.entity'
|
|
|
import { StreamPlatform } from '../common/enums/stream-platform.enum'
|
|
|
@@ -14,12 +14,17 @@ import { FileService } from 'src/file/file.service'
|
|
|
import path = require('path')
|
|
|
import { HttpsProxyAgent } from 'https-proxy-agent'
|
|
|
import { SysConfigService } from '../sys-config/sys-config.service'
|
|
|
+import { getEncodeHeader } from '../utils/crypto'
|
|
|
+import * as WebSocket from 'ws'
|
|
|
+import { Proto } from './models/proto.model'
|
|
|
+import { Interval } from '@nestjs/schedule'
|
|
|
|
|
|
@Injectable()
|
|
|
export class DanmuService implements OnModuleInit {
|
|
|
private readonly logger = new Logger(DanmuService.name)
|
|
|
client: Client
|
|
|
-
|
|
|
+ biliApi: AxiosInstance
|
|
|
+ biliWs: { [roomId: number]: WebSocket } = {}
|
|
|
constructor(
|
|
|
@InjectRepository(Danmu)
|
|
|
private readonly danmuRepository: Repository<Danmu>,
|
|
|
@@ -31,7 +36,22 @@ export class DanmuService implements OnModuleInit {
|
|
|
private readonly roomService: RoomService,
|
|
|
private readonly fileService: FileService,
|
|
|
private readonly sysConfigService: SysConfigService
|
|
|
- ) {}
|
|
|
+ ) {
|
|
|
+ // axios 拦截器
|
|
|
+ this.biliApi = axios.create({
|
|
|
+ baseURL: 'https://live-open.biliapi.com'
|
|
|
+ // baseURL: "http://test-live-open.biliapi.net" //test
|
|
|
+ })
|
|
|
+
|
|
|
+ // 鉴权加密处理headers,下次请求自动带上
|
|
|
+ this.biliApi.interceptors.request.use((config) => {
|
|
|
+ const headers = getEncodeHeader(config.data, 'Cwgk7TsPkMtKtGO8rY0LndiF', 'bV9HiQ03OQ0Hk00l90N1QWyAFSI8qR')
|
|
|
+ Object.keys(headers).forEach((key) => {
|
|
|
+ config.headers[key] = headers[key]
|
|
|
+ })
|
|
|
+ return config
|
|
|
+ })
|
|
|
+ }
|
|
|
|
|
|
async onModuleInit() {
|
|
|
const channels = (await this.roomService.findAllActiveRoom())
|
|
|
@@ -141,6 +161,71 @@ export class DanmuService implements OnModuleInit {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ async handleBiliMessage(data: any) {
|
|
|
+ try {
|
|
|
+ const proto = new Proto()
|
|
|
+ proto.unpack(data)
|
|
|
+ this.logger.debug('received message from bilibili op=' + proto.op)
|
|
|
+ switch (proto.op) {
|
|
|
+ case 3:
|
|
|
+ this.logger.debug('收到心跳回复')
|
|
|
+ break
|
|
|
+ case 5:
|
|
|
+ this.logger.debug('收到弹幕')
|
|
|
+ this.logger.debug(proto.body)
|
|
|
+ const data = JSON.parse(proto.body)
|
|
|
+ if (data.cmd === 'LIVE_OPEN_PLATFORM_DM') {
|
|
|
+ const channel = `${data.data.room_id}`
|
|
|
+ const platformUserId = `${data.data.uid}`
|
|
|
+ const name = data.data.uname
|
|
|
+ const avatar = data.data.uface
|
|
|
+ const msgId = data.data.msg_id
|
|
|
+ const room = await this.roomService.findActiveRoom(channel)
|
|
|
+ if (room) {
|
|
|
+ let danmuUser = await this.danmuUserRepository.findOne({
|
|
|
+ where: { roomId: room.id, platform: StreamPlatform.Bilibili, platformUserId }
|
|
|
+ })
|
|
|
+ if (!danmuUser) {
|
|
|
+ danmuUser = await this.danmuUserRepository.save(
|
|
|
+ new DanmuUser({
|
|
|
+ roomId: room.id,
|
|
|
+ platform: StreamPlatform.Bilibili,
|
|
|
+ platformUserId,
|
|
|
+ name,
|
|
|
+ avatar
|
|
|
+ })
|
|
|
+ )
|
|
|
+ }
|
|
|
+ await this.danmuRepository.save(
|
|
|
+ new Danmu({
|
|
|
+ platform: StreamPlatform.Bilibili,
|
|
|
+ danmuUserId: danmuUser.id,
|
|
|
+ channel,
|
|
|
+ platformRoomId: channel,
|
|
|
+ platformUserId,
|
|
|
+ content: data.data.msg,
|
|
|
+ roomId: room.id,
|
|
|
+ gameId: room.currentGameId,
|
|
|
+ msgId
|
|
|
+ })
|
|
|
+ )
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break
|
|
|
+ case 8:
|
|
|
+ this.logger.debug('收到鉴权回复')
|
|
|
+ if (JSON.parse(proto.body).code !== 0) {
|
|
|
+ this.logger.error('鉴权失败' + proto.body)
|
|
|
+ } else {
|
|
|
+ this.logger.debug('鉴权成功')
|
|
|
+ }
|
|
|
+ break
|
|
|
+ }
|
|
|
+ } catch (error) {
|
|
|
+ this.logger.error(error.stack)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
async startDanmu(room: Room) {
|
|
|
if (room.platform === StreamPlatform.Twitch) {
|
|
|
try {
|
|
|
@@ -148,6 +233,34 @@ export class DanmuService implements OnModuleInit {
|
|
|
} catch (error) {
|
|
|
this.logger.error(`failed to join twitch channel ${room.channelId}, ${error}`)
|
|
|
}
|
|
|
+ } else if (room.platform === StreamPlatform.Bilibili) {
|
|
|
+ if (!room.code) {
|
|
|
+ throw new InternalServerErrorException('缺少身份码')
|
|
|
+ }
|
|
|
+ const { data } = await this.biliApi.post('/v2/app/start', { app_id: 1700371901242, code: room.code })
|
|
|
+ if (data.code === 0) {
|
|
|
+ this.logger.log(`开启直播间成功`)
|
|
|
+ room.config = data.data
|
|
|
+ await this.roomService.updateRoom(room.id, {
|
|
|
+ config: data.data
|
|
|
+ })
|
|
|
+ const ws = new WebSocket(room.config.websocket_info.wss_link[0])
|
|
|
+ ws.on('error', this.logger.error)
|
|
|
+
|
|
|
+ ws.on('open', function open() {
|
|
|
+ const proto = new Proto()
|
|
|
+ proto.op = 7
|
|
|
+ proto.body = room.config.websocket_info.auth_body
|
|
|
+ ws.send(proto.pack())
|
|
|
+ })
|
|
|
+
|
|
|
+ ws.on('message', this.handleBiliMessage.bind(this))
|
|
|
+
|
|
|
+ this.biliWs[room.id] = ws
|
|
|
+ } else {
|
|
|
+ this.logger.error(`开启直播间失败: ${JSON.stringify(data)}`)
|
|
|
+ throw new InternalServerErrorException(data.message)
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -158,6 +271,19 @@ export class DanmuService implements OnModuleInit {
|
|
|
} catch (error) {
|
|
|
this.logger.error(`failed to leave twitch channel ${room.channelId}, ${error}`)
|
|
|
}
|
|
|
+ } else if (room.platform === StreamPlatform.Bilibili) {
|
|
|
+ const { data } = await this.biliApi.post('/v2/app/end', {
|
|
|
+ app_id: 1700371901242,
|
|
|
+ game_id: room.config.game_info.game_id
|
|
|
+ })
|
|
|
+ if (data.code !== 0) {
|
|
|
+ this.logger.error(`结束直播间失败: ${JSON.stringify(data)}`)
|
|
|
+ }
|
|
|
+ this.logger.log(`结束直播间成功`)
|
|
|
+ if (this.biliWs[room.id]) {
|
|
|
+ this.biliWs[room.id].close()
|
|
|
+ delete this.biliWs[room.id]
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -232,4 +358,27 @@ export class DanmuService implements OnModuleInit {
|
|
|
danmuUser.name = user.display_name
|
|
|
return await this.danmuUserRepository.save(danmuUser)
|
|
|
}
|
|
|
+
|
|
|
+ @Interval(10000)
|
|
|
+ async heartbeat() {
|
|
|
+ Object.keys(this.biliWs).forEach(async (roomId) => {
|
|
|
+ try {
|
|
|
+ const room = await this.roomService.findById(Number(roomId))
|
|
|
+ const { data } = await this.biliApi.post('/v2/app/batchHeartbeat', {
|
|
|
+ game_ids: [room.config.game_info.game_id]
|
|
|
+ })
|
|
|
+ if (data.code === 0) {
|
|
|
+ this.logger.debug('心跳发送成功')
|
|
|
+ } else {
|
|
|
+ this.logger.error(`心跳发送失败: ${JSON.stringify(data)}`)
|
|
|
+ }
|
|
|
+ const ws = this.biliWs[roomId]
|
|
|
+ const proto = new Proto()
|
|
|
+ proto.op = 2
|
|
|
+ ws.send(proto.pack())
|
|
|
+ } catch (error) {
|
|
|
+ this.logger.error(error)
|
|
|
+ }
|
|
|
+ })
|
|
|
+ }
|
|
|
}
|