handle.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. package gb28181
  2. import (
  3. "bytes"
  4. "crypto/md5"
  5. "encoding/xml"
  6. "fmt"
  7. "strconv"
  8. "github.com/ghettovoice/gosip/sip"
  9. "go.uber.org/zap"
  10. . "m7s.live/engine/v4"
  11. "m7s.live/plugin/gb28181/v4/utils"
  12. "net/http"
  13. "time"
  14. "golang.org/x/net/html/charset"
  15. )
  16. type Authorization struct {
  17. *sip.Authorization
  18. }
  19. func (a *Authorization) Verify(username, passwd, realm, nonce string) bool {
  20. //1、将 username,realm,password 依次组合获取 1 个字符串,并用算法加密的到密文 r1
  21. s1 := fmt.Sprintf("%s:%s:%s", username, realm, passwd)
  22. r1 := a.getDigest(s1)
  23. //2、将 method,即REGISTER ,uri 依次组合获取 1 个字符串,并对这个字符串使用算法 加密得到密文 r2
  24. s2 := fmt.Sprintf("REGISTER:%s", a.Uri())
  25. r2 := a.getDigest(s2)
  26. if r1 == "" || r2 == "" {
  27. GB28181Plugin.Error("Authorization algorithm wrong")
  28. return false
  29. }
  30. //3、将密文 1,nonce 和密文 2 依次组合获取 1 个字符串,并对这个字符串使用算法加密,获得密文 r3,即Response
  31. s3 := fmt.Sprintf("%s:%s:%s", r1, nonce, r2)
  32. r3 := a.getDigest(s3)
  33. //4、计算服务端和客户端上报的是否相等
  34. return r3 == a.Response()
  35. }
  36. func (a *Authorization) getDigest(raw string) string {
  37. switch a.Algorithm() {
  38. case "MD5":
  39. return fmt.Sprintf("%x", md5.Sum([]byte(raw)))
  40. default: //如果没有算法,默认使用MD5
  41. return fmt.Sprintf("%x", md5.Sum([]byte(raw)))
  42. }
  43. }
  44. func (c *GB28181Config) OnRegister(req sip.Request, tx sip.ServerTransaction) {
  45. from, ok := req.From()
  46. if !ok || from.Address == nil || from.Address.User() == nil {
  47. GB28181Plugin.Error("OnMessage", zap.String("error", "no id"))
  48. return
  49. }
  50. id := from.Address.User().String()
  51. GB28181Plugin.Debug("SIP<-OnMessage", zap.String("id", id), zap.String("source", req.Source()), zap.String("req", req.String()))
  52. isUnregister := false
  53. if exps := req.GetHeaders("Expires"); len(exps) > 0 {
  54. exp := exps[0]
  55. expSec, err := strconv.ParseInt(exp.Value(), 10, 32)
  56. if err != nil {
  57. GB28181Plugin.Info("OnRegister",
  58. zap.String("error", fmt.Sprintf("wrong expire header value %q", exp)),
  59. zap.String("id", id),
  60. zap.String("source", req.Source()),
  61. zap.String("destination", req.Destination()))
  62. return
  63. }
  64. if expSec == 0 {
  65. isUnregister = true
  66. }
  67. } else {
  68. GB28181Plugin.Info("OnRegister",
  69. zap.String("error", "has no expire header"),
  70. zap.String("id", id),
  71. zap.String("source", req.Source()),
  72. zap.String("destination", req.Destination()))
  73. return
  74. }
  75. GB28181Plugin.Info("OnRegister",
  76. zap.Bool("isUnregister", isUnregister),
  77. zap.String("id", id),
  78. zap.String("source", req.Source()),
  79. zap.String("destination", req.Destination()))
  80. if len(id) != 20 {
  81. GB28181Plugin.Info("Wrong GB-28181", zap.String("id", id))
  82. return
  83. }
  84. passAuth := false
  85. // 不需要密码情况
  86. if c.Username == "" && c.Password == "" {
  87. passAuth = true
  88. } else {
  89. // 需要密码情况 设备第一次上报,返回401和加密算法
  90. if hdrs := req.GetHeaders("Authorization"); len(hdrs) > 0 {
  91. authenticateHeader := hdrs[0].(*sip.GenericHeader)
  92. auth := &Authorization{sip.AuthFromValue(authenticateHeader.Contents)}
  93. // 有些摄像头没有配置用户名的地方,用户名就是摄像头自己的国标id
  94. var username string
  95. if auth.Username() == id {
  96. username = id
  97. } else {
  98. username = c.Username
  99. }
  100. if dc, ok := DeviceRegisterCount.LoadOrStore(id, 1); ok && dc.(int) > MaxRegisterCount {
  101. response := sip.NewResponseFromRequest("", req, http.StatusForbidden, "Forbidden", "")
  102. tx.Respond(response)
  103. return
  104. } else {
  105. // 设备第二次上报,校验
  106. _nonce, loaded := DeviceNonce.Load(id)
  107. if loaded && auth.Verify(username, c.Password, c.Realm, _nonce.(string)) {
  108. passAuth = true
  109. } else {
  110. DeviceRegisterCount.Store(id, dc.(int)+1)
  111. }
  112. }
  113. }
  114. }
  115. if passAuth {
  116. var d *Device
  117. if isUnregister {
  118. tmpd, ok := Devices.LoadAndDelete(id)
  119. if ok {
  120. GB28181Plugin.Info("Unregister Device", zap.String("id", id))
  121. d = tmpd.(*Device)
  122. } else {
  123. return
  124. }
  125. } else {
  126. if v, ok := Devices.Load(id); ok {
  127. d = v.(*Device)
  128. c.RecoverDevice(d, req)
  129. } else {
  130. d = c.StoreDevice(id, req)
  131. }
  132. }
  133. DeviceNonce.Delete(id)
  134. DeviceRegisterCount.Delete(id)
  135. resp := sip.NewResponseFromRequest("", req, http.StatusOK, "OK", "")
  136. to, _ := resp.To()
  137. resp.ReplaceHeaders("To", []sip.Header{&sip.ToHeader{Address: to.Address, Params: sip.NewParams().Add("tag", sip.String{Str: utils.RandNumString(9)})}})
  138. resp.RemoveHeader("Allow")
  139. expires := sip.Expires(3600)
  140. resp.AppendHeader(&expires)
  141. resp.AppendHeader(&sip.GenericHeader{
  142. HeaderName: "Date",
  143. Contents: time.Now().Format(TIME_LAYOUT),
  144. })
  145. _ = tx.Respond(resp)
  146. if !isUnregister {
  147. //订阅设备更新
  148. go d.syncChannels()
  149. }
  150. } else {
  151. GB28181Plugin.Info("OnRegister unauthorized", zap.String("id", id), zap.String("source", req.Source()),
  152. zap.String("destination", req.Destination()))
  153. response := sip.NewResponseFromRequest("", req, http.StatusUnauthorized, "Unauthorized", "")
  154. _nonce, _ := DeviceNonce.LoadOrStore(id, utils.RandNumString(32))
  155. auth := fmt.Sprintf(
  156. `Digest realm="%s",algorithm=%s,nonce="%s"`,
  157. c.Realm,
  158. "MD5",
  159. _nonce.(string),
  160. )
  161. response.AppendHeader(&sip.GenericHeader{
  162. HeaderName: "WWW-Authenticate",
  163. Contents: auth,
  164. })
  165. _ = tx.Respond(response)
  166. }
  167. }
  168. // syncChannels
  169. // 同步设备信息、下属通道信息,包括主动查询通道信息,订阅通道变化情况
  170. func (d *Device) syncChannels() {
  171. if time.Since(d.lastSyncTime) > 2*conf.HeartbeatInterval {
  172. d.lastSyncTime = time.Now()
  173. d.Catalog()
  174. d.Subscribe()
  175. d.QueryDeviceInfo()
  176. }
  177. }
  178. type MessageEvent struct {
  179. Type string
  180. Device *Device
  181. }
  182. func (c *GB28181Config) OnMessage(req sip.Request, tx sip.ServerTransaction) {
  183. from, ok := req.From()
  184. if !ok || from.Address == nil || from.Address.User() == nil {
  185. GB28181Plugin.Error("OnMessage", zap.String("error", "no id"))
  186. return
  187. }
  188. id := from.Address.User().String()
  189. GB28181Plugin.Debug("SIP<-OnMessage", zap.String("id", id), zap.String("source", req.Source()), zap.String("req", req.String()))
  190. if v, ok := Devices.Load(id); ok {
  191. d := v.(*Device)
  192. switch d.Status {
  193. case DeviceOfflineStatus, DeviceRecoverStatus:
  194. c.RecoverDevice(d, req)
  195. go d.syncChannels()
  196. case DeviceRegisterStatus:
  197. d.Status = DeviceOnlineStatus
  198. }
  199. d.UpdateTime = time.Now()
  200. temp := &struct {
  201. XMLName xml.Name
  202. CmdType string
  203. SN int // 请求序列号,一般用于对应 request 和 response
  204. DeviceID string
  205. DeviceName string
  206. Manufacturer string
  207. Model string
  208. Channel string
  209. DeviceList []ChannelInfo `xml:"DeviceList>Item"`
  210. RecordList []*Record `xml:"RecordList>Item"`
  211. SumNum int // 录像结果的总数 SumNum,录像结果会按照多条消息返回,可用于判断是否全部返回
  212. }{}
  213. decoder := xml.NewDecoder(bytes.NewReader([]byte(req.Body())))
  214. decoder.CharsetReader = charset.NewReaderLabel
  215. err := decoder.Decode(temp)
  216. if err != nil {
  217. err = utils.DecodeGbk(temp, []byte(req.Body()))
  218. if err != nil {
  219. GB28181Plugin.Error("decode catelog err", zap.Error(err))
  220. }
  221. }
  222. var body string
  223. switch temp.CmdType {
  224. case "Keepalive":
  225. d.LastKeepaliveAt = time.Now()
  226. //callID !="" 说明是订阅的事件类型信息
  227. if d.lastSyncTime.IsZero() {
  228. go d.syncChannels()
  229. } else {
  230. d.channelMap.Range(func(key, value interface{}) bool {
  231. if conf.InviteMode == INVIDE_MODE_AUTO {
  232. value.(*Channel).TryAutoInvite(&InviteOptions{})
  233. }
  234. return true
  235. })
  236. }
  237. //在KeepLive 进行位置订阅的处理,如果开启了自动订阅位置,则去订阅位置
  238. if c.Position.AutosubPosition && time.Since(d.GpsTime) > c.Position.Interval*2 {
  239. d.MobilePositionSubscribe(d.ID, c.Position.Expires, c.Position.Interval)
  240. GB28181Plugin.Debug("Mobile Position Subscribe", zap.String("deviceID", d.ID))
  241. }
  242. case "Catalog":
  243. d.UpdateChannels(temp.DeviceList...)
  244. case "RecordInfo":
  245. RecordQueryLink.Put(d.ID, temp.DeviceID, temp.SN, temp.SumNum, temp.RecordList)
  246. case "DeviceInfo":
  247. // 主设备信息
  248. d.Name = temp.DeviceName
  249. d.Manufacturer = temp.Manufacturer
  250. d.Model = temp.Model
  251. case "Alarm":
  252. d.Status = DeviceAlarmedStatus
  253. body = BuildAlarmResponseXML(d.ID)
  254. case "Broadcast":
  255. GB28181Plugin.Info("broadcast message", zap.String("body", req.Body()))
  256. default:
  257. d.Warn("Not supported CmdType", zap.String("CmdType", temp.CmdType), zap.String("body", req.Body()))
  258. response := sip.NewResponseFromRequest("", req, http.StatusBadRequest, "", "")
  259. tx.Respond(response)
  260. return
  261. }
  262. EmitEvent(MessageEvent{
  263. Type: temp.CmdType,
  264. Device: d,
  265. })
  266. tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", body))
  267. } else {
  268. GB28181Plugin.Debug("Unauthorized message, device not found", zap.String("id", id))
  269. }
  270. }
  271. func (c *GB28181Config) OnBye(req sip.Request, tx sip.ServerTransaction) {
  272. tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", ""))
  273. }
  274. type NotifyEvent MessageEvent
  275. // OnNotify 订阅通知处理
  276. func (c *GB28181Config) OnNotify(req sip.Request, tx sip.ServerTransaction) {
  277. from, ok := req.From()
  278. if !ok || from.Address == nil || from.Address.User() == nil {
  279. GB28181Plugin.Error("OnMessage", zap.String("error", "no id"))
  280. return
  281. }
  282. id := from.Address.User().String()
  283. if v, ok := Devices.Load(id); ok {
  284. d := v.(*Device)
  285. d.UpdateTime = time.Now()
  286. temp := &struct {
  287. XMLName xml.Name
  288. CmdType string
  289. DeviceID string
  290. Time string //位置订阅-GPS时间
  291. Longitude string //位置订阅-经度
  292. Latitude string //位置订阅-维度
  293. // Speed string //位置订阅-速度(km/h)(可选)
  294. // Direction string //位置订阅-方向(取值为当前摄像头方向与正北方的顺时针夹角,取值范围0°~360°,单位:°)(可选)
  295. // Altitude string //位置订阅-海拔高度,单位:m(可选)
  296. DeviceList []*notifyMessage `xml:"DeviceList>Item"` //目录订阅
  297. }{}
  298. decoder := xml.NewDecoder(bytes.NewReader([]byte(req.Body())))
  299. decoder.CharsetReader = charset.NewReaderLabel
  300. err := decoder.Decode(temp)
  301. if err != nil {
  302. err = utils.DecodeGbk(temp, []byte(req.Body()))
  303. if err != nil {
  304. GB28181Plugin.Error("decode catelog err", zap.Error(err))
  305. }
  306. }
  307. var body string
  308. switch temp.CmdType {
  309. case "Catalog":
  310. //目录状态
  311. d.UpdateChannelStatus(temp.DeviceList)
  312. case "MobilePosition":
  313. //更新channel的坐标
  314. d.UpdateChannelPosition(temp.DeviceID, temp.Time, temp.Longitude, temp.Latitude)
  315. case "Alarm":
  316. d.Status = DeviceAlarmedStatus
  317. default:
  318. d.Warn("Not supported CmdType", zap.String("CmdType", temp.CmdType), zap.String("body", req.Body()))
  319. response := sip.NewResponseFromRequest("", req, http.StatusBadRequest, "", "")
  320. tx.Respond(response)
  321. return
  322. }
  323. EmitEvent(NotifyEvent{
  324. Type: temp.CmdType,
  325. Device: d})
  326. tx.Respond(sip.NewResponseFromRequest("", req, http.StatusOK, "OK", body))
  327. }
  328. }
  329. type notifyMessage struct {
  330. DeviceID string
  331. ParentID string
  332. Name string
  333. Manufacturer string
  334. Model string
  335. Owner string
  336. CivilCode string
  337. Address string
  338. Port int
  339. Parental int
  340. SafetyWay int
  341. RegisterWay int
  342. Secrecy int
  343. Status string
  344. //状态改变事件 ON:上线,OFF:离线,VLOST:视频丢失,DEFECT:故障,ADD:增加,DEL:删除,UPDATE:更新(必选)
  345. Event string
  346. }