package gb28181 import ( "fmt" "net/http" "strconv" //"net/url" "strings" "time" "gorm.io/gorm" "sync/atomic" "crypto/md5" "encoding/hex" "github.com/ghettovoice/gosip/sip" "github.com/goccy/go-json" "go.uber.org/zap" "m7s.live/engine/v4/db" . "m7s.live/engine/v4" "m7s.live/engine/v4/log" "m7s.live/plugin/gb28181/v4/utils" "m7s.live/plugin/ps/v4" "sync" ) var QUERY_RECORD_TIMEOUT = time.Second * 5 var ( ChannelParentMap sync.Map ) type PullStream struct { opt *InviteOptions channel *Channel inviteRes sip.Response } func (p *PullStream) CreateRequest(method sip.RequestMethod) (req sip.Request) { res := p.inviteRes req = p.channel.CreateRequst(method) from, _ := res.From() to, _ := res.To() callId, _ := res.CallID() req.ReplaceHeaders(from.Name(), []sip.Header{from}) req.ReplaceHeaders(to.Name(), []sip.Header{to}) req.ReplaceHeaders(callId.Name(), []sip.Header{callId}) return } func (p *PullStream) Bye() int { req := p.CreateRequest(sip.BYE) resp, err := p.channel.Device.SipRequestForResponse(req) if p.opt.IsLive() { p.channel.State.Store(0) // 更新通道状态 p.channel.UpdateChanelState() } if p.opt.recyclePort != nil { //fmt.Println("回收端口 1111111111111111111111111111111111111111",p.opt.MediaPort) p.opt.recyclePort(p.opt.MediaPort) } if err != nil { return http.StatusInternalServerError } return int(resp.StatusCode()) } func (p *PullStream) info(body string) int { d := p.channel.Device req := p.CreateRequest(sip.INFO) contentType := sip.ContentType("Application/MANSRTSP") req.AppendHeader(&contentType) req.SetBody(body, true) resp, err := d.SipRequestForResponse(req) if err != nil { log.Warnf("Send info to stream error: %v, stream=%s, body=%s", err, p.opt.StreamPath, body) return getSipRespErrorCode(err) } return int(resp.StatusCode()) } // 暂停播放 func (p *PullStream) Pause() int { body := fmt.Sprintf(`PAUSE RTSP/1.0 CSeq: %d PauseTime: now `, p.channel.Device.SN) return p.info(body) } // 恢复播放 func (p *PullStream) Resume() int { d := p.channel.Device body := fmt.Sprintf(`PLAY RTSP/1.0 CSeq: %d Range: npt=now- `, d.SN) return p.info(body) } // 跳转到播放时间 // second: 相对于起始点调整到第 sec 秒播放 func (p *PullStream) PlayAt(second uint) int { d := p.channel.Device body := fmt.Sprintf(`PLAY RTSP/1.0 CSeq: %d Range: npt=%d- `, d.SN, second) return p.info(body) } // 快进/快退播放 // speed 取值: 0.25 0.5 1 2 4 或者其对应的负数表示倒放 func (p *PullStream) PlayForward(speed float32) int { d := p.channel.Device body := fmt.Sprintf(`PLAY RTSP/1.0 CSeq: %d Scale: %0.6f `, d.SN, speed) return p.info(body) } type Channel struct { Device *Device `json:"-" yaml:"-"` // 所属设备 State atomic.Int32 `json:"-" yaml:"-"` // 通道状态,0:空闲,1:正在invite,2:正在播放/对讲 LiveSubSP string // 实时子码流,通过rtsp GpsTime time.Time // gps时间 Longitude string // 经度 Latitude string // 纬度 *log.Logger `json:"-" yaml:"-"` ChannelInfo } type DeviceIdInfo struct{ DeviceId string `json:"device_id"` ParentId int64 `json:"parent_id"` } const LiveRoom = "live" // 通过通道id找设备id func getDeviceIdByChannelId(channelId string) string{ database := db.DB() deviceId := "" err := database.Raw("select device_id from t_sdp_delivery where channel_id=?",channelId).Find(&deviceId).Error if err != nil{ fmt.Println("通过通道编号没有找到父设备编号:",err) return "" } /*d := DeviceIdInfo{} err := database.Raw("select device_id,parent_id from t_sdp_channel where channel_id=?",channelId).Find(&d).Error if err != nil{ fmt.Println("通过通道编号没有找到设备编号:",err) return "" } if d.ParentId == 0 { return d.DeviceId }else{ err = database.Raw("select device_id from t_sdp_delivery where id=?",d.ParentId).Find(&deviceId).Error if err != nil{ fmt.Println("通过通道编号没有找到父设备编号:",err) return "" } }*/ return deviceId } // 更新通道state func(c *Channel) UpdateChanelState(){ return if c.Model == "AudioOut"{ return } GB28181Plugin.Info("更新通道state", zap.String("channelId", c.DeviceID), zap.Int32("state",c.State.Load())) //fmt.Println("更新通道state:",c.DeviceID,c.ParentID,c.Status) database := db.DB() updateMap := map[string]interface{}{} updateMap["live_status"] = c.State.Load() err := database.Table("t_sdp_channel").Where("channel_id=?",c.DeviceID).Updates(updateMap).Error //err := database.Exec(`update t_sdp_channel set live_status=? where channel_id=?`,info.DeviceID,d.ID).Error if err != nil { GB28181Plugin.Error("更新通道state失败", zap.String("channelId", c.DeviceID), zap.String("error",err.Error())) //fmt.Println("更新通道state失败:",err) } } // 更新通道LiveSubSp func(c *Channel) UpdateChanelLiveSubSp(){ return if c.Model == "AudioOut"{ return } GB28181Plugin.Info("更新通道LiveSubSp", zap.String("channelId", c.DeviceID), zap.Int32("state",c.State.Load())) //fmt.Println("更新通道state:",c.DeviceID,c.ParentID,c.Status) database := db.DB() updateMap := map[string]interface{}{} updateMap["live_sub_sp"] = c.LiveSubSP err := database.Table("t_sdp_channel").Where("channel_id=?",c.DeviceID).Updates(updateMap).Error //err := database.Exec(`update t_sdp_channel set live_status=? where channel_id=?`,info.DeviceID,d.ID).Error if err != nil { GB28181Plugin.Error("更新通道LiveSubSp失败", zap.String("channelId", c.DeviceID), zap.String("error",err.Error())) //fmt.Println("更新通道state失败:",err) } } type ChannelDevice struct { Id int64 `json:"id"` DeviceId string `json:"device_id"` ParentId int64 `json:"parent_id"` } // 更新通道信息 func(c *Channel) UpdateChanelInfo(){ if c.Model == "AudioOut"{ return } //c.ParentID = "34020000001320000001" GB28181Plugin.Info("更新通道信息", zap.String("channelId", c.DeviceID), zap.Any("info", *c)) database := db.DB() // 通过通道号查询设备号和父id channelParent := &ChannelDevice{} err := database.Raw("select id, device_id,parent_id from t_sdp_channel where channel_id=?",c.DeviceID).Find(channelParent).Error if err != nil{ GB28181Plugin.Info("查询通道失败", zap.String("channelId", c.DeviceID), zap.String("error",err.Error())) return } updateMap := map[string]interface{}{} // 更新父设备id if c.ParentID != "" && c.ParentID != channelParent.DeviceId{ // 父设备id和本设备id不同,表示是nvr下设备 parenetId := int64(0) err := database.Raw("select id from t_sdp_delivery where device_id = ?",c.ParentID).Find(&parenetId).Error if err == nil { updateMap["parent_id"] = parenetId }else{ GB28181Plugin.Error("查询nvr设备失败", zap.String("channelId", c.DeviceID), zap.String("error",err.Error())) } channelParent.ParentId = parenetId ChannelParentMap.Store(c.DeviceID,*channelParent) if parenetId != 0 { // 更新nvr设备下摄像头状态 // 父设备不为0 ,表示是nvr下面的摄像头,更新摄像头状态 GB28181Plugin.Info("更新nvr下摄像头设备状态", zap.String("channelId", c.DeviceID), zap.Any("status",c.Status)) status := 0 updateMap := map[string]interface{}{} if c.Status == ChannelOnStatus{ updateMap["state"] = 1 status = 1 }else if c.Status == ChannelOffStatus{ updateMap["state"] = 0 }else{ return } err = database.Table("t_sdp_delivery").Where("device_id=?",channelParent.DeviceId).Updates(updateMap).Error if err != nil { GB28181Plugin.Error("更新nvr下摄像头设备状态:",zap.String("id", c.DeviceID),zap.String("error", err.Error())) } SendMessageToRedis(channelParent.DeviceId,status) } }else{ ChannelParentMap.Store(c.DeviceID,*channelParent) } updateMap["live_status"] = c.State.Load() updateMap["status"] = c.ChannelInfo.Status updateMap["address"] = c.Address updateMap["civil_code"] = c.CivilCode updateMap["gps_time"] = c.GpsTime updateMap["longitude"] = c.Longitude updateMap["latitude"] = c.Latitude updateMap["live_sub_sp"] = c.LiveSubSP updateMap["manufacturer"] = c.Manufacturer updateMap["model"] = c.Model updateMap["name"] = c.Name updateMap["owner"] = c.Owner updateMap["parental"] = c.Parental updateMap["port"] = c.Port updateMap["safety_way"] = c.SafetyWay updateMap["register_way"] = c.RegisterWay updateMap["secrecy"] = c.Secrecy err = database.Table("t_sdp_channel").Where("id=?",channelParent.Id).Updates(updateMap).Error if err != nil { GB28181Plugin.Error("更新通道失败", zap.String("channelId", c.DeviceID), zap.String("error", err.Error())) } //fmt.Println("111111111111111111111111111111111111111111") //c.UpdateChanelStatus() } // 更新通道状态 func(c *Channel) UpdateChanelStatus(){ if c.Model == "AudioOut"{ return } //GB28181Plugin.Info("更新通道status", zap.String("channelId", c.DeviceID), zap.Any("status",c.Status)) GB28181Plugin.Info("更新通道status", zap.String("channelId", c.DeviceID), zap.Any("status",c.Status)) database := db.DB() //updateMap := map[string]interface{}{} //updateMap["status"] = c.ChannelInfo.Status /*err := database.Table("t_sdp_channel").Where("channel_id=?",c.DeviceID).Updates(map[string]interface{}{"status":c.ChannelInfo.Status}).Error if err != nil { GB28181Plugin.Error("更新更新通道status失败", zap.String("channelId", c.DeviceID), zap.String("error",err.Error())) }*/ //TODO 缓存 channelParent := ChannelDevice{} if v,ok := ChannelParentMap.Load(c.DeviceID);ok{ channelParent = v.(ChannelDevice) GB28181Plugin.Info("找到设备信息", zap.String("channelId", c.DeviceID)) }else{ GB28181Plugin.Info("没找到设备信息", zap.String("channelId", c.DeviceID)) err := database.Raw("select id,device_id,parent_id from t_sdp_channel where channel_id=?",c.DeviceID).Find(&channelParent).Error if err != nil{ GB28181Plugin.Info("通道查找父设备", zap.String("channelId", c.DeviceID), zap.String("error",err.Error())) return } } if channelParent.ParentId != 0 { // 父设备不为0 ,表示是nvr下面的摄像头,更新摄像头状态 GB28181Plugin.Info("更新nvr下摄像头设备状态", zap.String("channelId", c.DeviceID), zap.Any("status",c.Status)) status := 0 updateMap := map[string]interface{}{} if c.Status == ChannelOnStatus{ updateMap["state"] = 1 status = 1 }else if c.Status == ChannelOffStatus{ updateMap["state"] = 0 }else{ return } err := database.Table("t_sdp_delivery").Where("device_id=?",channelParent.DeviceId).Updates(updateMap).Error if err != nil { GB28181Plugin.Error("更新nvr下摄像头设备状态:",zap.String("id", c.DeviceID),zap.String("error", err.Error())) } SendMessageToRedis(channelParent.DeviceId,status) } } // 获取是否需要录像 func GetIsRecordByChannelId(channelId string) int { // TODO 从内存中获取,修改后调用invist接口删除 if value,ok := DeviceRecordState.Load(channelId);ok{ return value.(int) } database := db.DB() isRecord := int(0) err := database.Raw("select is_record from t_sdp_delivery where channel_id=?",channelId).Find(&isRecord).Error if err != nil{ if err == gorm.ErrRecordNotFound{ DeviceRecordState.Store(channelId,isRecord) return isRecord } return int(1) } DeviceRecordState.Store(channelId,isRecord) return isRecord } // 根据是否录像标识获取invist模式 func GetInvistMode(invistMode int,channelMode string,channelId string) int { if channelMode == "AudioOut"{ return INVIDE_MODE_ONSUBSCRIBE } if invistMode == INVIDE_MODE_AUTO{ isRecord := GetIsRecordByChannelId(channelId) if isRecord == 1 { return INVIDE_MODE_AUTO }else { return INVIDE_MODE_ONSUBSCRIBE } } return invistMode } func SetAllDeviceOffline(){ database := db.DB() err := database.Exec("update t_sdp_delivery set state=0 where sip_id=? and type=2",conf.SipId).Error if err != nil{ fmt.Println("更新所有设备离线失败") }else{ fmt.Println("成功更新所有设备离线") } } func (c *Channel) MarshalJSON() ([]byte, error) { m := map[string]any{ "DeviceID": c.DeviceID, "ParentID": c.ParentID, "Name": c.Name, "Manufacturer": c.Manufacturer, "Model": c.Model, "Owner": c.Owner, "CivilCode": c.CivilCode, "Address": c.Address, "Port": c.Port, "Parental": c.Parental, "SafetyWay": c.SafetyWay, "RegisterWay": c.RegisterWay, "Secrecy": c.Secrecy, "Status": c.Status, "Longitude": c.Longitude, "Latitude": c.Latitude, "GpsTime": c.GpsTime, "LiveSubSP": c.LiveSubSP, "LiveStatus": c.State.Load(), } return json.Marshal(m) } // Channel 通道 type ChannelInfo struct { DeviceID string // 通道ID ParentID string Name string Manufacturer string Model string Owner string CivilCode string Address string Port int Parental int SafetyWay int RegisterWay int Secrecy int Status ChannelStatus } type ChannelStatus string const ( ChannelOnStatus = "ON" ChannelOffStatus = "OFF" ) func (channel *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request) { d := channel.Device d.SN++ callId := sip.CallID(utils.RandNumString(10)) userAgent := sip.UserAgentHeader("Monibuca") maxForwards := sip.MaxForwards(70) //增加max-forwards为默认值 70 cseq := sip.CSeq{ SeqNo: uint32(d.SN), MethodName: Method, } port := sip.Port(conf.SipPort) serverAddr := sip.Address{ //DisplayName: sip.String{Str: d.serverConfig.Serial}, Uri: &sip.SipUri{ FUser: sip.String{Str: conf.Serial}, FHost: d.SipIP, FPort: &port, }, Params: sip.NewParams().Add("tag", sip.String{Str: utils.RandNumString(9)}), } //非同一域的目标地址需要使用@host host := conf.Realm if channel.DeviceID[0:9] != host { if channel.Port != 0 { deviceIp := d.NetAddr deviceIp = deviceIp[0:strings.LastIndex(deviceIp, ":")] host = fmt.Sprintf("%s:%d", deviceIp, channel.Port) } else { host = d.NetAddr } } channelAddr := sip.Address{ //DisplayName: sip.String{Str: d.serverConfig.Serial}, Uri: &sip.SipUri{FUser: sip.String{Str: channel.DeviceID}, FHost: host}, } req = sip.NewRequest( "", Method, channelAddr.Uri, "SIP/2.0", []sip.Header{ serverAddr.AsFromHeader(), channelAddr.AsToHeader(), &callId, &userAgent, &cseq, &maxForwards, serverAddr.AsContactHeader(), }, "", nil, ) req.SetTransport(conf.SipNetwork) req.SetDestination(d.NetAddr) return req } func (channel *Channel) QueryRecord(startTime, endTime string) ([]*Record, error) { d := channel.Device request := d.CreateRequest(sip.MESSAGE) contentType := sip.ContentType("Application/MANSCDP+xml") request.AppendHeader(&contentType) // body := fmt.Sprintf(` // // RecordInfo // %d // %s // %s // %s // 0 // all // `, d.sn, channel.DeviceID, startTime, endTime) start, _ := strconv.ParseInt(startTime, 10, 0) end, _ := strconv.ParseInt(endTime, 10, 0) body := BuildRecordInfoXML(d.SN, channel.DeviceID, start, end) request.SetBody(body, true) resultCh := RecordQueryLink.WaitResult(d.ID, channel.DeviceID, d.SN, QUERY_RECORD_TIMEOUT) resp, err := d.SipRequestForResponse(request) if err != nil { return nil, fmt.Errorf("query error: %s", err) } if resp.StatusCode() != http.StatusOK { return nil, fmt.Errorf("query error, status=%d", resp.StatusCode()) } // RecordQueryLink 中加了超时机制,该结果一定会返回 // 所以此处不用再增加超时等保护机制 r := <-resultCh return r.list, r.err } func (channel *Channel) Control(PTZCmd string) int { d := channel.Device request := d.CreateRequest(sip.MESSAGE) contentType := sip.ContentType("Application/MANSCDP+xml") request.AppendHeader(&contentType) body := fmt.Sprintf(` DeviceControl %d %s %s `, d.SN, channel.DeviceID, PTZCmd) request.SetBody(body, true) resp, err := d.SipRequestForResponse(request) if err != nil { return http.StatusRequestTimeout } return int(resp.StatusCode()) } // Invite 发送Invite报文 invites a channel to play // 注意里面的锁保证不同时发送invite报文,该锁由channel持有 /*** f字段: f = v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率 各项具体含义: v:后续参数为视频的参数;各参数间以 “/”分割; 编码格式:十进制整数字符串表示 1 –MPEG-4 2 –H.264 3 – SVAC 4 –3GP 分辨率:十进制整数字符串表示 1 – QCIF 2 – CIF 3 – 4CIF 4 – D1 5 –720P 6 –1080P/I 帧率:十进制整数字符串表示 0~99 码率类型:十进制整数字符串表示 1 – 固定码率(CBR) 2 – 可变码率(VBR) 码率大小:十进制整数字符串表示 0~100000(如 1表示1kbps) a:后续参数为音频的参数;各参数间以 “/”分割; 编码格式:十进制整数字符串表示 1 – G.711 2 – G.723.1 3 – G.729 4 – G.722.1 码率大小:十进制整数字符串 音频编码码率: 1 — 5.3 kbps (注:G.723.1中使用) 2 — 6.3 kbps (注:G.723.1中使用) 3 — 8 kbps (注:G.729中使用) 4 — 16 kbps (注:G.722.1中使用) 5 — 24 kbps (注:G.722.1中使用) 6 — 32 kbps (注:G.722.1中使用) 7 — 48 kbps (注:G.722.1中使用) 8 — 64 kbps(注:G.711中使用) 采样率:十进制整数字符串表示 1 — 8 kHz(注:G.711/ G.723.1/ G.729中使用) 2—14 kHz(注:G.722.1中使用) 3—16 kHz(注:G.722.1中使用) 4—32 kHz(注:G.722.1中使用) 注1:字符串说明 本节中使用的“十进制整数字符串”的含义为“0”~“4294967296” 之间的十进制数字字符串。 注2:参数分割标识 各参数间以“/”分割,参数间的分割符“/”不能省略; 若两个分割符 “/”间的某参数为空时(即两个分割符 “/”直接将相连时)表示无该参数值; 注3:f字段说明 使用f字段时,应保证视频和音频参数的结构完整性,即在任何时候,f字段的结构都应是完整的结构: f = v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率 若只有视频时,音频中的各参数项可以不填写,但应保持 “a///”的结构: f = v/编码格式/分辨率/帧率/码率类型/码率大小a/// 若只有音频时也类似处理,视频中的各参数项可以不填写,但应保持 “v/”的结构: f = v/a/编码格式/码率大小/采样率 f字段中视、音频参数段之间不需空格分割。 可使用f字段中的分辨率参数标识同一设备不同分辨率的码流。 */ func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) { if opt.IsLive() { if !channel.State.CompareAndSwap(0, 1) { return 304, nil }else{ channel.UpdateChanelState() } defer func() { if err != nil { GB28181Plugin.Error("Invite", zap.Error(err), zap.String("streamPath",opt.StreamPath )) channel.State.Store(0) // 重复发布和设备busy不重新拉流 /*if strings.Contains(err.Error(),"Duplicate Publish") || strings.Contains(err.Error(),"Busy"){ return } if GetInvistMode(conf.InviteMode,channel.Model,channel.DeviceID) == INVIDE_MODE_AUTO{ //if conf.InviteMode == INVIDE_MODE_AUTO { time.AfterFunc(time.Second*10, func() { // 5秒后重试 channel.Invite(opt) }) }*/ } else { channel.State.Store(2) channel.UpdateChanelState() } }() } d := channel.Device //streamPath := fmt.Sprintf("%s/%s", d.ID, channel.DeviceID) streamPath := fmt.Sprintf("%s/%s",LiveRoom, channel.DeviceID) s := "Play" opt.CreateSSRC() if opt.Record() { s = "Playback" streamPath = fmt.Sprintf("%s/%s/%d-%d", d.ID, channel.DeviceID, opt.Start, opt.End) } if opt.StreamPath != "" { streamPath = opt.StreamPath } else if channel.DeviceID == "" { streamPath = "gb28181/" + d.ID } else { opt.StreamPath = streamPath } // 对steam path自动加上签名 if EngineConfig.Publish.Key != ""{ nowTime := time.Now().Unix() expireTime := nowTime + 315360000 expireTime16 := strconv.FormatInt(expireTime, 16) m := md5.New() m.Write([]byte(EngineConfig.Publish.Key +streamPath + expireTime16)) streamPathList := strings.Split(streamPath,"?") if len(streamPathList) > 0 { streamPath = fmt.Sprintf("%s?%s=%s&%s=%s",streamPathList[0],EngineConfig.Publish.SecretArgName,hex.EncodeToString(m.Sum(nil)),EngineConfig.Publish.ExpireArgName,expireTime16) opt.StreamPath = streamPath } } if opt.dump == "" { opt.dump = conf.DumpPath } protocol := "" networkType := "udp" reusePort := true if conf.IsMediaNetworkTCP() { networkType = "tcp" protocol = "TCP/" if conf.tcpPorts.Valid { opt.MediaPort, err = conf.tcpPorts.GetUnUseTcpPort() opt.recyclePort = conf.tcpPorts.Recycle reusePort = false } channel.Info("media port",zap.Uint16("mediaPort",opt.MediaPort),zap.Bool("reusePort",reusePort),zap.Error(err)) } else { if conf.udpPorts.Valid { opt.MediaPort, err = conf.udpPorts.GetPort() opt.recyclePort = conf.udpPorts.Recycle reusePort = false } } if err != nil { // TODO 在这里返回的EOF return http.StatusInternalServerError, err } if opt.MediaPort == 0 { opt.MediaPort = conf.MediaPort } sdpInfo := []string{ "v=0", fmt.Sprintf("o=%s 0 0 IN IP4 %s", channel.DeviceID, d.MediaIP), "s=" + s, "u=" + channel.DeviceID + ":0", "c=IN IP4 " + d.MediaIP, opt.String(), fmt.Sprintf("m=video %d %sRTP/AVP 96", opt.MediaPort, protocol), "a=recvonly", "a=rtpmap:96 PS/90000", "y=" + opt.ssrc, } if conf.IsMediaNetworkTCP() { sdpInfo = append(sdpInfo, "a=setup:passive", "a=connection:new") } invite := channel.CreateRequst(sip.INVITE) contentType := sip.ContentType("application/sdp") invite.AppendHeader(&contentType) invite.SetBody(strings.Join(sdpInfo, "\r\n")+"\r\n", true) subject := sip.GenericHeader{ HeaderName: "Subject", Contents: fmt.Sprintf("%s:%s,%s:0", channel.DeviceID, opt.ssrc, conf.Serial), } invite.AppendHeader(&subject) inviteRes, err := d.SipRequestForResponse(invite) // 回收端口 /*defer func(){ if err != nil { if opt.recyclePort != nil { channel.Info("recycle port", zap.Uint16("mediaPort",opt.MediaPort)) opt.recyclePort(opt.MediaPort) } } }()*/ if err != nil { channel.Error("invite", zap.Error(err), zap.String("msg", invite.String())) if opt.recyclePort != nil { channel.Info("recycle port", zap.Uint16("mediaPort",opt.MediaPort)) opt.recyclePort(opt.MediaPort) } // 拉直播流失败 if opt.IsLive(){ // 设备busy,重启设备 if strings.Contains(err.Error(),"Busy here Creat Media failed"){ channel.Info("reboot device", zap.String("channelId",channel.DeviceID)) go SendRebootMessage(channel.DeviceID) } } return http.StatusInternalServerError, err }else { go DeleteRebootMessage(channel.DeviceID) } code = int(inviteRes.StatusCode()) channel.Info("invite response", zap.Int("status code", code)) if code == http.StatusOK { ds := strings.Split(inviteRes.Body(), "\r\n") for _, l := range ds { if ls := strings.Split(l, "="); len(ls) > 1 { if ls[0] == "y" && len(ls[1]) > 0 { if _ssrc, err := strconv.ParseInt(ls[1], 10, 0); err == nil { opt.SSRC = uint32(_ssrc) } else { channel.Error("read invite response y ", zap.Error(err)) } // break } if ls[0] == "m" && len(ls[1]) > 0 { netinfo := strings.Split(ls[1], " ") if strings.ToUpper(netinfo[2]) == "TCP/RTP/AVP" { channel.Debug("Device support tcp") } else { channel.Debug("Device not support tcp") networkType = "udp" } } } } var psPuber ps.PSPublisher err = psPuber.Receive(streamPath, opt.dump, fmt.Sprintf("%s:%d", networkType, opt.MediaPort), opt.SSRC, reusePort) if err == nil { // 检查是否开启录像,开启录像的IdleTimeout设置为 默认10s isRecord := GetIsRecordByChannelId(channel.DeviceID) if isRecord == 1 { // 10秒无数据关闭 // 需要录像,并且是直播 if opt.IsLive(){ psPuber.Stream.IdleTimeout = time.Second * 10 psPuber.Stream.DelayCloseTimeout = time.Second * 10 } } // 录播不过期 if !opt.IsLive(){ psPuber.Stream.IdleTimeout = 0 psPuber.Stream.DelayCloseTimeout = 0 } /*if !opt.IsLive() { // 10秒无数据关闭 if psPuber.Stream.DelayCloseTimeout == 0 { psPuber.Stream.DelayCloseTimeout = time.Second * 10 } if psPuber.Stream.IdleTimeout == 0 { psPuber.Stream.IdleTimeout = time.Second * 10 } }*/ // 保存的时候截断query内容,只保留stream path streamPathList := strings.Split(streamPath,"?") streamPath = streamPathList[0] PullStreams.Store(streamPath, &PullStream{ opt: opt, channel: channel, inviteRes: inviteRes, }) // 这里出错,会产生no track,close掉stream,自动回收端口 err = srv.Send(sip.NewAckRequest("", invite, inviteRes, "", nil)) if err != nil{ channel.Error("srv.Send", zap.Error(err)) } }else{ channel.Error("psPuber.Receive", zap.Error(err)) if opt.recyclePort != nil { channel.Info("recycle port", zap.Uint16("mediaPort",opt.MediaPort)) opt.recyclePort(opt.MediaPort) } } } return } func (channel *Channel) Bye(streamPath string) int { channel.Info("bye", zap.String("streamPath",streamPath)) d := channel.Device if streamPath == "" { streamPath = fmt.Sprintf("%s/%s", d.ID, channel.DeviceID) } if s, loaded := PullStreams.LoadAndDelete(streamPath); loaded { s.(*PullStream).Bye() if s := Streams.Get(streamPath); s != nil { s.Close() } // 直播将pull strem 设为0 /*if s.(*PullStream).opt.IsLive(){ channel.State.Store(0) }*/ return http.StatusOK } return http.StatusNotFound } func (channel *Channel) Pause(streamPath string) int { if s, loaded := PullStreams.Load(streamPath); loaded { r := s.(*PullStream).Pause() if s := Streams.Get(streamPath); s != nil { s.Pause() } return r } return http.StatusNotFound } func (channel *Channel) Resume(streamPath string) int { if s, loaded := PullStreams.Load(streamPath); loaded { r := s.(*PullStream).Resume() if s := Streams.Get(streamPath); s != nil { s.Resume() } return r } return http.StatusNotFound } func (channel *Channel) PlayAt(streamPath string, second uint) int { if s, loaded := PullStreams.Load(streamPath); loaded { r := s.(*PullStream).PlayAt(second) if s := Streams.Get(streamPath); s != nil { s.Resume() } return r } return http.StatusNotFound } func (channel *Channel) PlayForward(streamPath string, speed float32) int { if s, loaded := PullStreams.Load(streamPath); loaded { return s.(*PullStream).PlayForward(speed) } if s := Streams.Get(streamPath); s != nil { s.Resume() } return http.StatusNotFound } func (channel *Channel) TryAutoInvite(opt *InviteOptions) { condition := !opt.IsLive() || channel.CanInvite() // debug 改为了info channel.Info("TryAutoInvite", zap.Any("opt", opt), zap.Bool("condition", condition)) if condition { go channel.Invite(opt) } } func (channel *Channel) CanInvite() bool { if channel.State.Load() != 0 || len(channel.DeviceID) != 20 || channel.Status == ChannelOffStatus { return false } if conf.InviteIDs == "" { return true } // 11~13位是设备类型编码 typeID := channel.DeviceID[10:13] // format: start-end,type1,type2 tokens := strings.Split(conf.InviteIDs, ",") for _, tok := range tokens { if first, second, ok := strings.Cut(tok, "-"); ok { if typeID >= first && typeID <= second { return true } } else { if typeID == first { return true } } } return false } func getSipRespErrorCode(err error) int { if re, ok := err.(*sip.RequestError); ok { return int(re.Code) } else { return http.StatusInternalServerError } }