From 23d1a8fc914371c5c6a2cf51c32b7b9895022612 Mon Sep 17 00:00:00 2001 From: Brian Picciano Date: Wed, 18 Jul 2018 23:51:24 +0000 Subject: [PATCH] mdb: implement BigQuery type, needs tests --- mdb/bq.go | 154 +++++++++++++++++++++++++++++++++++++++++++++++++++ mlog/mlog.go | 6 +- 2 files changed, 159 insertions(+), 1 deletion(-) create mode 100644 mdb/bq.go diff --git a/mdb/bq.go b/mdb/bq.go new file mode 100644 index 0000000..39d35ec --- /dev/null +++ b/mdb/bq.go @@ -0,0 +1,154 @@ +package mdb + +import ( + "context" + "encoding/json" + "sync" + "time" + + "github.com/mediocregopher/mediocre-go-lib/m" + "github.com/mediocregopher/mediocre-go-lib/mcfg" + "github.com/mediocregopher/mediocre-go-lib/mlog" + + "cloud.google.com/go/bigquery" + "google.golang.org/api/googleapi" +) + +// TODO this file needs tests + +func bqIsErrAlreadyExists(err error) bool { + if err == nil { + return false + } + if gerr, ok := err.(*googleapi.Error); ok && gerr.Code == 409 { + return true + } + return false +} + +// BigQuery is a wrapper around a bigquery client providing more functionality. +type BigQuery struct { + *bigquery.Client + gce *GCE + log *mlog.Logger + + // key is dataset/tableName + tablesL sync.Mutex + tables map[[2]string]*bigquery.Table + tableUploaders map[[2]string]*bigquery.Uploader +} + +// CfgBigQuery configures and returns a BigQuery instance which will be usable +// once Run is called on the passed in Cfg instance. +func CfgBigQuery(cfg *mcfg.Cfg) *BigQuery { + cfg = cfg.Child("bigquery") + bq := BigQuery{ + gce: CfgGCE(cfg), + tables: map[[2]string]*bigquery.Table{}, + tableUploaders: map[[2]string]*bigquery.Uploader{}, + } + bq.log = m.Log(cfg, &bq) + cfg.Start.Then(func(ctx context.Context) error { + bq.log.Info("connecting to bigquery") + var err error + bq.Client, err = bigquery.NewClient(ctx, bq.gce.Project, bq.gce.ClientOptions()...) + return mlog.ErrWithKV(err, &bq) + }) + return &bq +} + +// KV implements the mlog.KVer interface. +func (bq *BigQuery) KV() mlog.KV { + return bq.gce.KV() +} + +// Table initializes and returns the table instance with the given dataset and +// schema information. This method caches the Table/Uploader instances it +// returns, so multiple calls with the same dataset/tableName will only actually +// create those instances on the first call. +func (bq *BigQuery) Table( + ctx context.Context, + dataset, tableName string, + schemaObj interface{}, +) ( + *bigquery.Table, *bigquery.Uploader, error, +) { + bq.tablesL.Lock() + defer bq.tablesL.Unlock() + + key := [2]string{dataset, tableName} + if table, ok := bq.tables[key]; ok { + return table, bq.tableUploaders[key], nil + } + + kv := mlog.KV{"dataset": dataset, "table": tableName} + bq.log.Debug("creating/grabbing table", kv) + + schema, err := bigquery.InferSchema(schemaObj) + if err != nil { + return nil, nil, mlog.ErrWithKV(err, bq, kv) + } + + ds := bq.Dataset(dataset) + if err := ds.Create(ctx, nil); err != nil && !bqIsErrAlreadyExists(err) { + return nil, nil, mlog.ErrWithKV(err, bq, kv) + } + + table := ds.Table(tableName) + meta := &bigquery.TableMetadata{ + Name: tableName, + Schema: schema, + } + if err := table.Create(ctx, meta); err != nil && !bqIsErrAlreadyExists(err) { + return nil, nil, mlog.ErrWithKV(err, bq, kv) + } + uploader := table.Uploader() + + bq.tables[key] = table + bq.tableUploaders[key] = uploader + return table, uploader, nil +} + +//////////////////////////////////////////////////////////////////////////////// + +const bqTimeFormat = "2006-01-02 15:04:05 MST" + +// BigQueryTime wraps a time.Time object and provides marshaling/unmarshaling +// for bigquery's time format. +type BigQueryTime struct { + time.Time +} + +// MarshalText implements the encoding.TextMarshaler interface. +func (t BigQueryTime) MarshalText() ([]byte, error) { + str := t.Time.Format(bqTimeFormat) + return []byte(str), nil +} + +// UnmarshalText implements the encoding.TextUnmarshaler interface. +func (t *BigQueryTime) UnmarshalText(b []byte) error { + tt, err := time.Parse(bqTimeFormat, string(b)) + if err != nil { + return err + } + t.Time = tt + return nil +} + +// MarshalJSON implements the json.Marshaler interface. +func (t *BigQueryTime) MarshalJSON() ([]byte, error) { + b, err := t.MarshalText() + if err != nil { + return nil, err + } + return json.Marshal(string(b)) +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +func (t *BigQueryTime) UnmarshalJSON(b []byte) error { + var str string + if err := json.Unmarshal(b, &str); err != nil { + return err + } + return t.UnmarshalText([]byte(str)) +} diff --git a/mlog/mlog.go b/mlog/mlog.go index da0ab79..d95bbdf 100644 --- a/mlog/mlog.go +++ b/mlog/mlog.go @@ -167,8 +167,12 @@ func merge(kvs ...KVer) KV { } // Merge takes in multiple KVers and returns a single KVer which is the union of -// all the passed in ones. Key/vals on the rightmost of the set take precedence +// all the passed in ones. Key/Vals on the rightmost of the set take precedence // over conflicting ones to the left. +// +// TODO currently this evaluates all of the KVers and generates a flat KV +// instance in the moment. It would be better if this actually called the KV +// methods of each KVer on every outer KV call. func Merge(kvs ...KVer) KVer { return merge(kvs...) }