client.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759
  1. /*
  2. * Copyright (c) 2013 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * which accompanies this distribution, and is available at
  7. * http://www.eclipse.org/legal/epl-v10.html
  8. *
  9. * Contributors:
  10. * Seth Hoenig
  11. * Allan Stockdill-Mander
  12. * Mike Robertson
  13. */
  14. // Portions copyright © 2018 TIBCO Software Inc.
  15. // Package mqtt provides an MQTT v3.1.1 client library.
  16. package mqtt
  17. import (
  18. "errors"
  19. "fmt"
  20. "net"
  21. "strings"
  22. "sync"
  23. "sync/atomic"
  24. "time"
  25. "github.com/eclipse/paho.mqtt.golang/packets"
  26. )
  27. const (
  28. disconnected uint32 = iota
  29. connecting
  30. reconnecting
  31. connected
  32. )
  33. // Client is the interface definition for a Client as used by this
  34. // library, the interface is primarily to allow mocking tests.
  35. //
  36. // It is an MQTT v3.1.1 client for communicating
  37. // with an MQTT server using non-blocking methods that allow work
  38. // to be done in the background.
  39. // An application may connect to an MQTT server using:
  40. // A plain TCP socket
  41. // A secure SSL/TLS socket
  42. // A websocket
  43. // To enable ensured message delivery at Quality of Service (QoS) levels
  44. // described in the MQTT spec, a message persistence mechanism must be
  45. // used. This is done by providing a type which implements the Store
  46. // interface. For convenience, FileStore and MemoryStore are provided
  47. // implementations that should be sufficient for most use cases. More
  48. // information can be found in their respective documentation.
  49. // Numerous connection options may be specified by configuring a
  50. // and then supplying a ClientOptions type.
  51. type Client interface {
  52. // IsConnected returns a bool signifying whether
  53. // the client is connected or not.
  54. IsConnected() bool
  55. // IsConnectionOpen return a bool signifying wether the client has an active
  56. // connection to mqtt broker, i.e not in disconnected or reconnect mode
  57. IsConnectionOpen() bool
  58. // Connect will create a connection to the message broker, by default
  59. // it will attempt to connect at v3.1.1 and auto retry at v3.1 if that
  60. // fails
  61. Connect() Token
  62. // Disconnect will end the connection with the server, but not before waiting
  63. // the specified number of milliseconds to wait for existing work to be
  64. // completed.
  65. Disconnect(quiesce uint)
  66. // Publish will publish a message with the specified QoS and content
  67. // to the specified topic.
  68. // Returns a token to track delivery of the message to the broker
  69. Publish(topic string, qos byte, retained bool, payload interface{}) Token
  70. // Subscribe starts a new subscription. Provide a MessageHandler to be executed when
  71. // a message is published on the topic provided, or nil for the default handler
  72. Subscribe(topic string, qos byte, callback MessageHandler) Token
  73. // SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to
  74. // be executed when a message is published on one of the topics provided, or nil for the
  75. // default handler
  76. SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token
  77. // Unsubscribe will end the subscription from each of the topics provided.
  78. // Messages published to those topics from other clients will no longer be
  79. // received.
  80. Unsubscribe(topics ...string) Token
  81. // AddRoute allows you to add a handler for messages on a specific topic
  82. // without making a subscription. For example having a different handler
  83. // for parts of a wildcard subscription
  84. AddRoute(topic string, callback MessageHandler)
  85. // OptionsReader returns a ClientOptionsReader which is a copy of the clientoptions
  86. // in use by the client.
  87. OptionsReader() ClientOptionsReader
  88. }
  89. // client implements the Client interface
  90. type client struct {
  91. lastSent atomic.Value
  92. lastReceived atomic.Value
  93. pingOutstanding int32
  94. status uint32
  95. sync.RWMutex
  96. messageIds
  97. conn net.Conn
  98. ibound chan packets.ControlPacket
  99. obound chan *PacketAndToken
  100. oboundP chan *PacketAndToken
  101. msgRouter *router
  102. stopRouter chan bool
  103. incomingPubChan chan *packets.PublishPacket
  104. errors chan error
  105. stop chan struct{}
  106. persist Store
  107. options ClientOptions
  108. workers sync.WaitGroup
  109. }
  110. // NewClient will create an MQTT v3.1.1 client with all of the options specified
  111. // in the provided ClientOptions. The client must have the Connect method called
  112. // on it before it may be used. This is to make sure resources (such as a net
  113. // connection) are created before the application is actually ready.
  114. func NewClient(o *ClientOptions) Client {
  115. c := &client{}
  116. c.options = *o
  117. if c.options.Store == nil {
  118. c.options.Store = NewMemoryStore()
  119. }
  120. switch c.options.ProtocolVersion {
  121. case 3, 4:
  122. c.options.protocolVersionExplicit = true
  123. case 0x83, 0x84:
  124. c.options.protocolVersionExplicit = true
  125. default:
  126. c.options.ProtocolVersion = 4
  127. c.options.protocolVersionExplicit = false
  128. }
  129. c.persist = c.options.Store
  130. c.status = disconnected
  131. c.messageIds = messageIds{index: make(map[uint16]tokenCompletor)}
  132. c.msgRouter, c.stopRouter = newRouter()
  133. c.msgRouter.setDefaultHandler(c.options.DefaultPublishHandler)
  134. if !c.options.AutoReconnect {
  135. c.options.MessageChannelDepth = 0
  136. }
  137. return c
  138. }
  139. // AddRoute allows you to add a handler for messages on a specific topic
  140. // without making a subscription. For example having a different handler
  141. // for parts of a wildcard subscription
  142. func (c *client) AddRoute(topic string, callback MessageHandler) {
  143. if callback != nil {
  144. c.msgRouter.addRoute(topic, callback)
  145. }
  146. }
  147. // IsConnected returns a bool signifying whether
  148. // the client is connected or not.
  149. func (c *client) IsConnected() bool {
  150. c.RLock()
  151. defer c.RUnlock()
  152. status := atomic.LoadUint32(&c.status)
  153. switch {
  154. case status == connected:
  155. return true
  156. case c.options.AutoReconnect && status > connecting:
  157. return true
  158. default:
  159. return false
  160. }
  161. }
  162. // IsConnectionOpen return a bool signifying whether the client has an active
  163. // connection to mqtt broker, i.e not in disconnected or reconnect mode
  164. func (c *client) IsConnectionOpen() bool {
  165. c.RLock()
  166. defer c.RUnlock()
  167. status := atomic.LoadUint32(&c.status)
  168. switch {
  169. case status == connected:
  170. return true
  171. default:
  172. return false
  173. }
  174. }
  175. func (c *client) connectionStatus() uint32 {
  176. c.RLock()
  177. defer c.RUnlock()
  178. status := atomic.LoadUint32(&c.status)
  179. return status
  180. }
  181. func (c *client) setConnected(status uint32) {
  182. c.Lock()
  183. defer c.Unlock()
  184. atomic.StoreUint32(&c.status, uint32(status))
  185. }
  186. //ErrNotConnected is the error returned from function calls that are
  187. //made when the client is not connected to a broker
  188. var ErrNotConnected = errors.New("Not Connected")
  189. // Connect will create a connection to the message broker, by default
  190. // it will attempt to connect at v3.1.1 and auto retry at v3.1 if that
  191. // fails
  192. func (c *client) Connect() Token {
  193. var err error
  194. t := newToken(packets.Connect).(*ConnectToken)
  195. DEBUG.Println(CLI, "Connect()")
  196. c.obound = make(chan *PacketAndToken, c.options.MessageChannelDepth)
  197. c.oboundP = make(chan *PacketAndToken, c.options.MessageChannelDepth)
  198. c.ibound = make(chan packets.ControlPacket)
  199. go func() {
  200. c.persist.Open()
  201. c.setConnected(connecting)
  202. c.errors = make(chan error, 1)
  203. c.stop = make(chan struct{})
  204. var rc byte
  205. protocolVersion := c.options.ProtocolVersion
  206. if len(c.options.Servers) == 0 {
  207. t.setError(fmt.Errorf("No servers defined to connect to"))
  208. return
  209. }
  210. for _, broker := range c.options.Servers {
  211. cm := newConnectMsgFromOptions(&c.options, broker)
  212. c.options.ProtocolVersion = protocolVersion
  213. CONN:
  214. DEBUG.Println(CLI, "about to write new connect msg")
  215. c.conn, err = openConnection(broker, c.options.TLSConfig, c.options.ConnectTimeout, c.options.HTTPHeaders)
  216. if err == nil {
  217. DEBUG.Println(CLI, "socket connected to broker")
  218. switch c.options.ProtocolVersion {
  219. case 3:
  220. DEBUG.Println(CLI, "Using MQTT 3.1 protocol")
  221. cm.ProtocolName = "MQIsdp"
  222. cm.ProtocolVersion = 3
  223. case 0x83:
  224. DEBUG.Println(CLI, "Using MQTT 3.1b protocol")
  225. cm.ProtocolName = "MQIsdp"
  226. cm.ProtocolVersion = 0x83
  227. case 0x84:
  228. DEBUG.Println(CLI, "Using MQTT 3.1.1b protocol")
  229. cm.ProtocolName = "MQTT"
  230. cm.ProtocolVersion = 0x84
  231. default:
  232. DEBUG.Println(CLI, "Using MQTT 3.1.1 protocol")
  233. c.options.ProtocolVersion = 4
  234. cm.ProtocolName = "MQTT"
  235. cm.ProtocolVersion = 4
  236. }
  237. cm.Write(c.conn)
  238. rc, t.sessionPresent = c.connect()
  239. if rc != packets.Accepted {
  240. if c.conn != nil {
  241. c.conn.Close()
  242. c.conn = nil
  243. }
  244. //if the protocol version was explicitly set don't do any fallback
  245. if c.options.protocolVersionExplicit {
  246. ERROR.Println(CLI, "Connecting to", broker, "CONNACK was not CONN_ACCEPTED, but rather", packets.ConnackReturnCodes[rc])
  247. continue
  248. }
  249. if c.options.ProtocolVersion == 4 {
  250. DEBUG.Println(CLI, "Trying reconnect using MQTT 3.1 protocol")
  251. c.options.ProtocolVersion = 3
  252. goto CONN
  253. }
  254. }
  255. break
  256. } else {
  257. ERROR.Println(CLI, err.Error())
  258. WARN.Println(CLI, "failed to connect to broker, trying next")
  259. rc = packets.ErrNetworkError
  260. }
  261. }
  262. if c.conn == nil {
  263. ERROR.Println(CLI, "Failed to connect to a broker")
  264. c.setConnected(disconnected)
  265. c.persist.Close()
  266. t.returnCode = rc
  267. if rc != packets.ErrNetworkError {
  268. t.setError(packets.ConnErrors[rc])
  269. } else {
  270. t.setError(fmt.Errorf("%s : %s", packets.ConnErrors[rc], err))
  271. }
  272. return
  273. }
  274. c.options.protocolVersionExplicit = true
  275. if c.options.KeepAlive != 0 {
  276. atomic.StoreInt32(&c.pingOutstanding, 0)
  277. c.lastReceived.Store(time.Now())
  278. c.lastSent.Store(time.Now())
  279. c.workers.Add(1)
  280. go keepalive(c)
  281. }
  282. c.incomingPubChan = make(chan *packets.PublishPacket, c.options.MessageChannelDepth)
  283. c.msgRouter.matchAndDispatch(c.incomingPubChan, c.options.Order, c)
  284. c.setConnected(connected)
  285. DEBUG.Println(CLI, "client is connected")
  286. if c.options.OnConnect != nil {
  287. go c.options.OnConnect(c)
  288. }
  289. c.workers.Add(4)
  290. go errorWatch(c)
  291. go alllogic(c)
  292. go outgoing(c)
  293. go incoming(c)
  294. // Take care of any messages in the store
  295. if c.options.CleanSession == false {
  296. c.resume(c.options.ResumeSubs)
  297. } else {
  298. c.persist.Reset()
  299. }
  300. DEBUG.Println(CLI, "exit startClient")
  301. t.flowComplete()
  302. }()
  303. return t
  304. }
  305. // internal function used to reconnect the client when it loses its connection
  306. func (c *client) reconnect() {
  307. DEBUG.Println(CLI, "enter reconnect")
  308. var (
  309. err error
  310. rc = byte(1)
  311. sleep = time.Duration(1 * time.Second)
  312. )
  313. for rc != 0 && atomic.LoadUint32(&c.status) != disconnected {
  314. for _, broker := range c.options.Servers {
  315. cm := newConnectMsgFromOptions(&c.options, broker)
  316. DEBUG.Println(CLI, "about to write new connect msg")
  317. c.Lock()
  318. c.conn, err = openConnection(broker, c.options.TLSConfig, c.options.ConnectTimeout, c.options.HTTPHeaders)
  319. c.Unlock()
  320. if err == nil {
  321. DEBUG.Println(CLI, "socket connected to broker")
  322. switch c.options.ProtocolVersion {
  323. case 0x83:
  324. DEBUG.Println(CLI, "Using MQTT 3.1b protocol")
  325. cm.ProtocolName = "MQIsdp"
  326. cm.ProtocolVersion = 0x83
  327. case 0x84:
  328. DEBUG.Println(CLI, "Using MQTT 3.1.1b protocol")
  329. cm.ProtocolName = "MQTT"
  330. cm.ProtocolVersion = 0x84
  331. case 3:
  332. DEBUG.Println(CLI, "Using MQTT 3.1 protocol")
  333. cm.ProtocolName = "MQIsdp"
  334. cm.ProtocolVersion = 3
  335. default:
  336. DEBUG.Println(CLI, "Using MQTT 3.1.1 protocol")
  337. cm.ProtocolName = "MQTT"
  338. cm.ProtocolVersion = 4
  339. }
  340. cm.Write(c.conn)
  341. rc, _ = c.connect()
  342. if rc != packets.Accepted {
  343. c.conn.Close()
  344. c.conn = nil
  345. //if the protocol version was explicitly set don't do any fallback
  346. if c.options.protocolVersionExplicit {
  347. ERROR.Println(CLI, "Connecting to", broker, "CONNACK was not Accepted, but rather", packets.ConnackReturnCodes[rc])
  348. continue
  349. }
  350. }
  351. break
  352. } else {
  353. ERROR.Println(CLI, err.Error())
  354. WARN.Println(CLI, "failed to connect to broker, trying next")
  355. rc = packets.ErrNetworkError
  356. }
  357. }
  358. if rc != 0 {
  359. DEBUG.Println(CLI, "Reconnect failed, sleeping for", int(sleep.Seconds()), "seconds")
  360. time.Sleep(sleep)
  361. if sleep < c.options.MaxReconnectInterval {
  362. sleep *= 2
  363. }
  364. if sleep > c.options.MaxReconnectInterval {
  365. sleep = c.options.MaxReconnectInterval
  366. }
  367. }
  368. }
  369. // Disconnect() must have been called while we were trying to reconnect.
  370. if c.connectionStatus() == disconnected {
  371. DEBUG.Println(CLI, "Client moved to disconnected state while reconnecting, abandoning reconnect")
  372. return
  373. }
  374. c.stop = make(chan struct{})
  375. if c.options.KeepAlive != 0 {
  376. atomic.StoreInt32(&c.pingOutstanding, 0)
  377. c.lastReceived.Store(time.Now())
  378. c.lastSent.Store(time.Now())
  379. c.workers.Add(1)
  380. go keepalive(c)
  381. }
  382. c.setConnected(connected)
  383. DEBUG.Println(CLI, "client is reconnected")
  384. if c.options.OnConnect != nil {
  385. go c.options.OnConnect(c)
  386. }
  387. c.workers.Add(4)
  388. go errorWatch(c)
  389. go alllogic(c)
  390. go outgoing(c)
  391. go incoming(c)
  392. c.resume(false)
  393. }
  394. // This function is only used for receiving a connack
  395. // when the connection is first started.
  396. // This prevents receiving incoming data while resume
  397. // is in progress if clean session is false.
  398. func (c *client) connect() (byte, bool) {
  399. DEBUG.Println(NET, "connect started")
  400. ca, err := packets.ReadPacket(c.conn)
  401. if err != nil {
  402. ERROR.Println(NET, "connect got error", err)
  403. return packets.ErrNetworkError, false
  404. }
  405. if ca == nil {
  406. ERROR.Println(NET, "received nil packet")
  407. return packets.ErrNetworkError, false
  408. }
  409. msg, ok := ca.(*packets.ConnackPacket)
  410. if !ok {
  411. ERROR.Println(NET, "received msg that was not CONNACK")
  412. return packets.ErrNetworkError, false
  413. }
  414. DEBUG.Println(NET, "received connack")
  415. return msg.ReturnCode, msg.SessionPresent
  416. }
  417. // Disconnect will end the connection with the server, but not before waiting
  418. // the specified number of milliseconds to wait for existing work to be
  419. // completed.
  420. func (c *client) Disconnect(quiesce uint) {
  421. status := atomic.LoadUint32(&c.status)
  422. if status == connected {
  423. DEBUG.Println(CLI, "disconnecting")
  424. c.setConnected(disconnected)
  425. dm := packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
  426. dt := newToken(packets.Disconnect)
  427. c.oboundP <- &PacketAndToken{p: dm, t: dt}
  428. // wait for work to finish, or quiesce time consumed
  429. dt.WaitTimeout(time.Duration(quiesce) * time.Millisecond)
  430. } else {
  431. WARN.Println(CLI, "Disconnect() called but not connected (disconnected/reconnecting)")
  432. c.setConnected(disconnected)
  433. }
  434. c.disconnect()
  435. }
  436. // ForceDisconnect will end the connection with the mqtt broker immediately.
  437. func (c *client) forceDisconnect() {
  438. if !c.IsConnected() {
  439. WARN.Println(CLI, "already disconnected")
  440. return
  441. }
  442. c.setConnected(disconnected)
  443. c.conn.Close()
  444. DEBUG.Println(CLI, "forcefully disconnecting")
  445. c.disconnect()
  446. }
  447. func (c *client) internalConnLost(err error) {
  448. // Only do anything if this was called and we are still "connected"
  449. // forceDisconnect can cause incoming/outgoing/alllogic to end with
  450. // error from closing the socket but state will be "disconnected"
  451. if c.IsConnected() {
  452. c.closeStop()
  453. c.conn.Close()
  454. c.workers.Wait()
  455. if c.options.CleanSession && !c.options.AutoReconnect {
  456. c.messageIds.cleanUp()
  457. }
  458. if c.options.AutoReconnect {
  459. c.setConnected(reconnecting)
  460. go c.reconnect()
  461. } else {
  462. c.setConnected(disconnected)
  463. }
  464. if c.options.OnConnectionLost != nil {
  465. go c.options.OnConnectionLost(c, err)
  466. }
  467. }
  468. }
  469. func (c *client) closeStop() {
  470. c.Lock()
  471. defer c.Unlock()
  472. select {
  473. case <-c.stop:
  474. DEBUG.Println("In disconnect and stop channel is already closed")
  475. default:
  476. if c.stop != nil {
  477. close(c.stop)
  478. }
  479. }
  480. }
  481. func (c *client) closeStopRouter() {
  482. c.Lock()
  483. defer c.Unlock()
  484. select {
  485. case <-c.stopRouter:
  486. DEBUG.Println("In disconnect and stop channel is already closed")
  487. default:
  488. if c.stopRouter != nil {
  489. close(c.stopRouter)
  490. }
  491. }
  492. }
  493. func (c *client) closeConn() {
  494. c.Lock()
  495. defer c.Unlock()
  496. if c.conn != nil {
  497. c.conn.Close()
  498. }
  499. }
  500. func (c *client) disconnect() {
  501. c.closeStop()
  502. c.closeConn()
  503. c.workers.Wait()
  504. c.messageIds.cleanUp()
  505. c.closeStopRouter()
  506. DEBUG.Println(CLI, "disconnected")
  507. c.persist.Close()
  508. }
  509. // Publish will publish a message with the specified QoS and content
  510. // to the specified topic.
  511. // Returns a token to track delivery of the message to the broker
  512. func (c *client) Publish(topic string, qos byte, retained bool, payload interface{}) Token {
  513. token := newToken(packets.Publish).(*PublishToken)
  514. DEBUG.Println(CLI, "enter Publish")
  515. switch {
  516. case !c.IsConnected():
  517. token.setError(ErrNotConnected)
  518. return token
  519. case c.connectionStatus() == reconnecting && qos == 0:
  520. token.flowComplete()
  521. return token
  522. }
  523. pub := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
  524. pub.Qos = qos
  525. pub.TopicName = topic
  526. pub.Retain = retained
  527. switch payload.(type) {
  528. case string:
  529. pub.Payload = []byte(payload.(string))
  530. case []byte:
  531. pub.Payload = payload.([]byte)
  532. default:
  533. token.setError(fmt.Errorf("Unknown payload type"))
  534. return token
  535. }
  536. if pub.Qos != 0 && pub.MessageID == 0 {
  537. pub.MessageID = c.getID(token)
  538. token.messageID = pub.MessageID
  539. }
  540. persistOutbound(c.persist, pub)
  541. if c.connectionStatus() == reconnecting {
  542. DEBUG.Println(CLI, "storing publish message (reconnecting), topic:", topic)
  543. } else {
  544. DEBUG.Println(CLI, "sending publish message, topic:", topic)
  545. c.obound <- &PacketAndToken{p: pub, t: token}
  546. }
  547. return token
  548. }
  549. // Subscribe starts a new subscription. Provide a MessageHandler to be executed when
  550. // a message is published on the topic provided.
  551. func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Token {
  552. token := newToken(packets.Subscribe).(*SubscribeToken)
  553. DEBUG.Println(CLI, "enter Subscribe")
  554. if !c.IsConnected() {
  555. token.setError(ErrNotConnected)
  556. return token
  557. }
  558. sub := packets.NewControlPacket(packets.Subscribe).(*packets.SubscribePacket)
  559. if err := validateTopicAndQos(topic, qos); err != nil {
  560. token.setError(err)
  561. return token
  562. }
  563. sub.Topics = append(sub.Topics, topic)
  564. sub.Qoss = append(sub.Qoss, qos)
  565. DEBUG.Println(CLI, sub.String())
  566. if strings.HasPrefix(topic, "$share") {
  567. topic = strings.Join(strings.Split(topic, "/")[2:], "/")
  568. }
  569. if callback != nil {
  570. c.msgRouter.addRoute(topic, callback)
  571. }
  572. token.subs = append(token.subs, topic)
  573. c.oboundP <- &PacketAndToken{p: sub, t: token}
  574. DEBUG.Println(CLI, "exit Subscribe")
  575. return token
  576. }
  577. // SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to
  578. // be executed when a message is published on one of the topics provided.
  579. func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token {
  580. var err error
  581. token := newToken(packets.Subscribe).(*SubscribeToken)
  582. DEBUG.Println(CLI, "enter SubscribeMultiple")
  583. if !c.IsConnected() {
  584. token.setError(ErrNotConnected)
  585. return token
  586. }
  587. sub := packets.NewControlPacket(packets.Subscribe).(*packets.SubscribePacket)
  588. if sub.Topics, sub.Qoss, err = validateSubscribeMap(filters); err != nil {
  589. token.setError(err)
  590. return token
  591. }
  592. if callback != nil {
  593. for topic := range filters {
  594. c.msgRouter.addRoute(topic, callback)
  595. }
  596. }
  597. token.subs = make([]string, len(sub.Topics))
  598. copy(token.subs, sub.Topics)
  599. c.oboundP <- &PacketAndToken{p: sub, t: token}
  600. DEBUG.Println(CLI, "exit SubscribeMultiple")
  601. return token
  602. }
  603. // Load all stored messages and resend them
  604. // Call this to ensure QOS > 1,2 even after an application crash
  605. func (c *client) resume(subscription bool) {
  606. storedKeys := c.persist.All()
  607. for _, key := range storedKeys {
  608. packet := c.persist.Get(key)
  609. if packet == nil {
  610. continue
  611. }
  612. details := packet.Details()
  613. if isKeyOutbound(key) {
  614. switch packet.(type) {
  615. case *packets.SubscribePacket:
  616. if subscription {
  617. DEBUG.Println(STR, fmt.Sprintf("loaded pending subscribe (%d)", details.MessageID))
  618. token := newToken(packets.Subscribe).(*SubscribeToken)
  619. c.oboundP <- &PacketAndToken{p: packet, t: token}
  620. }
  621. case *packets.UnsubscribePacket:
  622. if subscription {
  623. DEBUG.Println(STR, fmt.Sprintf("loaded pending unsubscribe (%d)", details.MessageID))
  624. token := newToken(packets.Unsubscribe).(*UnsubscribeToken)
  625. c.oboundP <- &PacketAndToken{p: packet, t: token}
  626. }
  627. case *packets.PubrelPacket:
  628. DEBUG.Println(STR, fmt.Sprintf("loaded pending pubrel (%d)", details.MessageID))
  629. select {
  630. case c.oboundP <- &PacketAndToken{p: packet, t: nil}:
  631. case <-c.stop:
  632. }
  633. case *packets.PublishPacket:
  634. token := newToken(packets.Publish).(*PublishToken)
  635. token.messageID = details.MessageID
  636. c.claimID(token, details.MessageID)
  637. DEBUG.Println(STR, fmt.Sprintf("loaded pending publish (%d)", details.MessageID))
  638. DEBUG.Println(STR, details)
  639. c.obound <- &PacketAndToken{p: packet, t: token}
  640. default:
  641. ERROR.Println(STR, "invalid message type in store (discarded)")
  642. c.persist.Del(key)
  643. }
  644. } else {
  645. switch packet.(type) {
  646. case *packets.PubrelPacket, *packets.PublishPacket:
  647. DEBUG.Println(STR, fmt.Sprintf("loaded pending incomming (%d)", details.MessageID))
  648. select {
  649. case c.ibound <- packet:
  650. case <-c.stop:
  651. }
  652. default:
  653. ERROR.Println(STR, "invalid message type in store (discarded)")
  654. c.persist.Del(key)
  655. }
  656. }
  657. }
  658. }
  659. // Unsubscribe will end the subscription from each of the topics provided.
  660. // Messages published to those topics from other clients will no longer be
  661. // received.
  662. func (c *client) Unsubscribe(topics ...string) Token {
  663. token := newToken(packets.Unsubscribe).(*UnsubscribeToken)
  664. DEBUG.Println(CLI, "enter Unsubscribe")
  665. if !c.IsConnected() {
  666. token.setError(ErrNotConnected)
  667. return token
  668. }
  669. unsub := packets.NewControlPacket(packets.Unsubscribe).(*packets.UnsubscribePacket)
  670. unsub.Topics = make([]string, len(topics))
  671. copy(unsub.Topics, topics)
  672. c.oboundP <- &PacketAndToken{p: unsub, t: token}
  673. for _, topic := range topics {
  674. c.msgRouter.deleteRoute(topic)
  675. }
  676. DEBUG.Println(CLI, "exit Unsubscribe")
  677. return token
  678. }
  679. // OptionsReader returns a ClientOptionsReader which is a copy of the clientoptions
  680. // in use by the client.
  681. func (c *client) OptionsReader() ClientOptionsReader {
  682. r := ClientOptionsReader{options: &c.options}
  683. return r
  684. }
  685. //DefaultConnectionLostHandler is a definition of a function that simply
  686. //reports to the DEBUG log the reason for the client losing a connection.
  687. func DefaultConnectionLostHandler(client Client, reason error) {
  688. DEBUG.Println("Connection lost:", reason.Error())
  689. }