handle.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. package gb28181
  2. import (
  3. "bytes"
  4. "crypto/md5"
  5. "encoding/xml"
  6. "fmt"
  7. "strconv"
  8. "github.com/ghettovoice/gosip/sip"
  9. "go.uber.org/zap"
  10. . "m7s.live/engine/v4"
  11. "m7s.live/plugin/gb28181/v4/utils"
  12. "net/http"
  13. "time"
  14. "golang.org/x/net/html/charset"
  15. )
  16. type Authorization struct {
  17. *sip.Authorization
  18. }
  19. func (a *Authorization) Verify(username, passwd, realm, nonce string) bool {
  20. //1、将 username,realm,password 依次组合获取 1 个字符串,并用算法加密的到密文 r1
  21. s1 := fmt.Sprintf("%s:%s:%s", username, realm, passwd)
  22. r1 := a.getDigest(s1)
  23. //2、将 method,即REGISTER ,uri 依次组合获取 1 个字符串,并对这个字符串使用算法 加密得到密文 r2
  24. s2 := fmt.Sprintf("REGISTER:%s", a.Uri())
  25. r2 := a.getDigest(s2)
  26. if r1 == "" || r2 == "" {
  27. GB28181Plugin.Error("Authorization algorithm wrong")
  28. return false
  29. }
  30. //3、将密文 1,nonce 和密文 2 依次组合获取 1 个字符串,并对这个字符串使用算法加密,获得密文 r3,即Response
  31. s3 := fmt.Sprintf("%s:%s:%s", r1, nonce, r2)
  32. r3 := a.getDigest(s3)
  33. //4、计算服务端和客户端上报的是否相等
  34. return r3 == a.Response()
  35. }
  36. func (a *Authorization) getDigest(raw string) string {
  37. switch a.Algorithm() {
  38. case "MD5":
  39. return fmt.Sprintf("%x", md5.Sum([]byte(raw)))
  40. default: //如果没有算法,默认使用MD5
  41. return fmt.Sprintf("%x", md5.Sum([]byte(raw)))
  42. }
  43. }
  44. func (c *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransaction) {
  45. from, ok := req.From()
  46. if !ok || from.Address == nil || from.Address.User() == nil {
  47. GB28181Plugin.Error("OnMessage", zap.String("error", "no id"))
  48. return
  49. }
  50. id := from.Address.User().String()
  51. GB28181Plugin.Info("SIP<-OnRegister", zap.String("id", id), zap.String("source", req.Source()), zap.String("req", req.String()))
  52. isUnregister := false
  53. if exps := req.GetHeaders("Expires"); len(exps) > 0 {
  54. exp := exps[0]
  55. expSec, err := strconv.ParseInt(exp.Value(), 10, 32)
  56. if err != nil {
  57. GB28181Plugin.Info("OnRegister",
  58. zap.String("error", fmt.Sprintf("wrong expire header value %q", exp)),
  59. zap.String("id", id),
  60. zap.String("source", req.Source()),
  61. zap.String("destination", req.Destination()))
  62. return
  63. }
  64. GB28181Plugin.Info("注册有效期:", zap.Int64("id", expSec))
  65. if expSec == 0 {
  66. isUnregister = true
  67. }
  68. } else {
  69. GB28181Plugin.Info("OnRegister",
  70. zap.String("error", "has no expire header"),
  71. zap.String("id", id),
  72. zap.String("source", req.Source()),
  73. zap.String("destination", req.Destination()))
  74. return
  75. }
  76. GB28181Plugin.Info("OnRegister",
  77. zap.Bool("isUnregister", isUnregister),
  78. zap.String("id", id),
  79. zap.String("source", req.Source()),
  80. zap.String("destination", req.Destination()))
  81. if len(id) != 20 {
  82. GB28181Plugin.Info("Wrong GB-28181", zap.String("id", id))
  83. return
  84. }
  85. // 用设备id检查是否出库
  86. isDelivery := CheckDeviceDelivery(id)
  87. GB28181Plugin.Info("校验设备是否出库:",zap.String("id", id),zap.Bool("isDelivery", isDelivery))
  88. if !isDelivery{
  89. // 未出库直接拒绝
  90. response := sip.NewResponseFromRequest("", req, http.StatusForbidden, "Forbidden", "")
  91. tx.Respond(response)
  92. return
  93. }
  94. passAuth := false
  95. // 不需要密码情况
  96. if c.Username == "" && c.Password == "" {
  97. passAuth = true
  98. } else {
  99. // 需要密码情况 设备第一次上报,返回401和加密算法
  100. if hdrs := req.GetHeaders("Authorization"); len(hdrs) > 0 {
  101. authenticateHeader := hdrs[0].(*sip.GenericHeader)
  102. auth := &Authorization{sip.AuthFromValue(authenticateHeader.Contents)}
  103. // 有些摄像头没有配置用户名的地方,用户名就是摄像头自己的国标id
  104. var username string
  105. if auth.Username() == id {
  106. username = id
  107. } else {
  108. username = c.Username
  109. }
  110. if dc, ok := DeviceRegisterCount.LoadOrStore(id, 1); ok && dc.(int) > MaxRegisterCount {
  111. response := sip.NewResponseFromRequest("", req, http.StatusForbidden, "Forbidden", "")
  112. tx.Respond(response)
  113. return
  114. } else {
  115. // 设备第二次上报,校验
  116. _nonce, loaded := DeviceNonce.Load(id)
  117. if loaded && auth.Verify(username, c.Password, c.Realm, _nonce.(string)) {
  118. passAuth = true
  119. } else {
  120. DeviceRegisterCount.Store(id, dc.(int)+1)
  121. }
  122. }
  123. }
  124. }
  125. if passAuth {
  126. var d *Device
  127. if isUnregister {
  128. GB28181Plugin.Info("设备注销:",zap.String("id", id))
  129. tmpd, ok := Devices.Load(id)
  130. //tmpd, ok := Devices.LoadAndDelete(id)
  131. if ok {
  132. GB28181Plugin.Info("Unregister Device", zap.String("id", id))
  133. d = tmpd.(*Device)
  134. // 删除前先停止掉所有的连接
  135. d.channelMap.Range(func(key, value any) bool {
  136. ch := value.(*Channel)
  137. if c := FindChannel(id, ch.DeviceID); c != nil {
  138. if c.Model != "AudioOut"{
  139. if ch.State.Load() != 0 {
  140. streamPath := LiveRoom+"/"+ch.DeviceID
  141. co := c.Bye(streamPath)
  142. fmt.Println("live by code:",co)
  143. }
  144. streamPath := "record"+"/"+ch.DeviceID
  145. co:= c.Bye(streamPath)
  146. fmt.Println("record by code:",co)
  147. }
  148. }
  149. return true
  150. })
  151. Devices.Delete(id)
  152. // 更新,两句都是自己加的
  153. //d.Status = DeviceOfflineStatus
  154. //d.UpdateDeviceInfo()
  155. } else {
  156. return
  157. }
  158. } else {
  159. if v, ok := Devices.Load(id); ok {
  160. d = v.(*Device)
  161. c.RecoverDevice(d, req)
  162. } else {
  163. d = c.StoreDevice(id, req)
  164. }
  165. }
  166. DeviceNonce.Delete(id)
  167. DeviceRegisterCount.Delete(id)
  168. resp := sip.NewResponseFromRequest("", req, http.StatusOK, "OK", "")
  169. to, _ := resp.To()
  170. resp.ReplaceHeaders("To", []sip.Header{&sip.ToHeader{Address: to.Address, Params: sip.NewParams().Add("tag", sip.String{Str: utils.RandNumString(9)})}})
  171. resp.RemoveHeader("Allow")
  172. expires := sip.Expires(3600)
  173. resp.AppendHeader(&expires)
  174. resp.AppendHeader(&sip.GenericHeader{
  175. HeaderName: "Date",
  176. Contents: time.Now().Format(TIME_LAYOUT),
  177. })
  178. _ = tx.Respond(resp)
  179. if !isUnregister {
  180. //订阅设备更新
  181. go d.syncChannels()
  182. }
  183. } else {
  184. GB28181Plugin.Info("OnRegister unauthorized", zap.String("id", id), zap.String("source", req.Source()),
  185. zap.String("destination", req.Destination()))
  186. response := sip.NewResponseFromRequest("", req, http.StatusUnauthorized, "Unauthorized", "")
  187. _nonce, _ := DeviceNonce.LoadOrStore(id, utils.RandNumString(32))
  188. auth := fmt.Sprintf(
  189. `Digest realm="%s",algorithm=%s,nonce="%s"`,
  190. c.Realm,
  191. "MD5",
  192. _nonce.(string),
  193. )
  194. response.AppendHeader(&sip.GenericHeader{
  195. HeaderName: "WWW-Authenticate",
  196. Contents: auth,
  197. })
  198. _ = tx.Respond(response)
  199. }
  200. }
  201. // syncChannels
  202. // 同步设备信息、下属通道信息,包括主动查询通道信息,订阅通道变化情况
  203. func (d *Device) syncChannels() {
  204. if time.Since(d.lastSyncTime) > 2*conf.HeartbeatInterval {
  205. d.lastSyncTime = time.Now()
  206. d.Catalog()
  207. d.Subscribe()
  208. d.QueryDeviceInfo()
  209. }
  210. }
  211. type MessageEvent struct {
  212. Type string
  213. Device *Device
  214. }
  215. func (c *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction) {
  216. from, ok := req.From()
  217. if !ok || from.Address == nil || from.Address.User() == nil {
  218. GB28181Plugin.Error("OnMessage", zap.String("error", "no id"))
  219. return
  220. }
  221. id := from.Address.User().String()
  222. GB28181Plugin.Info("SIP<-OnMessage", zap.String("id", id), zap.String("source", req.Source()), zap.String("req", req.String()))
  223. if v, ok := Devices.Load(id); ok {
  224. d := v.(*Device)
  225. //fmt.Println("DEVICE STATUS 111111111111111111111111",d.Status )
  226. switch d.Status {
  227. case DeviceOfflineStatus, DeviceRecoverStatus:
  228. c.RecoverDevice(d, req)
  229. go d.syncChannels()
  230. case DeviceRegisterStatus:
  231. d.Status = DeviceOnlineStatus
  232. go d.UpdateDeviceStatus()
  233. }
  234. d.UpdateTime = time.Now()
  235. temp := &struct {
  236. XMLName xml.Name
  237. CmdType string
  238. SN int // 请求序列号,一般用于对应 request 和 response
  239. DeviceID string
  240. DeviceName string
  241. Manufacturer string
  242. Model string
  243. Channel string
  244. DeviceList []ChannelInfo `xml:"DeviceList>Item"`
  245. RecordList []*Record `xml:"RecordList>Item"`
  246. SumNum int // 录像结果的总数 SumNum,录像结果会按照多条消息返回,可用于判断是否全部返回
  247. }{}
  248. decoder := xml.NewDecoder(bytes.NewReader([]byte(req.Body())))
  249. decoder.CharsetReader = charset.NewReaderLabel
  250. err := decoder.Decode(temp)
  251. if err != nil {
  252. err = utils.DecodeGbk(temp, []byte(req.Body()))
  253. if err != nil {
  254. GB28181Plugin.Error("decode catelog err", zap.Error(err))
  255. }
  256. }
  257. var body string
  258. switch temp.CmdType {
  259. case "Keepalive":
  260. d.LastKeepaliveAt = time.Now()
  261. //callID !="" 说明是订阅的事件类型信息
  262. if d.lastSyncTime.IsZero() {
  263. go d.syncChannels()
  264. } else {
  265. d.channelMap.Range(func(key, value interface{}) bool {
  266. if GetInvistMode(conf.InviteMode,value.(*Channel).Model,value.(*Channel).DeviceID) == INVIDE_MODE_AUTO{
  267. //if conf.InviteMode == INVIDE_MODE_AUTO {
  268. value.(*Channel).TryAutoInvite(&InviteOptions{})
  269. }
  270. return true
  271. })
  272. }
  273. //在KeepLive 进行位置订阅的处理,如果开启了自动订阅位置,则去订阅位置
  274. if c.Position.AutosubPosition && time.Since(d.GpsTime) > c.Position.Interval*2 {
  275. d.MobilePositionSubscribe(d.ID, c.Position.Expires, c.Position.Interval)
  276. GB28181Plugin.Debug("Mobile Position Subscribe", zap.String("deviceID", d.ID))
  277. }
  278. case "Catalog":
  279. d.UpdateChannels(temp.DeviceList...)
  280. case "RecordInfo":
  281. RecordQueryLink.Put(d.ID, temp.DeviceID, temp.SN, temp.SumNum, temp.RecordList)
  282. case "DeviceInfo":
  283. // 主设备信息
  284. d.Name = temp.DeviceName
  285. d.Manufacturer = temp.Manufacturer
  286. d.Model = temp.Model
  287. case "Alarm":
  288. d.Status = DeviceAlarmedStatus
  289. body = BuildAlarmResponseXML(d.ID)
  290. case "Broadcast":
  291. GB28181Plugin.Info("broadcast message", zap.String("body", req.Body()))
  292. default:
  293. d.Warn("Not supported CmdType", zap.String("CmdType", temp.CmdType), zap.String("body", req.Body()))
  294. response := sip.NewResponseFromRequest("", req, http.StatusBadRequest, "", "")
  295. tx.Respond(response)
  296. return
  297. }
  298. EmitEvent(MessageEvent{
  299. Type: temp.CmdType,
  300. Device: d,
  301. })
  302. tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", body))
  303. } else {
  304. GB28181Plugin.Debug("Unauthorized message, device not found", zap.String("id", id))
  305. }
  306. }
  307. func (c *GB28181Config) OnBye(req sip.Request, tx sip.ServerTransaction) {
  308. tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", ""))
  309. }
  310. type NotifyEvent MessageEvent
  311. // OnNotify 订阅通知处理
  312. func (c *GB28181Config) OnNotify(req sip.Request, tx sip.ServerTransaction) {
  313. from, ok := req.From()
  314. if !ok || from.Address == nil || from.Address.User() == nil {
  315. GB28181Plugin.Error("OnMessage", zap.String("error", "no id"))
  316. return
  317. }
  318. id := from.Address.User().String()
  319. GB28181Plugin.Info("SIP<-OnNotify", zap.String("id", id), zap.String("source", req.Source()), zap.String("req", req.String()))
  320. if v, ok := Devices.Load(id); ok {
  321. d := v.(*Device)
  322. d.UpdateTime = time.Now()
  323. temp := &struct {
  324. XMLName xml.Name
  325. CmdType string
  326. DeviceID string
  327. Time string //位置订阅-GPS时间
  328. Longitude string //位置订阅-经度
  329. Latitude string //位置订阅-维度
  330. // Speed string //位置订阅-速度(km/h)(可选)
  331. // Direction string //位置订阅-方向(取值为当前摄像头方向与正北方的顺时针夹角,取值范围0°~360°,单位:°)(可选)
  332. // Altitude string //位置订阅-海拔高度,单位:m(可选)
  333. DeviceList []*notifyMessage `xml:"DeviceList>Item"` //目录订阅
  334. }{}
  335. decoder := xml.NewDecoder(bytes.NewReader([]byte(req.Body())))
  336. decoder.CharsetReader = charset.NewReaderLabel
  337. err := decoder.Decode(temp)
  338. if err != nil {
  339. err = utils.DecodeGbk(temp, []byte(req.Body()))
  340. if err != nil {
  341. GB28181Plugin.Error("decode catelog err", zap.Error(err))
  342. }
  343. }
  344. var body string
  345. switch temp.CmdType {
  346. case "Catalog":
  347. //目录状态
  348. d.UpdateChannelStatus(temp.DeviceList)
  349. case "MobilePosition":
  350. //更新channel的坐标
  351. d.UpdateChannelPosition(temp.DeviceID, temp.Time, temp.Longitude, temp.Latitude)
  352. case "Alarm":
  353. d.Status = DeviceAlarmedStatus
  354. default:
  355. d.Warn("Not supported CmdType", zap.String("CmdType", temp.CmdType), zap.String("body", req.Body()))
  356. response := sip.NewResponseFromRequest("", req, http.StatusBadRequest, "", "")
  357. tx.Respond(response)
  358. return
  359. }
  360. EmitEvent(NotifyEvent{
  361. Type: temp.CmdType,
  362. Device: d})
  363. tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", body))
  364. }
  365. }
  366. type notifyMessage struct {
  367. DeviceID string
  368. ParentID string
  369. Name string
  370. Manufacturer string
  371. Model string
  372. Owner string
  373. CivilCode string
  374. Address string
  375. Port int
  376. Parental int
  377. SafetyWay int
  378. RegisterWay int
  379. Secrecy int
  380. Status string
  381. //状态改变事件 ON:上线,OFF:离线,VLOST:视频丢失,DEFECT:故障,ADD:增加,DEL:删除,UPDATE:更新(必选)
  382. Event string
  383. }