123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253 |
- package gb28181
- import (
- "context"
- "fmt"
- "strconv"
- "strings"
- "time"
- "github.com/ghettovoice/gosip"
- "github.com/ghettovoice/gosip/log"
- "github.com/ghettovoice/gosip/sip"
- "github.com/logrusorgru/aurora/v4"
- "go.uber.org/zap"
- . "m7s.live/engine/v4"
- "m7s.live/plugin/gb28181/v4/utils"
- )
- var srv gosip.Server
- const MaxRegisterCount = 3
- func FindChannel(deviceId string, channelId string) (c *Channel) {
- //fmt.Println("find channel11111111111111111:",deviceId,channelId)
- if deviceId == LiveRoom && len(channelId) == 20 {
- // 表示为gb28181 来的实时流
- if true{
- deviceId = channelId
- }
-
- if false{
- deviceId = getDeviceIdByChannelId(channelId)
- fmt.Println("设备编号为:",deviceId)
- if deviceId == ""{
- deviceId = channelId
- }
- }
-
- }
- if v, ok := Devices.Load(deviceId); ok {
- d := v.(*Device)
- if v, ok := d.channelMap.Load(channelId); ok {
- return v.(*Channel)
- }else{
- fmt.Println("没找到通道:",channelId)
- }
- }else{
- fmt.Println("没找到设备:",deviceId)
- }
- return
- }
- var levelMap = map[string]log.Level{
- "trace": log.TraceLevel,
- "debug": log.DebugLevel,
- "info": log.InfoLevel,
- "warn": log.WarnLevel,
- "error": log.ErrorLevel,
- "fatal": log.FatalLevel,
- "panic": log.PanicLevel,
- }
- func GetSipServer(transport string) gosip.Server {
- return srv
- }
- var sn = 0
- func CreateRequest(exposedId string, Method sip.RequestMethod, recipient *sip.Address, netAddr string) (req sip.Request) {
- sn++
- callId := sip.CallID(utils.RandNumString(10))
- userAgent := sip.UserAgentHeader("Monibuca")
- cseq := sip.CSeq{
- SeqNo: uint32(sn),
- MethodName: Method,
- }
- port := sip.Port(conf.SipPort)
- serverAddr := sip.Address{
- //DisplayName: sip.String{Str: d.config.Serial},
- Uri: &sip.SipUri{
- FUser: sip.String{Str: exposedId},
- FHost: conf.SipIP,
- FPort: &port,
- },
- Params: sip.NewParams().Add("tag", sip.String{Str: utils.RandNumString(9)}),
- }
- req = sip.NewRequest(
- "",
- Method,
- recipient.Uri,
- "SIP/2.0",
- []sip.Header{
- serverAddr.AsFromHeader(),
- recipient.AsToHeader(),
- &callId,
- &userAgent,
- &cseq,
- serverAddr.AsContactHeader(),
- },
- "",
- nil,
- )
- req.SetTransport(conf.SipNetwork)
- req.SetDestination(netAddr)
- //fmt.Printf("构建请求参数:%s", *&req)
- // requestMsg.DestAdd, err2 = d.ResolveAddress(requestMsg)
- // if err2 != nil {
- // return nil
- // }
- //intranet ip , let's resolve it with public ip
- // var deviceIp, deviceSourceIP net.IP
- // switch addr := requestMsg.DestAdd.(type) {
- // case *net.UDPAddr:
- // deviceIp = addr.IP
- // case *net.TCPAddr:
- // deviceIp = addr.IP
- // }
- // switch addr2 := d.SourceAddr.(type) {
- // case *net.UDPAddr:
- // deviceSourceIP = addr2.IP
- // case *net.TCPAddr:
- // deviceSourceIP = addr2.IP
- // }
- // if deviceIp.IsPrivate() && !deviceSourceIP.IsPrivate() {
- // requestMsg.DestAdd = d.SourceAddr
- // }
- return
- }
- func RequestForResponse(transport string, request sip.Request,
- options ...gosip.RequestWithContextOption) (sip.Response, error) {
- return (GetSipServer(transport)).RequestWithContext(context.Background(), request, options...)
- }
- func (c *GB28181Config) startServer() {
- //fmt.Println("START server :",*c)
- addr := c.ListenAddr + ":" + strconv.Itoa(int(c.SipPort))
- //fmt.Println("START server addr:",addr)
- logger := utils.NewZapLogger(GB28181Plugin.Logger, "GB SIP Server", nil)
- logger.SetLevel(uint32(levelMap[EngineConfig.LogLevel]))
- // logger := log.NewDefaultLogrusLogger().WithPrefix("GB SIP Server")
- srvConf := gosip.ServerConfig{}
- if c.SipIP != "" {
- srvConf.Host = c.SipIP
- }
- //fmt.Println("START server host:",srvConf.Host)
- srv = gosip.NewServer(srvConf, nil, nil, logger)
- srv.OnRequest(sip.REGISTER, c.OnRegister)
- srv.OnRequest(sip.MESSAGE, c.OnMessage)
- srv.OnRequest(sip.NOTIFY, c.OnNotify)
- srv.OnRequest(sip.BYE, c.OnBye)
- err := srv.Listen(strings.ToLower(c.SipNetwork), addr)
- if err != nil {
- GB28181Plugin.Logger.Error("gb28181 server listen", zap.Error(err))
- } else {
- GB28181Plugin.Info(fmt.Sprint(aurora.Green("Server gb28181 start at"), aurora.BrightBlue(addr)))
- }
- if c.MediaNetwork == "tcp" {
- c.tcpPorts.Init(c.MediaPortMin, c.MediaPortMax)
- } else {
- c.udpPorts.Init(c.MediaPortMin, c.MediaPortMax)
- }
- go c.startJob()
- }
- // func queryCatalog(config *transaction.Config) {
- // t := time.NewTicker(time.Duration(config.CatalogInterval) * time.Second)
- // for range t.C {
- // Devices.Range(func(key, value interface{}) bool {
- // device := value.(*Device)
- // if time.Since(device.UpdateTime) > time.Duration(config.RegisterValidity)*time.Second {
- // Devices.Delete(key)
- // } else if device.Channels != nil {
- // go device.Catalog()
- // }
- // return true
- // })
- // }
- // }
- // 定时任务
- func (c *GB28181Config) startJob() {
- statusTick := time.NewTicker(c.HeartbeatInterval / 2)
- banTick := time.NewTicker(c.RemoveBanInterval)
- linkTick := time.NewTicker(time.Millisecond * 100)
- GB28181Plugin.Debug("start job")
- for {
- select {
- case <-banTick.C:
- if c.Username != "" || c.Password != "" {
- c.removeBanDevice()
- }
- case <-statusTick.C:
- c.statusCheck()
- case <-linkTick.C:
- RecordQueryLink.cleanTimeout()
- }
- }
- }
- func (c *GB28181Config) removeBanDevice() {
- DeviceRegisterCount.Range(func(key, value interface{}) bool {
- if value.(int) > MaxRegisterCount {
- DeviceRegisterCount.Delete(key)
- }
- return true
- })
- }
- // statusCheck
- // - 当设备超过 3 倍心跳时间未发送过心跳(通过 UpdateTime 判断), 视为离线
- // - 当设备超过注册有效期内为发送过消息,则从设备列表中删除
- // UpdateTime 在设备发送心跳之外的消息也会被更新,相对于 LastKeepaliveAt 更能体现出设备最会一次活跃的时间
- func (c *GB28181Config) statusCheck() {
- Devices.Range(func(key, value any) bool {
- d := value.(*Device)
- if time.Since(d.UpdateTime) > c.RegisterValidity {
- // TODO 这里把注释掉了,不删除
- Devices.Delete(key)
- GB28181Plugin.Info("Device register timeout",
- zap.String("id", d.ID),
- zap.Time("registerTime", d.RegisterTime),
- zap.Time("updateTime", d.UpdateTime),
- )
- } else if time.Since(d.UpdateTime) > c.HeartbeatInterval*3 {
- preStatus := d.Status
- d.Status = DeviceOfflineStatus
- if preStatus != DeviceOfflineStatus{
- go d.UpdateDeviceStatus()
- }
-
- // TODO 更新为离线
- d.channelMap.Range(func(key, value any) bool {
- ch := value.(*Channel)
- preStatus := ch.Status
- ch.Status = ChannelOffStatus
- if preStatus != ChannelOffStatus{
- ch.UpdateChanelStatus()
- }
- return true
- })
- GB28181Plugin.Info("Device offline", zap.String("id", d.ID), zap.Time("updateTime", d.UpdateTime))
- }
- return true
- })
- }
|