package gb28181
import (
"fmt"
"net/http"
"strconv"
//"net/url"
"strings"
"time"
"gorm.io/gorm"
"sync/atomic"
"crypto/md5"
"encoding/hex"
"github.com/ghettovoice/gosip/sip"
"github.com/goccy/go-json"
"go.uber.org/zap"
"m7s.live/engine/v4/db"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/log"
"m7s.live/plugin/gb28181/v4/utils"
"m7s.live/plugin/ps/v4"
"sync"
)
var QUERY_RECORD_TIMEOUT = time.Second * 5
var (
ChannelParentMap sync.Map
)
type PullStream struct {
opt *InviteOptions
channel *Channel
inviteRes sip.Response
}
func (p *PullStream) CreateRequest(method sip.RequestMethod) (req sip.Request) {
res := p.inviteRes
req = p.channel.CreateRequst(method)
from, _ := res.From()
to, _ := res.To()
callId, _ := res.CallID()
req.ReplaceHeaders(from.Name(), []sip.Header{from})
req.ReplaceHeaders(to.Name(), []sip.Header{to})
req.ReplaceHeaders(callId.Name(), []sip.Header{callId})
return
}
func (p *PullStream) Bye() int {
req := p.CreateRequest(sip.BYE)
resp, err := p.channel.Device.SipRequestForResponse(req)
if p.opt.IsLive() {
p.channel.State.Store(0)
// 更新通道状态
p.channel.UpdateChanelState()
}
if p.opt.recyclePort != nil {
//fmt.Println("回收端口 1111111111111111111111111111111111111111",p.opt.MediaPort)
p.opt.recyclePort(p.opt.MediaPort)
}
if err != nil {
return http.StatusInternalServerError
}
return int(resp.StatusCode())
}
func (p *PullStream) info(body string) int {
d := p.channel.Device
req := p.CreateRequest(sip.INFO)
contentType := sip.ContentType("Application/MANSRTSP")
req.AppendHeader(&contentType)
req.SetBody(body, true)
resp, err := d.SipRequestForResponse(req)
if err != nil {
log.Warnf("Send info to stream error: %v, stream=%s, body=%s", err, p.opt.StreamPath, body)
return getSipRespErrorCode(err)
}
return int(resp.StatusCode())
}
// 暂停播放
func (p *PullStream) Pause() int {
body := fmt.Sprintf(`PAUSE RTSP/1.0
CSeq: %d
PauseTime: now
`, p.channel.Device.SN)
return p.info(body)
}
// 恢复播放
func (p *PullStream) Resume() int {
d := p.channel.Device
body := fmt.Sprintf(`PLAY RTSP/1.0
CSeq: %d
Range: npt=now-
`, d.SN)
return p.info(body)
}
// 跳转到播放时间
// second: 相对于起始点调整到第 sec 秒播放
func (p *PullStream) PlayAt(second uint) int {
d := p.channel.Device
body := fmt.Sprintf(`PLAY RTSP/1.0
CSeq: %d
Range: npt=%d-
`, d.SN, second)
return p.info(body)
}
// 快进/快退播放
// speed 取值: 0.25 0.5 1 2 4 或者其对应的负数表示倒放
func (p *PullStream) PlayForward(speed float32) int {
d := p.channel.Device
body := fmt.Sprintf(`PLAY RTSP/1.0
CSeq: %d
Scale: %0.6f
`, d.SN, speed)
return p.info(body)
}
type Channel struct {
Device *Device `json:"-" yaml:"-"` // 所属设备
State atomic.Int32 `json:"-" yaml:"-"` // 通道状态,0:空闲,1:正在invite,2:正在播放/对讲
LiveSubSP string // 实时子码流,通过rtsp
GpsTime time.Time // gps时间
Longitude string // 经度
Latitude string // 纬度
*log.Logger `json:"-" yaml:"-"`
ChannelInfo
}
type DeviceIdInfo struct{
DeviceId string `json:"device_id"`
ParentId int64 `json:"parent_id"`
}
const LiveRoom = "live"
// 通过通道id找设备id
func getDeviceIdByChannelId(channelId string) string{
database := db.DB()
deviceId := ""
err := database.Raw("select device_id from t_sdp_delivery where channel_id=?",channelId).Find(&deviceId).Error
if err != nil{
fmt.Println("通过通道编号没有找到父设备编号:",err)
return ""
}
/*d := DeviceIdInfo{}
err := database.Raw("select device_id,parent_id from t_sdp_channel where channel_id=?",channelId).Find(&d).Error
if err != nil{
fmt.Println("通过通道编号没有找到设备编号:",err)
return ""
}
if d.ParentId == 0 {
return d.DeviceId
}else{
err = database.Raw("select device_id from t_sdp_delivery where id=?",d.ParentId).Find(&deviceId).Error
if err != nil{
fmt.Println("通过通道编号没有找到父设备编号:",err)
return ""
}
}*/
return deviceId
}
// 更新通道state
func(c *Channel) UpdateChanelState(){
return
if c.Model == "AudioOut"{
return
}
GB28181Plugin.Info("更新通道state", zap.String("channelId", c.DeviceID), zap.Int32("state",c.State.Load()))
//fmt.Println("更新通道state:",c.DeviceID,c.ParentID,c.Status)
database := db.DB()
updateMap := map[string]interface{}{}
updateMap["live_status"] = c.State.Load()
err := database.Table("t_sdp_channel").Where("channel_id=?",c.DeviceID).Updates(updateMap).Error
//err := database.Exec(`update t_sdp_channel set live_status=? where channel_id=?`,info.DeviceID,d.ID).Error
if err != nil {
GB28181Plugin.Error("更新通道state失败", zap.String("channelId", c.DeviceID), zap.String("error",err.Error()))
//fmt.Println("更新通道state失败:",err)
}
}
// 更新通道LiveSubSp
func(c *Channel) UpdateChanelLiveSubSp(){
return
if c.Model == "AudioOut"{
return
}
GB28181Plugin.Info("更新通道LiveSubSp", zap.String("channelId", c.DeviceID), zap.Int32("state",c.State.Load()))
//fmt.Println("更新通道state:",c.DeviceID,c.ParentID,c.Status)
database := db.DB()
updateMap := map[string]interface{}{}
updateMap["live_sub_sp"] = c.LiveSubSP
err := database.Table("t_sdp_channel").Where("channel_id=?",c.DeviceID).Updates(updateMap).Error
//err := database.Exec(`update t_sdp_channel set live_status=? where channel_id=?`,info.DeviceID,d.ID).Error
if err != nil {
GB28181Plugin.Error("更新通道LiveSubSp失败", zap.String("channelId", c.DeviceID), zap.String("error",err.Error()))
//fmt.Println("更新通道state失败:",err)
}
}
type ChannelDevice struct {
Id int64 `json:"id"`
DeviceId string `json:"device_id"`
ParentId int64 `json:"parent_id"`
}
// 更新通道信息
func(c *Channel) UpdateChanelInfo(){
if c.Model == "AudioOut"{
return
}
//c.ParentID = "34020000001320000001"
GB28181Plugin.Info("更新通道信息", zap.String("channelId", c.DeviceID), zap.Any("info", *c))
database := db.DB()
// 通过通道号查询设备号和父id
channelParent := &ChannelDevice{}
err := database.Raw("select id, device_id,parent_id from t_sdp_channel where channel_id=?",c.DeviceID).Find(channelParent).Error
if err != nil{
GB28181Plugin.Info("查询通道失败", zap.String("channelId", c.DeviceID), zap.String("error",err.Error()))
return
}
updateMap := map[string]interface{}{}
// 更新父设备id
if c.ParentID != "" && c.ParentID != channelParent.DeviceId{
// 父设备id和本设备id不同,表示是nvr下设备
parenetId := int64(0)
err := database.Raw("select id from t_sdp_delivery where device_id = ?",c.ParentID).Find(&parenetId).Error
if err == nil {
updateMap["parent_id"] = parenetId
}else{
GB28181Plugin.Error("查询nvr设备失败", zap.String("channelId", c.DeviceID), zap.String("error",err.Error()))
}
channelParent.ParentId = parenetId
ChannelParentMap.Store(c.DeviceID,*channelParent)
if parenetId != 0 {
// 更新nvr设备下摄像头状态
// 父设备不为0 ,表示是nvr下面的摄像头,更新摄像头状态
GB28181Plugin.Info("更新nvr下摄像头设备状态", zap.String("channelId", c.DeviceID), zap.Any("status",c.Status))
status := 0
updateMap := map[string]interface{}{}
if c.Status == ChannelOnStatus{
updateMap["state"] = 1
status = 1
}else if c.Status == ChannelOffStatus{
updateMap["state"] = 0
}else{
return
}
err = database.Table("t_sdp_delivery").Where("device_id=?",channelParent.DeviceId).Updates(updateMap).Error
if err != nil {
GB28181Plugin.Error("更新nvr下摄像头设备状态:",zap.String("id", c.DeviceID),zap.String("error", err.Error()))
}
SendMessageToRedis(channelParent.DeviceId,status)
}
}else{
ChannelParentMap.Store(c.DeviceID,*channelParent)
}
updateMap["live_status"] = c.State.Load()
updateMap["status"] = c.ChannelInfo.Status
updateMap["address"] = c.Address
updateMap["civil_code"] = c.CivilCode
updateMap["gps_time"] = c.GpsTime
updateMap["longitude"] = c.Longitude
updateMap["latitude"] = c.Latitude
updateMap["live_sub_sp"] = c.LiveSubSP
updateMap["manufacturer"] = c.Manufacturer
updateMap["model"] = c.Model
updateMap["name"] = c.Name
updateMap["owner"] = c.Owner
updateMap["parental"] = c.Parental
updateMap["port"] = c.Port
updateMap["safety_way"] = c.SafetyWay
updateMap["register_way"] = c.RegisterWay
updateMap["secrecy"] = c.Secrecy
err = database.Table("t_sdp_channel").Where("id=?",channelParent.Id).Updates(updateMap).Error
if err != nil {
GB28181Plugin.Error("更新通道失败", zap.String("channelId", c.DeviceID), zap.String("error", err.Error()))
}
//fmt.Println("111111111111111111111111111111111111111111")
//c.UpdateChanelStatus()
}
// 更新通道状态
func(c *Channel) UpdateChanelStatus(){
if c.Model == "AudioOut"{
return
}
//GB28181Plugin.Info("更新通道status", zap.String("channelId", c.DeviceID), zap.Any("status",c.Status))
GB28181Plugin.Info("更新通道status", zap.String("channelId", c.DeviceID), zap.Any("status",c.Status))
database := db.DB()
//updateMap := map[string]interface{}{}
//updateMap["status"] = c.ChannelInfo.Status
/*err := database.Table("t_sdp_channel").Where("channel_id=?",c.DeviceID).Updates(map[string]interface{}{"status":c.ChannelInfo.Status}).Error
if err != nil {
GB28181Plugin.Error("更新更新通道status失败", zap.String("channelId", c.DeviceID), zap.String("error",err.Error()))
}*/
//TODO 缓存
channelParent := ChannelDevice{}
if v,ok := ChannelParentMap.Load(c.DeviceID);ok{
channelParent = v.(ChannelDevice)
GB28181Plugin.Info("找到设备信息", zap.String("channelId", c.DeviceID))
}else{
GB28181Plugin.Info("没找到设备信息", zap.String("channelId", c.DeviceID))
err := database.Raw("select id,device_id,parent_id from t_sdp_channel where channel_id=?",c.DeviceID).Find(&channelParent).Error
if err != nil{
GB28181Plugin.Info("通道查找父设备", zap.String("channelId", c.DeviceID), zap.String("error",err.Error()))
return
}
}
if channelParent.ParentId != 0 {
// 父设备不为0 ,表示是nvr下面的摄像头,更新摄像头状态
GB28181Plugin.Info("更新nvr下摄像头设备状态", zap.String("channelId", c.DeviceID), zap.Any("status",c.Status))
status := 0
updateMap := map[string]interface{}{}
if c.Status == ChannelOnStatus{
updateMap["state"] = 1
status = 1
}else if c.Status == ChannelOffStatus{
updateMap["state"] = 0
}else{
return
}
err := database.Table("t_sdp_delivery").Where("device_id=?",channelParent.DeviceId).Updates(updateMap).Error
if err != nil {
GB28181Plugin.Error("更新nvr下摄像头设备状态:",zap.String("id", c.DeviceID),zap.String("error", err.Error()))
}
SendMessageToRedis(channelParent.DeviceId,status)
}
}
// 获取是否需要录像
func GetIsRecordByChannelId(channelId string) int {
// TODO 从内存中获取,修改后调用invist接口删除
if value,ok := DeviceRecordState.Load(channelId);ok{
return value.(int)
}
database := db.DB()
isRecord := int(0)
err := database.Raw("select is_record from t_sdp_delivery where channel_id=?",channelId).Find(&isRecord).Error
if err != nil{
if err == gorm.ErrRecordNotFound{
DeviceRecordState.Store(channelId,isRecord)
return isRecord
}
return int(1)
}
DeviceRecordState.Store(channelId,isRecord)
return isRecord
}
// 根据是否录像标识获取invist模式
func GetInvistMode(invistMode int,channelMode string,channelId string) int {
if channelMode == "AudioOut"{
return INVIDE_MODE_ONSUBSCRIBE
}
if invistMode == INVIDE_MODE_AUTO{
isRecord := GetIsRecordByChannelId(channelId)
if isRecord == 1 {
return INVIDE_MODE_AUTO
}else {
return INVIDE_MODE_ONSUBSCRIBE
}
}
return invistMode
}
func SetAllDeviceOffline(){
database := db.DB()
err := database.Exec("update t_sdp_delivery set state=0 where sip_id=? and type=2",conf.SipId).Error
if err != nil{
fmt.Println("更新所有设备离线失败")
}else{
fmt.Println("成功更新所有设备离线")
}
}
func (c *Channel) MarshalJSON() ([]byte, error) {
m := map[string]any{
"DeviceID": c.DeviceID,
"ParentID": c.ParentID,
"Name": c.Name,
"Manufacturer": c.Manufacturer,
"Model": c.Model,
"Owner": c.Owner,
"CivilCode": c.CivilCode,
"Address": c.Address,
"Port": c.Port,
"Parental": c.Parental,
"SafetyWay": c.SafetyWay,
"RegisterWay": c.RegisterWay,
"Secrecy": c.Secrecy,
"Status": c.Status,
"Longitude": c.Longitude,
"Latitude": c.Latitude,
"GpsTime": c.GpsTime,
"LiveSubSP": c.LiveSubSP,
"LiveStatus": c.State.Load(),
}
return json.Marshal(m)
}
// Channel 通道
type ChannelInfo struct {
DeviceID string // 通道ID
ParentID string
Name string
Manufacturer string
Model string
Owner string
CivilCode string
Address string
Port int
Parental int
SafetyWay int
RegisterWay int
Secrecy int
Status ChannelStatus
}
type ChannelStatus string
const (
ChannelOnStatus = "ON"
ChannelOffStatus = "OFF"
)
func (channel *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request) {
d := channel.Device
d.SN++
callId := sip.CallID(utils.RandNumString(10))
userAgent := sip.UserAgentHeader("Monibuca")
maxForwards := sip.MaxForwards(70) //增加max-forwards为默认值 70
cseq := sip.CSeq{
SeqNo: uint32(d.SN),
MethodName: Method,
}
port := sip.Port(conf.SipPort)
serverAddr := sip.Address{
//DisplayName: sip.String{Str: d.serverConfig.Serial},
Uri: &sip.SipUri{
FUser: sip.String{Str: conf.Serial},
FHost: d.SipIP,
FPort: &port,
},
Params: sip.NewParams().Add("tag", sip.String{Str: utils.RandNumString(9)}),
}
//非同一域的目标地址需要使用@host
host := conf.Realm
if channel.DeviceID[0:9] != host {
if channel.Port != 0 {
deviceIp := d.NetAddr
deviceIp = deviceIp[0:strings.LastIndex(deviceIp, ":")]
host = fmt.Sprintf("%s:%d", deviceIp, channel.Port)
} else {
host = d.NetAddr
}
}
channelAddr := sip.Address{
//DisplayName: sip.String{Str: d.serverConfig.Serial},
Uri: &sip.SipUri{FUser: sip.String{Str: channel.DeviceID}, FHost: host},
}
req = sip.NewRequest(
"",
Method,
channelAddr.Uri,
"SIP/2.0",
[]sip.Header{
serverAddr.AsFromHeader(),
channelAddr.AsToHeader(),
&callId,
&userAgent,
&cseq,
&maxForwards,
serverAddr.AsContactHeader(),
},
"",
nil,
)
req.SetTransport(conf.SipNetwork)
req.SetDestination(d.NetAddr)
return req
}
func (channel *Channel) QueryRecord(startTime, endTime string) ([]*Record, error) {
d := channel.Device
request := d.CreateRequest(sip.MESSAGE)
contentType := sip.ContentType("Application/MANSCDP+xml")
request.AppendHeader(&contentType)
// body := fmt.Sprintf(`
//
// RecordInfo
// %d
// %s
// %s
// %s
// 0
// all
// `, d.sn, channel.DeviceID, startTime, endTime)
start, _ := strconv.ParseInt(startTime, 10, 0)
end, _ := strconv.ParseInt(endTime, 10, 0)
body := BuildRecordInfoXML(d.SN, channel.DeviceID, start, end)
request.SetBody(body, true)
resultCh := RecordQueryLink.WaitResult(d.ID, channel.DeviceID, d.SN, QUERY_RECORD_TIMEOUT)
resp, err := d.SipRequestForResponse(request)
if err != nil {
return nil, fmt.Errorf("query error: %s", err)
}
if resp.StatusCode() != http.StatusOK {
return nil, fmt.Errorf("query error, status=%d", resp.StatusCode())
}
// RecordQueryLink 中加了超时机制,该结果一定会返回
// 所以此处不用再增加超时等保护机制
r := <-resultCh
return r.list, r.err
}
func (channel *Channel) Control(PTZCmd string) int {
d := channel.Device
request := d.CreateRequest(sip.MESSAGE)
contentType := sip.ContentType("Application/MANSCDP+xml")
request.AppendHeader(&contentType)
body := fmt.Sprintf(`
DeviceControl
%d
%s
%s
`, d.SN, channel.DeviceID, PTZCmd)
request.SetBody(body, true)
resp, err := d.SipRequestForResponse(request)
if err != nil {
return http.StatusRequestTimeout
}
return int(resp.StatusCode())
}
// Invite 发送Invite报文 invites a channel to play
// 注意里面的锁保证不同时发送invite报文,该锁由channel持有
/***
f字段: f = v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率
各项具体含义:
v:后续参数为视频的参数;各参数间以 “/”分割;
编码格式:十进制整数字符串表示
1 –MPEG-4 2 –H.264 3 – SVAC 4 –3GP
分辨率:十进制整数字符串表示
1 – QCIF 2 – CIF 3 – 4CIF 4 – D1 5 –720P 6 –1080P/I
帧率:十进制整数字符串表示 0~99
码率类型:十进制整数字符串表示
1 – 固定码率(CBR) 2 – 可变码率(VBR)
码率大小:十进制整数字符串表示 0~100000(如 1表示1kbps)
a:后续参数为音频的参数;各参数间以 “/”分割;
编码格式:十进制整数字符串表示
1 – G.711 2 – G.723.1 3 – G.729 4 – G.722.1
码率大小:十进制整数字符串
音频编码码率: 1 — 5.3 kbps (注:G.723.1中使用)
2 — 6.3 kbps (注:G.723.1中使用)
3 — 8 kbps (注:G.729中使用)
4 — 16 kbps (注:G.722.1中使用)
5 — 24 kbps (注:G.722.1中使用)
6 — 32 kbps (注:G.722.1中使用)
7 — 48 kbps (注:G.722.1中使用)
8 — 64 kbps(注:G.711中使用)
采样率:十进制整数字符串表示
1 — 8 kHz(注:G.711/ G.723.1/ G.729中使用)
2—14 kHz(注:G.722.1中使用)
3—16 kHz(注:G.722.1中使用)
4—32 kHz(注:G.722.1中使用)
注1:字符串说明
本节中使用的“十进制整数字符串”的含义为“0”~“4294967296” 之间的十进制数字字符串。
注2:参数分割标识
各参数间以“/”分割,参数间的分割符“/”不能省略;
若两个分割符 “/”间的某参数为空时(即两个分割符 “/”直接将相连时)表示无该参数值;
注3:f字段说明
使用f字段时,应保证视频和音频参数的结构完整性,即在任何时候,f字段的结构都应是完整的结构:
f = v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率
若只有视频时,音频中的各参数项可以不填写,但应保持 “a///”的结构:
f = v/编码格式/分辨率/帧率/码率类型/码率大小a///
若只有音频时也类似处理,视频中的各参数项可以不填写,但应保持 “v/”的结构:
f = v/a/编码格式/码率大小/采样率
f字段中视、音频参数段之间不需空格分割。
可使用f字段中的分辨率参数标识同一设备不同分辨率的码流。
*/
func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) {
if opt.IsLive() {
if !channel.State.CompareAndSwap(0, 1) {
return 304, nil
}else{
channel.UpdateChanelState()
}
defer func() {
if err != nil {
GB28181Plugin.Error("Invite", zap.Error(err), zap.String("streamPath",opt.StreamPath ))
channel.State.Store(0)
// 重复发布和设备busy不重新拉流
/*if strings.Contains(err.Error(),"Duplicate Publish") || strings.Contains(err.Error(),"Busy"){
return
}
if GetInvistMode(conf.InviteMode,channel.Model,channel.DeviceID) == INVIDE_MODE_AUTO{
//if conf.InviteMode == INVIDE_MODE_AUTO {
time.AfterFunc(time.Second*10, func() {
// 5秒后重试
channel.Invite(opt)
})
}*/
} else {
channel.State.Store(2)
channel.UpdateChanelState()
}
}()
}
d := channel.Device
//streamPath := fmt.Sprintf("%s/%s", d.ID, channel.DeviceID)
streamPath := fmt.Sprintf("%s/%s",LiveRoom, channel.DeviceID)
s := "Play"
opt.CreateSSRC()
if opt.Record() {
s = "Playback"
streamPath = fmt.Sprintf("%s/%s/%d-%d", d.ID, channel.DeviceID, opt.Start, opt.End)
}
if opt.StreamPath != "" {
streamPath = opt.StreamPath
} else if channel.DeviceID == "" {
streamPath = "gb28181/" + d.ID
} else {
opt.StreamPath = streamPath
}
// 对steam path自动加上签名
if EngineConfig.Publish.Key != ""{
nowTime := time.Now().Unix()
expireTime := nowTime + 315360000
expireTime16 := strconv.FormatInt(expireTime, 16)
m := md5.New()
m.Write([]byte(EngineConfig.Publish.Key +streamPath + expireTime16))
streamPathList := strings.Split(streamPath,"?")
if len(streamPathList) > 0 {
streamPath = fmt.Sprintf("%s?%s=%s&%s=%s",streamPathList[0],EngineConfig.Publish.SecretArgName,hex.EncodeToString(m.Sum(nil)),EngineConfig.Publish.ExpireArgName,expireTime16)
opt.StreamPath = streamPath
}
}
if opt.dump == "" {
opt.dump = conf.DumpPath
}
protocol := ""
networkType := "udp"
reusePort := true
if conf.IsMediaNetworkTCP() {
networkType = "tcp"
protocol = "TCP/"
if conf.tcpPorts.Valid {
opt.MediaPort, err = conf.tcpPorts.GetUnUseTcpPort()
opt.recyclePort = conf.tcpPorts.Recycle
reusePort = false
}
channel.Info("media port",zap.Uint16("mediaPort",opt.MediaPort),zap.Bool("reusePort",reusePort),zap.Error(err))
} else {
if conf.udpPorts.Valid {
opt.MediaPort, err = conf.udpPorts.GetPort()
opt.recyclePort = conf.udpPorts.Recycle
reusePort = false
}
}
if err != nil {
// TODO 在这里返回的EOF
return http.StatusInternalServerError, err
}
if opt.MediaPort == 0 {
opt.MediaPort = conf.MediaPort
}
sdpInfo := []string{
"v=0",
fmt.Sprintf("o=%s 0 0 IN IP4 %s", channel.DeviceID, d.MediaIP),
"s=" + s,
"u=" + channel.DeviceID + ":0",
"c=IN IP4 " + d.MediaIP,
opt.String(),
fmt.Sprintf("m=video %d %sRTP/AVP 96", opt.MediaPort, protocol),
"a=recvonly",
"a=rtpmap:96 PS/90000",
"y=" + opt.ssrc,
}
if conf.IsMediaNetworkTCP() {
sdpInfo = append(sdpInfo, "a=setup:passive", "a=connection:new")
}
invite := channel.CreateRequst(sip.INVITE)
contentType := sip.ContentType("application/sdp")
invite.AppendHeader(&contentType)
invite.SetBody(strings.Join(sdpInfo, "\r\n")+"\r\n", true)
subject := sip.GenericHeader{
HeaderName: "Subject", Contents: fmt.Sprintf("%s:%s,%s:0", channel.DeviceID, opt.ssrc, conf.Serial),
}
invite.AppendHeader(&subject)
inviteRes, err := d.SipRequestForResponse(invite)
// 回收端口
/*defer func(){
if err != nil {
if opt.recyclePort != nil {
channel.Info("recycle port", zap.Uint16("mediaPort",opt.MediaPort))
opt.recyclePort(opt.MediaPort)
}
}
}()*/
if err != nil {
channel.Error("invite", zap.Error(err), zap.String("msg", invite.String()))
if opt.recyclePort != nil {
channel.Info("recycle port", zap.Uint16("mediaPort",opt.MediaPort))
opt.recyclePort(opt.MediaPort)
}
// 拉直播流失败
if opt.IsLive(){
// 设备busy,重启设备
if strings.Contains(err.Error(),"Busy here Creat Media failed"){
channel.Info("reboot device", zap.String("channelId",channel.DeviceID))
go SendRebootMessage(channel.DeviceID)
}
}
return http.StatusInternalServerError, err
}else {
go DeleteRebootMessage(channel.DeviceID)
}
code = int(inviteRes.StatusCode())
channel.Info("invite response", zap.Int("status code", code))
if code == http.StatusOK {
ds := strings.Split(inviteRes.Body(), "\r\n")
for _, l := range ds {
if ls := strings.Split(l, "="); len(ls) > 1 {
if ls[0] == "y" && len(ls[1]) > 0 {
if _ssrc, err := strconv.ParseInt(ls[1], 10, 0); err == nil {
opt.SSRC = uint32(_ssrc)
} else {
channel.Error("read invite response y ", zap.Error(err))
}
// break
}
if ls[0] == "m" && len(ls[1]) > 0 {
netinfo := strings.Split(ls[1], " ")
if strings.ToUpper(netinfo[2]) == "TCP/RTP/AVP" {
channel.Debug("Device support tcp")
} else {
channel.Debug("Device not support tcp")
networkType = "udp"
}
}
}
}
var psPuber ps.PSPublisher
err = psPuber.Receive(streamPath, opt.dump, fmt.Sprintf("%s:%d", networkType, opt.MediaPort), opt.SSRC, reusePort)
if err == nil {
// 检查是否开启录像,开启录像的IdleTimeout设置为 默认10s
isRecord := GetIsRecordByChannelId(channel.DeviceID)
if isRecord == 1 {
// 10秒无数据关闭
// 需要录像,并且是直播
if opt.IsLive(){
psPuber.Stream.IdleTimeout = time.Second * 10
psPuber.Stream.DelayCloseTimeout = time.Second * 10
}
}
// 录播不过期
if !opt.IsLive(){
psPuber.Stream.IdleTimeout = 0
psPuber.Stream.DelayCloseTimeout = 0
}
/*if !opt.IsLive() {
// 10秒无数据关闭
if psPuber.Stream.DelayCloseTimeout == 0 {
psPuber.Stream.DelayCloseTimeout = time.Second * 10
}
if psPuber.Stream.IdleTimeout == 0 {
psPuber.Stream.IdleTimeout = time.Second * 10
}
}*/
// 保存的时候截断query内容,只保留stream path
streamPathList := strings.Split(streamPath,"?")
streamPath = streamPathList[0]
PullStreams.Store(streamPath, &PullStream{
opt: opt,
channel: channel,
inviteRes: inviteRes,
})
// 这里出错,会产生no track,close掉stream,自动回收端口
err = srv.Send(sip.NewAckRequest("", invite, inviteRes, "", nil))
if err != nil{
channel.Error("srv.Send", zap.Error(err))
}
}else{
channel.Error("psPuber.Receive", zap.Error(err))
if opt.recyclePort != nil {
channel.Info("recycle port", zap.Uint16("mediaPort",opt.MediaPort))
opt.recyclePort(opt.MediaPort)
}
}
}
return
}
func (channel *Channel) Bye(streamPath string) int {
channel.Info("bye", zap.String("streamPath",streamPath))
d := channel.Device
if streamPath == "" {
streamPath = fmt.Sprintf("%s/%s", d.ID, channel.DeviceID)
}
if s, loaded := PullStreams.LoadAndDelete(streamPath); loaded {
s.(*PullStream).Bye()
if s := Streams.Get(streamPath); s != nil {
s.Close()
}
// 直播将pull strem 设为0
/*if s.(*PullStream).opt.IsLive(){
channel.State.Store(0)
}*/
return http.StatusOK
}
return http.StatusNotFound
}
func (channel *Channel) Pause(streamPath string) int {
if s, loaded := PullStreams.Load(streamPath); loaded {
r := s.(*PullStream).Pause()
if s := Streams.Get(streamPath); s != nil {
s.Pause()
}
return r
}
return http.StatusNotFound
}
func (channel *Channel) Resume(streamPath string) int {
if s, loaded := PullStreams.Load(streamPath); loaded {
r := s.(*PullStream).Resume()
if s := Streams.Get(streamPath); s != nil {
s.Resume()
}
return r
}
return http.StatusNotFound
}
func (channel *Channel) PlayAt(streamPath string, second uint) int {
if s, loaded := PullStreams.Load(streamPath); loaded {
r := s.(*PullStream).PlayAt(second)
if s := Streams.Get(streamPath); s != nil {
s.Resume()
}
return r
}
return http.StatusNotFound
}
func (channel *Channel) PlayForward(streamPath string, speed float32) int {
if s, loaded := PullStreams.Load(streamPath); loaded {
return s.(*PullStream).PlayForward(speed)
}
if s := Streams.Get(streamPath); s != nil {
s.Resume()
}
return http.StatusNotFound
}
func (channel *Channel) TryAutoInvite(opt *InviteOptions) {
condition := !opt.IsLive() || channel.CanInvite()
// debug 改为了info
channel.Info("TryAutoInvite", zap.Any("opt", opt), zap.Bool("condition", condition))
if condition {
go channel.Invite(opt)
}
}
func (channel *Channel) CanInvite() bool {
if channel.State.Load() != 0 || len(channel.DeviceID) != 20 || channel.Status == ChannelOffStatus {
return false
}
if conf.InviteIDs == "" {
return true
}
// 11~13位是设备类型编码
typeID := channel.DeviceID[10:13]
// format: start-end,type1,type2
tokens := strings.Split(conf.InviteIDs, ",")
for _, tok := range tokens {
if first, second, ok := strings.Cut(tok, "-"); ok {
if typeID >= first && typeID <= second {
return true
}
} else {
if typeID == first {
return true
}
}
}
return false
}
func getSipRespErrorCode(err error) int {
if re, ok := err.(*sip.RequestError); ok {
return int(re.Code)
} else {
return http.StatusInternalServerError
}
}