mdb/mbigtable: refactor to use new mctx/mlog/merr stuff
This commit is contained in:
parent
a5446b0833
commit
40794d83b8
@ -7,10 +7,12 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"cloud.google.com/go/bigtable"
|
"cloud.google.com/go/bigtable"
|
||||||
"github.com/mediocregopher/mediocre-go-lib/m"
|
|
||||||
"github.com/mediocregopher/mediocre-go-lib/mcfg"
|
"github.com/mediocregopher/mediocre-go-lib/mcfg"
|
||||||
|
"github.com/mediocregopher/mediocre-go-lib/mctx"
|
||||||
"github.com/mediocregopher/mediocre-go-lib/mdb"
|
"github.com/mediocregopher/mediocre-go-lib/mdb"
|
||||||
|
"github.com/mediocregopher/mediocre-go-lib/merr"
|
||||||
"github.com/mediocregopher/mediocre-go-lib/mlog"
|
"github.com/mediocregopher/mediocre-go-lib/mlog"
|
||||||
|
"github.com/mediocregopher/mediocre-go-lib/mrun"
|
||||||
)
|
)
|
||||||
|
|
||||||
func isErrAlreadyExists(err error) bool {
|
func isErrAlreadyExists(err error) bool {
|
||||||
@ -29,44 +31,60 @@ type Bigtable struct {
|
|||||||
log *mlog.Logger
|
log *mlog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cfg configurs and returns a Bigtable instance which will be usable once
|
// MNew returns a Bigtable instance which will be initialized and configured
|
||||||
// StartRun is called on the passed in Cfg instance.
|
// when the start event is triggered on ctx (see mrun.Start). The Bigtable
|
||||||
|
// instance will have Close called on it when the stop event is triggered on ctx
|
||||||
|
// (see mrun.Stop).
|
||||||
|
//
|
||||||
|
// gce is optional and can be passed in if there's an existing gce object which
|
||||||
|
// should be used, otherwise a new one will be created with mdb.MGCE.
|
||||||
//
|
//
|
||||||
// defaultInstance can be given as the instance name to use as the default
|
// defaultInstance can be given as the instance name to use as the default
|
||||||
// parameter value. If empty the parameter will be required to be set.
|
// parameter value. If empty the parameter will be required to be set.
|
||||||
func Cfg(cfg *mcfg.Cfg, defaultInstance string) *Bigtable {
|
func MNew(ctx mctx.Context, gce *mdb.GCE, defaultInstance string) *Bigtable {
|
||||||
cfg = cfg.Child("bigtable")
|
if gce == nil {
|
||||||
var bt Bigtable
|
gce = mdb.MGCE(ctx, "")
|
||||||
bt.gce = mdb.CfgGCE(cfg)
|
}
|
||||||
bt.log = m.Log(cfg, &bt)
|
|
||||||
|
ctx = mctx.ChildOf(ctx, "bigtable")
|
||||||
|
bt := &Bigtable{
|
||||||
|
gce: gce,
|
||||||
|
log: mlog.From(ctx),
|
||||||
|
}
|
||||||
|
bt.log.SetKV(bt)
|
||||||
|
|
||||||
var inst *string
|
var inst *string
|
||||||
{
|
{
|
||||||
name, descr := "instance", "name of the bigtable instance in the project to connect to"
|
const name, descr = "instance", "name of the bigtable instance in the project to connect to"
|
||||||
if defaultInstance != "" {
|
if defaultInstance != "" {
|
||||||
inst = cfg.ParamString(name, defaultInstance, descr)
|
inst = mcfg.String(ctx, name, defaultInstance, descr)
|
||||||
} else {
|
} else {
|
||||||
inst = cfg.ParamRequiredString(name, descr)
|
inst = mcfg.RequiredString(ctx, name, descr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg.Start.Then(func(ctx context.Context) error {
|
mrun.OnStart(ctx, func(innerCtx mctx.Context) error {
|
||||||
bt.Instance = *inst
|
bt.Instance = *inst
|
||||||
bt.log.Info("connecting to bigtable")
|
bt.log.Info("connecting to bigtable")
|
||||||
var err error
|
var err error
|
||||||
bt.Client, err = bigtable.NewClient(
|
bt.Client, err = bigtable.NewClient(
|
||||||
ctx,
|
innerCtx,
|
||||||
bt.gce.Project, bt.Instance,
|
bt.gce.Project, bt.Instance,
|
||||||
bt.gce.ClientOptions()...,
|
bt.gce.ClientOptions()...,
|
||||||
)
|
)
|
||||||
return mlog.ErrWithKV(err, &bt)
|
return merr.WithKV(err, bt.KV())
|
||||||
})
|
})
|
||||||
return &bt
|
mrun.OnStop(ctx, func(mctx.Context) error {
|
||||||
|
return bt.Client.Close()
|
||||||
|
})
|
||||||
|
return bt
|
||||||
}
|
}
|
||||||
|
|
||||||
// KV implements the mlog.KVer interface.
|
// KV implements the mlog.KVer interface.
|
||||||
func (bt *Bigtable) KV() mlog.KV {
|
func (bt *Bigtable) KV() map[string]interface{} {
|
||||||
return bt.gce.KV().Set("instance", bt.Instance)
|
kv := bt.gce.KV()
|
||||||
|
kv["bigtableInstance"] = bt.Instance
|
||||||
|
return kv
|
||||||
}
|
}
|
||||||
|
|
||||||
// EnsureTable ensures that the given table exists and has (at least) the given
|
// EnsureTable ensures that the given table exists and has (at least) the given
|
||||||
@ -74,20 +92,20 @@ func (bt *Bigtable) KV() mlog.KV {
|
|||||||
//
|
//
|
||||||
// This method requires admin privileges on the bigtable instance.
|
// This method requires admin privileges on the bigtable instance.
|
||||||
func (bt *Bigtable) EnsureTable(ctx context.Context, name string, colFams ...string) error {
|
func (bt *Bigtable) EnsureTable(ctx context.Context, name string, colFams ...string) error {
|
||||||
kv := bt.KV().Set("table", name)
|
kv := mlog.KV{"bigtableTable": name}
|
||||||
bt.log.Info("ensuring table", kv)
|
bt.log.Info("ensuring table", kv)
|
||||||
|
|
||||||
bt.log.Debug("creating admin client", kv)
|
bt.log.Debug("creating admin client", kv)
|
||||||
adminClient, err := bigtable.NewAdminClient(ctx, bt.gce.Project, bt.Instance)
|
adminClient, err := bigtable.NewAdminClient(ctx, bt.gce.Project, bt.Instance)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return mlog.ErrWithKV(err, kv)
|
return merr.WithKV(err, bt.KV(), kv.KV())
|
||||||
}
|
}
|
||||||
defer adminClient.Close()
|
defer adminClient.Close()
|
||||||
|
|
||||||
bt.log.Debug("creating bigtable table (if needed)", kv)
|
bt.log.Debug("creating bigtable table (if needed)", kv)
|
||||||
err = adminClient.CreateTable(ctx, name)
|
err = adminClient.CreateTable(ctx, name)
|
||||||
if err != nil && !isErrAlreadyExists(err) {
|
if err != nil && !isErrAlreadyExists(err) {
|
||||||
return mlog.ErrWithKV(err, kv)
|
return merr.WithKV(err, bt.KV(), kv.KV())
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, colFam := range colFams {
|
for _, colFam := range colFams {
|
||||||
@ -95,7 +113,7 @@ func (bt *Bigtable) EnsureTable(ctx context.Context, name string, colFams ...str
|
|||||||
bt.log.Debug("creating bigtable column family (if needed)", kv)
|
bt.log.Debug("creating bigtable column family (if needed)", kv)
|
||||||
err := adminClient.CreateColumnFamily(ctx, name, colFam)
|
err := adminClient.CreateColumnFamily(ctx, name, colFam)
|
||||||
if err != nil && !isErrAlreadyExists(err) {
|
if err != nil && !isErrAlreadyExists(err) {
|
||||||
return mlog.ErrWithKV(err, kv)
|
return merr.WithKV(err, bt.KV(), kv.KV())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,52 +1,44 @@
|
|||||||
package mbigtable
|
package mbigtable
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
. "testing"
|
. "testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"cloud.google.com/go/bigtable"
|
"cloud.google.com/go/bigtable"
|
||||||
"github.com/mediocregopher/mediocre-go-lib/mcfg"
|
|
||||||
"github.com/mediocregopher/mediocre-go-lib/mdb"
|
|
||||||
"github.com/mediocregopher/mediocre-go-lib/mrand"
|
"github.com/mediocregopher/mediocre-go-lib/mrand"
|
||||||
|
"github.com/mediocregopher/mediocre-go-lib/mtest"
|
||||||
"github.com/mediocregopher/mediocre-go-lib/mtest/massert"
|
"github.com/mediocregopher/mediocre-go-lib/mtest/massert"
|
||||||
)
|
)
|
||||||
|
|
||||||
var testBT *Bigtable
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
mdb.DefaultGCEProject = "test"
|
|
||||||
cfg := mcfg.New()
|
|
||||||
testBT = Cfg(cfg, "test")
|
|
||||||
cfg.StartTestRun()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBasic(t *T) {
|
func TestBasic(t *T) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
ctx := mtest.NewCtx()
|
||||||
defer cancel()
|
mtest.SetEnv(ctx, "GCE_PROJECT", "testProject")
|
||||||
|
bt := MNew(ctx, nil, "testInstance")
|
||||||
|
|
||||||
tableName := "test-" + mrand.Hex(8)
|
mtest.Run(ctx, t, func() {
|
||||||
colFam := "colFam-" + mrand.Hex(8)
|
tableName := "test-" + mrand.Hex(8)
|
||||||
if err := testBT.EnsureTable(ctx, tableName, colFam); err != nil {
|
colFam := "colFam-" + mrand.Hex(8)
|
||||||
t.Fatal(err)
|
if err := bt.EnsureTable(ctx, tableName, colFam); err != nil {
|
||||||
}
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
table := testBT.Table(tableName)
|
table := bt.Table(tableName)
|
||||||
row := "row-" + mrand.Hex(8)
|
row := "row-" + mrand.Hex(8)
|
||||||
mut := bigtable.NewMutation()
|
mut := bigtable.NewMutation()
|
||||||
mut.Set(colFam, "col", bigtable.Time(time.Now()), []byte("bar"))
|
mut.Set(colFam, "col", bigtable.Time(time.Now()), []byte("bar"))
|
||||||
if err := table.Apply(ctx, row, mut); err != nil {
|
if err := table.Apply(ctx, row, mut); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
readRow, err := table.ReadRow(ctx, row)
|
readRow, err := table.ReadRow(ctx, row)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
readColFam := readRow[colFam]
|
readColFam := readRow[colFam]
|
||||||
massert.Fatal(t, massert.All(
|
massert.Fatal(t, massert.All(
|
||||||
massert.Len(readColFam, 1),
|
massert.Len(readColFam, 1),
|
||||||
massert.Equal(colFam+":col", readColFam[0].Column),
|
massert.Equal(colFam+":col", readColFam[0].Column),
|
||||||
massert.Equal([]byte("bar"), readColFam[0].Value),
|
massert.Equal([]byte("bar"), readColFam[0].Value),
|
||||||
))
|
))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user