notify_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570
  1. package pq
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "os"
  7. "runtime"
  8. "sync"
  9. "testing"
  10. "time"
  11. )
  12. var errNilNotification = errors.New("nil notification")
  13. func expectNotification(t *testing.T, ch <-chan *Notification, relname string, extra string) error {
  14. select {
  15. case n := <-ch:
  16. if n == nil {
  17. return errNilNotification
  18. }
  19. if n.Channel != relname || n.Extra != extra {
  20. return fmt.Errorf("unexpected notification %v", n)
  21. }
  22. return nil
  23. case <-time.After(1500 * time.Millisecond):
  24. return fmt.Errorf("timeout")
  25. }
  26. }
  27. func expectNoNotification(t *testing.T, ch <-chan *Notification) error {
  28. select {
  29. case n := <-ch:
  30. return fmt.Errorf("unexpected notification %v", n)
  31. case <-time.After(100 * time.Millisecond):
  32. return nil
  33. }
  34. }
  35. func expectEvent(t *testing.T, eventch <-chan ListenerEventType, et ListenerEventType) error {
  36. select {
  37. case e := <-eventch:
  38. if e != et {
  39. return fmt.Errorf("unexpected event %v", e)
  40. }
  41. return nil
  42. case <-time.After(1500 * time.Millisecond):
  43. panic("expectEvent timeout")
  44. }
  45. }
  46. func expectNoEvent(t *testing.T, eventch <-chan ListenerEventType) error {
  47. select {
  48. case e := <-eventch:
  49. return fmt.Errorf("unexpected event %v", e)
  50. case <-time.After(100 * time.Millisecond):
  51. return nil
  52. }
  53. }
  54. func newTestListenerConn(t *testing.T) (*ListenerConn, <-chan *Notification) {
  55. datname := os.Getenv("PGDATABASE")
  56. sslmode := os.Getenv("PGSSLMODE")
  57. if datname == "" {
  58. os.Setenv("PGDATABASE", "pqgotest")
  59. }
  60. if sslmode == "" {
  61. os.Setenv("PGSSLMODE", "disable")
  62. }
  63. notificationChan := make(chan *Notification)
  64. l, err := NewListenerConn("", notificationChan)
  65. if err != nil {
  66. t.Fatal(err)
  67. }
  68. return l, notificationChan
  69. }
  70. func TestNewListenerConn(t *testing.T) {
  71. l, _ := newTestListenerConn(t)
  72. defer l.Close()
  73. }
  74. func TestConnListen(t *testing.T) {
  75. l, channel := newTestListenerConn(t)
  76. defer l.Close()
  77. db := openTestConn(t)
  78. defer db.Close()
  79. ok, err := l.Listen("notify_test")
  80. if !ok || err != nil {
  81. t.Fatal(err)
  82. }
  83. _, err = db.Exec("NOTIFY notify_test")
  84. if err != nil {
  85. t.Fatal(err)
  86. }
  87. err = expectNotification(t, channel, "notify_test", "")
  88. if err != nil {
  89. t.Fatal(err)
  90. }
  91. }
  92. func TestConnUnlisten(t *testing.T) {
  93. l, channel := newTestListenerConn(t)
  94. defer l.Close()
  95. db := openTestConn(t)
  96. defer db.Close()
  97. ok, err := l.Listen("notify_test")
  98. if !ok || err != nil {
  99. t.Fatal(err)
  100. }
  101. _, err = db.Exec("NOTIFY notify_test")
  102. if err != nil {
  103. t.Fatal(err)
  104. }
  105. err = expectNotification(t, channel, "notify_test", "")
  106. if err != nil {
  107. t.Fatal(err)
  108. }
  109. ok, err = l.Unlisten("notify_test")
  110. if !ok || err != nil {
  111. t.Fatal(err)
  112. }
  113. _, err = db.Exec("NOTIFY notify_test")
  114. if err != nil {
  115. t.Fatal(err)
  116. }
  117. err = expectNoNotification(t, channel)
  118. if err != nil {
  119. t.Fatal(err)
  120. }
  121. }
  122. func TestConnUnlistenAll(t *testing.T) {
  123. l, channel := newTestListenerConn(t)
  124. defer l.Close()
  125. db := openTestConn(t)
  126. defer db.Close()
  127. ok, err := l.Listen("notify_test")
  128. if !ok || err != nil {
  129. t.Fatal(err)
  130. }
  131. _, err = db.Exec("NOTIFY notify_test")
  132. if err != nil {
  133. t.Fatal(err)
  134. }
  135. err = expectNotification(t, channel, "notify_test", "")
  136. if err != nil {
  137. t.Fatal(err)
  138. }
  139. ok, err = l.UnlistenAll()
  140. if !ok || err != nil {
  141. t.Fatal(err)
  142. }
  143. _, err = db.Exec("NOTIFY notify_test")
  144. if err != nil {
  145. t.Fatal(err)
  146. }
  147. err = expectNoNotification(t, channel)
  148. if err != nil {
  149. t.Fatal(err)
  150. }
  151. }
  152. func TestConnClose(t *testing.T) {
  153. l, _ := newTestListenerConn(t)
  154. defer l.Close()
  155. err := l.Close()
  156. if err != nil {
  157. t.Fatal(err)
  158. }
  159. err = l.Close()
  160. if err != errListenerConnClosed {
  161. t.Fatalf("expected errListenerConnClosed; got %v", err)
  162. }
  163. }
  164. func TestConnPing(t *testing.T) {
  165. l, _ := newTestListenerConn(t)
  166. defer l.Close()
  167. err := l.Ping()
  168. if err != nil {
  169. t.Fatal(err)
  170. }
  171. err = l.Close()
  172. if err != nil {
  173. t.Fatal(err)
  174. }
  175. err = l.Ping()
  176. if err != errListenerConnClosed {
  177. t.Fatalf("expected errListenerConnClosed; got %v", err)
  178. }
  179. }
  180. // Test for deadlock where a query fails while another one is queued
  181. func TestConnExecDeadlock(t *testing.T) {
  182. l, _ := newTestListenerConn(t)
  183. defer l.Close()
  184. var wg sync.WaitGroup
  185. wg.Add(2)
  186. go func() {
  187. l.ExecSimpleQuery("SELECT pg_sleep(60)")
  188. wg.Done()
  189. }()
  190. runtime.Gosched()
  191. go func() {
  192. l.ExecSimpleQuery("SELECT 1")
  193. wg.Done()
  194. }()
  195. // give the two goroutines some time to get into position
  196. runtime.Gosched()
  197. // calls Close on the net.Conn; equivalent to a network failure
  198. l.Close()
  199. defer time.AfterFunc(10*time.Second, func() {
  200. panic("timed out")
  201. }).Stop()
  202. wg.Wait()
  203. }
  204. // Test for ListenerConn being closed while a slow query is executing
  205. func TestListenerConnCloseWhileQueryIsExecuting(t *testing.T) {
  206. l, _ := newTestListenerConn(t)
  207. defer l.Close()
  208. var wg sync.WaitGroup
  209. wg.Add(1)
  210. go func() {
  211. sent, err := l.ExecSimpleQuery("SELECT pg_sleep(60)")
  212. if sent {
  213. panic("expected sent=false")
  214. }
  215. // could be any of a number of errors
  216. if err == nil {
  217. panic("expected error")
  218. }
  219. wg.Done()
  220. }()
  221. // give the above goroutine some time to get into position
  222. runtime.Gosched()
  223. err := l.Close()
  224. if err != nil {
  225. t.Fatal(err)
  226. }
  227. defer time.AfterFunc(10*time.Second, func() {
  228. panic("timed out")
  229. }).Stop()
  230. wg.Wait()
  231. }
  232. func TestNotifyExtra(t *testing.T) {
  233. db := openTestConn(t)
  234. defer db.Close()
  235. if getServerVersion(t, db) < 90000 {
  236. t.Skip("skipping NOTIFY payload test since the server does not appear to support it")
  237. }
  238. l, channel := newTestListenerConn(t)
  239. defer l.Close()
  240. ok, err := l.Listen("notify_test")
  241. if !ok || err != nil {
  242. t.Fatal(err)
  243. }
  244. _, err = db.Exec("NOTIFY notify_test, 'something'")
  245. if err != nil {
  246. t.Fatal(err)
  247. }
  248. err = expectNotification(t, channel, "notify_test", "something")
  249. if err != nil {
  250. t.Fatal(err)
  251. }
  252. }
  253. // create a new test listener and also set the timeouts
  254. func newTestListenerTimeout(t *testing.T, min time.Duration, max time.Duration) (*Listener, <-chan ListenerEventType) {
  255. datname := os.Getenv("PGDATABASE")
  256. sslmode := os.Getenv("PGSSLMODE")
  257. if datname == "" {
  258. os.Setenv("PGDATABASE", "pqgotest")
  259. }
  260. if sslmode == "" {
  261. os.Setenv("PGSSLMODE", "disable")
  262. }
  263. eventch := make(chan ListenerEventType, 16)
  264. l := NewListener("", min, max, func(t ListenerEventType, err error) { eventch <- t })
  265. err := expectEvent(t, eventch, ListenerEventConnected)
  266. if err != nil {
  267. t.Fatal(err)
  268. }
  269. return l, eventch
  270. }
  271. func newTestListener(t *testing.T) (*Listener, <-chan ListenerEventType) {
  272. return newTestListenerTimeout(t, time.Hour, time.Hour)
  273. }
  274. func TestListenerListen(t *testing.T) {
  275. l, _ := newTestListener(t)
  276. defer l.Close()
  277. db := openTestConn(t)
  278. defer db.Close()
  279. err := l.Listen("notify_listen_test")
  280. if err != nil {
  281. t.Fatal(err)
  282. }
  283. _, err = db.Exec("NOTIFY notify_listen_test")
  284. if err != nil {
  285. t.Fatal(err)
  286. }
  287. err = expectNotification(t, l.Notify, "notify_listen_test", "")
  288. if err != nil {
  289. t.Fatal(err)
  290. }
  291. }
  292. func TestListenerUnlisten(t *testing.T) {
  293. l, _ := newTestListener(t)
  294. defer l.Close()
  295. db := openTestConn(t)
  296. defer db.Close()
  297. err := l.Listen("notify_listen_test")
  298. if err != nil {
  299. t.Fatal(err)
  300. }
  301. _, err = db.Exec("NOTIFY notify_listen_test")
  302. if err != nil {
  303. t.Fatal(err)
  304. }
  305. err = l.Unlisten("notify_listen_test")
  306. if err != nil {
  307. t.Fatal(err)
  308. }
  309. err = expectNotification(t, l.Notify, "notify_listen_test", "")
  310. if err != nil {
  311. t.Fatal(err)
  312. }
  313. _, err = db.Exec("NOTIFY notify_listen_test")
  314. if err != nil {
  315. t.Fatal(err)
  316. }
  317. err = expectNoNotification(t, l.Notify)
  318. if err != nil {
  319. t.Fatal(err)
  320. }
  321. }
  322. func TestListenerUnlistenAll(t *testing.T) {
  323. l, _ := newTestListener(t)
  324. defer l.Close()
  325. db := openTestConn(t)
  326. defer db.Close()
  327. err := l.Listen("notify_listen_test")
  328. if err != nil {
  329. t.Fatal(err)
  330. }
  331. _, err = db.Exec("NOTIFY notify_listen_test")
  332. if err != nil {
  333. t.Fatal(err)
  334. }
  335. err = l.UnlistenAll()
  336. if err != nil {
  337. t.Fatal(err)
  338. }
  339. err = expectNotification(t, l.Notify, "notify_listen_test", "")
  340. if err != nil {
  341. t.Fatal(err)
  342. }
  343. _, err = db.Exec("NOTIFY notify_listen_test")
  344. if err != nil {
  345. t.Fatal(err)
  346. }
  347. err = expectNoNotification(t, l.Notify)
  348. if err != nil {
  349. t.Fatal(err)
  350. }
  351. }
  352. func TestListenerFailedQuery(t *testing.T) {
  353. l, eventch := newTestListener(t)
  354. defer l.Close()
  355. db := openTestConn(t)
  356. defer db.Close()
  357. err := l.Listen("notify_listen_test")
  358. if err != nil {
  359. t.Fatal(err)
  360. }
  361. _, err = db.Exec("NOTIFY notify_listen_test")
  362. if err != nil {
  363. t.Fatal(err)
  364. }
  365. err = expectNotification(t, l.Notify, "notify_listen_test", "")
  366. if err != nil {
  367. t.Fatal(err)
  368. }
  369. // shouldn't cause a disconnect
  370. ok, err := l.cn.ExecSimpleQuery("SELECT error")
  371. if !ok {
  372. t.Fatalf("could not send query to server: %v", err)
  373. }
  374. _, ok = err.(PGError)
  375. if !ok {
  376. t.Fatalf("unexpected error %v", err)
  377. }
  378. err = expectNoEvent(t, eventch)
  379. if err != nil {
  380. t.Fatal(err)
  381. }
  382. // should still work
  383. _, err = db.Exec("NOTIFY notify_listen_test")
  384. if err != nil {
  385. t.Fatal(err)
  386. }
  387. err = expectNotification(t, l.Notify, "notify_listen_test", "")
  388. if err != nil {
  389. t.Fatal(err)
  390. }
  391. }
  392. func TestListenerReconnect(t *testing.T) {
  393. l, eventch := newTestListenerTimeout(t, 20*time.Millisecond, time.Hour)
  394. defer l.Close()
  395. db := openTestConn(t)
  396. defer db.Close()
  397. err := l.Listen("notify_listen_test")
  398. if err != nil {
  399. t.Fatal(err)
  400. }
  401. _, err = db.Exec("NOTIFY notify_listen_test")
  402. if err != nil {
  403. t.Fatal(err)
  404. }
  405. err = expectNotification(t, l.Notify, "notify_listen_test", "")
  406. if err != nil {
  407. t.Fatal(err)
  408. }
  409. // kill the connection and make sure it comes back up
  410. ok, err := l.cn.ExecSimpleQuery("SELECT pg_terminate_backend(pg_backend_pid())")
  411. if ok {
  412. t.Fatalf("could not kill the connection: %v", err)
  413. }
  414. if err != io.EOF {
  415. t.Fatalf("unexpected error %v", err)
  416. }
  417. err = expectEvent(t, eventch, ListenerEventDisconnected)
  418. if err != nil {
  419. t.Fatal(err)
  420. }
  421. err = expectEvent(t, eventch, ListenerEventReconnected)
  422. if err != nil {
  423. t.Fatal(err)
  424. }
  425. // should still work
  426. _, err = db.Exec("NOTIFY notify_listen_test")
  427. if err != nil {
  428. t.Fatal(err)
  429. }
  430. // should get nil after Reconnected
  431. err = expectNotification(t, l.Notify, "", "")
  432. if err != errNilNotification {
  433. t.Fatal(err)
  434. }
  435. err = expectNotification(t, l.Notify, "notify_listen_test", "")
  436. if err != nil {
  437. t.Fatal(err)
  438. }
  439. }
  440. func TestListenerClose(t *testing.T) {
  441. l, _ := newTestListenerTimeout(t, 20*time.Millisecond, time.Hour)
  442. defer l.Close()
  443. err := l.Close()
  444. if err != nil {
  445. t.Fatal(err)
  446. }
  447. err = l.Close()
  448. if err != errListenerClosed {
  449. t.Fatalf("expected errListenerClosed; got %v", err)
  450. }
  451. }
  452. func TestListenerPing(t *testing.T) {
  453. l, _ := newTestListenerTimeout(t, 20*time.Millisecond, time.Hour)
  454. defer l.Close()
  455. err := l.Ping()
  456. if err != nil {
  457. t.Fatal(err)
  458. }
  459. err = l.Close()
  460. if err != nil {
  461. t.Fatal(err)
  462. }
  463. err = l.Ping()
  464. if err != errListenerClosed {
  465. t.Fatalf("expected errListenerClosed; got %v", err)
  466. }
  467. }