redis.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. package redis
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "os"
  7. "time"
  8. "github.com/go-redis/redis/internal"
  9. "github.com/go-redis/redis/internal/pool"
  10. "github.com/go-redis/redis/internal/proto"
  11. )
  12. // Nil reply Redis returns when key does not exist.
  13. const Nil = proto.Nil
  14. func init() {
  15. SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
  16. }
  17. func SetLogger(logger *log.Logger) {
  18. internal.Logger = logger
  19. }
  20. type baseClient struct {
  21. opt *Options
  22. connPool pool.Pooler
  23. limiter Limiter
  24. process func(Cmder) error
  25. processPipeline func([]Cmder) error
  26. processTxPipeline func([]Cmder) error
  27. onClose func() error // hook called when client is closed
  28. }
  29. func (c *baseClient) init() {
  30. c.process = c.defaultProcess
  31. c.processPipeline = c.defaultProcessPipeline
  32. c.processTxPipeline = c.defaultProcessTxPipeline
  33. }
  34. func (c *baseClient) String() string {
  35. return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB)
  36. }
  37. func (c *baseClient) newConn() (*pool.Conn, error) {
  38. cn, err := c.connPool.NewConn()
  39. if err != nil {
  40. return nil, err
  41. }
  42. if cn.InitedAt.IsZero() {
  43. if err := c.initConn(cn); err != nil {
  44. _ = c.connPool.CloseConn(cn)
  45. return nil, err
  46. }
  47. }
  48. return cn, nil
  49. }
  50. func (c *baseClient) getConn() (*pool.Conn, error) {
  51. if c.limiter != nil {
  52. err := c.limiter.Allow()
  53. if err != nil {
  54. return nil, err
  55. }
  56. }
  57. cn, err := c._getConn()
  58. if err != nil {
  59. if c.limiter != nil {
  60. c.limiter.ReportResult(err)
  61. }
  62. return nil, err
  63. }
  64. return cn, nil
  65. }
  66. func (c *baseClient) _getConn() (*pool.Conn, error) {
  67. cn, err := c.connPool.Get()
  68. if err != nil {
  69. return nil, err
  70. }
  71. if cn.InitedAt.IsZero() {
  72. err := c.initConn(cn)
  73. if err != nil {
  74. c.connPool.Remove(cn)
  75. return nil, err
  76. }
  77. }
  78. return cn, nil
  79. }
  80. func (c *baseClient) releaseConn(cn *pool.Conn, err error) {
  81. if c.limiter != nil {
  82. c.limiter.ReportResult(err)
  83. }
  84. if internal.IsBadConn(err, false) {
  85. c.connPool.Remove(cn)
  86. } else {
  87. c.connPool.Put(cn)
  88. }
  89. }
  90. func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) {
  91. if c.limiter != nil {
  92. c.limiter.ReportResult(err)
  93. }
  94. if err == nil || internal.IsRedisError(err) {
  95. c.connPool.Put(cn)
  96. } else {
  97. c.connPool.Remove(cn)
  98. }
  99. }
  100. func (c *baseClient) initConn(cn *pool.Conn) error {
  101. cn.InitedAt = time.Now()
  102. if c.opt.Password == "" &&
  103. c.opt.DB == 0 &&
  104. !c.opt.readOnly &&
  105. c.opt.OnConnect == nil {
  106. return nil
  107. }
  108. conn := newConn(c.opt, cn)
  109. _, err := conn.Pipelined(func(pipe Pipeliner) error {
  110. if c.opt.Password != "" {
  111. pipe.Auth(c.opt.Password)
  112. }
  113. if c.opt.DB > 0 {
  114. pipe.Select(c.opt.DB)
  115. }
  116. if c.opt.readOnly {
  117. pipe.ReadOnly()
  118. }
  119. return nil
  120. })
  121. if err != nil {
  122. return err
  123. }
  124. if c.opt.OnConnect != nil {
  125. return c.opt.OnConnect(conn)
  126. }
  127. return nil
  128. }
  129. // Do creates a Cmd from the args and processes the cmd.
  130. func (c *baseClient) Do(args ...interface{}) *Cmd {
  131. cmd := NewCmd(args...)
  132. _ = c.Process(cmd)
  133. return cmd
  134. }
  135. // WrapProcess wraps function that processes Redis commands.
  136. func (c *baseClient) WrapProcess(
  137. fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error,
  138. ) {
  139. c.process = fn(c.process)
  140. }
  141. func (c *baseClient) Process(cmd Cmder) error {
  142. return c.process(cmd)
  143. }
  144. func (c *baseClient) defaultProcess(cmd Cmder) error {
  145. for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
  146. if attempt > 0 {
  147. time.Sleep(c.retryBackoff(attempt))
  148. }
  149. cn, err := c.getConn()
  150. if err != nil {
  151. cmd.setErr(err)
  152. if internal.IsRetryableError(err, true) {
  153. continue
  154. }
  155. return err
  156. }
  157. err = cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
  158. return writeCmd(wr, cmd)
  159. })
  160. if err != nil {
  161. c.releaseConn(cn, err)
  162. cmd.setErr(err)
  163. if internal.IsRetryableError(err, true) {
  164. continue
  165. }
  166. return err
  167. }
  168. err = cn.WithReader(c.cmdTimeout(cmd), func(rd *proto.Reader) error {
  169. return cmd.readReply(rd)
  170. })
  171. c.releaseConn(cn, err)
  172. if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) {
  173. continue
  174. }
  175. return err
  176. }
  177. return cmd.Err()
  178. }
  179. func (c *baseClient) retryBackoff(attempt int) time.Duration {
  180. return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
  181. }
  182. func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
  183. if timeout := cmd.readTimeout(); timeout != nil {
  184. t := *timeout
  185. if t == 0 {
  186. return 0
  187. }
  188. return t + 10*time.Second
  189. }
  190. return c.opt.ReadTimeout
  191. }
  192. // Close closes the client, releasing any open resources.
  193. //
  194. // It is rare to Close a Client, as the Client is meant to be
  195. // long-lived and shared between many goroutines.
  196. func (c *baseClient) Close() error {
  197. var firstErr error
  198. if c.onClose != nil {
  199. if err := c.onClose(); err != nil && firstErr == nil {
  200. firstErr = err
  201. }
  202. }
  203. if err := c.connPool.Close(); err != nil && firstErr == nil {
  204. firstErr = err
  205. }
  206. return firstErr
  207. }
  208. func (c *baseClient) getAddr() string {
  209. return c.opt.Addr
  210. }
  211. func (c *baseClient) WrapProcessPipeline(
  212. fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
  213. ) {
  214. c.processPipeline = fn(c.processPipeline)
  215. c.processTxPipeline = fn(c.processTxPipeline)
  216. }
  217. func (c *baseClient) defaultProcessPipeline(cmds []Cmder) error {
  218. return c.generalProcessPipeline(cmds, c.pipelineProcessCmds)
  219. }
  220. func (c *baseClient) defaultProcessTxPipeline(cmds []Cmder) error {
  221. return c.generalProcessPipeline(cmds, c.txPipelineProcessCmds)
  222. }
  223. type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error)
  224. func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) error {
  225. for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
  226. if attempt > 0 {
  227. time.Sleep(c.retryBackoff(attempt))
  228. }
  229. cn, err := c.getConn()
  230. if err != nil {
  231. setCmdsErr(cmds, err)
  232. return err
  233. }
  234. canRetry, err := p(cn, cmds)
  235. c.releaseConnStrict(cn, err)
  236. if !canRetry || !internal.IsRetryableError(err, true) {
  237. break
  238. }
  239. }
  240. return cmdsFirstErr(cmds)
  241. }
  242. func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
  243. err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
  244. return writeCmd(wr, cmds...)
  245. })
  246. if err != nil {
  247. setCmdsErr(cmds, err)
  248. return true, err
  249. }
  250. err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
  251. return pipelineReadCmds(rd, cmds)
  252. })
  253. return true, err
  254. }
  255. func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
  256. for _, cmd := range cmds {
  257. err := cmd.readReply(rd)
  258. if err != nil && !internal.IsRedisError(err) {
  259. return err
  260. }
  261. }
  262. return nil
  263. }
  264. func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
  265. err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
  266. return txPipelineWriteMulti(wr, cmds)
  267. })
  268. if err != nil {
  269. setCmdsErr(cmds, err)
  270. return true, err
  271. }
  272. err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
  273. err := txPipelineReadQueued(rd, cmds)
  274. if err != nil {
  275. setCmdsErr(cmds, err)
  276. return err
  277. }
  278. return pipelineReadCmds(rd, cmds)
  279. })
  280. return false, err
  281. }
  282. func txPipelineWriteMulti(wr *proto.Writer, cmds []Cmder) error {
  283. multiExec := make([]Cmder, 0, len(cmds)+2)
  284. multiExec = append(multiExec, NewStatusCmd("MULTI"))
  285. multiExec = append(multiExec, cmds...)
  286. multiExec = append(multiExec, NewSliceCmd("EXEC"))
  287. return writeCmd(wr, multiExec...)
  288. }
  289. func txPipelineReadQueued(rd *proto.Reader, cmds []Cmder) error {
  290. // Parse queued replies.
  291. var statusCmd StatusCmd
  292. err := statusCmd.readReply(rd)
  293. if err != nil {
  294. return err
  295. }
  296. for range cmds {
  297. err = statusCmd.readReply(rd)
  298. if err != nil && !internal.IsRedisError(err) {
  299. return err
  300. }
  301. }
  302. // Parse number of replies.
  303. line, err := rd.ReadLine()
  304. if err != nil {
  305. if err == Nil {
  306. err = TxFailedErr
  307. }
  308. return err
  309. }
  310. switch line[0] {
  311. case proto.ErrorReply:
  312. return proto.ParseErrorReply(line)
  313. case proto.ArrayReply:
  314. // ok
  315. default:
  316. err := fmt.Errorf("redis: expected '*', but got line %q", line)
  317. return err
  318. }
  319. return nil
  320. }
  321. //------------------------------------------------------------------------------
  322. // Client is a Redis client representing a pool of zero or more
  323. // underlying connections. It's safe for concurrent use by multiple
  324. // goroutines.
  325. type Client struct {
  326. baseClient
  327. cmdable
  328. ctx context.Context
  329. }
  330. // NewClient returns a client to the Redis Server specified by Options.
  331. func NewClient(opt *Options) *Client {
  332. opt.init()
  333. c := Client{
  334. baseClient: baseClient{
  335. opt: opt,
  336. connPool: newConnPool(opt),
  337. },
  338. }
  339. c.baseClient.init()
  340. c.init()
  341. return &c
  342. }
  343. func (c *Client) init() {
  344. c.cmdable.setProcessor(c.Process)
  345. }
  346. func (c *Client) Context() context.Context {
  347. if c.ctx != nil {
  348. return c.ctx
  349. }
  350. return context.Background()
  351. }
  352. func (c *Client) WithContext(ctx context.Context) *Client {
  353. if ctx == nil {
  354. panic("nil context")
  355. }
  356. c2 := c.clone()
  357. c2.ctx = ctx
  358. return c2
  359. }
  360. func (c *Client) clone() *Client {
  361. cp := *c
  362. cp.init()
  363. return &cp
  364. }
  365. // Options returns read-only Options that were used to create the client.
  366. func (c *Client) Options() *Options {
  367. return c.opt
  368. }
  369. func (c *Client) SetLimiter(l Limiter) *Client {
  370. c.limiter = l
  371. return c
  372. }
  373. type PoolStats pool.Stats
  374. // PoolStats returns connection pool stats.
  375. func (c *Client) PoolStats() *PoolStats {
  376. stats := c.connPool.Stats()
  377. return (*PoolStats)(stats)
  378. }
  379. func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  380. return c.Pipeline().Pipelined(fn)
  381. }
  382. func (c *Client) Pipeline() Pipeliner {
  383. pipe := Pipeline{
  384. exec: c.processPipeline,
  385. }
  386. pipe.statefulCmdable.setProcessor(pipe.Process)
  387. return &pipe
  388. }
  389. func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  390. return c.TxPipeline().Pipelined(fn)
  391. }
  392. // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
  393. func (c *Client) TxPipeline() Pipeliner {
  394. pipe := Pipeline{
  395. exec: c.processTxPipeline,
  396. }
  397. pipe.statefulCmdable.setProcessor(pipe.Process)
  398. return &pipe
  399. }
  400. func (c *Client) pubSub() *PubSub {
  401. pubsub := &PubSub{
  402. opt: c.opt,
  403. newConn: func(channels []string) (*pool.Conn, error) {
  404. return c.newConn()
  405. },
  406. closeConn: c.connPool.CloseConn,
  407. }
  408. pubsub.init()
  409. return pubsub
  410. }
  411. // Subscribe subscribes the client to the specified channels.
  412. // Channels can be omitted to create empty subscription.
  413. // Note that this method does not wait on a response from Redis, so the
  414. // subscription may not be active immediately. To force the connection to wait,
  415. // you may call the Receive() method on the returned *PubSub like so:
  416. //
  417. // sub := client.Subscribe(queryResp)
  418. // iface, err := sub.Receive()
  419. // if err != nil {
  420. // // handle error
  421. // }
  422. //
  423. // // Should be *Subscription, but others are possible if other actions have been
  424. // // taken on sub since it was created.
  425. // switch iface.(type) {
  426. // case *Subscription:
  427. // // subscribe succeeded
  428. // case *Message:
  429. // // received first message
  430. // case *Pong:
  431. // // pong received
  432. // default:
  433. // // handle error
  434. // }
  435. //
  436. // ch := sub.Channel()
  437. func (c *Client) Subscribe(channels ...string) *PubSub {
  438. pubsub := c.pubSub()
  439. if len(channels) > 0 {
  440. _ = pubsub.Subscribe(channels...)
  441. }
  442. return pubsub
  443. }
  444. // PSubscribe subscribes the client to the given patterns.
  445. // Patterns can be omitted to create empty subscription.
  446. func (c *Client) PSubscribe(channels ...string) *PubSub {
  447. pubsub := c.pubSub()
  448. if len(channels) > 0 {
  449. _ = pubsub.PSubscribe(channels...)
  450. }
  451. return pubsub
  452. }
  453. //------------------------------------------------------------------------------
  454. // Conn is like Client, but its pool contains single connection.
  455. type Conn struct {
  456. baseClient
  457. statefulCmdable
  458. }
  459. func newConn(opt *Options, cn *pool.Conn) *Conn {
  460. c := Conn{
  461. baseClient: baseClient{
  462. opt: opt,
  463. connPool: pool.NewSingleConnPool(cn),
  464. },
  465. }
  466. c.baseClient.init()
  467. c.statefulCmdable.setProcessor(c.Process)
  468. return &c
  469. }
  470. func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  471. return c.Pipeline().Pipelined(fn)
  472. }
  473. func (c *Conn) Pipeline() Pipeliner {
  474. pipe := Pipeline{
  475. exec: c.processPipeline,
  476. }
  477. pipe.statefulCmdable.setProcessor(pipe.Process)
  478. return &pipe
  479. }
  480. func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  481. return c.TxPipeline().Pipelined(fn)
  482. }
  483. // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
  484. func (c *Conn) TxPipeline() Pipeliner {
  485. pipe := Pipeline{
  486. exec: c.processTxPipeline,
  487. }
  488. pipe.statefulCmdable.setProcessor(pipe.Process)
  489. return &pipe
  490. }