2018-07-19 19:38:56 +00:00
|
|
|
// Package mbigquery implements connecting to Google's BigQuery service and
|
|
|
|
// simplifying a number of interactions with it.
|
2018-07-19 17:43:18 +00:00
|
|
|
package mbigquery
|
2018-07-18 23:51:24 +00:00
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"encoding/json"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
2019-02-03 02:56:54 +00:00
|
|
|
"github.com/mediocregopher/mediocre-go-lib/mctx"
|
2018-07-19 17:43:18 +00:00
|
|
|
"github.com/mediocregopher/mediocre-go-lib/mdb"
|
2019-02-03 02:56:54 +00:00
|
|
|
"github.com/mediocregopher/mediocre-go-lib/merr"
|
2018-07-18 23:51:24 +00:00
|
|
|
"github.com/mediocregopher/mediocre-go-lib/mlog"
|
2019-02-03 02:56:54 +00:00
|
|
|
"github.com/mediocregopher/mediocre-go-lib/mrun"
|
2018-07-18 23:51:24 +00:00
|
|
|
|
|
|
|
"cloud.google.com/go/bigquery"
|
|
|
|
"google.golang.org/api/googleapi"
|
|
|
|
)
|
|
|
|
|
|
|
|
// TODO this file needs tests
|
|
|
|
|
2018-07-19 17:43:18 +00:00
|
|
|
func isErrAlreadyExists(err error) bool {
|
2018-07-18 23:51:24 +00:00
|
|
|
if err == nil {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
if gerr, ok := err.(*googleapi.Error); ok && gerr.Code == 409 {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
// BigQuery is a wrapper around a bigquery client providing more functionality.
|
|
|
|
type BigQuery struct {
|
|
|
|
*bigquery.Client
|
2018-07-19 17:43:18 +00:00
|
|
|
gce *mdb.GCE
|
2019-02-05 20:18:17 +00:00
|
|
|
ctx context.Context
|
2018-07-18 23:51:24 +00:00
|
|
|
|
|
|
|
// key is dataset/tableName
|
|
|
|
tablesL sync.Mutex
|
|
|
|
tables map[[2]string]*bigquery.Table
|
|
|
|
tableUploaders map[[2]string]*bigquery.Uploader
|
|
|
|
}
|
|
|
|
|
2019-02-09 19:08:30 +00:00
|
|
|
// WithBigQuery returns a BigQuery instance which will be initialized and
|
|
|
|
// configured when the start event is triggered on the returned (see
|
|
|
|
// mrun.Start). The BigQuery instance will have Close called on it when the stop
|
|
|
|
// event is triggered on the returned Context (see mrun.Stop).
|
2019-02-03 02:56:54 +00:00
|
|
|
//
|
|
|
|
// gce is optional and can be passed in if there's an existing gce object which
|
|
|
|
// should be used, otherwise a new one will be created with mdb.MGCE.
|
2019-02-09 19:08:30 +00:00
|
|
|
func WithBigQuery(parent context.Context, gce *mdb.GCE) (context.Context, *BigQuery) {
|
|
|
|
ctx := mctx.NewChild(parent, "mbigquery")
|
2019-02-03 02:56:54 +00:00
|
|
|
if gce == nil {
|
2019-02-09 19:08:30 +00:00
|
|
|
ctx, gce = mdb.WithGCE(ctx, "")
|
2019-02-03 02:56:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
bq := &BigQuery{
|
|
|
|
gce: gce,
|
2018-07-18 23:51:24 +00:00
|
|
|
tables: map[[2]string]*bigquery.Table{},
|
|
|
|
tableUploaders: map[[2]string]*bigquery.Uploader{},
|
|
|
|
}
|
2019-02-05 20:18:17 +00:00
|
|
|
|
2019-02-09 19:08:30 +00:00
|
|
|
ctx = mrun.WithStartHook(ctx, func(innerCtx context.Context) error {
|
|
|
|
bq.ctx = mctx.MergeAnnotations(bq.ctx, bq.gce.Context())
|
|
|
|
mlog.Info("connecting to bigquery", bq.ctx)
|
2018-07-18 23:51:24 +00:00
|
|
|
var err error
|
2019-02-03 02:56:54 +00:00
|
|
|
bq.Client, err = bigquery.NewClient(innerCtx, bq.gce.Project, bq.gce.ClientOptions()...)
|
2019-02-27 18:05:51 +00:00
|
|
|
return merr.Wrap(err, bq.ctx)
|
2019-02-03 02:56:54 +00:00
|
|
|
})
|
2019-02-09 19:08:30 +00:00
|
|
|
ctx = mrun.WithStopHook(ctx, func(context.Context) error {
|
2019-02-03 02:56:54 +00:00
|
|
|
return bq.Client.Close()
|
2018-07-18 23:51:24 +00:00
|
|
|
})
|
2019-02-09 19:08:30 +00:00
|
|
|
bq.ctx = ctx
|
|
|
|
return mctx.WithChild(parent, ctx), bq
|
2018-07-18 23:51:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Table initializes and returns the table instance with the given dataset and
|
|
|
|
// schema information. This method caches the Table/Uploader instances it
|
|
|
|
// returns, so multiple calls with the same dataset/tableName will only actually
|
|
|
|
// create those instances on the first call.
|
|
|
|
func (bq *BigQuery) Table(
|
|
|
|
ctx context.Context,
|
|
|
|
dataset, tableName string,
|
|
|
|
schemaObj interface{},
|
|
|
|
) (
|
|
|
|
*bigquery.Table, *bigquery.Uploader, error,
|
|
|
|
) {
|
|
|
|
bq.tablesL.Lock()
|
|
|
|
defer bq.tablesL.Unlock()
|
|
|
|
|
|
|
|
key := [2]string{dataset, tableName}
|
|
|
|
if table, ok := bq.tables[key]; ok {
|
|
|
|
return table, bq.tableUploaders[key], nil
|
|
|
|
}
|
|
|
|
|
2019-02-09 19:08:30 +00:00
|
|
|
ctx = mctx.MergeAnnotations(ctx, bq.ctx)
|
|
|
|
ctx = mctx.Annotate(ctx, "dataset", dataset, "table", tableName)
|
2018-07-18 23:51:24 +00:00
|
|
|
|
2019-02-09 19:08:30 +00:00
|
|
|
mlog.Debug("creating/grabbing table", bq.ctx)
|
2018-07-18 23:51:24 +00:00
|
|
|
schema, err := bigquery.InferSchema(schemaObj)
|
|
|
|
if err != nil {
|
2019-02-27 18:05:51 +00:00
|
|
|
return nil, nil, merr.Wrap(err, ctx)
|
2018-07-18 23:51:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
ds := bq.Dataset(dataset)
|
2018-07-19 17:43:18 +00:00
|
|
|
if err := ds.Create(ctx, nil); err != nil && !isErrAlreadyExists(err) {
|
2019-02-27 18:05:51 +00:00
|
|
|
return nil, nil, merr.Wrap(err, ctx)
|
2018-07-18 23:51:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
table := ds.Table(tableName)
|
|
|
|
meta := &bigquery.TableMetadata{
|
|
|
|
Name: tableName,
|
|
|
|
Schema: schema,
|
|
|
|
}
|
2018-07-19 17:43:18 +00:00
|
|
|
if err := table.Create(ctx, meta); err != nil && !isErrAlreadyExists(err) {
|
2019-02-27 18:05:51 +00:00
|
|
|
return nil, nil, merr.Wrap(err, ctx)
|
2018-07-18 23:51:24 +00:00
|
|
|
}
|
|
|
|
uploader := table.Uploader()
|
|
|
|
|
|
|
|
bq.tables[key] = table
|
|
|
|
bq.tableUploaders[key] = uploader
|
|
|
|
return table, uploader, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
|
2018-07-19 17:43:18 +00:00
|
|
|
const timeFormat = "2006-01-02 15:04:05 MST"
|
2018-07-18 23:51:24 +00:00
|
|
|
|
2018-07-19 17:43:18 +00:00
|
|
|
// Time wraps a time.Time object and provides marshaling/unmarshaling for
|
|
|
|
// bigquery's time format.
|
|
|
|
type Time struct {
|
2018-07-18 23:51:24 +00:00
|
|
|
time.Time
|
|
|
|
}
|
|
|
|
|
|
|
|
// MarshalText implements the encoding.TextMarshaler interface.
|
2018-07-19 17:43:18 +00:00
|
|
|
func (t Time) MarshalText() ([]byte, error) {
|
|
|
|
str := t.Time.Format(timeFormat)
|
2018-07-18 23:51:24 +00:00
|
|
|
return []byte(str), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// UnmarshalText implements the encoding.TextUnmarshaler interface.
|
2018-07-19 17:43:18 +00:00
|
|
|
func (t *Time) UnmarshalText(b []byte) error {
|
|
|
|
tt, err := time.Parse(timeFormat, string(b))
|
2018-07-18 23:51:24 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
t.Time = tt
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// MarshalJSON implements the json.Marshaler interface.
|
2018-07-19 17:43:18 +00:00
|
|
|
func (t *Time) MarshalJSON() ([]byte, error) {
|
2018-07-18 23:51:24 +00:00
|
|
|
b, err := t.MarshalText()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return json.Marshal(string(b))
|
|
|
|
}
|
|
|
|
|
|
|
|
// UnmarshalJSON implements the json.Unmarshaler interface.
|
2018-07-19 17:43:18 +00:00
|
|
|
func (t *Time) UnmarshalJSON(b []byte) error {
|
2018-07-18 23:51:24 +00:00
|
|
|
var str string
|
|
|
|
if err := json.Unmarshal(b, &str); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return t.UnmarshalText([]byte(str))
|
|
|
|
}
|