123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570 |
- package pq
- import (
- "errors"
- "fmt"
- "io"
- "os"
- "runtime"
- "sync"
- "testing"
- "time"
- )
- var errNilNotification = errors.New("nil notification")
- func expectNotification(t *testing.T, ch <-chan *Notification, relname string, extra string) error {
- select {
- case n := <-ch:
- if n == nil {
- return errNilNotification
- }
- if n.Channel != relname || n.Extra != extra {
- return fmt.Errorf("unexpected notification %v", n)
- }
- return nil
- case <-time.After(1500 * time.Millisecond):
- return fmt.Errorf("timeout")
- }
- }
- func expectNoNotification(t *testing.T, ch <-chan *Notification) error {
- select {
- case n := <-ch:
- return fmt.Errorf("unexpected notification %v", n)
- case <-time.After(100 * time.Millisecond):
- return nil
- }
- }
- func expectEvent(t *testing.T, eventch <-chan ListenerEventType, et ListenerEventType) error {
- select {
- case e := <-eventch:
- if e != et {
- return fmt.Errorf("unexpected event %v", e)
- }
- return nil
- case <-time.After(1500 * time.Millisecond):
- panic("expectEvent timeout")
- }
- }
- func expectNoEvent(t *testing.T, eventch <-chan ListenerEventType) error {
- select {
- case e := <-eventch:
- return fmt.Errorf("unexpected event %v", e)
- case <-time.After(100 * time.Millisecond):
- return nil
- }
- }
- func newTestListenerConn(t *testing.T) (*ListenerConn, <-chan *Notification) {
- datname := os.Getenv("PGDATABASE")
- sslmode := os.Getenv("PGSSLMODE")
- if datname == "" {
- os.Setenv("PGDATABASE", "pqgotest")
- }
- if sslmode == "" {
- os.Setenv("PGSSLMODE", "disable")
- }
- notificationChan := make(chan *Notification)
- l, err := NewListenerConn("", notificationChan)
- if err != nil {
- t.Fatal(err)
- }
- return l, notificationChan
- }
- func TestNewListenerConn(t *testing.T) {
- l, _ := newTestListenerConn(t)
- defer l.Close()
- }
- func TestConnListen(t *testing.T) {
- l, channel := newTestListenerConn(t)
- defer l.Close()
- db := openTestConn(t)
- defer db.Close()
- ok, err := l.Listen("notify_test")
- if !ok || err != nil {
- t.Fatal(err)
- }
- _, err = db.Exec("NOTIFY notify_test")
- if err != nil {
- t.Fatal(err)
- }
- err = expectNotification(t, channel, "notify_test", "")
- if err != nil {
- t.Fatal(err)
- }
- }
- func TestConnUnlisten(t *testing.T) {
- l, channel := newTestListenerConn(t)
- defer l.Close()
- db := openTestConn(t)
- defer db.Close()
- ok, err := l.Listen("notify_test")
- if !ok || err != nil {
- t.Fatal(err)
- }
- _, err = db.Exec("NOTIFY notify_test")
- if err != nil {
- t.Fatal(err)
- }
- err = expectNotification(t, channel, "notify_test", "")
- if err != nil {
- t.Fatal(err)
- }
- ok, err = l.Unlisten("notify_test")
- if !ok || err != nil {
- t.Fatal(err)
- }
- _, err = db.Exec("NOTIFY notify_test")
- if err != nil {
- t.Fatal(err)
- }
- err = expectNoNotification(t, channel)
- if err != nil {
- t.Fatal(err)
- }
- }
- func TestConnUnlistenAll(t *testing.T) {
- l, channel := newTestListenerConn(t)
- defer l.Close()
- db := openTestConn(t)
- defer db.Close()
- ok, err := l.Listen("notify_test")
- if !ok || err != nil {
- t.Fatal(err)
- }
- _, err = db.Exec("NOTIFY notify_test")
- if err != nil {
- t.Fatal(err)
- }
- err = expectNotification(t, channel, "notify_test", "")
- if err != nil {
- t.Fatal(err)
- }
- ok, err = l.UnlistenAll()
- if !ok || err != nil {
- t.Fatal(err)
- }
- _, err = db.Exec("NOTIFY notify_test")
- if err != nil {
- t.Fatal(err)
- }
- err = expectNoNotification(t, channel)
- if err != nil {
- t.Fatal(err)
- }
- }
- func TestConnClose(t *testing.T) {
- l, _ := newTestListenerConn(t)
- defer l.Close()
- err := l.Close()
- if err != nil {
- t.Fatal(err)
- }
- err = l.Close()
- if err != errListenerConnClosed {
- t.Fatalf("expected errListenerConnClosed; got %v", err)
- }
- }
- func TestConnPing(t *testing.T) {
- l, _ := newTestListenerConn(t)
- defer l.Close()
- err := l.Ping()
- if err != nil {
- t.Fatal(err)
- }
- err = l.Close()
- if err != nil {
- t.Fatal(err)
- }
- err = l.Ping()
- if err != errListenerConnClosed {
- t.Fatalf("expected errListenerConnClosed; got %v", err)
- }
- }
- // Test for deadlock where a query fails while another one is queued
- func TestConnExecDeadlock(t *testing.T) {
- l, _ := newTestListenerConn(t)
- defer l.Close()
- var wg sync.WaitGroup
- wg.Add(2)
- go func() {
- l.ExecSimpleQuery("SELECT pg_sleep(60)")
- wg.Done()
- }()
- runtime.Gosched()
- go func() {
- l.ExecSimpleQuery("SELECT 1")
- wg.Done()
- }()
- // give the two goroutines some time to get into position
- runtime.Gosched()
- // calls Close on the net.Conn; equivalent to a network failure
- l.Close()
- defer time.AfterFunc(10*time.Second, func() {
- panic("timed out")
- }).Stop()
- wg.Wait()
- }
- // Test for ListenerConn being closed while a slow query is executing
- func TestListenerConnCloseWhileQueryIsExecuting(t *testing.T) {
- l, _ := newTestListenerConn(t)
- defer l.Close()
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- sent, err := l.ExecSimpleQuery("SELECT pg_sleep(60)")
- if sent {
- panic("expected sent=false")
- }
- // could be any of a number of errors
- if err == nil {
- panic("expected error")
- }
- wg.Done()
- }()
- // give the above goroutine some time to get into position
- runtime.Gosched()
- err := l.Close()
- if err != nil {
- t.Fatal(err)
- }
- defer time.AfterFunc(10*time.Second, func() {
- panic("timed out")
- }).Stop()
- wg.Wait()
- }
- func TestNotifyExtra(t *testing.T) {
- db := openTestConn(t)
- defer db.Close()
- if getServerVersion(t, db) < 90000 {
- t.Skip("skipping NOTIFY payload test since the server does not appear to support it")
- }
- l, channel := newTestListenerConn(t)
- defer l.Close()
- ok, err := l.Listen("notify_test")
- if !ok || err != nil {
- t.Fatal(err)
- }
- _, err = db.Exec("NOTIFY notify_test, 'something'")
- if err != nil {
- t.Fatal(err)
- }
- err = expectNotification(t, channel, "notify_test", "something")
- if err != nil {
- t.Fatal(err)
- }
- }
- // create a new test listener and also set the timeouts
- func newTestListenerTimeout(t *testing.T, min time.Duration, max time.Duration) (*Listener, <-chan ListenerEventType) {
- datname := os.Getenv("PGDATABASE")
- sslmode := os.Getenv("PGSSLMODE")
- if datname == "" {
- os.Setenv("PGDATABASE", "pqgotest")
- }
- if sslmode == "" {
- os.Setenv("PGSSLMODE", "disable")
- }
- eventch := make(chan ListenerEventType, 16)
- l := NewListener("", min, max, func(t ListenerEventType, err error) { eventch <- t })
- err := expectEvent(t, eventch, ListenerEventConnected)
- if err != nil {
- t.Fatal(err)
- }
- return l, eventch
- }
- func newTestListener(t *testing.T) (*Listener, <-chan ListenerEventType) {
- return newTestListenerTimeout(t, time.Hour, time.Hour)
- }
- func TestListenerListen(t *testing.T) {
- l, _ := newTestListener(t)
- defer l.Close()
- db := openTestConn(t)
- defer db.Close()
- err := l.Listen("notify_listen_test")
- if err != nil {
- t.Fatal(err)
- }
- _, err = db.Exec("NOTIFY notify_listen_test")
- if err != nil {
- t.Fatal(err)
- }
- err = expectNotification(t, l.Notify, "notify_listen_test", "")
- if err != nil {
- t.Fatal(err)
- }
- }
- func TestListenerUnlisten(t *testing.T) {
- l, _ := newTestListener(t)
- defer l.Close()
- db := openTestConn(t)
- defer db.Close()
- err := l.Listen("notify_listen_test")
- if err != nil {
- t.Fatal(err)
- }
- _, err = db.Exec("NOTIFY notify_listen_test")
- if err != nil {
- t.Fatal(err)
- }
- err = l.Unlisten("notify_listen_test")
- if err != nil {
- t.Fatal(err)
- }
- err = expectNotification(t, l.Notify, "notify_listen_test", "")
- if err != nil {
- t.Fatal(err)
- }
- _, err = db.Exec("NOTIFY notify_listen_test")
- if err != nil {
- t.Fatal(err)
- }
- err = expectNoNotification(t, l.Notify)
- if err != nil {
- t.Fatal(err)
- }
- }
- func TestListenerUnlistenAll(t *testing.T) {
- l, _ := newTestListener(t)
- defer l.Close()
- db := openTestConn(t)
- defer db.Close()
- err := l.Listen("notify_listen_test")
- if err != nil {
- t.Fatal(err)
- }
- _, err = db.Exec("NOTIFY notify_listen_test")
- if err != nil {
- t.Fatal(err)
- }
- err = l.UnlistenAll()
- if err != nil {
- t.Fatal(err)
- }
- err = expectNotification(t, l.Notify, "notify_listen_test", "")
- if err != nil {
- t.Fatal(err)
- }
- _, err = db.Exec("NOTIFY notify_listen_test")
- if err != nil {
- t.Fatal(err)
- }
- err = expectNoNotification(t, l.Notify)
- if err != nil {
- t.Fatal(err)
- }
- }
- func TestListenerFailedQuery(t *testing.T) {
- l, eventch := newTestListener(t)
- defer l.Close()
- db := openTestConn(t)
- defer db.Close()
- err := l.Listen("notify_listen_test")
- if err != nil {
- t.Fatal(err)
- }
- _, err = db.Exec("NOTIFY notify_listen_test")
- if err != nil {
- t.Fatal(err)
- }
- err = expectNotification(t, l.Notify, "notify_listen_test", "")
- if err != nil {
- t.Fatal(err)
- }
- // shouldn't cause a disconnect
- ok, err := l.cn.ExecSimpleQuery("SELECT error")
- if !ok {
- t.Fatalf("could not send query to server: %v", err)
- }
- _, ok = err.(PGError)
- if !ok {
- t.Fatalf("unexpected error %v", err)
- }
- err = expectNoEvent(t, eventch)
- if err != nil {
- t.Fatal(err)
- }
- // should still work
- _, err = db.Exec("NOTIFY notify_listen_test")
- if err != nil {
- t.Fatal(err)
- }
- err = expectNotification(t, l.Notify, "notify_listen_test", "")
- if err != nil {
- t.Fatal(err)
- }
- }
- func TestListenerReconnect(t *testing.T) {
- l, eventch := newTestListenerTimeout(t, 20*time.Millisecond, time.Hour)
- defer l.Close()
- db := openTestConn(t)
- defer db.Close()
- err := l.Listen("notify_listen_test")
- if err != nil {
- t.Fatal(err)
- }
- _, err = db.Exec("NOTIFY notify_listen_test")
- if err != nil {
- t.Fatal(err)
- }
- err = expectNotification(t, l.Notify, "notify_listen_test", "")
- if err != nil {
- t.Fatal(err)
- }
- // kill the connection and make sure it comes back up
- ok, err := l.cn.ExecSimpleQuery("SELECT pg_terminate_backend(pg_backend_pid())")
- if ok {
- t.Fatalf("could not kill the connection: %v", err)
- }
- if err != io.EOF {
- t.Fatalf("unexpected error %v", err)
- }
- err = expectEvent(t, eventch, ListenerEventDisconnected)
- if err != nil {
- t.Fatal(err)
- }
- err = expectEvent(t, eventch, ListenerEventReconnected)
- if err != nil {
- t.Fatal(err)
- }
- // should still work
- _, err = db.Exec("NOTIFY notify_listen_test")
- if err != nil {
- t.Fatal(err)
- }
- // should get nil after Reconnected
- err = expectNotification(t, l.Notify, "", "")
- if err != errNilNotification {
- t.Fatal(err)
- }
- err = expectNotification(t, l.Notify, "notify_listen_test", "")
- if err != nil {
- t.Fatal(err)
- }
- }
- func TestListenerClose(t *testing.T) {
- l, _ := newTestListenerTimeout(t, 20*time.Millisecond, time.Hour)
- defer l.Close()
- err := l.Close()
- if err != nil {
- t.Fatal(err)
- }
- err = l.Close()
- if err != errListenerClosed {
- t.Fatalf("expected errListenerClosed; got %v", err)
- }
- }
- func TestListenerPing(t *testing.T) {
- l, _ := newTestListenerTimeout(t, 20*time.Millisecond, time.Hour)
- defer l.Close()
- err := l.Ping()
- if err != nil {
- t.Fatal(err)
- }
- err = l.Close()
- if err != nil {
- t.Fatal(err)
- }
- err = l.Ping()
- if err != errListenerClosed {
- t.Fatalf("expected errListenerClosed; got %v", err)
- }
- }
|