123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- /*
- * Copyright (c) 2014 IBM Corp.
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Allan Stockdill-Mander
- */
- package mqtt
- import (
- "sync"
- "time"
- "github.com/eclipse/paho.mqtt.golang/packets"
- )
- // PacketAndToken is a struct that contains both a ControlPacket and a
- // Token. This struct is passed via channels between the client interface
- // code and the underlying code responsible for sending and receiving
- // MQTT messages.
- type PacketAndToken struct {
- p packets.ControlPacket
- t tokenCompletor
- }
- // Token defines the interface for the tokens used to indicate when
- // actions have completed.
- type Token interface {
- Wait() bool
- WaitTimeout(time.Duration) bool
- Error() error
- }
- type TokenErrorSetter interface {
- setError(error)
- }
- type tokenCompletor interface {
- Token
- TokenErrorSetter
- flowComplete()
- }
- type baseToken struct {
- m sync.RWMutex
- complete chan struct{}
- err error
- }
- // Wait will wait indefinitely for the Token to complete, ie the Publish
- // to be sent and confirmed receipt from the broker
- func (b *baseToken) Wait() bool {
- <-b.complete
- return true
- }
- // WaitTimeout takes a time.Duration to wait for the flow associated with the
- // Token to complete, returns true if it returned before the timeout or
- // returns false if the timeout occurred. In the case of a timeout the Token
- // does not have an error set in case the caller wishes to wait again
- func (b *baseToken) WaitTimeout(d time.Duration) bool {
- b.m.Lock()
- defer b.m.Unlock()
- timer := time.NewTimer(d)
- select {
- case <-b.complete:
- if !timer.Stop() {
- <-timer.C
- }
- return true
- case <-timer.C:
- }
- return false
- }
- func (b *baseToken) flowComplete() {
- select {
- case <-b.complete:
- default:
- close(b.complete)
- }
- }
- func (b *baseToken) Error() error {
- b.m.RLock()
- defer b.m.RUnlock()
- return b.err
- }
- func (b *baseToken) setError(e error) {
- b.m.Lock()
- b.err = e
- b.flowComplete()
- b.m.Unlock()
- }
- func newToken(tType byte) tokenCompletor {
- switch tType {
- case packets.Connect:
- return &ConnectToken{baseToken: baseToken{complete: make(chan struct{})}}
- case packets.Subscribe:
- return &SubscribeToken{baseToken: baseToken{complete: make(chan struct{})}, subResult: make(map[string]byte)}
- case packets.Publish:
- return &PublishToken{baseToken: baseToken{complete: make(chan struct{})}}
- case packets.Unsubscribe:
- return &UnsubscribeToken{baseToken: baseToken{complete: make(chan struct{})}}
- case packets.Disconnect:
- return &DisconnectToken{baseToken: baseToken{complete: make(chan struct{})}}
- }
- return nil
- }
- // ConnectToken is an extension of Token containing the extra fields
- // required to provide information about calls to Connect()
- type ConnectToken struct {
- baseToken
- returnCode byte
- sessionPresent bool
- }
- // ReturnCode returns the acknowlegement code in the connack sent
- // in response to a Connect()
- func (c *ConnectToken) ReturnCode() byte {
- c.m.RLock()
- defer c.m.RUnlock()
- return c.returnCode
- }
- // SessionPresent returns a bool representing the value of the
- // session present field in the connack sent in response to a Connect()
- func (c *ConnectToken) SessionPresent() bool {
- c.m.RLock()
- defer c.m.RUnlock()
- return c.sessionPresent
- }
- // PublishToken is an extension of Token containing the extra fields
- // required to provide information about calls to Publish()
- type PublishToken struct {
- baseToken
- messageID uint16
- }
- // MessageID returns the MQTT message ID that was assigned to the
- // Publish packet when it was sent to the broker
- func (p *PublishToken) MessageID() uint16 {
- return p.messageID
- }
- // SubscribeToken is an extension of Token containing the extra fields
- // required to provide information about calls to Subscribe()
- type SubscribeToken struct {
- baseToken
- subs []string
- subResult map[string]byte
- }
- // Result returns a map of topics that were subscribed to along with
- // the matching return code from the broker. This is either the Qos
- // value of the subscription or an error code.
- func (s *SubscribeToken) Result() map[string]byte {
- s.m.RLock()
- defer s.m.RUnlock()
- return s.subResult
- }
- // UnsubscribeToken is an extension of Token containing the extra fields
- // required to provide information about calls to Unsubscribe()
- type UnsubscribeToken struct {
- baseToken
- }
- // DisconnectToken is an extension of Token containing the extra fields
- // required to provide information about calls to Disconnect()
- type DisconnectToken struct {
- baseToken
- }
|