mediocre-go-lib/mdb/mbigquery/bigquery.go

169 lines
4.6 KiB
Go
Raw Permalink Normal View History

2018-07-19 19:38:56 +00:00
// Package mbigquery implements connecting to Google's BigQuery service and
// simplifying a number of interactions with it.
package mbigquery
import (
"context"
"encoding/json"
"sync"
"time"
"github.com/mediocregopher/mediocre-go-lib/mctx"
"github.com/mediocregopher/mediocre-go-lib/mdb"
"github.com/mediocregopher/mediocre-go-lib/merr"
"github.com/mediocregopher/mediocre-go-lib/mlog"
"github.com/mediocregopher/mediocre-go-lib/mrun"
"cloud.google.com/go/bigquery"
"google.golang.org/api/googleapi"
)
// TODO this file needs tests
func isErrAlreadyExists(err error) bool {
if err == nil {
return false
}
if gerr, ok := err.(*googleapi.Error); ok && gerr.Code == 409 {
return true
}
return false
}
// BigQuery is a wrapper around a bigquery client providing more functionality.
type BigQuery struct {
*bigquery.Client
gce *mdb.GCE
ctx context.Context
// key is dataset/tableName
tablesL sync.Mutex
tables map[[2]string]*bigquery.Table
tableUploaders map[[2]string]*bigquery.Uploader
}
// WithBigQuery returns a BigQuery instance which will be initialized and
// configured when the start event is triggered on the returned (see
// mrun.Start). The BigQuery instance will have Close called on it when the stop
// event is triggered on the returned Context (see mrun.Stop).
//
// gce is optional and can be passed in if there's an existing gce object which
// should be used, otherwise a new one will be created with mdb.MGCE.
func WithBigQuery(parent context.Context, gce *mdb.GCE) (context.Context, *BigQuery) {
ctx := mctx.NewChild(parent, "mbigquery")
if gce == nil {
ctx, gce = mdb.WithGCE(ctx, "")
}
bq := &BigQuery{
gce: gce,
tables: map[[2]string]*bigquery.Table{},
tableUploaders: map[[2]string]*bigquery.Uploader{},
}
ctx = mrun.WithStartHook(ctx, func(innerCtx context.Context) error {
bq.ctx = mctx.MergeAnnotations(bq.ctx, bq.gce.Context())
mlog.Info("connecting to bigquery", bq.ctx)
var err error
bq.Client, err = bigquery.NewClient(innerCtx, bq.gce.Project, bq.gce.ClientOptions()...)
return merr.Wrap(err, bq.ctx)
})
ctx = mrun.WithStopHook(ctx, func(context.Context) error {
return bq.Client.Close()
})
bq.ctx = ctx
return mctx.WithChild(parent, ctx), bq
}
// Table initializes and returns the table instance with the given dataset and
// schema information. This method caches the Table/Uploader instances it
// returns, so multiple calls with the same dataset/tableName will only actually
// create those instances on the first call.
func (bq *BigQuery) Table(
ctx context.Context,
dataset, tableName string,
schemaObj interface{},
) (
*bigquery.Table, *bigquery.Uploader, error,
) {
bq.tablesL.Lock()
defer bq.tablesL.Unlock()
key := [2]string{dataset, tableName}
if table, ok := bq.tables[key]; ok {
return table, bq.tableUploaders[key], nil
}
ctx = mctx.MergeAnnotations(ctx, bq.ctx)
ctx = mctx.Annotate(ctx, "dataset", dataset, "table", tableName)
mlog.Debug("creating/grabbing table", bq.ctx)
schema, err := bigquery.InferSchema(schemaObj)
if err != nil {
return nil, nil, merr.Wrap(err, ctx)
}
ds := bq.Dataset(dataset)
if err := ds.Create(ctx, nil); err != nil && !isErrAlreadyExists(err) {
return nil, nil, merr.Wrap(err, ctx)
}
table := ds.Table(tableName)
meta := &bigquery.TableMetadata{
Name: tableName,
Schema: schema,
}
if err := table.Create(ctx, meta); err != nil && !isErrAlreadyExists(err) {
return nil, nil, merr.Wrap(err, ctx)
}
uploader := table.Uploader()
bq.tables[key] = table
bq.tableUploaders[key] = uploader
return table, uploader, nil
}
////////////////////////////////////////////////////////////////////////////////
const timeFormat = "2006-01-02 15:04:05 MST"
// Time wraps a time.Time object and provides marshaling/unmarshaling for
// bigquery's time format.
type Time struct {
time.Time
}
// MarshalText implements the encoding.TextMarshaler interface.
func (t Time) MarshalText() ([]byte, error) {
str := t.Time.Format(timeFormat)
return []byte(str), nil
}
// UnmarshalText implements the encoding.TextUnmarshaler interface.
func (t *Time) UnmarshalText(b []byte) error {
tt, err := time.Parse(timeFormat, string(b))
if err != nil {
return err
}
t.Time = tt
return nil
}
// MarshalJSON implements the json.Marshaler interface.
func (t *Time) MarshalJSON() ([]byte, error) {
b, err := t.MarshalText()
if err != nil {
return nil, err
}
return json.Marshal(string(b))
}
// UnmarshalJSON implements the json.Unmarshaler interface.
func (t *Time) UnmarshalJSON(b []byte) error {
var str string
if err := json.Unmarshal(b, &str); err != nil {
return err
}
return t.UnmarshalText([]byte(str))
}