token.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. /*
  2. * Copyright (c) 2014 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * which accompanies this distribution, and is available at
  7. * http://www.eclipse.org/legal/epl-v10.html
  8. *
  9. * Contributors:
  10. * Allan Stockdill-Mander
  11. */
  12. package mqtt
  13. import (
  14. "sync"
  15. "time"
  16. "github.com/eclipse/paho.mqtt.golang/packets"
  17. )
  18. // PacketAndToken is a struct that contains both a ControlPacket and a
  19. // Token. This struct is passed via channels between the client interface
  20. // code and the underlying code responsible for sending and receiving
  21. // MQTT messages.
  22. type PacketAndToken struct {
  23. p packets.ControlPacket
  24. t tokenCompletor
  25. }
  26. // Token defines the interface for the tokens used to indicate when
  27. // actions have completed.
  28. type Token interface {
  29. Wait() bool
  30. WaitTimeout(time.Duration) bool
  31. Error() error
  32. }
  33. type TokenErrorSetter interface {
  34. setError(error)
  35. }
  36. type tokenCompletor interface {
  37. Token
  38. TokenErrorSetter
  39. flowComplete()
  40. }
  41. type baseToken struct {
  42. m sync.RWMutex
  43. complete chan struct{}
  44. err error
  45. }
  46. // Wait will wait indefinitely for the Token to complete, ie the Publish
  47. // to be sent and confirmed receipt from the broker
  48. func (b *baseToken) Wait() bool {
  49. <-b.complete
  50. return true
  51. }
  52. // WaitTimeout takes a time.Duration to wait for the flow associated with the
  53. // Token to complete, returns true if it returned before the timeout or
  54. // returns false if the timeout occurred. In the case of a timeout the Token
  55. // does not have an error set in case the caller wishes to wait again
  56. func (b *baseToken) WaitTimeout(d time.Duration) bool {
  57. b.m.Lock()
  58. defer b.m.Unlock()
  59. timer := time.NewTimer(d)
  60. select {
  61. case <-b.complete:
  62. if !timer.Stop() {
  63. <-timer.C
  64. }
  65. return true
  66. case <-timer.C:
  67. }
  68. return false
  69. }
  70. func (b *baseToken) flowComplete() {
  71. select {
  72. case <-b.complete:
  73. default:
  74. close(b.complete)
  75. }
  76. }
  77. func (b *baseToken) Error() error {
  78. b.m.RLock()
  79. defer b.m.RUnlock()
  80. return b.err
  81. }
  82. func (b *baseToken) setError(e error) {
  83. b.m.Lock()
  84. b.err = e
  85. b.flowComplete()
  86. b.m.Unlock()
  87. }
  88. func newToken(tType byte) tokenCompletor {
  89. switch tType {
  90. case packets.Connect:
  91. return &ConnectToken{baseToken: baseToken{complete: make(chan struct{})}}
  92. case packets.Subscribe:
  93. return &SubscribeToken{baseToken: baseToken{complete: make(chan struct{})}, subResult: make(map[string]byte)}
  94. case packets.Publish:
  95. return &PublishToken{baseToken: baseToken{complete: make(chan struct{})}}
  96. case packets.Unsubscribe:
  97. return &UnsubscribeToken{baseToken: baseToken{complete: make(chan struct{})}}
  98. case packets.Disconnect:
  99. return &DisconnectToken{baseToken: baseToken{complete: make(chan struct{})}}
  100. }
  101. return nil
  102. }
  103. // ConnectToken is an extension of Token containing the extra fields
  104. // required to provide information about calls to Connect()
  105. type ConnectToken struct {
  106. baseToken
  107. returnCode byte
  108. sessionPresent bool
  109. }
  110. // ReturnCode returns the acknowlegement code in the connack sent
  111. // in response to a Connect()
  112. func (c *ConnectToken) ReturnCode() byte {
  113. c.m.RLock()
  114. defer c.m.RUnlock()
  115. return c.returnCode
  116. }
  117. // SessionPresent returns a bool representing the value of the
  118. // session present field in the connack sent in response to a Connect()
  119. func (c *ConnectToken) SessionPresent() bool {
  120. c.m.RLock()
  121. defer c.m.RUnlock()
  122. return c.sessionPresent
  123. }
  124. // PublishToken is an extension of Token containing the extra fields
  125. // required to provide information about calls to Publish()
  126. type PublishToken struct {
  127. baseToken
  128. messageID uint16
  129. }
  130. // MessageID returns the MQTT message ID that was assigned to the
  131. // Publish packet when it was sent to the broker
  132. func (p *PublishToken) MessageID() uint16 {
  133. return p.messageID
  134. }
  135. // SubscribeToken is an extension of Token containing the extra fields
  136. // required to provide information about calls to Subscribe()
  137. type SubscribeToken struct {
  138. baseToken
  139. subs []string
  140. subResult map[string]byte
  141. }
  142. // Result returns a map of topics that were subscribed to along with
  143. // the matching return code from the broker. This is either the Qos
  144. // value of the subscription or an error code.
  145. func (s *SubscribeToken) Result() map[string]byte {
  146. s.m.RLock()
  147. defer s.m.RUnlock()
  148. return s.subResult
  149. }
  150. // UnsubscribeToken is an extension of Token containing the extra fields
  151. // required to provide information about calls to Unsubscribe()
  152. type UnsubscribeToken struct {
  153. baseToken
  154. }
  155. // DisconnectToken is an extension of Token containing the extra fields
  156. // required to provide information about calls to Disconnect()
  157. type DisconnectToken struct {
  158. baseToken
  159. }