diff --git a/mdb/mbigquery/bigquery.go b/mdb/mbigquery/bigquery.go index 33c292f..24eeaa6 100644 --- a/mdb/mbigquery/bigquery.go +++ b/mdb/mbigquery/bigquery.go @@ -8,10 +8,11 @@ import ( "sync" "time" - "github.com/mediocregopher/mediocre-go-lib/m" - "github.com/mediocregopher/mediocre-go-lib/mcfg" + "github.com/mediocregopher/mediocre-go-lib/mctx" "github.com/mediocregopher/mediocre-go-lib/mdb" + "github.com/mediocregopher/mediocre-go-lib/merr" "github.com/mediocregopher/mediocre-go-lib/mlog" + "github.com/mediocregopher/mediocre-go-lib/mrun" "cloud.google.com/go/bigquery" "google.golang.org/api/googleapi" @@ -41,27 +42,40 @@ type BigQuery struct { tableUploaders map[[2]string]*bigquery.Uploader } -// Cfg configures and returns a BigQuery instance which will be usable once Run -// is called on the passed in Cfg instance. -func Cfg(cfg *mcfg.Cfg) *BigQuery { - cfg = cfg.Child("bigquery") - bq := BigQuery{ - gce: mdb.CfgGCE(cfg), +// MNew returns a BigQuery instance which will be initialized and configured +// when the start event is triggered on ctx (see mrun.Start). The BigQuery +// instance will have Close called on it when the stop event is triggered on ctx +// (see mrun.Stop). +// +// gce is optional and can be passed in if there's an existing gce object which +// should be used, otherwise a new one will be created with mdb.MGCE. +func MNew(ctx mctx.Context, gce *mdb.GCE) *BigQuery { + if gce == nil { + gce = mdb.MGCE(ctx, "") + } + + ctx = mctx.ChildOf(ctx, "bigquery") + bq := &BigQuery{ + gce: gce, tables: map[[2]string]*bigquery.Table{}, tableUploaders: map[[2]string]*bigquery.Uploader{}, + log: mlog.From(ctx), } - bq.log = m.Log(cfg, &bq) - cfg.Start.Then(func(ctx context.Context) error { + bq.log.SetKV(bq) + mrun.OnStart(ctx, func(innerCtx mctx.Context) error { bq.log.Info("connecting to bigquery") var err error - bq.Client, err = bigquery.NewClient(ctx, bq.gce.Project, bq.gce.ClientOptions()...) - return mlog.ErrWithKV(err, &bq) + bq.Client, err = bigquery.NewClient(innerCtx, bq.gce.Project, bq.gce.ClientOptions()...) + return merr.WithKV(err, bq.KV()) }) - return &bq + mrun.OnStop(ctx, func(mctx.Context) error { + return bq.Client.Close() + }) + return bq } // KV implements the mlog.KVer interface. -func (bq *BigQuery) KV() mlog.KV { +func (bq *BigQuery) KV() map[string]interface{} { return bq.gce.KV() } @@ -84,17 +98,17 @@ func (bq *BigQuery) Table( return table, bq.tableUploaders[key], nil } - kv := mlog.KV{"dataset": dataset, "table": tableName} + kv := mlog.KV{"bigQueryDataset": dataset, "bigQueryTable": tableName} bq.log.Debug("creating/grabbing table", kv) schema, err := bigquery.InferSchema(schemaObj) if err != nil { - return nil, nil, mlog.ErrWithKV(err, bq, kv) + return nil, nil, merr.WithKV(err, bq.KV(), kv.KV()) } ds := bq.Dataset(dataset) if err := ds.Create(ctx, nil); err != nil && !isErrAlreadyExists(err) { - return nil, nil, mlog.ErrWithKV(err, bq, kv) + return nil, nil, merr.WithKV(err, bq.KV(), kv.KV()) } table := ds.Table(tableName) @@ -103,7 +117,7 @@ func (bq *BigQuery) Table( Schema: schema, } if err := table.Create(ctx, meta); err != nil && !isErrAlreadyExists(err) { - return nil, nil, mlog.ErrWithKV(err, bq, kv) + return nil, nil, merr.WithKV(err, bq.KV(), kv.KV()) } uploader := table.Uploader()