1
4 package sql
5
6
7
8
9 import (
10 "context"
11 "database/sql"
12 "flag"
13 "fmt"
14 "os"
15 "strings"
16 "sync"
17 "time"
18
19 pq "github.com/lib/pq"
20 pb "golang.conradwood.net/apis/goeasyops"
21 "golang.conradwood.net/go-easyops/cmdline"
22 "golang.conradwood.net/go-easyops/errors"
23 pp "golang.conradwood.net/go-easyops/profiling"
24 "golang.conradwood.net/go-easyops/prometheus"
25 "golang.conradwood.net/go-easyops/utils"
26 )
27
28 const (
29 DEFAULT_MAX_QUERY_MILLIS = 3000
30 )
31
32 var (
33 e_dbhost = cmdline.ENV("GE_POSTGRES_HOST", "the postgresql hostname to connect to")
34 e_dbdb = cmdline.ENV("GE_POSTGRES_DB", "the postgresql database to connect to")
35 e_dbuser = cmdline.ENV("GE_POSTGRES_USER", "the postgresql user to connect with")
36 e_dbpw = cmdline.ENV("GE_POSTGRES_PW", "the postgresql password to connect with")
37 e_dbproto = cmdline.ENV("GE_POSTGRES_PROTO", "the postgresql connection details as base64 encoded proto goeasyops.PostgresConfig")
38
39 failure_action = flag.String("ge_sql_failure_action", "report", "one of [report|quit|retry], report means to report it to the application (this is the default), quit means to quit the process, retry means to block until the connection is open")
40
49 f_dbhost = flag.String("dbhost", "localhost", "hostname of the postgres database rdbms")
50 f_dbdb = flag.String("dbdb", "", "database to use")
51 f_dbuser = flag.String("dbuser", "root", "username for the database to use")
52 f_dbpw = flag.String("dbpw", "pw", "password for the database to use")
53 sqldebug = flag.Bool("ge_debug_sql", false, "debug sql stuff")
54 print_errors = flag.Bool("ge_print_sql_errors", true, "print all sql errors (all failed queries)")
55 sqlTotalTime = prometheus.NewCounterVec(
56 prometheus.CounterOpts{
57 Name: "sql_total_time",
58 Help: "V=1 UNIT=durationms DESC=total time spent in a query",
59 },
60 []string{"dbhost", "database", "queryname"},
61 )
62 sqlTotalQueries = prometheus.NewCounterVec(
63 prometheus.CounterOpts{
64 Name: "sql_queries_executed",
65 Help: "V=1 UNIT=ops DESC=total number of sql queries started",
66 },
67 []string{"dbhost", "database", "queryname"},
68 )
69 sqlPerformance = prometheus.NewSummaryVec(
70 prometheus.SummaryOpts{
71 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.015, 0.99: 0.001},
72 Name: "sql_query_performance",
73 Help: "V=1 UNIT=durations DESC=timing information for sql performance in seconds",
74 },
75 []string{"dbhost", "database", "queryname"},
76 )
77 sqlFailedQueries = prometheus.NewCounterVec(
78 prometheus.CounterOpts{
79 Name: "sql_queries_failed",
80 Help: "V=1 UNIT=ops DESC=total number of sql queries failed",
81 },
82 []string{"dbhost", "database", "queryname"},
83 )
84 metricsRegistered = false
85 metricsRegisterLock sync.Mutex
86 databases []*DB
87 opendblock sync.Mutex
88 )
89
90 type DB struct {
91 dbcon *sql.DB
92 dbname string
93 dbinfo string
94 dbhost string
95 dbshorthost string
96 MaxQueryTimeout int
97 failurectr *utils.SlidingAverage
98 lastReconnect time.Time
99 reconnectLock sync.RWMutex
100 }
101
102 func maxConnections() int {
103 return 5
104 }
105 func maxIdle() int {
106 return 4
107 }
108
109
110
111 func init() {
112
122 }
123
124
125
126 func Open() (*DB, error) {
127 host := *f_dbhost
128 db := *f_dbdb
129 user := *f_dbuser
130 pw := *f_dbpw
131 if host == "" {
132 host = e_dbhost.Value()
133 }
134 if db == "" {
135 db = e_dbdb.Value()
136 }
137 if user == "" {
138 user = e_dbuser.Value()
139 }
140 if pw == "" {
141 pw = e_dbpw.Value()
142 }
143 if e_dbproto.Value() != "" {
144 pp := &pb.PostgresConfig{}
145 err := utils.Unmarshal(e_dbproto.Value(), pp)
146 utils.Bail("invalid configuration in "+e_dbproto.Name(), err)
147 if host == "" {
148 host = pp.Host
149 }
150 if db == "" {
151 db = pp.DB
152 }
153 if user == "" {
154 user = pp.User
155 }
156 if pw == "" {
157 pw = pp.PW
158 }
159 }
160 return OpenWithInfo(host, db, user, pw)
161 }
162 func OpenWithInfo(dbhost, dbdb, dbuser, dbpw string) (*DB, error) {
163 var err error
164 var now string
165 if dbdb == "" {
166 utils.PrintStack("database parameters missing")
167 return nil, errors.Errorf("Please specify -dbdb flag")
168 }
169 dbinfo := fmt.Sprintf("host=%s user=%s password=%s dbname=%s sslmode=require", dbhost, dbuser, dbpw, dbdb)
170
171
172 for _, db := range databases {
173 if db.dbinfo == dbinfo {
174 return db, nil
175 }
176 }
177
178 opendblock.Lock()
179 defer opendblock.Unlock()
180
181 for _, db := range databases {
182 if db.dbinfo == dbinfo {
183 return db, nil
184 }
185 }
186
187 if !metricsRegistered {
188 metricsRegisterLock.Lock()
189 if !metricsRegistered {
190 prometheus.MustRegister(sqlTotalTime, sqlPerformance, sqlTotalQueries, sqlFailedQueries, NewPoolSizeCollector())
191 metricsRegistered = true
192 }
193 metricsRegisterLock.Unlock()
194 }
195
196 var dbcon *sql.DB
197 for {
198 dbcon, err = sql.Open("postgres", dbinfo)
199 if err == nil {
200 break
201 }
202 fmt.Printf("[go-easyops] Failed to connect to %s on host \"%s\" as \"%s\"\n", dbdb, dbhost, dbuser)
203 if *failure_action == "quit" {
204 os.Exit(10)
205 } else if *failure_action == "report" {
206 return nil, errors.Errorf("failed to open database \"%s\" on host \"%s\" as \"%s\": %w", dbdb, dbhost, dbuser, err)
207 } else if *failure_action == "retry" {
208 time.Sleep(time.Duration(2) * time.Second)
209 } else {
210 fmt.Printf("[go-easyops] ge_sql_failure_action must be one of [report|retry|quit], not \"%s\"\n", *failure_action)
211 os.Exit(10)
212 }
213 }
214
215 dbcon.SetMaxIdleConns(maxIdle())
216 dbcon.SetMaxOpenConns(maxConnections())
217
218 err = dbcon.QueryRow("SELECT NOW() as now").Scan(&now)
219 if err != nil {
220 fmt.Printf("[go-easyops] Failed to query db %s: %s\n", dbdb, err)
221 return nil, errors.Errorf("failed to open database \"%s\" on host \"%s\" as \"%s\": %w", dbdb, dbhost, dbuser, err)
222 }
223 if *sqldebug {
224 fmt.Printf("[go-easyops] sql now query returned: \"%s\"\n", now)
225 }
226 names := strings.Split(dbhost, ".")
227 dbshort := dbhost
228 if len(names) > 0 {
229 dbshort = names[0]
230 }
231 c := &DB{dbcon: dbcon, dbname: dbdb, dbinfo: dbinfo, MaxQueryTimeout: DEFAULT_MAX_QUERY_MILLIS, dbhost: dbhost, dbshorthost: dbshort}
232 c.failurectr = utils.NewSlidingAverage()
233 c.failurectr.MinSamples = 10
234 c.failurectr.MinAge = time.Duration(60) * time.Second
235 databases = append(databases, c)
236 if len(databases) > 5 {
237 fmt.Printf("[go-easyops] WARNING OPENED %d databases\n", len(databases))
238 for i, d := range databases {
239 fmt.Printf("[go-easyops] Opened database #%d: %s\n", i, d.dbinfo)
240 }
241 panic("too many databases")
242 }
243 return c, nil
244 }
245 func reopen() {
246 }
247
248
251
252 func IsSQLSafe(txt string) bool {
253 return utils.IsOnlyChars(txt, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
254 }
255 func (d *DB) GetDatabaseName() string {
256 return d.dbname
257 }
258 func (d *DB) GetFailureCounter() *utils.SlidingAverage {
259 return d.failurectr
260 }
261
262 func (d *DB) reconnect_if_required() {
263 if !must_reconnect(d.GetFailureCounter()) {
264 return
265 }
266 if time.Since(d.lastReconnect) < time.Duration(60)*time.Second {
267
268 return
269 }
270 if *sqldebug {
271 sa := d.GetFailureCounter()
272 fmt.Printf("[go-easyops] sql counters: 0=%d, 1=%d\n", sa.GetCounter(0), sa.GetCounter(1))
273 }
274 fmt.Printf("[go-easyops] sql reconnect required.\n")
275 d.reconnectLock.Lock()
276 defer d.reconnectLock.Unlock()
277 d.lastReconnect = time.Now()
278 fmt.Printf("[go-easyops] sql reconnecting...\n")
279 d.dbcon.Close()
280 dc, err := sql.Open("postgres", d.dbinfo)
281 if err != nil {
282 fmt.Printf("[go-easyops] sql failed to reconnect: %s\n", err)
283 return
284 }
285 d.dbcon = dc
286
287 }
288
289 func must_reconnect(sa *utils.SlidingAverage) bool {
290 if sa.GetCounter(0) != 0 {
291
292 return false
293 }
294 if sa.GetCounter(1) == 0 {
295
296 return false
297 }
298 if sa.GetCounter(1) != sa.GetCounts(1) {
299
300 return false
301 }
302 return true
303 }
304
305 func query_error(ctx context.Context, typ string, name string, query string, err error) {
306 if *sqldebug || *print_errors {
307 fmt.Printf("[go-easyops] [sql] %s %s (%s) failed (%s)\n", typ, name, query, err)
308 }
309 if *sqldebug {
310 utils.PrintStack("query failed")
311 }
312 }
313
314
317
318
319 func (d *DB) QueryContext(ctx context.Context, name string, query string, args ...interface{}) (*sql.Rows, error) {
320 d.reconnect_if_required()
321 d.reconnectLock.RLock()
322 defer d.reconnectLock.RUnlock()
323 pp.SqlEntered()
324 defer pp.SqlDone()
325 if *sqldebug {
326 fmt.Printf("[go-easyops] [sql] Query %s (%v)\n", query, args)
327 }
328 l := prometheus.Labels{"dbhost": d.dbshorthost, "database": d.dbname, "queryname": name}
329 sqlTotalQueries.With(l).Inc()
330 started := time.Now()
331 r, err := d.dbcon.QueryContext(ctx, query, args...)
332 duration := time.Since(started).Seconds()
333 sqlPerformance.With(l).Observe(duration)
334
335 if err == nil && ctx.Err() != nil {
336 err = ctx.Err()
337 }
338 if err != nil {
339 d.failurectr.Add(1, 1)
340 query_error(ctx, "select", name, query, err)
341 sqlFailedQueries.With(l).Inc()
342 err = errors.Wrap(err)
343 } else {
344 d.failurectr.Add(0, 1)
345 }
346 sqlTotalTime.With(l).Add(duration)
347 pqtime(name, duration)
348 return r, err
349 }
350
351
352 func (d *DB) ExecContext(ctx context.Context, name string, query string, args ...interface{}) (sql.Result, error) {
353 rep := *sqldebug || *print_errors
354 return d.execContext(ctx, rep, name, query, args...)
355 }
356 func (d *DB) ExecContextQuiet(ctx context.Context, name string, query string, args ...interface{}) (sql.Result, error) {
357 rep := false
358 return d.execContext(ctx, rep, name, query, args...)
359 }
360 func (d *DB) execContext(ctx context.Context, report_failure bool, name string, query string, args ...interface{}) (sql.Result, error) {
361 d.reconnect_if_required()
362 d.reconnectLock.RLock()
363 defer d.reconnectLock.RUnlock()
364 pp.SqlEntered()
365 defer pp.SqlDone()
366 l := prometheus.Labels{"dbhost": d.dbshorthost, "database": d.dbname, "queryname": name}
367 if *sqldebug {
368 fmt.Printf("[go-easyops] [sql] Exec %s (%v)\n", query, args)
369 }
370 sqlTotalQueries.With(l).Inc()
371 started := time.Now()
372 r, err := d.dbcon.ExecContext(ctx, query, args...)
373 duration := time.Since(started).Seconds()
374 sqlPerformance.With(l).Observe(duration)
375
376 if err == nil && ctx.Err() != nil {
377 err = ctx.Err()
378 }
379 if err != nil {
380 d.failurectr.Add(1, 1)
381 query_error(ctx, "exec", name, query, err)
382 sqlFailedQueries.With(l).Inc()
383 err = errors.Wrap(err)
384 } else {
385 d.failurectr.Add(0, 1)
386 }
387 sqlTotalTime.With(l).Add(duration)
388 pqtime(name, duration)
389 return r, err
390 }
391
392
393
394 func (d *DB) QueryRowContext(ctx context.Context, name string, query string, args ...interface{}) *sql.Row {
395 d.reconnect_if_required()
396 d.reconnectLock.RLock()
397 defer d.reconnectLock.RUnlock()
398 pp.SqlEntered()
399 defer pp.SqlDone()
400 if *sqldebug {
401 fmt.Printf("[go-easyops] [sql] QueryRow %s\n", query)
402 }
403 l := prometheus.Labels{"dbhost": d.dbshorthost, "database": d.dbname, "queryname": name}
404 sqlTotalQueries.With(l).Inc()
405 r := d.dbcon.QueryRowContext(ctx, query, args...)
406 return r
407 }
408
409 func (d *DB) Conn(ctx context.Context) (*sql.Conn, error) {
410 return d.dbcon.Conn(ctx)
411 }
412
413 func (d *DB) CheckDuplicateRowError(err error) bool {
414 if err, ok := err.(*pq.Error); ok {
415 if err.Code == "23505" {
416 return true
417 }
418 }
419
420 return false
421 }
422
423 func pqtime(name string, dur float64) {
424 if !*sqldebug {
425 return
426 }
427 fmt.Printf("[go-easyops] Query \"%s\" completed in %0.2f seconds\n", name, dur)
428 }
429
View as plain text