|
- package gb28181
- import (
- "fmt"
- "net/http"
- "strconv"
- //"net/url"
- "strings"
- "time"
- "gorm.io/gorm"
- "sync/atomic"
- "crypto/md5"
- "encoding/hex"
- "github.com/ghettovoice/gosip/sip"
- "github.com/goccy/go-json"
- "go.uber.org/zap"
- "m7s.live/engine/v4/db"
- . "m7s.live/engine/v4"
- "m7s.live/engine/v4/log"
- "m7s.live/plugin/gb28181/v4/utils"
- "m7s.live/plugin/ps/v4"
- "sync"
- )
- var QUERY_RECORD_TIMEOUT = time.Second * 5
- var (
- ChannelParentMap sync.Map
- )
- type PullStream struct {
- opt *InviteOptions
- channel *Channel
- inviteRes sip.Response
- }
- func (p *PullStream) CreateRequest(method sip.RequestMethod) (req sip.Request) {
- res := p.inviteRes
- req = p.channel.CreateRequst(method)
- from, _ := res.From()
- to, _ := res.To()
- callId, _ := res.CallID()
- req.ReplaceHeaders(from.Name(), []sip.Header{from})
- req.ReplaceHeaders(to.Name(), []sip.Header{to})
- req.ReplaceHeaders(callId.Name(), []sip.Header{callId})
- return
- }
- func (p *PullStream) Bye() int {
- req := p.CreateRequest(sip.BYE)
- resp, err := p.channel.Device.SipRequestForResponse(req)
- if p.opt.IsLive() {
- p.channel.State.Store(0)
- // 更新通道状态
- p.channel.UpdateChanelState()
- }
- if p.opt.recyclePort != nil {
- //fmt.Println("回收端口 1111111111111111111111111111111111111111",p.opt.MediaPort)
- p.opt.recyclePort(p.opt.MediaPort)
- }
- if err != nil {
- return http.StatusInternalServerError
- }
- return int(resp.StatusCode())
- }
- func (p *PullStream) info(body string) int {
- d := p.channel.Device
- req := p.CreateRequest(sip.INFO)
- contentType := sip.ContentType("Application/MANSRTSP")
- req.AppendHeader(&contentType)
- req.SetBody(body, true)
- resp, err := d.SipRequestForResponse(req)
- if err != nil {
- log.Warnf("Send info to stream error: %v, stream=%s, body=%s", err, p.opt.StreamPath, body)
- return getSipRespErrorCode(err)
- }
- return int(resp.StatusCode())
- }
- // 暂停播放
- func (p *PullStream) Pause() int {
- body := fmt.Sprintf(`PAUSE RTSP/1.0
- CSeq: %d
- PauseTime: now
- `, p.channel.Device.SN)
- return p.info(body)
- }
- // 恢复播放
- func (p *PullStream) Resume() int {
- d := p.channel.Device
- body := fmt.Sprintf(`PLAY RTSP/1.0
- CSeq: %d
- Range: npt=now-
- `, d.SN)
- return p.info(body)
- }
- // 跳转到播放时间
- // second: 相对于起始点调整到第 sec 秒播放
- func (p *PullStream) PlayAt(second uint) int {
- d := p.channel.Device
- body := fmt.Sprintf(`PLAY RTSP/1.0
- CSeq: %d
- Range: npt=%d-
- `, d.SN, second)
- return p.info(body)
- }
- // 快进/快退播放
- // speed 取值: 0.25 0.5 1 2 4 或者其对应的负数表示倒放
- func (p *PullStream) PlayForward(speed float32) int {
- d := p.channel.Device
- body := fmt.Sprintf(`PLAY RTSP/1.0
- CSeq: %d
- Scale: %0.6f
- `, d.SN, speed)
- return p.info(body)
- }
- type Channel struct {
- Device *Device `json:"-" yaml:"-"` // 所属设备
- State atomic.Int32 `json:"-" yaml:"-"` // 通道状态,0:空闲,1:正在invite,2:正在播放/对讲
- LiveSubSP string // 实时子码流,通过rtsp
- GpsTime time.Time // gps时间
- Longitude string // 经度
- Latitude string // 纬度
- *log.Logger `json:"-" yaml:"-"`
- ChannelInfo
- }
- type DeviceIdInfo struct{
- DeviceId string `json:"device_id"`
- ParentId int64 `json:"parent_id"`
- }
- const LiveRoom = "live"
- // 通过通道id找设备id
- func getDeviceIdByChannelId(channelId string) string{
- database := db.DB()
- deviceId := ""
- err := database.Raw("select device_id from t_sdp_delivery where channel_id=?",channelId).Find(&deviceId).Error
- if err != nil{
- fmt.Println("通过通道编号没有找到父设备编号:",err)
- return ""
- }
- /*d := DeviceIdInfo{}
- err := database.Raw("select device_id,parent_id from t_sdp_channel where channel_id=?",channelId).Find(&d).Error
- if err != nil{
- fmt.Println("通过通道编号没有找到设备编号:",err)
- return ""
- }
- if d.ParentId == 0 {
- return d.DeviceId
- }else{
- err = database.Raw("select device_id from t_sdp_delivery where id=?",d.ParentId).Find(&deviceId).Error
- if err != nil{
- fmt.Println("通过通道编号没有找到父设备编号:",err)
- return ""
- }
- }*/
- return deviceId
- }
- // 更新通道state
- func(c *Channel) UpdateChanelState(){
- return
- if c.Model == "AudioOut"{
- return
- }
- GB28181Plugin.Info("更新通道state", zap.String("channelId", c.DeviceID), zap.Int32("state",c.State.Load()))
- //fmt.Println("更新通道state:",c.DeviceID,c.ParentID,c.Status)
- database := db.DB()
- updateMap := map[string]interface{}{}
- updateMap["live_status"] = c.State.Load()
- err := database.Table("t_sdp_channel").Where("channel_id=?",c.DeviceID).Updates(updateMap).Error
- //err := database.Exec(`update t_sdp_channel set live_status=? where channel_id=?`,info.DeviceID,d.ID).Error
- if err != nil {
- GB28181Plugin.Error("更新通道state失败", zap.String("channelId", c.DeviceID), zap.String("error",err.Error()))
- //fmt.Println("更新通道state失败:",err)
- }
- }
- // 更新通道LiveSubSp
- func(c *Channel) UpdateChanelLiveSubSp(){
- return
- if c.Model == "AudioOut"{
- return
- }
- GB28181Plugin.Info("更新通道LiveSubSp", zap.String("channelId", c.DeviceID), zap.Int32("state",c.State.Load()))
- //fmt.Println("更新通道state:",c.DeviceID,c.ParentID,c.Status)
- database := db.DB()
- updateMap := map[string]interface{}{}
- updateMap["live_sub_sp"] = c.LiveSubSP
- err := database.Table("t_sdp_channel").Where("channel_id=?",c.DeviceID).Updates(updateMap).Error
- //err := database.Exec(`update t_sdp_channel set live_status=? where channel_id=?`,info.DeviceID,d.ID).Error
- if err != nil {
- GB28181Plugin.Error("更新通道LiveSubSp失败", zap.String("channelId", c.DeviceID), zap.String("error",err.Error()))
- //fmt.Println("更新通道state失败:",err)
- }
- }
- type ChannelDevice struct {
- Id int64 `json:"id"`
- DeviceId string `json:"device_id"`
- ParentId int64 `json:"parent_id"`
- }
- // 更新通道信息
- func(c *Channel) UpdateChanelInfo(){
- if c.Model == "AudioOut"{
- return
- }
- //c.ParentID = "34020000001320000001"
- GB28181Plugin.Info("更新通道信息", zap.String("channelId", c.DeviceID), zap.Any("info", *c))
- database := db.DB()
- // 通过通道号查询设备号和父id
- channelParent := &ChannelDevice{}
- err := database.Raw("select id, device_id,parent_id from t_sdp_channel where channel_id=?",c.DeviceID).Find(channelParent).Error
- if err != nil{
- GB28181Plugin.Info("查询通道失败", zap.String("channelId", c.DeviceID), zap.String("error",err.Error()))
- return
- }
- updateMap := map[string]interface{}{}
- // 更新父设备id
- if c.ParentID != "" && c.ParentID != channelParent.DeviceId{
- // 父设备id和本设备id不同,表示是nvr下设备
- parenetId := int64(0)
- err := database.Raw("select id from t_sdp_delivery where device_id = ?",c.ParentID).Find(&parenetId).Error
- if err == nil {
- updateMap["parent_id"] = parenetId
- }else{
- GB28181Plugin.Error("查询nvr设备失败", zap.String("channelId", c.DeviceID), zap.String("error",err.Error()))
- }
- channelParent.ParentId = parenetId
- ChannelParentMap.Store(c.DeviceID,*channelParent)
- if parenetId != 0 {
- // 更新nvr设备下摄像头状态
- // 父设备不为0 ,表示是nvr下面的摄像头,更新摄像头状态
- GB28181Plugin.Info("更新nvr下摄像头设备状态", zap.String("channelId", c.DeviceID), zap.Any("status",c.Status))
- status := 0
- updateMap := map[string]interface{}{}
- if c.Status == ChannelOnStatus{
- updateMap["state"] = 1
- status = 1
- }else if c.Status == ChannelOffStatus{
- updateMap["state"] = 0
- }else{
- return
- }
- err = database.Table("t_sdp_delivery").Where("device_id=?",channelParent.DeviceId).Updates(updateMap).Error
- if err != nil {
- GB28181Plugin.Error("更新nvr下摄像头设备状态:",zap.String("id", c.DeviceID),zap.String("error", err.Error()))
- }
- SendMessageToRedis(channelParent.DeviceId,status)
- }
- }else{
- ChannelParentMap.Store(c.DeviceID,*channelParent)
- }
- updateMap["live_status"] = c.State.Load()
- updateMap["status"] = c.ChannelInfo.Status
- updateMap["address"] = c.Address
- updateMap["civil_code"] = c.CivilCode
- updateMap["gps_time"] = c.GpsTime
- updateMap["longitude"] = c.Longitude
- updateMap["latitude"] = c.Latitude
- updateMap["live_sub_sp"] = c.LiveSubSP
- updateMap["manufacturer"] = c.Manufacturer
- updateMap["model"] = c.Model
- updateMap["name"] = c.Name
- updateMap["owner"] = c.Owner
- updateMap["parental"] = c.Parental
- updateMap["port"] = c.Port
- updateMap["safety_way"] = c.SafetyWay
- updateMap["register_way"] = c.RegisterWay
- updateMap["secrecy"] = c.Secrecy
- err = database.Table("t_sdp_channel").Where("id=?",channelParent.Id).Updates(updateMap).Error
- if err != nil {
- GB28181Plugin.Error("更新通道失败", zap.String("channelId", c.DeviceID), zap.String("error", err.Error()))
- }
- //fmt.Println("111111111111111111111111111111111111111111")
- //c.UpdateChanelStatus()
- }
- // 更新通道状态
- func(c *Channel) UpdateChanelStatus(){
- if c.Model == "AudioOut"{
- return
- }
- //GB28181Plugin.Info("更新通道status", zap.String("channelId", c.DeviceID), zap.Any("status",c.Status))
- GB28181Plugin.Info("更新通道status", zap.String("channelId", c.DeviceID), zap.Any("status",c.Status))
- database := db.DB()
- //updateMap := map[string]interface{}{}
- //updateMap["status"] = c.ChannelInfo.Status
- /*err := database.Table("t_sdp_channel").Where("channel_id=?",c.DeviceID).Updates(map[string]interface{}{"status":c.ChannelInfo.Status}).Error
- if err != nil {
- GB28181Plugin.Error("更新更新通道status失败", zap.String("channelId", c.DeviceID), zap.String("error",err.Error()))
- }*/
- //TODO 缓存
- channelParent := ChannelDevice{}
- if v,ok := ChannelParentMap.Load(c.DeviceID);ok{
- channelParent = v.(ChannelDevice)
- GB28181Plugin.Info("找到设备信息", zap.String("channelId", c.DeviceID))
- }else{
- GB28181Plugin.Info("没找到设备信息", zap.String("channelId", c.DeviceID))
- err := database.Raw("select id,device_id,parent_id from t_sdp_channel where channel_id=?",c.DeviceID).Find(&channelParent).Error
- if err != nil{
- GB28181Plugin.Info("通道查找父设备", zap.String("channelId", c.DeviceID), zap.String("error",err.Error()))
- return
- }
- }
- if channelParent.ParentId != 0 {
- // 父设备不为0 ,表示是nvr下面的摄像头,更新摄像头状态
- GB28181Plugin.Info("更新nvr下摄像头设备状态", zap.String("channelId", c.DeviceID), zap.Any("status",c.Status))
- status := 0
- updateMap := map[string]interface{}{}
- if c.Status == ChannelOnStatus{
- updateMap["state"] = 1
- status = 1
- }else if c.Status == ChannelOffStatus{
- updateMap["state"] = 0
- }else{
- return
- }
- err := database.Table("t_sdp_delivery").Where("device_id=?",channelParent.DeviceId).Updates(updateMap).Error
- if err != nil {
- GB28181Plugin.Error("更新nvr下摄像头设备状态:",zap.String("id", c.DeviceID),zap.String("error", err.Error()))
- }
- SendMessageToRedis(channelParent.DeviceId,status)
- }
- }
- // 获取是否需要录像
- func GetIsRecordByChannelId(channelId string) int {
- // TODO 从内存中获取,修改后调用invist接口删除
- if value,ok := DeviceRecordState.Load(channelId);ok{
- return value.(int)
- }
-
- database := db.DB()
- isRecord := int(0)
- err := database.Raw("select is_record from t_sdp_delivery where channel_id=?",channelId).Find(&isRecord).Error
- if err != nil{
- if err == gorm.ErrRecordNotFound{
- DeviceRecordState.Store(channelId,isRecord)
- return isRecord
- }
- return int(1)
- }
- DeviceRecordState.Store(channelId,isRecord)
- return isRecord
- }
- // 根据是否录像标识获取invist模式
- func GetInvistMode(invistMode int,channelMode string,channelId string) int {
- if channelMode == "AudioOut"{
- return INVIDE_MODE_ONSUBSCRIBE
- }
-
- if invistMode == INVIDE_MODE_AUTO{
- isRecord := GetIsRecordByChannelId(channelId)
- if isRecord == 1 {
- return INVIDE_MODE_AUTO
- }else {
- return INVIDE_MODE_ONSUBSCRIBE
- }
- }
- return invistMode
- }
- func SetAllDeviceOffline(){
- database := db.DB()
- err := database.Exec("update t_sdp_delivery set state=0 where sip_id=? and type=2",conf.SipId).Error
- if err != nil{
- fmt.Println("更新所有设备离线失败")
- }else{
- fmt.Println("成功更新所有设备离线")
- }
- }
- func (c *Channel) MarshalJSON() ([]byte, error) {
- m := map[string]any{
- "DeviceID": c.DeviceID,
- "ParentID": c.ParentID,
- "Name": c.Name,
- "Manufacturer": c.Manufacturer,
- "Model": c.Model,
- "Owner": c.Owner,
- "CivilCode": c.CivilCode,
- "Address": c.Address,
- "Port": c.Port,
- "Parental": c.Parental,
- "SafetyWay": c.SafetyWay,
- "RegisterWay": c.RegisterWay,
- "Secrecy": c.Secrecy,
- "Status": c.Status,
- "Longitude": c.Longitude,
- "Latitude": c.Latitude,
- "GpsTime": c.GpsTime,
- "LiveSubSP": c.LiveSubSP,
- "LiveStatus": c.State.Load(),
- }
- return json.Marshal(m)
- }
- // Channel 通道
- type ChannelInfo struct {
- DeviceID string // 通道ID
- ParentID string
- Name string
- Manufacturer string
- Model string
- Owner string
- CivilCode string
- Address string
- Port int
- Parental int
- SafetyWay int
- RegisterWay int
- Secrecy int
- Status ChannelStatus
- }
- type ChannelStatus string
- const (
- ChannelOnStatus = "ON"
- ChannelOffStatus = "OFF"
- )
- func (channel *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request) {
- d := channel.Device
- d.SN++
- callId := sip.CallID(utils.RandNumString(10))
- userAgent := sip.UserAgentHeader("Monibuca")
- maxForwards := sip.MaxForwards(70) //增加max-forwards为默认值 70
- cseq := sip.CSeq{
- SeqNo: uint32(d.SN),
- MethodName: Method,
- }
- port := sip.Port(conf.SipPort)
- serverAddr := sip.Address{
- //DisplayName: sip.String{Str: d.serverConfig.Serial},
- Uri: &sip.SipUri{
- FUser: sip.String{Str: conf.Serial},
- FHost: d.SipIP,
- FPort: &port,
- },
- Params: sip.NewParams().Add("tag", sip.String{Str: utils.RandNumString(9)}),
- }
- //非同一域的目标地址需要使用@host
- host := conf.Realm
- if channel.DeviceID[0:9] != host {
- if channel.Port != 0 {
- deviceIp := d.NetAddr
- deviceIp = deviceIp[0:strings.LastIndex(deviceIp, ":")]
- host = fmt.Sprintf("%s:%d", deviceIp, channel.Port)
- } else {
- host = d.NetAddr
- }
- }
- channelAddr := sip.Address{
- //DisplayName: sip.String{Str: d.serverConfig.Serial},
- Uri: &sip.SipUri{FUser: sip.String{Str: channel.DeviceID}, FHost: host},
- }
- req = sip.NewRequest(
- "",
- Method,
- channelAddr.Uri,
- "SIP/2.0",
- []sip.Header{
- serverAddr.AsFromHeader(),
- channelAddr.AsToHeader(),
- &callId,
- &userAgent,
- &cseq,
- &maxForwards,
- serverAddr.AsContactHeader(),
- },
- "",
- nil,
- )
- req.SetTransport(conf.SipNetwork)
- req.SetDestination(d.NetAddr)
- return req
- }
- func (channel *Channel) QueryRecord(startTime, endTime string) ([]*Record, error) {
- d := channel.Device
- request := d.CreateRequest(sip.MESSAGE)
- contentType := sip.ContentType("Application/MANSCDP+xml")
- request.AppendHeader(&contentType)
- // body := fmt.Sprintf(`<?xml version="1.0"?>
- // <Query>
- // <CmdType>RecordInfo</CmdType>
- // <SN>%d</SN>
- // <DeviceID>%s</DeviceID>
- // <StartTime>%s</StartTime>
- // <EndTime>%s</EndTime>
- // <Secrecy>0</Secrecy>
- // <Type>all</Type>
- // </Query>`, d.sn, channel.DeviceID, startTime, endTime)
- start, _ := strconv.ParseInt(startTime, 10, 0)
- end, _ := strconv.ParseInt(endTime, 10, 0)
- body := BuildRecordInfoXML(d.SN, channel.DeviceID, start, end)
- request.SetBody(body, true)
- resultCh := RecordQueryLink.WaitResult(d.ID, channel.DeviceID, d.SN, QUERY_RECORD_TIMEOUT)
- resp, err := d.SipRequestForResponse(request)
- if err != nil {
- return nil, fmt.Errorf("query error: %s", err)
- }
- if resp.StatusCode() != http.StatusOK {
- return nil, fmt.Errorf("query error, status=%d", resp.StatusCode())
- }
- // RecordQueryLink 中加了超时机制,该结果一定会返回
- // 所以此处不用再增加超时等保护机制
- r := <-resultCh
- return r.list, r.err
- }
- func (channel *Channel) Control(PTZCmd string) int {
- d := channel.Device
- request := d.CreateRequest(sip.MESSAGE)
- contentType := sip.ContentType("Application/MANSCDP+xml")
- request.AppendHeader(&contentType)
- body := fmt.Sprintf(`<?xml version="1.0"?>
- <Control>
- <CmdType>DeviceControl</CmdType>
- <SN>%d</SN>
- <DeviceID>%s</DeviceID>
- <PTZCmd>%s</PTZCmd>
- </Control>`, d.SN, channel.DeviceID, PTZCmd)
- request.SetBody(body, true)
- resp, err := d.SipRequestForResponse(request)
- if err != nil {
- return http.StatusRequestTimeout
- }
- return int(resp.StatusCode())
- }
- // Invite 发送Invite报文 invites a channel to play
- // 注意里面的锁保证不同时发送invite报文,该锁由channel持有
- /***
- f字段: f = v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率
- 各项具体含义:
- v:后续参数为视频的参数;各参数间以 “/”分割;
- 编码格式:十进制整数字符串表示
- 1 –MPEG-4 2 –H.264 3 – SVAC 4 –3GP
- 分辨率:十进制整数字符串表示
- 1 – QCIF 2 – CIF 3 – 4CIF 4 – D1 5 –720P 6 –1080P/I
- 帧率:十进制整数字符串表示 0~99
- 码率类型:十进制整数字符串表示
- 1 – 固定码率(CBR) 2 – 可变码率(VBR)
- 码率大小:十进制整数字符串表示 0~100000(如 1表示1kbps)
- a:后续参数为音频的参数;各参数间以 “/”分割;
- 编码格式:十进制整数字符串表示
- 1 – G.711 2 – G.723.1 3 – G.729 4 – G.722.1
- 码率大小:十进制整数字符串
- 音频编码码率: 1 — 5.3 kbps (注:G.723.1中使用)
- 2 — 6.3 kbps (注:G.723.1中使用)
- 3 — 8 kbps (注:G.729中使用)
- 4 — 16 kbps (注:G.722.1中使用)
- 5 — 24 kbps (注:G.722.1中使用)
- 6 — 32 kbps (注:G.722.1中使用)
- 7 — 48 kbps (注:G.722.1中使用)
- 8 — 64 kbps(注:G.711中使用)
- 采样率:十进制整数字符串表示
- 1 — 8 kHz(注:G.711/ G.723.1/ G.729中使用)
- 2—14 kHz(注:G.722.1中使用)
- 3—16 kHz(注:G.722.1中使用)
- 4—32 kHz(注:G.722.1中使用)
- 注1:字符串说明
- 本节中使用的“十进制整数字符串”的含义为“0”~“4294967296” 之间的十进制数字字符串。
- 注2:参数分割标识
- 各参数间以“/”分割,参数间的分割符“/”不能省略;
- 若两个分割符 “/”间的某参数为空时(即两个分割符 “/”直接将相连时)表示无该参数值;
- 注3:f字段说明
- 使用f字段时,应保证视频和音频参数的结构完整性,即在任何时候,f字段的结构都应是完整的结构:
- f = v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率
- 若只有视频时,音频中的各参数项可以不填写,但应保持 “a///”的结构:
- f = v/编码格式/分辨率/帧率/码率类型/码率大小a///
- 若只有音频时也类似处理,视频中的各参数项可以不填写,但应保持 “v/”的结构:
- f = v/a/编码格式/码率大小/采样率
- f字段中视、音频参数段之间不需空格分割。
- 可使用f字段中的分辨率参数标识同一设备不同分辨率的码流。
- */
- func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) {
- if opt.IsLive() {
- if !channel.State.CompareAndSwap(0, 1) {
- return 304, nil
- }else{
- channel.UpdateChanelState()
- }
-
- defer func() {
- if err != nil {
- GB28181Plugin.Error("Invite", zap.Error(err), zap.String("streamPath",opt.StreamPath ))
- channel.State.Store(0)
- // 重复发布和设备busy不重新拉流
- /*if strings.Contains(err.Error(),"Duplicate Publish") || strings.Contains(err.Error(),"Busy"){
- return
- }
- if GetInvistMode(conf.InviteMode,channel.Model,channel.DeviceID) == INVIDE_MODE_AUTO{
- //if conf.InviteMode == INVIDE_MODE_AUTO {
- time.AfterFunc(time.Second*10, func() {
- // 5秒后重试
- channel.Invite(opt)
- })
- }*/
- } else {
- channel.State.Store(2)
- channel.UpdateChanelState()
- }
- }()
- }
- d := channel.Device
- //streamPath := fmt.Sprintf("%s/%s", d.ID, channel.DeviceID)
- streamPath := fmt.Sprintf("%s/%s",LiveRoom, channel.DeviceID)
- s := "Play"
- opt.CreateSSRC()
- if opt.Record() {
- s = "Playback"
- streamPath = fmt.Sprintf("%s/%s/%d-%d", d.ID, channel.DeviceID, opt.Start, opt.End)
- }
- if opt.StreamPath != "" {
- streamPath = opt.StreamPath
- } else if channel.DeviceID == "" {
- streamPath = "gb28181/" + d.ID
- } else {
- opt.StreamPath = streamPath
- }
- // 对steam path自动加上签名
- if EngineConfig.Publish.Key != ""{
- nowTime := time.Now().Unix()
- expireTime := nowTime + 315360000
- expireTime16 := strconv.FormatInt(expireTime, 16)
- m := md5.New()
- m.Write([]byte(EngineConfig.Publish.Key +streamPath + expireTime16))
- streamPathList := strings.Split(streamPath,"?")
- if len(streamPathList) > 0 {
- streamPath = fmt.Sprintf("%s?%s=%s&%s=%s",streamPathList[0],EngineConfig.Publish.SecretArgName,hex.EncodeToString(m.Sum(nil)),EngineConfig.Publish.ExpireArgName,expireTime16)
- opt.StreamPath = streamPath
- }
- }
-
- if opt.dump == "" {
- opt.dump = conf.DumpPath
- }
- protocol := ""
- networkType := "udp"
- reusePort := true
- if conf.IsMediaNetworkTCP() {
- networkType = "tcp"
- protocol = "TCP/"
- if conf.tcpPorts.Valid {
- opt.MediaPort, err = conf.tcpPorts.GetUnUseTcpPort()
- opt.recyclePort = conf.tcpPorts.Recycle
- reusePort = false
- }
- channel.Info("media port",zap.Uint16("mediaPort",opt.MediaPort),zap.Bool("reusePort",reusePort),zap.Error(err))
- } else {
- if conf.udpPorts.Valid {
- opt.MediaPort, err = conf.udpPorts.GetPort()
- opt.recyclePort = conf.udpPorts.Recycle
- reusePort = false
- }
- }
- if err != nil {
- // TODO 在这里返回的EOF
- return http.StatusInternalServerError, err
- }
- if opt.MediaPort == 0 {
- opt.MediaPort = conf.MediaPort
- }
- sdpInfo := []string{
- "v=0",
- fmt.Sprintf("o=%s 0 0 IN IP4 %s", channel.DeviceID, d.MediaIP),
- "s=" + s,
- "u=" + channel.DeviceID + ":0",
- "c=IN IP4 " + d.MediaIP,
- opt.String(),
- fmt.Sprintf("m=video %d %sRTP/AVP 96", opt.MediaPort, protocol),
- "a=recvonly",
- "a=rtpmap:96 PS/90000",
- "y=" + opt.ssrc,
- }
- if conf.IsMediaNetworkTCP() {
- sdpInfo = append(sdpInfo, "a=setup:passive", "a=connection:new")
- }
- invite := channel.CreateRequst(sip.INVITE)
- contentType := sip.ContentType("application/sdp")
- invite.AppendHeader(&contentType)
- invite.SetBody(strings.Join(sdpInfo, "\r\n")+"\r\n", true)
- subject := sip.GenericHeader{
- HeaderName: "Subject", Contents: fmt.Sprintf("%s:%s,%s:0", channel.DeviceID, opt.ssrc, conf.Serial),
- }
- invite.AppendHeader(&subject)
- inviteRes, err := d.SipRequestForResponse(invite)
- // 回收端口
- /*defer func(){
- if err != nil {
- if opt.recyclePort != nil {
- channel.Info("recycle port", zap.Uint16("mediaPort",opt.MediaPort))
- opt.recyclePort(opt.MediaPort)
- }
- }
- }()*/
- if err != nil {
- channel.Error("invite", zap.Error(err), zap.String("msg", invite.String()))
- if opt.recyclePort != nil {
- channel.Info("recycle port", zap.Uint16("mediaPort",opt.MediaPort))
- opt.recyclePort(opt.MediaPort)
- }
- // 拉直播流失败
- if opt.IsLive(){
- // 设备busy,重启设备
- if strings.Contains(err.Error(),"Busy here Creat Media failed"){
- channel.Info("reboot device", zap.String("channelId",channel.DeviceID))
- go SendRebootMessage(channel.DeviceID)
- }
- }
- return http.StatusInternalServerError, err
- }else {
- go DeleteRebootMessage(channel.DeviceID)
- }
- code = int(inviteRes.StatusCode())
- channel.Info("invite response", zap.Int("status code", code))
- if code == http.StatusOK {
- ds := strings.Split(inviteRes.Body(), "\r\n")
- for _, l := range ds {
- if ls := strings.Split(l, "="); len(ls) > 1 {
- if ls[0] == "y" && len(ls[1]) > 0 {
- if _ssrc, err := strconv.ParseInt(ls[1], 10, 0); err == nil {
- opt.SSRC = uint32(_ssrc)
- } else {
- channel.Error("read invite response y ", zap.Error(err))
- }
- // break
- }
- if ls[0] == "m" && len(ls[1]) > 0 {
- netinfo := strings.Split(ls[1], " ")
- if strings.ToUpper(netinfo[2]) == "TCP/RTP/AVP" {
- channel.Debug("Device support tcp")
- } else {
- channel.Debug("Device not support tcp")
- networkType = "udp"
- }
- }
- }
- }
- var psPuber ps.PSPublisher
-
- err = psPuber.Receive(streamPath, opt.dump, fmt.Sprintf("%s:%d", networkType, opt.MediaPort), opt.SSRC, reusePort)
- if err == nil {
- // 检查是否开启录像,开启录像的IdleTimeout设置为 默认10s
- isRecord := GetIsRecordByChannelId(channel.DeviceID)
- if isRecord == 1 {
- // 10秒无数据关闭
- // 需要录像,并且是直播
- if opt.IsLive(){
- psPuber.Stream.IdleTimeout = time.Second * 10
- psPuber.Stream.DelayCloseTimeout = time.Second * 10
- }
- }
- // 录播不过期
- if !opt.IsLive(){
- psPuber.Stream.IdleTimeout = 0
- psPuber.Stream.DelayCloseTimeout = 0
- }
- /*if !opt.IsLive() {
- // 10秒无数据关闭
- if psPuber.Stream.DelayCloseTimeout == 0 {
- psPuber.Stream.DelayCloseTimeout = time.Second * 10
- }
- if psPuber.Stream.IdleTimeout == 0 {
- psPuber.Stream.IdleTimeout = time.Second * 10
- }
- }*/
- // 保存的时候截断query内容,只保留stream path
- streamPathList := strings.Split(streamPath,"?")
- streamPath = streamPathList[0]
- PullStreams.Store(streamPath, &PullStream{
- opt: opt,
- channel: channel,
- inviteRes: inviteRes,
- })
- // 这里出错,会产生no track,close掉stream,自动回收端口
- err = srv.Send(sip.NewAckRequest("", invite, inviteRes, "", nil))
- if err != nil{
- channel.Error("srv.Send", zap.Error(err))
- }
- }else{
- channel.Error("psPuber.Receive", zap.Error(err))
- if opt.recyclePort != nil {
- channel.Info("recycle port", zap.Uint16("mediaPort",opt.MediaPort))
- opt.recyclePort(opt.MediaPort)
- }
- }
- }
- return
- }
- func (channel *Channel) Bye(streamPath string) int {
- channel.Info("bye", zap.String("streamPath",streamPath))
- d := channel.Device
- if streamPath == "" {
- streamPath = fmt.Sprintf("%s/%s", d.ID, channel.DeviceID)
- }
- if s, loaded := PullStreams.LoadAndDelete(streamPath); loaded {
- s.(*PullStream).Bye()
- if s := Streams.Get(streamPath); s != nil {
- s.Close()
- }
- // 直播将pull strem 设为0
- /*if s.(*PullStream).opt.IsLive(){
- channel.State.Store(0)
- }*/
- return http.StatusOK
- }
- return http.StatusNotFound
- }
- func (channel *Channel) Pause(streamPath string) int {
- if s, loaded := PullStreams.Load(streamPath); loaded {
- r := s.(*PullStream).Pause()
- if s := Streams.Get(streamPath); s != nil {
- s.Pause()
- }
- return r
- }
- return http.StatusNotFound
- }
- func (channel *Channel) Resume(streamPath string) int {
- if s, loaded := PullStreams.Load(streamPath); loaded {
- r := s.(*PullStream).Resume()
- if s := Streams.Get(streamPath); s != nil {
- s.Resume()
- }
- return r
- }
- return http.StatusNotFound
- }
- func (channel *Channel) PlayAt(streamPath string, second uint) int {
- if s, loaded := PullStreams.Load(streamPath); loaded {
- r := s.(*PullStream).PlayAt(second)
- if s := Streams.Get(streamPath); s != nil {
- s.Resume()
- }
- return r
- }
- return http.StatusNotFound
- }
- func (channel *Channel) PlayForward(streamPath string, speed float32) int {
- if s, loaded := PullStreams.Load(streamPath); loaded {
- return s.(*PullStream).PlayForward(speed)
- }
- if s := Streams.Get(streamPath); s != nil {
- s.Resume()
- }
- return http.StatusNotFound
- }
- func (channel *Channel) TryAutoInvite(opt *InviteOptions) {
- condition := !opt.IsLive() || channel.CanInvite()
- // debug 改为了info
- channel.Info("TryAutoInvite", zap.Any("opt", opt), zap.Bool("condition", condition))
- if condition {
- go channel.Invite(opt)
- }
- }
- func (channel *Channel) CanInvite() bool {
- if channel.State.Load() != 0 || len(channel.DeviceID) != 20 || channel.Status == ChannelOffStatus {
- return false
- }
- if conf.InviteIDs == "" {
- return true
- }
- // 11~13位是设备类型编码
- typeID := channel.DeviceID[10:13]
- // format: start-end,type1,type2
- tokens := strings.Split(conf.InviteIDs, ",")
- for _, tok := range tokens {
- if first, second, ok := strings.Cut(tok, "-"); ok {
- if typeID >= first && typeID <= second {
- return true
- }
- } else {
- if typeID == first {
- return true
- }
- }
- }
- return false
- }
- func getSipRespErrorCode(err error) int {
- if re, ok := err.(*sip.RequestError); ok {
- return int(re.Code)
- } else {
- return http.StatusInternalServerError
- }
- }
|