mdb/mbigquery: refactor to use new mctx/mlog/merr stuff

This commit is contained in:
Brian Picciano 2019-02-02 21:56:54 -05:00
parent 608eb90274
commit a5446b0833

View File

@ -8,10 +8,11 @@ import (
"sync" "sync"
"time" "time"
"github.com/mediocregopher/mediocre-go-lib/m" "github.com/mediocregopher/mediocre-go-lib/mctx"
"github.com/mediocregopher/mediocre-go-lib/mcfg"
"github.com/mediocregopher/mediocre-go-lib/mdb" "github.com/mediocregopher/mediocre-go-lib/mdb"
"github.com/mediocregopher/mediocre-go-lib/merr"
"github.com/mediocregopher/mediocre-go-lib/mlog" "github.com/mediocregopher/mediocre-go-lib/mlog"
"github.com/mediocregopher/mediocre-go-lib/mrun"
"cloud.google.com/go/bigquery" "cloud.google.com/go/bigquery"
"google.golang.org/api/googleapi" "google.golang.org/api/googleapi"
@ -41,27 +42,40 @@ type BigQuery struct {
tableUploaders map[[2]string]*bigquery.Uploader tableUploaders map[[2]string]*bigquery.Uploader
} }
// Cfg configures and returns a BigQuery instance which will be usable once Run // MNew returns a BigQuery instance which will be initialized and configured
// is called on the passed in Cfg instance. // when the start event is triggered on ctx (see mrun.Start). The BigQuery
func Cfg(cfg *mcfg.Cfg) *BigQuery { // instance will have Close called on it when the stop event is triggered on ctx
cfg = cfg.Child("bigquery") // (see mrun.Stop).
bq := BigQuery{ //
gce: mdb.CfgGCE(cfg), // gce is optional and can be passed in if there's an existing gce object which
// should be used, otherwise a new one will be created with mdb.MGCE.
func MNew(ctx mctx.Context, gce *mdb.GCE) *BigQuery {
if gce == nil {
gce = mdb.MGCE(ctx, "")
}
ctx = mctx.ChildOf(ctx, "bigquery")
bq := &BigQuery{
gce: gce,
tables: map[[2]string]*bigquery.Table{}, tables: map[[2]string]*bigquery.Table{},
tableUploaders: map[[2]string]*bigquery.Uploader{}, tableUploaders: map[[2]string]*bigquery.Uploader{},
log: mlog.From(ctx),
} }
bq.log = m.Log(cfg, &bq) bq.log.SetKV(bq)
cfg.Start.Then(func(ctx context.Context) error { mrun.OnStart(ctx, func(innerCtx mctx.Context) error {
bq.log.Info("connecting to bigquery") bq.log.Info("connecting to bigquery")
var err error var err error
bq.Client, err = bigquery.NewClient(ctx, bq.gce.Project, bq.gce.ClientOptions()...) bq.Client, err = bigquery.NewClient(innerCtx, bq.gce.Project, bq.gce.ClientOptions()...)
return mlog.ErrWithKV(err, &bq) return merr.WithKV(err, bq.KV())
}) })
return &bq mrun.OnStop(ctx, func(mctx.Context) error {
return bq.Client.Close()
})
return bq
} }
// KV implements the mlog.KVer interface. // KV implements the mlog.KVer interface.
func (bq *BigQuery) KV() mlog.KV { func (bq *BigQuery) KV() map[string]interface{} {
return bq.gce.KV() return bq.gce.KV()
} }
@ -84,17 +98,17 @@ func (bq *BigQuery) Table(
return table, bq.tableUploaders[key], nil return table, bq.tableUploaders[key], nil
} }
kv := mlog.KV{"dataset": dataset, "table": tableName} kv := mlog.KV{"bigQueryDataset": dataset, "bigQueryTable": tableName}
bq.log.Debug("creating/grabbing table", kv) bq.log.Debug("creating/grabbing table", kv)
schema, err := bigquery.InferSchema(schemaObj) schema, err := bigquery.InferSchema(schemaObj)
if err != nil { if err != nil {
return nil, nil, mlog.ErrWithKV(err, bq, kv) return nil, nil, merr.WithKV(err, bq.KV(), kv.KV())
} }
ds := bq.Dataset(dataset) ds := bq.Dataset(dataset)
if err := ds.Create(ctx, nil); err != nil && !isErrAlreadyExists(err) { if err := ds.Create(ctx, nil); err != nil && !isErrAlreadyExists(err) {
return nil, nil, mlog.ErrWithKV(err, bq, kv) return nil, nil, merr.WithKV(err, bq.KV(), kv.KV())
} }
table := ds.Table(tableName) table := ds.Table(tableName)
@ -103,7 +117,7 @@ func (bq *BigQuery) Table(
Schema: schema, Schema: schema,
} }
if err := table.Create(ctx, meta); err != nil && !isErrAlreadyExists(err) { if err := table.Create(ctx, meta); err != nil && !isErrAlreadyExists(err) {
return nil, nil, mlog.ErrWithKV(err, bq, kv) return nil, nil, merr.WithKV(err, bq.KV(), kv.KV())
} }
uploader := table.Uploader() uploader := table.Uploader()