channel.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952
  1. package gb28181
  2. import (
  3. "fmt"
  4. "net/http"
  5. "strconv"
  6. //"net/url"
  7. "strings"
  8. "time"
  9. "gorm.io/gorm"
  10. "sync/atomic"
  11. "crypto/md5"
  12. "encoding/hex"
  13. "github.com/ghettovoice/gosip/sip"
  14. "github.com/goccy/go-json"
  15. "go.uber.org/zap"
  16. "m7s.live/engine/v4/db"
  17. . "m7s.live/engine/v4"
  18. "m7s.live/engine/v4/log"
  19. "m7s.live/plugin/gb28181/v4/utils"
  20. "m7s.live/plugin/ps/v4"
  21. "sync"
  22. )
  23. var QUERY_RECORD_TIMEOUT = time.Second * 5
  24. var (
  25. ChannelParentMap sync.Map
  26. )
  27. type PullStream struct {
  28. opt *InviteOptions
  29. channel *Channel
  30. inviteRes sip.Response
  31. }
  32. func (p *PullStream) CreateRequest(method sip.RequestMethod) (req sip.Request) {
  33. res := p.inviteRes
  34. req = p.channel.CreateRequst(method)
  35. from, _ := res.From()
  36. to, _ := res.To()
  37. callId, _ := res.CallID()
  38. req.ReplaceHeaders(from.Name(), []sip.Header{from})
  39. req.ReplaceHeaders(to.Name(), []sip.Header{to})
  40. req.ReplaceHeaders(callId.Name(), []sip.Header{callId})
  41. return
  42. }
  43. func (p *PullStream) Bye() int {
  44. req := p.CreateRequest(sip.BYE)
  45. resp, err := p.channel.Device.SipRequestForResponse(req)
  46. if p.opt.IsLive() {
  47. p.channel.State.Store(0)
  48. // 更新通道状态
  49. p.channel.UpdateChanelState()
  50. }
  51. if p.opt.recyclePort != nil {
  52. //fmt.Println("回收端口 1111111111111111111111111111111111111111",p.opt.MediaPort)
  53. p.opt.recyclePort(p.opt.MediaPort)
  54. }
  55. if err != nil {
  56. return http.StatusInternalServerError
  57. }
  58. return int(resp.StatusCode())
  59. }
  60. func (p *PullStream) info(body string) int {
  61. d := p.channel.Device
  62. req := p.CreateRequest(sip.INFO)
  63. contentType := sip.ContentType("Application/MANSRTSP")
  64. req.AppendHeader(&contentType)
  65. req.SetBody(body, true)
  66. resp, err := d.SipRequestForResponse(req)
  67. if err != nil {
  68. log.Warnf("Send info to stream error: %v, stream=%s, body=%s", err, p.opt.StreamPath, body)
  69. return getSipRespErrorCode(err)
  70. }
  71. return int(resp.StatusCode())
  72. }
  73. // 暂停播放
  74. func (p *PullStream) Pause() int {
  75. body := fmt.Sprintf(`PAUSE RTSP/1.0
  76. CSeq: %d
  77. PauseTime: now
  78. `, p.channel.Device.SN)
  79. return p.info(body)
  80. }
  81. // 恢复播放
  82. func (p *PullStream) Resume() int {
  83. d := p.channel.Device
  84. body := fmt.Sprintf(`PLAY RTSP/1.0
  85. CSeq: %d
  86. Range: npt=now-
  87. `, d.SN)
  88. return p.info(body)
  89. }
  90. // 跳转到播放时间
  91. // second: 相对于起始点调整到第 sec 秒播放
  92. func (p *PullStream) PlayAt(second uint) int {
  93. d := p.channel.Device
  94. body := fmt.Sprintf(`PLAY RTSP/1.0
  95. CSeq: %d
  96. Range: npt=%d-
  97. `, d.SN, second)
  98. return p.info(body)
  99. }
  100. // 快进/快退播放
  101. // speed 取值: 0.25 0.5 1 2 4 或者其对应的负数表示倒放
  102. func (p *PullStream) PlayForward(speed float32) int {
  103. d := p.channel.Device
  104. body := fmt.Sprintf(`PLAY RTSP/1.0
  105. CSeq: %d
  106. Scale: %0.6f
  107. `, d.SN, speed)
  108. return p.info(body)
  109. }
  110. type Channel struct {
  111. Device *Device `json:"-" yaml:"-"` // 所属设备
  112. State atomic.Int32 `json:"-" yaml:"-"` // 通道状态,0:空闲,1:正在invite,2:正在播放/对讲
  113. LiveSubSP string // 实时子码流,通过rtsp
  114. GpsTime time.Time // gps时间
  115. Longitude string // 经度
  116. Latitude string // 纬度
  117. *log.Logger `json:"-" yaml:"-"`
  118. ChannelInfo
  119. }
  120. type DeviceIdInfo struct{
  121. DeviceId string `json:"device_id"`
  122. ParentId int64 `json:"parent_id"`
  123. }
  124. const LiveRoom = "live"
  125. // 通过通道id找设备id
  126. func getDeviceIdByChannelId(channelId string) string{
  127. database := db.DB()
  128. deviceId := ""
  129. err := database.Raw("select device_id from t_sdp_delivery where channel_id=?",channelId).Find(&deviceId).Error
  130. if err != nil{
  131. fmt.Println("通过通道编号没有找到父设备编号:",err)
  132. return ""
  133. }
  134. /*d := DeviceIdInfo{}
  135. err := database.Raw("select device_id,parent_id from t_sdp_channel where channel_id=?",channelId).Find(&d).Error
  136. if err != nil{
  137. fmt.Println("通过通道编号没有找到设备编号:",err)
  138. return ""
  139. }
  140. if d.ParentId == 0 {
  141. return d.DeviceId
  142. }else{
  143. err = database.Raw("select device_id from t_sdp_delivery where id=?",d.ParentId).Find(&deviceId).Error
  144. if err != nil{
  145. fmt.Println("通过通道编号没有找到父设备编号:",err)
  146. return ""
  147. }
  148. }*/
  149. return deviceId
  150. }
  151. // 更新通道state
  152. func(c *Channel) UpdateChanelState(){
  153. return
  154. if c.Model == "AudioOut"{
  155. return
  156. }
  157. GB28181Plugin.Info("更新通道state", zap.String("channelId", c.DeviceID), zap.Int32("state",c.State.Load()))
  158. //fmt.Println("更新通道state:",c.DeviceID,c.ParentID,c.Status)
  159. database := db.DB()
  160. updateMap := map[string]interface{}{}
  161. updateMap["live_status"] = c.State.Load()
  162. err := database.Table("t_sdp_channel").Where("channel_id=?",c.DeviceID).Updates(updateMap).Error
  163. //err := database.Exec(`update t_sdp_channel set live_status=? where channel_id=?`,info.DeviceID,d.ID).Error
  164. if err != nil {
  165. GB28181Plugin.Error("更新通道state失败", zap.String("channelId", c.DeviceID), zap.String("error",err.Error()))
  166. //fmt.Println("更新通道state失败:",err)
  167. }
  168. }
  169. // 更新通道LiveSubSp
  170. func(c *Channel) UpdateChanelLiveSubSp(){
  171. return
  172. if c.Model == "AudioOut"{
  173. return
  174. }
  175. GB28181Plugin.Info("更新通道LiveSubSp", zap.String("channelId", c.DeviceID), zap.Int32("state",c.State.Load()))
  176. //fmt.Println("更新通道state:",c.DeviceID,c.ParentID,c.Status)
  177. database := db.DB()
  178. updateMap := map[string]interface{}{}
  179. updateMap["live_sub_sp"] = c.LiveSubSP
  180. err := database.Table("t_sdp_channel").Where("channel_id=?",c.DeviceID).Updates(updateMap).Error
  181. //err := database.Exec(`update t_sdp_channel set live_status=? where channel_id=?`,info.DeviceID,d.ID).Error
  182. if err != nil {
  183. GB28181Plugin.Error("更新通道LiveSubSp失败", zap.String("channelId", c.DeviceID), zap.String("error",err.Error()))
  184. //fmt.Println("更新通道state失败:",err)
  185. }
  186. }
  187. type ChannelDevice struct {
  188. Id int64 `json:"id"`
  189. DeviceId string `json:"device_id"`
  190. ParentId int64 `json:"parent_id"`
  191. }
  192. // 更新通道信息
  193. func(c *Channel) UpdateChanelInfo(){
  194. if c.Model == "AudioOut"{
  195. return
  196. }
  197. //c.ParentID = "34020000001320000001"
  198. GB28181Plugin.Info("更新通道信息", zap.String("channelId", c.DeviceID), zap.Any("info", *c))
  199. database := db.DB()
  200. // 通过通道号查询设备号和父id
  201. channelParent := &ChannelDevice{}
  202. err := database.Raw("select id, device_id,parent_id from t_sdp_channel where channel_id=?",c.DeviceID).Find(channelParent).Error
  203. if err != nil{
  204. GB28181Plugin.Info("查询通道失败", zap.String("channelId", c.DeviceID), zap.String("error",err.Error()))
  205. return
  206. }
  207. updateMap := map[string]interface{}{}
  208. // 更新父设备id
  209. if c.ParentID != "" && c.ParentID != channelParent.DeviceId{
  210. // 父设备id和本设备id不同,表示是nvr下设备
  211. parenetId := int64(0)
  212. err := database.Raw("select id from t_sdp_delivery where device_id = ?",c.ParentID).Find(&parenetId).Error
  213. if err == nil {
  214. updateMap["parent_id"] = parenetId
  215. }else{
  216. GB28181Plugin.Error("查询nvr设备失败", zap.String("channelId", c.DeviceID), zap.String("error",err.Error()))
  217. }
  218. channelParent.ParentId = parenetId
  219. ChannelParentMap.Store(c.DeviceID,*channelParent)
  220. if parenetId != 0 {
  221. // 更新nvr设备下摄像头状态
  222. // 父设备不为0 ,表示是nvr下面的摄像头,更新摄像头状态
  223. GB28181Plugin.Info("更新nvr下摄像头设备状态", zap.String("channelId", c.DeviceID), zap.Any("status",c.Status))
  224. status := 0
  225. updateMap := map[string]interface{}{}
  226. if c.Status == ChannelOnStatus{
  227. updateMap["state"] = 1
  228. status = 1
  229. }else if c.Status == ChannelOffStatus{
  230. updateMap["state"] = 0
  231. }else{
  232. return
  233. }
  234. err = database.Table("t_sdp_delivery").Where("device_id=?",channelParent.DeviceId).Updates(updateMap).Error
  235. if err != nil {
  236. GB28181Plugin.Error("更新nvr下摄像头设备状态:",zap.String("id", c.DeviceID),zap.String("error", err.Error()))
  237. }
  238. SendMessageToRedis(channelParent.DeviceId,status)
  239. }
  240. }else{
  241. ChannelParentMap.Store(c.DeviceID,*channelParent)
  242. }
  243. updateMap["live_status"] = c.State.Load()
  244. updateMap["status"] = c.ChannelInfo.Status
  245. updateMap["address"] = c.Address
  246. updateMap["civil_code"] = c.CivilCode
  247. updateMap["gps_time"] = c.GpsTime
  248. updateMap["longitude"] = c.Longitude
  249. updateMap["latitude"] = c.Latitude
  250. updateMap["live_sub_sp"] = c.LiveSubSP
  251. updateMap["manufacturer"] = c.Manufacturer
  252. updateMap["model"] = c.Model
  253. updateMap["name"] = c.Name
  254. updateMap["owner"] = c.Owner
  255. updateMap["parental"] = c.Parental
  256. updateMap["port"] = c.Port
  257. updateMap["safety_way"] = c.SafetyWay
  258. updateMap["register_way"] = c.RegisterWay
  259. updateMap["secrecy"] = c.Secrecy
  260. err = database.Table("t_sdp_channel").Where("id=?",channelParent.Id).Updates(updateMap).Error
  261. if err != nil {
  262. GB28181Plugin.Error("更新通道失败", zap.String("channelId", c.DeviceID), zap.String("error", err.Error()))
  263. }
  264. //fmt.Println("111111111111111111111111111111111111111111")
  265. //c.UpdateChanelStatus()
  266. }
  267. // 更新通道状态
  268. func(c *Channel) UpdateChanelStatus(){
  269. if c.Model == "AudioOut"{
  270. return
  271. }
  272. //GB28181Plugin.Info("更新通道status", zap.String("channelId", c.DeviceID), zap.Any("status",c.Status))
  273. GB28181Plugin.Info("更新通道status", zap.String("channelId", c.DeviceID), zap.Any("status",c.Status))
  274. database := db.DB()
  275. //updateMap := map[string]interface{}{}
  276. //updateMap["status"] = c.ChannelInfo.Status
  277. /*err := database.Table("t_sdp_channel").Where("channel_id=?",c.DeviceID).Updates(map[string]interface{}{"status":c.ChannelInfo.Status}).Error
  278. if err != nil {
  279. GB28181Plugin.Error("更新更新通道status失败", zap.String("channelId", c.DeviceID), zap.String("error",err.Error()))
  280. }*/
  281. //TODO 缓存
  282. channelParent := ChannelDevice{}
  283. if v,ok := ChannelParentMap.Load(c.DeviceID);ok{
  284. channelParent = v.(ChannelDevice)
  285. GB28181Plugin.Info("找到设备信息", zap.String("channelId", c.DeviceID))
  286. }else{
  287. GB28181Plugin.Info("没找到设备信息", zap.String("channelId", c.DeviceID))
  288. err := database.Raw("select id,device_id,parent_id from t_sdp_channel where channel_id=?",c.DeviceID).Find(&channelParent).Error
  289. if err != nil{
  290. GB28181Plugin.Info("通道查找父设备", zap.String("channelId", c.DeviceID), zap.String("error",err.Error()))
  291. return
  292. }
  293. }
  294. if channelParent.ParentId != 0 {
  295. // 父设备不为0 ,表示是nvr下面的摄像头,更新摄像头状态
  296. GB28181Plugin.Info("更新nvr下摄像头设备状态", zap.String("channelId", c.DeviceID), zap.Any("status",c.Status))
  297. status := 0
  298. updateMap := map[string]interface{}{}
  299. if c.Status == ChannelOnStatus{
  300. updateMap["state"] = 1
  301. status = 1
  302. }else if c.Status == ChannelOffStatus{
  303. updateMap["state"] = 0
  304. }else{
  305. return
  306. }
  307. err := database.Table("t_sdp_delivery").Where("device_id=?",channelParent.DeviceId).Updates(updateMap).Error
  308. if err != nil {
  309. GB28181Plugin.Error("更新nvr下摄像头设备状态:",zap.String("id", c.DeviceID),zap.String("error", err.Error()))
  310. }
  311. SendMessageToRedis(channelParent.DeviceId,status)
  312. }
  313. }
  314. // 获取是否需要录像
  315. func GetIsRecordByChannelId(channelId string) int {
  316. // TODO 从内存中获取,修改后调用invist接口删除
  317. if value,ok := DeviceRecordState.Load(channelId);ok{
  318. return value.(int)
  319. }
  320. database := db.DB()
  321. isRecord := int(0)
  322. err := database.Raw("select is_record from t_sdp_delivery where channel_id=?",channelId).Find(&isRecord).Error
  323. if err != nil{
  324. if err == gorm.ErrRecordNotFound{
  325. DeviceRecordState.Store(channelId,isRecord)
  326. return isRecord
  327. }
  328. return int(1)
  329. }
  330. DeviceRecordState.Store(channelId,isRecord)
  331. return isRecord
  332. }
  333. // 根据是否录像标识获取invist模式
  334. func GetInvistMode(invistMode int,channelMode string,channelId string) int {
  335. if channelMode == "AudioOut"{
  336. return INVIDE_MODE_ONSUBSCRIBE
  337. }
  338. if invistMode == INVIDE_MODE_AUTO{
  339. isRecord := GetIsRecordByChannelId(channelId)
  340. if isRecord == 1 {
  341. return INVIDE_MODE_AUTO
  342. }else {
  343. return INVIDE_MODE_ONSUBSCRIBE
  344. }
  345. }
  346. return invistMode
  347. }
  348. func SetAllDeviceOffline(){
  349. database := db.DB()
  350. err := database.Exec("update t_sdp_delivery set state=0 where sip_id=? and type=2",conf.SipId).Error
  351. if err != nil{
  352. fmt.Println("更新所有设备离线失败")
  353. }else{
  354. fmt.Println("成功更新所有设备离线")
  355. }
  356. }
  357. func (c *Channel) MarshalJSON() ([]byte, error) {
  358. m := map[string]any{
  359. "DeviceID": c.DeviceID,
  360. "ParentID": c.ParentID,
  361. "Name": c.Name,
  362. "Manufacturer": c.Manufacturer,
  363. "Model": c.Model,
  364. "Owner": c.Owner,
  365. "CivilCode": c.CivilCode,
  366. "Address": c.Address,
  367. "Port": c.Port,
  368. "Parental": c.Parental,
  369. "SafetyWay": c.SafetyWay,
  370. "RegisterWay": c.RegisterWay,
  371. "Secrecy": c.Secrecy,
  372. "Status": c.Status,
  373. "Longitude": c.Longitude,
  374. "Latitude": c.Latitude,
  375. "GpsTime": c.GpsTime,
  376. "LiveSubSP": c.LiveSubSP,
  377. "LiveStatus": c.State.Load(),
  378. }
  379. return json.Marshal(m)
  380. }
  381. // Channel 通道
  382. type ChannelInfo struct {
  383. DeviceID string // 通道ID
  384. ParentID string
  385. Name string
  386. Manufacturer string
  387. Model string
  388. Owner string
  389. CivilCode string
  390. Address string
  391. Port int
  392. Parental int
  393. SafetyWay int
  394. RegisterWay int
  395. Secrecy int
  396. Status ChannelStatus
  397. }
  398. type ChannelStatus string
  399. const (
  400. ChannelOnStatus = "ON"
  401. ChannelOffStatus = "OFF"
  402. )
  403. func (channel *Channel) CreateRequst(Method sip.RequestMethod) (req sip.Request) {
  404. d := channel.Device
  405. d.SN++
  406. callId := sip.CallID(utils.RandNumString(10))
  407. userAgent := sip.UserAgentHeader("Monibuca")
  408. maxForwards := sip.MaxForwards(70) //增加max-forwards为默认值 70
  409. cseq := sip.CSeq{
  410. SeqNo: uint32(d.SN),
  411. MethodName: Method,
  412. }
  413. port := sip.Port(conf.SipPort)
  414. serverAddr := sip.Address{
  415. //DisplayName: sip.String{Str: d.serverConfig.Serial},
  416. Uri: &sip.SipUri{
  417. FUser: sip.String{Str: conf.Serial},
  418. FHost: d.SipIP,
  419. FPort: &port,
  420. },
  421. Params: sip.NewParams().Add("tag", sip.String{Str: utils.RandNumString(9)}),
  422. }
  423. //非同一域的目标地址需要使用@host
  424. host := conf.Realm
  425. if channel.DeviceID[0:9] != host {
  426. if channel.Port != 0 {
  427. deviceIp := d.NetAddr
  428. deviceIp = deviceIp[0:strings.LastIndex(deviceIp, ":")]
  429. host = fmt.Sprintf("%s:%d", deviceIp, channel.Port)
  430. } else {
  431. host = d.NetAddr
  432. }
  433. }
  434. channelAddr := sip.Address{
  435. //DisplayName: sip.String{Str: d.serverConfig.Serial},
  436. Uri: &sip.SipUri{FUser: sip.String{Str: channel.DeviceID}, FHost: host},
  437. }
  438. req = sip.NewRequest(
  439. "",
  440. Method,
  441. channelAddr.Uri,
  442. "SIP/2.0",
  443. []sip.Header{
  444. serverAddr.AsFromHeader(),
  445. channelAddr.AsToHeader(),
  446. &callId,
  447. &userAgent,
  448. &cseq,
  449. &maxForwards,
  450. serverAddr.AsContactHeader(),
  451. },
  452. "",
  453. nil,
  454. )
  455. req.SetTransport(conf.SipNetwork)
  456. req.SetDestination(d.NetAddr)
  457. return req
  458. }
  459. func (channel *Channel) QueryRecord(startTime, endTime string) ([]*Record, error) {
  460. d := channel.Device
  461. request := d.CreateRequest(sip.MESSAGE)
  462. contentType := sip.ContentType("Application/MANSCDP+xml")
  463. request.AppendHeader(&contentType)
  464. // body := fmt.Sprintf(`<?xml version="1.0"?>
  465. // <Query>
  466. // <CmdType>RecordInfo</CmdType>
  467. // <SN>%d</SN>
  468. // <DeviceID>%s</DeviceID>
  469. // <StartTime>%s</StartTime>
  470. // <EndTime>%s</EndTime>
  471. // <Secrecy>0</Secrecy>
  472. // <Type>all</Type>
  473. // </Query>`, d.sn, channel.DeviceID, startTime, endTime)
  474. start, _ := strconv.ParseInt(startTime, 10, 0)
  475. end, _ := strconv.ParseInt(endTime, 10, 0)
  476. body := BuildRecordInfoXML(d.SN, channel.DeviceID, start, end)
  477. request.SetBody(body, true)
  478. resultCh := RecordQueryLink.WaitResult(d.ID, channel.DeviceID, d.SN, QUERY_RECORD_TIMEOUT)
  479. resp, err := d.SipRequestForResponse(request)
  480. if err != nil {
  481. return nil, fmt.Errorf("query error: %s", err)
  482. }
  483. if resp.StatusCode() != http.StatusOK {
  484. return nil, fmt.Errorf("query error, status=%d", resp.StatusCode())
  485. }
  486. // RecordQueryLink 中加了超时机制,该结果一定会返回
  487. // 所以此处不用再增加超时等保护机制
  488. r := <-resultCh
  489. return r.list, r.err
  490. }
  491. func (channel *Channel) Control(PTZCmd string) int {
  492. d := channel.Device
  493. request := d.CreateRequest(sip.MESSAGE)
  494. contentType := sip.ContentType("Application/MANSCDP+xml")
  495. request.AppendHeader(&contentType)
  496. body := fmt.Sprintf(`<?xml version="1.0"?>
  497. <Control>
  498. <CmdType>DeviceControl</CmdType>
  499. <SN>%d</SN>
  500. <DeviceID>%s</DeviceID>
  501. <PTZCmd>%s</PTZCmd>
  502. </Control>`, d.SN, channel.DeviceID, PTZCmd)
  503. request.SetBody(body, true)
  504. resp, err := d.SipRequestForResponse(request)
  505. if err != nil {
  506. return http.StatusRequestTimeout
  507. }
  508. return int(resp.StatusCode())
  509. }
  510. // Invite 发送Invite报文 invites a channel to play
  511. // 注意里面的锁保证不同时发送invite报文,该锁由channel持有
  512. /***
  513. f字段: f = v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率
  514. 各项具体含义:
  515. v:后续参数为视频的参数;各参数间以 “/”分割;
  516. 编码格式:十进制整数字符串表示
  517. 1 –MPEG-4 2 –H.264 3 – SVAC 4 –3GP
  518. 分辨率:十进制整数字符串表示
  519. 1 – QCIF 2 – CIF 3 – 4CIF 4 – D1 5 –720P 6 –1080P/I
  520. 帧率:十进制整数字符串表示 0~99
  521. 码率类型:十进制整数字符串表示
  522. 1 – 固定码率(CBR) 2 – 可变码率(VBR)
  523. 码率大小:十进制整数字符串表示 0~100000(如 1表示1kbps)
  524. a:后续参数为音频的参数;各参数间以 “/”分割;
  525. 编码格式:十进制整数字符串表示
  526. 1 – G.711 2 – G.723.1 3 – G.729 4 – G.722.1
  527. 码率大小:十进制整数字符串
  528. 音频编码码率: 1 — 5.3 kbps (注:G.723.1中使用)
  529. 2 — 6.3 kbps (注:G.723.1中使用)
  530. 3 — 8 kbps (注:G.729中使用)
  531. 4 — 16 kbps (注:G.722.1中使用)
  532. 5 — 24 kbps (注:G.722.1中使用)
  533. 6 — 32 kbps (注:G.722.1中使用)
  534. 7 — 48 kbps (注:G.722.1中使用)
  535. 8 — 64 kbps(注:G.711中使用)
  536. 采样率:十进制整数字符串表示
  537. 1 — 8 kHz(注:G.711/ G.723.1/ G.729中使用)
  538. 2—14 kHz(注:G.722.1中使用)
  539. 3—16 kHz(注:G.722.1中使用)
  540. 4—32 kHz(注:G.722.1中使用)
  541. 注1:字符串说明
  542. 本节中使用的“十进制整数字符串”的含义为“0”~“4294967296” 之间的十进制数字字符串。
  543. 注2:参数分割标识
  544. 各参数间以“/”分割,参数间的分割符“/”不能省略;
  545. 若两个分割符 “/”间的某参数为空时(即两个分割符 “/”直接将相连时)表示无该参数值;
  546. 注3:f字段说明
  547. 使用f字段时,应保证视频和音频参数的结构完整性,即在任何时候,f字段的结构都应是完整的结构:
  548. f = v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率
  549. 若只有视频时,音频中的各参数项可以不填写,但应保持 “a///”的结构:
  550. f = v/编码格式/分辨率/帧率/码率类型/码率大小a///
  551. 若只有音频时也类似处理,视频中的各参数项可以不填写,但应保持 “v/”的结构:
  552. f = v/a/编码格式/码率大小/采样率
  553. f字段中视、音频参数段之间不需空格分割。
  554. 可使用f字段中的分辨率参数标识同一设备不同分辨率的码流。
  555. */
  556. func (channel *Channel) Invite(opt *InviteOptions) (code int, err error) {
  557. if opt.IsLive() {
  558. if !channel.State.CompareAndSwap(0, 1) {
  559. return 304, nil
  560. }else{
  561. channel.UpdateChanelState()
  562. }
  563. defer func() {
  564. if err != nil {
  565. GB28181Plugin.Error("Invite", zap.Error(err), zap.String("streamPath",opt.StreamPath ))
  566. channel.State.Store(0)
  567. // 重复发布和设备busy不重新拉流
  568. /*if strings.Contains(err.Error(),"Duplicate Publish") || strings.Contains(err.Error(),"Busy"){
  569. return
  570. }
  571. if GetInvistMode(conf.InviteMode,channel.Model,channel.DeviceID) == INVIDE_MODE_AUTO{
  572. //if conf.InviteMode == INVIDE_MODE_AUTO {
  573. time.AfterFunc(time.Second*10, func() {
  574. // 5秒后重试
  575. channel.Invite(opt)
  576. })
  577. }*/
  578. } else {
  579. channel.State.Store(2)
  580. channel.UpdateChanelState()
  581. }
  582. }()
  583. }
  584. d := channel.Device
  585. //streamPath := fmt.Sprintf("%s/%s", d.ID, channel.DeviceID)
  586. streamPath := fmt.Sprintf("%s/%s",LiveRoom, channel.DeviceID)
  587. s := "Play"
  588. opt.CreateSSRC()
  589. if opt.Record() {
  590. s = "Playback"
  591. streamPath = fmt.Sprintf("%s/%s/%d-%d", d.ID, channel.DeviceID, opt.Start, opt.End)
  592. }
  593. if opt.StreamPath != "" {
  594. streamPath = opt.StreamPath
  595. } else if channel.DeviceID == "" {
  596. streamPath = "gb28181/" + d.ID
  597. } else {
  598. opt.StreamPath = streamPath
  599. }
  600. // 对steam path自动加上签名
  601. if EngineConfig.Publish.Key != ""{
  602. nowTime := time.Now().Unix()
  603. expireTime := nowTime + 315360000
  604. expireTime16 := strconv.FormatInt(expireTime, 16)
  605. m := md5.New()
  606. m.Write([]byte(EngineConfig.Publish.Key +streamPath + expireTime16))
  607. streamPathList := strings.Split(streamPath,"?")
  608. if len(streamPathList) > 0 {
  609. streamPath = fmt.Sprintf("%s?%s=%s&%s=%s",streamPathList[0],EngineConfig.Publish.SecretArgName,hex.EncodeToString(m.Sum(nil)),EngineConfig.Publish.ExpireArgName,expireTime16)
  610. opt.StreamPath = streamPath
  611. }
  612. }
  613. if opt.dump == "" {
  614. opt.dump = conf.DumpPath
  615. }
  616. protocol := ""
  617. networkType := "udp"
  618. reusePort := true
  619. if conf.IsMediaNetworkTCP() {
  620. networkType = "tcp"
  621. protocol = "TCP/"
  622. if conf.tcpPorts.Valid {
  623. opt.MediaPort, err = conf.tcpPorts.GetUnUseTcpPort()
  624. opt.recyclePort = conf.tcpPorts.Recycle
  625. reusePort = false
  626. }
  627. channel.Info("media port",zap.Uint16("mediaPort",opt.MediaPort),zap.Bool("reusePort",reusePort),zap.Error(err))
  628. } else {
  629. if conf.udpPorts.Valid {
  630. opt.MediaPort, err = conf.udpPorts.GetPort()
  631. opt.recyclePort = conf.udpPorts.Recycle
  632. reusePort = false
  633. }
  634. }
  635. if err != nil {
  636. // TODO 在这里返回的EOF
  637. return http.StatusInternalServerError, err
  638. }
  639. if opt.MediaPort == 0 {
  640. opt.MediaPort = conf.MediaPort
  641. }
  642. sdpInfo := []string{
  643. "v=0",
  644. fmt.Sprintf("o=%s 0 0 IN IP4 %s", channel.DeviceID, d.MediaIP),
  645. "s=" + s,
  646. "u=" + channel.DeviceID + ":0",
  647. "c=IN IP4 " + d.MediaIP,
  648. opt.String(),
  649. fmt.Sprintf("m=video %d %sRTP/AVP 96", opt.MediaPort, protocol),
  650. "a=recvonly",
  651. "a=rtpmap:96 PS/90000",
  652. "y=" + opt.ssrc,
  653. }
  654. if conf.IsMediaNetworkTCP() {
  655. sdpInfo = append(sdpInfo, "a=setup:passive", "a=connection:new")
  656. }
  657. invite := channel.CreateRequst(sip.INVITE)
  658. contentType := sip.ContentType("application/sdp")
  659. invite.AppendHeader(&contentType)
  660. invite.SetBody(strings.Join(sdpInfo, "\r\n")+"\r\n", true)
  661. subject := sip.GenericHeader{
  662. HeaderName: "Subject", Contents: fmt.Sprintf("%s:%s,%s:0", channel.DeviceID, opt.ssrc, conf.Serial),
  663. }
  664. invite.AppendHeader(&subject)
  665. inviteRes, err := d.SipRequestForResponse(invite)
  666. // 回收端口
  667. /*defer func(){
  668. if err != nil {
  669. if opt.recyclePort != nil {
  670. channel.Info("recycle port", zap.Uint16("mediaPort",opt.MediaPort))
  671. opt.recyclePort(opt.MediaPort)
  672. }
  673. }
  674. }()*/
  675. if err != nil {
  676. channel.Error("invite", zap.Error(err), zap.String("msg", invite.String()))
  677. if opt.recyclePort != nil {
  678. channel.Info("recycle port", zap.Uint16("mediaPort",opt.MediaPort))
  679. opt.recyclePort(opt.MediaPort)
  680. }
  681. // 拉直播流失败
  682. if opt.IsLive(){
  683. // 设备busy,重启设备
  684. if strings.Contains(err.Error(),"Busy here Creat Media failed"){
  685. channel.Info("reboot device", zap.String("channelId",channel.DeviceID))
  686. go SendRebootMessage(channel.DeviceID)
  687. }
  688. }
  689. return http.StatusInternalServerError, err
  690. }else {
  691. go DeleteRebootMessage(channel.DeviceID)
  692. }
  693. code = int(inviteRes.StatusCode())
  694. channel.Info("invite response", zap.Int("status code", code))
  695. if code == http.StatusOK {
  696. ds := strings.Split(inviteRes.Body(), "\r\n")
  697. for _, l := range ds {
  698. if ls := strings.Split(l, "="); len(ls) > 1 {
  699. if ls[0] == "y" && len(ls[1]) > 0 {
  700. if _ssrc, err := strconv.ParseInt(ls[1], 10, 0); err == nil {
  701. opt.SSRC = uint32(_ssrc)
  702. } else {
  703. channel.Error("read invite response y ", zap.Error(err))
  704. }
  705. // break
  706. }
  707. if ls[0] == "m" && len(ls[1]) > 0 {
  708. netinfo := strings.Split(ls[1], " ")
  709. if strings.ToUpper(netinfo[2]) == "TCP/RTP/AVP" {
  710. channel.Debug("Device support tcp")
  711. } else {
  712. channel.Debug("Device not support tcp")
  713. networkType = "udp"
  714. }
  715. }
  716. }
  717. }
  718. var psPuber ps.PSPublisher
  719. err = psPuber.Receive(streamPath, opt.dump, fmt.Sprintf("%s:%d", networkType, opt.MediaPort), opt.SSRC, reusePort)
  720. if err == nil {
  721. // 检查是否开启录像,开启录像的IdleTimeout设置为 默认10s
  722. isRecord := GetIsRecordByChannelId(channel.DeviceID)
  723. if isRecord == 1 {
  724. // 10秒无数据关闭
  725. // 需要录像,并且是直播
  726. if opt.IsLive(){
  727. psPuber.Stream.IdleTimeout = time.Second * 10
  728. psPuber.Stream.DelayCloseTimeout = time.Second * 10
  729. }
  730. }
  731. // 录播不过期
  732. if !opt.IsLive(){
  733. psPuber.Stream.IdleTimeout = 0
  734. psPuber.Stream.DelayCloseTimeout = 0
  735. }
  736. /*if !opt.IsLive() {
  737. // 10秒无数据关闭
  738. if psPuber.Stream.DelayCloseTimeout == 0 {
  739. psPuber.Stream.DelayCloseTimeout = time.Second * 10
  740. }
  741. if psPuber.Stream.IdleTimeout == 0 {
  742. psPuber.Stream.IdleTimeout = time.Second * 10
  743. }
  744. }*/
  745. // 保存的时候截断query内容,只保留stream path
  746. streamPathList := strings.Split(streamPath,"?")
  747. streamPath = streamPathList[0]
  748. PullStreams.Store(streamPath, &PullStream{
  749. opt: opt,
  750. channel: channel,
  751. inviteRes: inviteRes,
  752. })
  753. // 这里出错,会产生no track,close掉stream,自动回收端口
  754. err = srv.Send(sip.NewAckRequest("", invite, inviteRes, "", nil))
  755. if err != nil{
  756. channel.Error("srv.Send", zap.Error(err))
  757. }
  758. }else{
  759. channel.Error("psPuber.Receive", zap.Error(err))
  760. if opt.recyclePort != nil {
  761. channel.Info("recycle port", zap.Uint16("mediaPort",opt.MediaPort))
  762. opt.recyclePort(opt.MediaPort)
  763. }
  764. }
  765. }
  766. return
  767. }
  768. func (channel *Channel) Bye(streamPath string) int {
  769. channel.Info("bye", zap.String("streamPath",streamPath))
  770. d := channel.Device
  771. if streamPath == "" {
  772. streamPath = fmt.Sprintf("%s/%s", d.ID, channel.DeviceID)
  773. }
  774. if s, loaded := PullStreams.LoadAndDelete(streamPath); loaded {
  775. s.(*PullStream).Bye()
  776. if s := Streams.Get(streamPath); s != nil {
  777. s.Close()
  778. }
  779. // 直播将pull strem 设为0
  780. /*if s.(*PullStream).opt.IsLive(){
  781. channel.State.Store(0)
  782. }*/
  783. return http.StatusOK
  784. }
  785. return http.StatusNotFound
  786. }
  787. func (channel *Channel) Pause(streamPath string) int {
  788. if s, loaded := PullStreams.Load(streamPath); loaded {
  789. r := s.(*PullStream).Pause()
  790. if s := Streams.Get(streamPath); s != nil {
  791. s.Pause()
  792. }
  793. return r
  794. }
  795. return http.StatusNotFound
  796. }
  797. func (channel *Channel) Resume(streamPath string) int {
  798. if s, loaded := PullStreams.Load(streamPath); loaded {
  799. r := s.(*PullStream).Resume()
  800. if s := Streams.Get(streamPath); s != nil {
  801. s.Resume()
  802. }
  803. return r
  804. }
  805. return http.StatusNotFound
  806. }
  807. func (channel *Channel) PlayAt(streamPath string, second uint) int {
  808. if s, loaded := PullStreams.Load(streamPath); loaded {
  809. r := s.(*PullStream).PlayAt(second)
  810. if s := Streams.Get(streamPath); s != nil {
  811. s.Resume()
  812. }
  813. return r
  814. }
  815. return http.StatusNotFound
  816. }
  817. func (channel *Channel) PlayForward(streamPath string, speed float32) int {
  818. if s, loaded := PullStreams.Load(streamPath); loaded {
  819. return s.(*PullStream).PlayForward(speed)
  820. }
  821. if s := Streams.Get(streamPath); s != nil {
  822. s.Resume()
  823. }
  824. return http.StatusNotFound
  825. }
  826. func (channel *Channel) TryAutoInvite(opt *InviteOptions) {
  827. condition := !opt.IsLive() || channel.CanInvite()
  828. // debug 改为了info
  829. channel.Info("TryAutoInvite", zap.Any("opt", opt), zap.Bool("condition", condition))
  830. if condition {
  831. go channel.Invite(opt)
  832. }
  833. }
  834. func (channel *Channel) CanInvite() bool {
  835. if channel.State.Load() != 0 || len(channel.DeviceID) != 20 || channel.Status == ChannelOffStatus {
  836. return false
  837. }
  838. if conf.InviteIDs == "" {
  839. return true
  840. }
  841. // 11~13位是设备类型编码
  842. typeID := channel.DeviceID[10:13]
  843. // format: start-end,type1,type2
  844. tokens := strings.Split(conf.InviteIDs, ",")
  845. for _, tok := range tokens {
  846. if first, second, ok := strings.Cut(tok, "-"); ok {
  847. if typeID >= first && typeID <= second {
  848. return true
  849. }
  850. } else {
  851. if typeID == first {
  852. return true
  853. }
  854. }
  855. }
  856. return false
  857. }
  858. func getSipRespErrorCode(err error) int {
  859. if re, ok := err.(*sip.RequestError); ok {
  860. return int(re.Code)
  861. } else {
  862. return http.StatusInternalServerError
  863. }
  864. }