tx.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package redis
  2. import (
  3. "github.com/go-redis/redis/internal/pool"
  4. "github.com/go-redis/redis/internal/proto"
  5. )
  6. // TxFailedErr transaction redis failed.
  7. const TxFailedErr = proto.RedisError("redis: transaction failed")
  8. // Tx implements Redis transactions as described in
  9. // http://redis.io/topics/transactions. It's NOT safe for concurrent use
  10. // by multiple goroutines, because Exec resets list of watched keys.
  11. // If you don't need WATCH it is better to use Pipeline.
  12. type Tx struct {
  13. statefulCmdable
  14. baseClient
  15. }
  16. func (c *Client) newTx() *Tx {
  17. tx := Tx{
  18. baseClient: baseClient{
  19. opt: c.opt,
  20. connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), true),
  21. },
  22. }
  23. tx.baseClient.init()
  24. tx.statefulCmdable.setProcessor(tx.Process)
  25. return &tx
  26. }
  27. // Watch prepares a transaction and marks the keys to be watched
  28. // for conditional execution if there are any keys.
  29. //
  30. // The transaction is automatically closed when fn exits.
  31. func (c *Client) Watch(fn func(*Tx) error, keys ...string) error {
  32. tx := c.newTx()
  33. if len(keys) > 0 {
  34. if err := tx.Watch(keys...).Err(); err != nil {
  35. _ = tx.Close()
  36. return err
  37. }
  38. }
  39. err := fn(tx)
  40. _ = tx.Close()
  41. return err
  42. }
  43. // Close closes the transaction, releasing any open resources.
  44. func (c *Tx) Close() error {
  45. _ = c.Unwatch().Err()
  46. return c.baseClient.Close()
  47. }
  48. // Watch marks the keys to be watched for conditional execution
  49. // of a transaction.
  50. func (c *Tx) Watch(keys ...string) *StatusCmd {
  51. args := make([]interface{}, 1+len(keys))
  52. args[0] = "watch"
  53. for i, key := range keys {
  54. args[1+i] = key
  55. }
  56. cmd := NewStatusCmd(args...)
  57. c.Process(cmd)
  58. return cmd
  59. }
  60. // Unwatch flushes all the previously watched keys for a transaction.
  61. func (c *Tx) Unwatch(keys ...string) *StatusCmd {
  62. args := make([]interface{}, 1+len(keys))
  63. args[0] = "unwatch"
  64. for i, key := range keys {
  65. args[1+i] = key
  66. }
  67. cmd := NewStatusCmd(args...)
  68. c.Process(cmd)
  69. return cmd
  70. }
  71. // Pipeline creates a new pipeline. It is more convenient to use Pipelined.
  72. func (c *Tx) Pipeline() Pipeliner {
  73. pipe := Pipeline{
  74. exec: c.processTxPipeline,
  75. }
  76. pipe.statefulCmdable.setProcessor(pipe.Process)
  77. return &pipe
  78. }
  79. // Pipelined executes commands queued in the fn in a transaction.
  80. //
  81. // When using WATCH, EXEC will execute commands only if the watched keys
  82. // were not modified, allowing for a check-and-set mechanism.
  83. //
  84. // Exec always returns list of commands. If transaction fails
  85. // TxFailedErr is returned. Otherwise Exec returns an error of the first
  86. // failed command or nil.
  87. func (c *Tx) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  88. return c.Pipeline().Pipelined(fn)
  89. }
  90. // TxPipelined is an alias for Pipelined.
  91. func (c *Tx) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  92. return c.Pipelined(fn)
  93. }
  94. // TxPipeline is an alias for Pipeline.
  95. func (c *Tx) TxPipeline() Pipeliner {
  96. return c.Pipeline()
  97. }