...

Source file src/golang.conradwood.net/go-easyops/sql/db.go

Documentation: golang.conradwood.net/go-easyops/sql

     1  /*
     2  Package sql provides safe and managed access to (postgres) databases
     3  */
     4  package sql
     5  
     6  // this package opens and maintains database connections
     7  // to postgres and provide some metrics for us
     8  
     9  import (
    10  	"context"
    11  	"database/sql"
    12  	"flag"
    13  	"fmt"
    14  	"os"
    15  	"strings"
    16  	"sync"
    17  	"time"
    18  
    19  	pq "github.com/lib/pq"
    20  	pb "golang.conradwood.net/apis/goeasyops"
    21  	"golang.conradwood.net/go-easyops/cmdline"
    22  	"golang.conradwood.net/go-easyops/errors"
    23  	pp "golang.conradwood.net/go-easyops/profiling"
    24  	"golang.conradwood.net/go-easyops/prometheus"
    25  	"golang.conradwood.net/go-easyops/utils"
    26  )
    27  
    28  const (
    29  	DEFAULT_MAX_QUERY_MILLIS = 3000
    30  )
    31  
    32  var (
    33  	e_dbhost  = cmdline.ENV("GE_POSTGRES_HOST", "the postgresql hostname to connect to")
    34  	e_dbdb    = cmdline.ENV("GE_POSTGRES_DB", "the postgresql database to connect to")
    35  	e_dbuser  = cmdline.ENV("GE_POSTGRES_USER", "the postgresql user to connect with")
    36  	e_dbpw    = cmdline.ENV("GE_POSTGRES_PW", "the postgresql password to connect with")
    37  	e_dbproto = cmdline.ENV("GE_POSTGRES_PROTO", "the postgresql connection details as base64 encoded proto goeasyops.PostgresConfig")
    38  
    39  	failure_action = flag.String("ge_sql_failure_action", "report", "one of [report|quit|retry], report means to report it to the application (this is the default), quit means to quit the process, retry means to block until the connection is open")
    40  	/* eventually we'll look these up in the datacenter rather than passing
    41  	these as command line parameters.
    42  	this will increase security a little bit (at least obscure it a bit)
    43  	-- Database URL vs Command line parameters: --
    44  	we are not using a DB Url here because the syntax of the url is driver/vendor specific.
    45  	The abstraction into these variables puts the burden of generating a valid url into the code
    46  	rather than requiring the user to know the syntax of the url of the specific driver/version/vendor
    47  	the binary was compiled with.
    48  	*/
    49  	f_dbhost     = flag.String("dbhost", "localhost", "hostname of the postgres database rdbms")
    50  	f_dbdb       = flag.String("dbdb", "", "database to use")
    51  	f_dbuser     = flag.String("dbuser", "root", "username for the database to use")
    52  	f_dbpw       = flag.String("dbpw", "pw", "password for the database to use")
    53  	sqldebug     = flag.Bool("ge_debug_sql", false, "debug sql stuff")
    54  	print_errors = flag.Bool("ge_print_sql_errors", true, "print all sql errors (all failed queries)")
    55  	sqlTotalTime = prometheus.NewCounterVec(
    56  		prometheus.CounterOpts{
    57  			Name: "sql_total_time",
    58  			Help: "V=1 UNIT=durationms DESC=total time spent in a query",
    59  		},
    60  		[]string{"dbhost", "database", "queryname"},
    61  	)
    62  	sqlTotalQueries = prometheus.NewCounterVec(
    63  		prometheus.CounterOpts{
    64  			Name: "sql_queries_executed",
    65  			Help: "V=1 UNIT=ops DESC=total number of sql queries started",
    66  		},
    67  		[]string{"dbhost", "database", "queryname"},
    68  	)
    69  	sqlPerformance = prometheus.NewSummaryVec(
    70  		prometheus.SummaryOpts{
    71  			Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.015, 0.99: 0.001},
    72  			Name:       "sql_query_performance",
    73  			Help:       "V=1 UNIT=durations DESC=timing information for sql performance in seconds",
    74  		},
    75  		[]string{"dbhost", "database", "queryname"},
    76  	)
    77  	sqlFailedQueries = prometheus.NewCounterVec(
    78  		prometheus.CounterOpts{
    79  			Name: "sql_queries_failed",
    80  			Help: "V=1 UNIT=ops DESC=total number of sql queries failed",
    81  		},
    82  		[]string{"dbhost", "database", "queryname"},
    83  	)
    84  	metricsRegistered   = false
    85  	metricsRegisterLock sync.Mutex
    86  	databases           []*DB
    87  	opendblock          sync.Mutex
    88  )
    89  
    90  type DB struct {
    91  	dbcon           *sql.DB
    92  	dbname          string
    93  	dbinfo          string
    94  	dbhost          string // hostname as specified on commandline
    95  	dbshorthost     string // hostname only (no domain)
    96  	MaxQueryTimeout int
    97  	failurectr      *utils.SlidingAverage
    98  	lastReconnect   time.Time
    99  	reconnectLock   sync.RWMutex
   100  }
   101  
   102  func maxConnections() int {
   103  	return 5
   104  }
   105  func maxIdle() int {
   106  	return 4
   107  }
   108  
   109  // make sure we catch missing database configuration issues (fail-fast)
   110  // that is stupid, because it makes it fail even if we include it but not execute it (import with _)
   111  func init() {
   112  	/*
   113  		go func() {
   114  			time.Sleep(time.Duration(3) * time.Second)
   115  			_, err := Open()
   116  			if err != nil {
   117  				fmt.Printf("Application %s error\n", cmdline.SourceCodePath())
   118  			}
   119  			utils.Bail("database error", err)
   120  		}()
   121  	*/
   122  }
   123  
   124  // call this once when you startup and cache the result
   125  // only if there is an error you'll need to retry
   126  func Open() (*DB, error) {
   127  	host := *f_dbhost
   128  	db := *f_dbdb
   129  	user := *f_dbuser
   130  	pw := *f_dbpw
   131  	if host == "" {
   132  		host = e_dbhost.Value()
   133  	}
   134  	if db == "" {
   135  		db = e_dbdb.Value()
   136  	}
   137  	if user == "" {
   138  		user = e_dbuser.Value()
   139  	}
   140  	if pw == "" {
   141  		pw = e_dbpw.Value()
   142  	}
   143  	if e_dbproto.Value() != "" {
   144  		pp := &pb.PostgresConfig{}
   145  		err := utils.Unmarshal(e_dbproto.Value(), pp)
   146  		utils.Bail("invalid configuration in "+e_dbproto.Name(), err)
   147  		if host == "" {
   148  			host = pp.Host
   149  		}
   150  		if db == "" {
   151  			db = pp.DB
   152  		}
   153  		if user == "" {
   154  			user = pp.User
   155  		}
   156  		if pw == "" {
   157  			pw = pp.PW
   158  		}
   159  	}
   160  	return OpenWithInfo(host, db, user, pw)
   161  }
   162  func OpenWithInfo(dbhost, dbdb, dbuser, dbpw string) (*DB, error) {
   163  	var err error
   164  	var now string
   165  	if dbdb == "" {
   166  		utils.PrintStack("database parameters missing")
   167  		return nil, errors.Errorf("Please specify -dbdb flag")
   168  	}
   169  	dbinfo := fmt.Sprintf("host=%s user=%s password=%s dbname=%s sslmode=require", dbhost, dbuser, dbpw, dbdb)
   170  
   171  	// check if we already have an sql object that matches, if so return it
   172  	for _, db := range databases {
   173  		if db.dbinfo == dbinfo {
   174  			return db, nil
   175  		}
   176  	}
   177  
   178  	opendblock.Lock()
   179  	defer opendblock.Unlock()
   180  	// check again, with lock
   181  	for _, db := range databases {
   182  		if db.dbinfo == dbinfo {
   183  			return db, nil
   184  		}
   185  	}
   186  
   187  	if !metricsRegistered {
   188  		metricsRegisterLock.Lock()
   189  		if !metricsRegistered {
   190  			prometheus.MustRegister(sqlTotalTime, sqlPerformance, sqlTotalQueries, sqlFailedQueries, NewPoolSizeCollector())
   191  			metricsRegistered = true
   192  		}
   193  		metricsRegisterLock.Unlock()
   194  	}
   195  
   196  	var dbcon *sql.DB
   197  	for {
   198  		dbcon, err = sql.Open("postgres", dbinfo)
   199  		if err == nil {
   200  			break
   201  		}
   202  		fmt.Printf("[go-easyops] Failed to connect to %s on host \"%s\" as \"%s\"\n", dbdb, dbhost, dbuser)
   203  		if *failure_action == "quit" {
   204  			os.Exit(10)
   205  		} else if *failure_action == "report" {
   206  			return nil, errors.Errorf("failed to open database \"%s\" on host \"%s\" as \"%s\": %w", dbdb, dbhost, dbuser, err)
   207  		} else if *failure_action == "retry" {
   208  			time.Sleep(time.Duration(2) * time.Second)
   209  		} else {
   210  			fmt.Printf("[go-easyops] ge_sql_failure_action must be one of [report|retry|quit], not \"%s\"\n", *failure_action)
   211  			os.Exit(10)
   212  		}
   213  	}
   214  
   215  	dbcon.SetMaxIdleConns(maxIdle())
   216  	dbcon.SetMaxOpenConns(maxConnections()) // max connections per instance by default
   217  	// force at least one connection to initialize
   218  	err = dbcon.QueryRow("SELECT NOW() as now").Scan(&now)
   219  	if err != nil {
   220  		fmt.Printf("[go-easyops] Failed to query db %s: %s\n", dbdb, err)
   221  		return nil, errors.Errorf("failed to open database \"%s\" on host \"%s\" as \"%s\": %w", dbdb, dbhost, dbuser, err)
   222  	}
   223  	if *sqldebug {
   224  		fmt.Printf("[go-easyops] sql now query returned: \"%s\"\n", now)
   225  	}
   226  	names := strings.Split(dbhost, ".")
   227  	dbshort := dbhost
   228  	if len(names) > 0 {
   229  		dbshort = names[0]
   230  	}
   231  	c := &DB{dbcon: dbcon, dbname: dbdb, dbinfo: dbinfo, MaxQueryTimeout: DEFAULT_MAX_QUERY_MILLIS, dbhost: dbhost, dbshorthost: dbshort}
   232  	c.failurectr = utils.NewSlidingAverage()
   233  	c.failurectr.MinSamples = 10
   234  	c.failurectr.MinAge = time.Duration(60) * time.Second
   235  	databases = append(databases, c)
   236  	if len(databases) > 5 {
   237  		fmt.Printf("[go-easyops] WARNING OPENED %d databases\n", len(databases))
   238  		for i, d := range databases {
   239  			fmt.Printf("[go-easyops] Opened database #%d: %s\n", i, d.dbinfo)
   240  		}
   241  		panic("too many databases")
   242  	}
   243  	return c, nil
   244  }
   245  func reopen() {
   246  }
   247  
   248  /*****
   249  // Helpers
   250  /**********/
   251  // returns true if this string is sql safe (no special characters
   252  func IsSQLSafe(txt string) bool {
   253  	return utils.IsOnlyChars(txt, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
   254  }
   255  func (d *DB) GetDatabaseName() string {
   256  	return d.dbname
   257  }
   258  func (d *DB) GetFailureCounter() *utils.SlidingAverage {
   259  	return d.failurectr
   260  }
   261  
   262  func (d *DB) reconnect_if_required() {
   263  	if !must_reconnect(d.GetFailureCounter()) {
   264  		return
   265  	}
   266  	if time.Since(d.lastReconnect) < time.Duration(60)*time.Second {
   267  		// recently reconnected, ignore..
   268  		return
   269  	}
   270  	if *sqldebug {
   271  		sa := d.GetFailureCounter()
   272  		fmt.Printf("[go-easyops] sql counters: 0=%d, 1=%d\n", sa.GetCounter(0), sa.GetCounter(1))
   273  	}
   274  	fmt.Printf("[go-easyops] sql reconnect required.\n")
   275  	d.reconnectLock.Lock()
   276  	defer d.reconnectLock.Unlock()
   277  	d.lastReconnect = time.Now()
   278  	fmt.Printf("[go-easyops] sql reconnecting...\n")
   279  	d.dbcon.Close()
   280  	dc, err := sql.Open("postgres", d.dbinfo)
   281  	if err != nil {
   282  		fmt.Printf("[go-easyops] sql failed to reconnect: %s\n", err)
   283  		return
   284  	}
   285  	d.dbcon = dc
   286  
   287  }
   288  
   289  func must_reconnect(sa *utils.SlidingAverage) bool {
   290  	if sa.GetCounter(0) != 0 {
   291  		// at least one succeeded
   292  		return false
   293  	}
   294  	if sa.GetCounter(1) == 0 {
   295  		// no failures
   296  		return false
   297  	}
   298  	if sa.GetCounter(1) != sa.GetCounts(1) {
   299  		// some failure counts where not "1"?? so we counted a failure as 0 or 2 failures?
   300  		return false
   301  	}
   302  	return true
   303  }
   304  
   305  func query_error(ctx context.Context, typ string, name string, query string, err error) {
   306  	if *sqldebug || *print_errors {
   307  		fmt.Printf("[go-easyops] [sql] %s %s (%s) failed (%s)\n", typ, name, query, err)
   308  	}
   309  	if *sqldebug {
   310  		utils.PrintStack("query failed")
   311  	}
   312  }
   313  
   314  /*****
   315  // wrapping the calls
   316  /**********/
   317  
   318  // "name" will be used to provide timing information as prometheus metric.
   319  func (d *DB) QueryContext(ctx context.Context, name string, query string, args ...interface{}) (*sql.Rows, error) {
   320  	d.reconnect_if_required()
   321  	d.reconnectLock.RLock()
   322  	defer d.reconnectLock.RUnlock()
   323  	pp.SqlEntered()
   324  	defer pp.SqlDone()
   325  	if *sqldebug {
   326  		fmt.Printf("[go-easyops] [sql] Query %s (%v)\n", query, args)
   327  	}
   328  	l := prometheus.Labels{"dbhost": d.dbshorthost, "database": d.dbname, "queryname": name}
   329  	sqlTotalQueries.With(l).Inc()
   330  	started := time.Now()
   331  	r, err := d.dbcon.QueryContext(ctx, query, args...)
   332  	duration := time.Since(started).Seconds()
   333  	sqlPerformance.With(l).Observe(duration)
   334  	// return err if occured, or context-error if such occured
   335  	if err == nil && ctx.Err() != nil {
   336  		err = ctx.Err()
   337  	}
   338  	if err != nil {
   339  		d.failurectr.Add(1, 1)
   340  		query_error(ctx, "select", name, query, err)
   341  		sqlFailedQueries.With(l).Inc()
   342  		err = errors.Wrap(err)
   343  	} else {
   344  		d.failurectr.Add(0, 1)
   345  	}
   346  	sqlTotalTime.With(l).Add(duration)
   347  	pqtime(name, duration)
   348  	return r, err
   349  }
   350  
   351  // "name" will be used to provide timing information as prometheus metric.
   352  func (d *DB) ExecContext(ctx context.Context, name string, query string, args ...interface{}) (sql.Result, error) {
   353  	rep := *sqldebug || *print_errors
   354  	return d.execContext(ctx, rep, name, query, args...)
   355  }
   356  func (d *DB) ExecContextQuiet(ctx context.Context, name string, query string, args ...interface{}) (sql.Result, error) {
   357  	rep := false
   358  	return d.execContext(ctx, rep, name, query, args...)
   359  }
   360  func (d *DB) execContext(ctx context.Context, report_failure bool, name string, query string, args ...interface{}) (sql.Result, error) {
   361  	d.reconnect_if_required()
   362  	d.reconnectLock.RLock()
   363  	defer d.reconnectLock.RUnlock()
   364  	pp.SqlEntered()
   365  	defer pp.SqlDone()
   366  	l := prometheus.Labels{"dbhost": d.dbshorthost, "database": d.dbname, "queryname": name}
   367  	if *sqldebug {
   368  		fmt.Printf("[go-easyops] [sql] Exec %s (%v)\n", query, args)
   369  	}
   370  	sqlTotalQueries.With(l).Inc()
   371  	started := time.Now()
   372  	r, err := d.dbcon.ExecContext(ctx, query, args...)
   373  	duration := time.Since(started).Seconds()
   374  	sqlPerformance.With(l).Observe(duration)
   375  	// return err if occured, or context-error if such occured
   376  	if err == nil && ctx.Err() != nil {
   377  		err = ctx.Err()
   378  	}
   379  	if err != nil {
   380  		d.failurectr.Add(1, 1)
   381  		query_error(ctx, "exec", name, query, err)
   382  		sqlFailedQueries.With(l).Inc()
   383  		err = errors.Wrap(err)
   384  	} else {
   385  		d.failurectr.Add(0, 1)
   386  	}
   387  	sqlTotalTime.With(l).Add(duration)
   388  	pqtime(name, duration)
   389  	return r, err
   390  }
   391  
   392  // discouraged use. QueryRow() does not provide an error on the query, nor do we get a good timing
   393  // value. Use QueryContext() instead.
   394  func (d *DB) QueryRowContext(ctx context.Context, name string, query string, args ...interface{}) *sql.Row {
   395  	d.reconnect_if_required()
   396  	d.reconnectLock.RLock()
   397  	defer d.reconnectLock.RUnlock()
   398  	pp.SqlEntered()
   399  	defer pp.SqlDone()
   400  	if *sqldebug {
   401  		fmt.Printf("[go-easyops] [sql] QueryRow %s\n", query)
   402  	}
   403  	l := prometheus.Labels{"dbhost": d.dbshorthost, "database": d.dbname, "queryname": name}
   404  	sqlTotalQueries.With(l).Inc()
   405  	r := d.dbcon.QueryRowContext(ctx, query, args...)
   406  	return r
   407  }
   408  
   409  func (d *DB) Conn(ctx context.Context) (*sql.Conn, error) {
   410  	return d.dbcon.Conn(ctx)
   411  }
   412  
   413  func (d *DB) CheckDuplicateRowError(err error) bool {
   414  	if err, ok := err.(*pq.Error); ok {
   415  		if err.Code == "23505" {
   416  			return true
   417  		}
   418  	}
   419  
   420  	return false
   421  }
   422  
   423  func pqtime(name string, dur float64) {
   424  	if !*sqldebug {
   425  		return
   426  	}
   427  	fmt.Printf("[go-easyops] Query \"%s\" completed in %0.2f seconds\n", name, dur)
   428  }
   429  

View as plain text