1
7 package mysql
8
9
10
11
12 import (
13 "database/sql"
14 "flag"
15 "fmt"
16 _ "github.com/go-sql-driver/mysql"
17 pp "golang.conradwood.net/go-easyops/profiling"
18 "golang.conradwood.net/go-easyops/prometheus"
19 "golang.conradwood.net/go-easyops/utils"
20 "golang.org/x/net/context"
21 "sync"
22 "time"
23 )
24
25 const (
26 DEFAULT_MAX_QUERY_MILLIS = 3000
27 )
28
29 var (
30
39 dbhost = flag.String("mysql_host", "localhost", "hostname of the postgres database rdbms")
40 dbdb = flag.String("mysql_db", "", "database to use")
41 dbuser = flag.String("mysql_user", "root", "username for the database to use")
42 dbpw = flag.String("mysql_pw", "pw", "password for the database to use")
43 sqldebug = flag.Bool("debug_mysql", false, "debug mysql stuff")
44 sqlTotalQueries = prometheus.NewCounterVec(
45 prometheus.CounterOpts{
46 Name: "mysql_queries_executed",
47 Help: "V=1 UNIT=ops DESC=total number of sql queries started",
48 },
49 []string{"database", "queryname"},
50 )
51 sqlPerformance = prometheus.NewSummaryVec(
52 prometheus.SummaryOpts{
53 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.015, 0.99: 0.001},
54 Name: "mysql_query_performance",
55 Help: "V=1 UNIT=durations DESC=timing information for sql performance in seconds",
56 },
57 []string{"database", "queryname"},
58 )
59 sqlFailedQueries = prometheus.NewCounterVec(
60 prometheus.CounterOpts{
61 Name: "mysql_queries_failed",
62 Help: "V=1 UNIT=ops DESC=total number of sql queries failed",
63 },
64 []string{"database", "queryname"},
65 )
66
75 metricsRegistered = false
76 metricsRegisterLock sync.Mutex
77 databases []*DB
78 opendblock sync.Mutex
79 )
80
81 type DB struct {
82 dbcon *sql.DB
83 dbname string
84 dbinfo string
85 MaxQueryTimeout int
86 }
87
88 func maxConnections() int {
89 return 5
90 }
91 func maxIdle() int {
92 return 4
93 }
94
95
96
97 func Open() (*DB, error) {
98
99 var err error
100 var now string
101 if *dbdb == "" {
102 return nil, fmt.Errorf("Please specify -mysql_db flag")
103 }
104 dbinfo := fmt.Sprintf("%s:%s@tcp(%s)/%s?parseTime=true", *dbuser, *dbpw, *dbhost, *dbdb)
105
106
107 for _, db := range databases {
108 if db.dbinfo == dbinfo {
109 return db, nil
110 }
111 }
112 opendblock.Lock()
113 defer opendblock.Unlock()
114
115 for _, db := range databases {
116 if db.dbinfo == dbinfo {
117 return db, nil
118 }
119 }
120
121 if !metricsRegistered {
122 metricsRegisterLock.Lock()
123 if !metricsRegistered {
124 prometheus.MustRegister(sqlPerformance, sqlTotalQueries, sqlFailedQueries)
125 metricsRegistered = true
126 }
127 metricsRegisterLock.Unlock()
128 }
129
130 dbcon, err := sql.Open("mysql", dbinfo)
131 if err != nil {
132 fmt.Printf("Failed to connect to %s on host \"%s\" as \"%s\"\n", *dbdb, *dbhost, *dbuser)
133 return nil, err
134 }
135 dbcon.SetMaxIdleConns(maxIdle())
136 dbcon.SetMaxOpenConns(maxConnections())
137 dbcon.SetConnMaxLifetime(time.Second * time.Duration(90))
138
139
140 err = dbcon.QueryRow("SELECT NOW() as now").Scan(&now)
141 if err != nil {
142 fmt.Printf("Failed to query db %s: %s\n", *dbdb, err)
143 return nil, err
144 }
145 c := &DB{dbcon: dbcon, dbname: *dbdb, dbinfo: dbinfo, MaxQueryTimeout: DEFAULT_MAX_QUERY_MILLIS}
146 databases = append(databases, c)
147 if len(databases) > 2 {
148 fmt.Printf("[go-easyops] WARNING OPENED %d databases\n", len(databases))
149 for i, d := range databases {
150 fmt.Printf("Opened database #%d: %s\n", i, d.dbinfo)
151 }
152 panic("too many databases")
153 }
154 return c, nil
155 }
156
157
160
161 func IsSQLSafe(txt string) bool {
162 return utils.IsOnlyChars(txt, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
163 }
164
165
168
169
170 func (d *DB) QueryContext(ctx context.Context, name string, query string, args ...interface{}) (*sql.Rows, error) {
171 pp.SqlEntered()
172 defer pp.SqlDone()
173 if *sqldebug {
174 fmt.Printf("[sql] Query %s (%v)\n", query, args)
175 }
176 l := prometheus.Labels{"database": d.dbname, "queryname": name}
177 sqlTotalQueries.With(l).Inc()
178 started := time.Now()
179 r, err := d.dbcon.QueryContext(ctx, query, args...)
180 sqlPerformance.With(l).Observe(time.Since(started).Seconds())
181
182 if err == nil && ctx.Err() != nil {
183 err = ctx.Err()
184 }
185 if err != nil {
186 if *sqldebug {
187 fmt.Printf("[sql] Query %s failed (%s)\n", query, err)
188 }
189 sqlFailedQueries.With(l).Inc()
190 }
191 return r, err
192 }
193
194
195 func (d *DB) ExecContext(ctx context.Context, name string, query string, args ...interface{}) (sql.Result, error) {
196 pp.SqlEntered()
197 defer pp.SqlDone()
198 l := prometheus.Labels{"database": d.dbname, "queryname": name}
199 if *sqldebug {
200 fmt.Printf("[sql] Exec %s (%v)\n", query, args)
201 }
202 sqlTotalQueries.With(l).Inc()
203 started := time.Now()
204 r, err := d.dbcon.ExecContext(ctx, query, args...)
205 sqlPerformance.With(l).Observe(time.Since(started).Seconds())
206
207 if err == nil && ctx.Err() != nil {
208 err = ctx.Err()
209 }
210 if err != nil {
211 if *sqldebug {
212 fmt.Printf("[sql] Query %s failed (%s)\n", query, err)
213 }
214 sqlFailedQueries.With(l).Inc()
215 }
216 return r, err
217 }
218
219
220
221 func (d *DB) QueryRowContext(ctx context.Context, name string, query string, args ...interface{}) *sql.Row {
222 pp.SqlEntered()
223 defer pp.SqlDone()
224 if *sqldebug {
225 fmt.Printf("[sql] QueryRow %s\n", query)
226 }
227 l := prometheus.Labels{"database": d.dbname, "queryname": name}
228 sqlTotalQueries.With(l).Inc()
229 return d.dbcon.QueryRowContext(ctx, query, args...)
230 }
231
View as plain text