2018-07-21 19:56:40 +00:00
|
|
|
// Package mbigtable implements connecting to Google's Bigtable service and
|
|
|
|
// simplifying a number of interactions with it.
|
|
|
|
package mbigtable
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"strings"
|
|
|
|
|
|
|
|
"cloud.google.com/go/bigtable"
|
|
|
|
"github.com/mediocregopher/mediocre-go-lib/mcfg"
|
2019-02-03 02:57:11 +00:00
|
|
|
"github.com/mediocregopher/mediocre-go-lib/mctx"
|
2018-07-21 19:56:40 +00:00
|
|
|
"github.com/mediocregopher/mediocre-go-lib/mdb"
|
2019-02-03 02:57:11 +00:00
|
|
|
"github.com/mediocregopher/mediocre-go-lib/merr"
|
2018-07-21 19:56:40 +00:00
|
|
|
"github.com/mediocregopher/mediocre-go-lib/mlog"
|
2019-02-03 02:57:11 +00:00
|
|
|
"github.com/mediocregopher/mediocre-go-lib/mrun"
|
2018-07-21 19:56:40 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
func isErrAlreadyExists(err error) bool {
|
|
|
|
if err == nil {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
return strings.HasSuffix(err.Error(), " already exists")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Bigtable is a wrapper around a bigtable client providing more functionality.
|
|
|
|
type Bigtable struct {
|
|
|
|
*bigtable.Client
|
|
|
|
Instance string
|
|
|
|
|
|
|
|
gce *mdb.GCE
|
2019-02-05 20:18:17 +00:00
|
|
|
ctx context.Context
|
2018-07-21 19:56:40 +00:00
|
|
|
}
|
|
|
|
|
2019-02-09 19:08:30 +00:00
|
|
|
// WithBigTable returns a Bigtable instance which will be initialized and
|
|
|
|
// configured when the start event is triggered on the returned Context (see
|
|
|
|
// mrun.Start). The Bigtable instance will have Close called on it when the
|
|
|
|
// stop event is triggered on the returned Context (see mrun.Stop).
|
2019-02-03 02:57:11 +00:00
|
|
|
//
|
|
|
|
// gce is optional and can be passed in if there's an existing gce object which
|
|
|
|
// should be used, otherwise a new one will be created with mdb.MGCE.
|
2018-07-21 19:56:40 +00:00
|
|
|
//
|
|
|
|
// defaultInstance can be given as the instance name to use as the default
|
|
|
|
// parameter value. If empty the parameter will be required to be set.
|
2019-02-09 19:08:30 +00:00
|
|
|
func WithBigTable(parent context.Context, gce *mdb.GCE, defaultInstance string) (context.Context, *Bigtable) {
|
|
|
|
ctx := mctx.NewChild(parent, "bigtable")
|
2019-02-03 02:57:11 +00:00
|
|
|
if gce == nil {
|
2019-02-09 19:08:30 +00:00
|
|
|
ctx, gce = mdb.WithGCE(ctx, "")
|
2019-02-03 02:57:11 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
bt := &Bigtable{
|
|
|
|
gce: gce,
|
|
|
|
}
|
2019-02-05 20:18:17 +00:00
|
|
|
|
2018-07-21 19:56:40 +00:00
|
|
|
var inst *string
|
|
|
|
{
|
2019-02-03 02:57:11 +00:00
|
|
|
const name, descr = "instance", "name of the bigtable instance in the project to connect to"
|
2018-07-21 19:56:40 +00:00
|
|
|
if defaultInstance != "" {
|
2019-02-09 19:08:30 +00:00
|
|
|
ctx, inst = mcfg.WithString(ctx, name, defaultInstance, descr)
|
2018-07-21 19:56:40 +00:00
|
|
|
} else {
|
2019-02-09 19:08:30 +00:00
|
|
|
ctx, inst = mcfg.WithRequiredString(ctx, name, descr)
|
2018-07-21 19:56:40 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-09 19:08:30 +00:00
|
|
|
ctx = mrun.WithStartHook(ctx, func(innerCtx context.Context) error {
|
2018-07-21 19:56:40 +00:00
|
|
|
bt.Instance = *inst
|
2019-02-09 19:08:30 +00:00
|
|
|
|
|
|
|
bt.ctx = mctx.MergeAnnotations(bt.ctx, bt.gce.Context())
|
|
|
|
bt.ctx = mctx.Annotate(bt.ctx, "instance", bt.Instance)
|
|
|
|
|
|
|
|
mlog.Info("connecting to bigtable", bt.ctx)
|
2018-07-21 19:56:40 +00:00
|
|
|
var err error
|
|
|
|
bt.Client, err = bigtable.NewClient(
|
2019-02-03 02:57:11 +00:00
|
|
|
innerCtx,
|
2018-07-21 19:56:40 +00:00
|
|
|
bt.gce.Project, bt.Instance,
|
|
|
|
bt.gce.ClientOptions()...,
|
|
|
|
)
|
2019-02-09 19:08:30 +00:00
|
|
|
return merr.Wrap(bt.ctx, err)
|
2019-02-03 02:57:11 +00:00
|
|
|
})
|
2019-02-09 19:08:30 +00:00
|
|
|
ctx = mrun.WithStopHook(ctx, func(context.Context) error {
|
2019-02-03 02:57:11 +00:00
|
|
|
return bt.Client.Close()
|
2018-07-21 19:56:40 +00:00
|
|
|
})
|
2019-02-09 19:08:30 +00:00
|
|
|
bt.ctx = ctx
|
|
|
|
return mctx.WithChild(parent, ctx), bt
|
2018-07-21 19:56:40 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// EnsureTable ensures that the given table exists and has (at least) the given
|
|
|
|
// column families.
|
|
|
|
//
|
|
|
|
// This method requires admin privileges on the bigtable instance.
|
|
|
|
func (bt *Bigtable) EnsureTable(ctx context.Context, name string, colFams ...string) error {
|
2019-02-09 19:08:30 +00:00
|
|
|
ctx = mctx.MergeAnnotations(ctx, bt.ctx)
|
|
|
|
ctx = mctx.Annotate(ctx, "table", name)
|
|
|
|
mlog.Info("ensuring table", ctx)
|
2018-07-21 19:56:40 +00:00
|
|
|
|
2019-02-09 19:08:30 +00:00
|
|
|
mlog.Debug("creating admin client", ctx)
|
2018-07-21 19:56:40 +00:00
|
|
|
adminClient, err := bigtable.NewAdminClient(ctx, bt.gce.Project, bt.Instance)
|
|
|
|
if err != nil {
|
2019-02-09 19:08:30 +00:00
|
|
|
return merr.Wrap(ctx, err)
|
2018-07-21 19:56:40 +00:00
|
|
|
}
|
|
|
|
defer adminClient.Close()
|
|
|
|
|
2019-02-09 19:08:30 +00:00
|
|
|
mlog.Debug("creating bigtable table (if needed)", ctx)
|
2018-07-21 19:56:40 +00:00
|
|
|
err = adminClient.CreateTable(ctx, name)
|
|
|
|
if err != nil && !isErrAlreadyExists(err) {
|
2019-02-09 19:08:30 +00:00
|
|
|
return merr.Wrap(ctx, err)
|
2018-07-21 19:56:40 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
for _, colFam := range colFams {
|
2019-02-09 19:08:30 +00:00
|
|
|
ctx := mctx.Annotate(ctx, "family", colFam)
|
|
|
|
mlog.Debug("creating bigtable column family (if needed)", ctx)
|
2018-07-21 19:56:40 +00:00
|
|
|
err := adminClient.CreateColumnFamily(ctx, name, colFam)
|
|
|
|
if err != nil && !isErrAlreadyExists(err) {
|
2019-02-09 19:08:30 +00:00
|
|
|
return merr.Wrap(ctx, err)
|
2018-07-21 19:56:40 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Table returns the bigtable.Table instance which can be used to write/query
|
|
|
|
// the given table.
|
|
|
|
func (bt *Bigtable) Table(tableName string) *bigtable.Table {
|
|
|
|
return bt.Open(tableName)
|
|
|
|
}
|