diff --git a/mdb/bq.go b/mdb/mbigquery/bigquery.go similarity index 77% rename from mdb/bq.go rename to mdb/mbigquery/bigquery.go index 39d35ec..397294a 100644 --- a/mdb/bq.go +++ b/mdb/mbigquery/bigquery.go @@ -1,4 +1,4 @@ -package mdb +package mbigquery import ( "context" @@ -8,6 +8,7 @@ import ( "github.com/mediocregopher/mediocre-go-lib/m" "github.com/mediocregopher/mediocre-go-lib/mcfg" + "github.com/mediocregopher/mediocre-go-lib/mdb" "github.com/mediocregopher/mediocre-go-lib/mlog" "cloud.google.com/go/bigquery" @@ -16,7 +17,7 @@ import ( // TODO this file needs tests -func bqIsErrAlreadyExists(err error) bool { +func isErrAlreadyExists(err error) bool { if err == nil { return false } @@ -29,7 +30,7 @@ func bqIsErrAlreadyExists(err error) bool { // BigQuery is a wrapper around a bigquery client providing more functionality. type BigQuery struct { *bigquery.Client - gce *GCE + gce *mdb.GCE log *mlog.Logger // key is dataset/tableName @@ -38,12 +39,12 @@ type BigQuery struct { tableUploaders map[[2]string]*bigquery.Uploader } -// CfgBigQuery configures and returns a BigQuery instance which will be usable -// once Run is called on the passed in Cfg instance. -func CfgBigQuery(cfg *mcfg.Cfg) *BigQuery { +// Cfg configures and returns a BigQuery instance which will be usable once Run +// is called on the passed in Cfg instance. +func Cfg(cfg *mcfg.Cfg) *BigQuery { cfg = cfg.Child("bigquery") bq := BigQuery{ - gce: CfgGCE(cfg), + gce: mdb.CfgGCE(cfg), tables: map[[2]string]*bigquery.Table{}, tableUploaders: map[[2]string]*bigquery.Uploader{}, } @@ -90,7 +91,7 @@ func (bq *BigQuery) Table( } ds := bq.Dataset(dataset) - if err := ds.Create(ctx, nil); err != nil && !bqIsErrAlreadyExists(err) { + if err := ds.Create(ctx, nil); err != nil && !isErrAlreadyExists(err) { return nil, nil, mlog.ErrWithKV(err, bq, kv) } @@ -99,7 +100,7 @@ func (bq *BigQuery) Table( Name: tableName, Schema: schema, } - if err := table.Create(ctx, meta); err != nil && !bqIsErrAlreadyExists(err) { + if err := table.Create(ctx, meta); err != nil && !isErrAlreadyExists(err) { return nil, nil, mlog.ErrWithKV(err, bq, kv) } uploader := table.Uploader() @@ -111,23 +112,23 @@ func (bq *BigQuery) Table( //////////////////////////////////////////////////////////////////////////////// -const bqTimeFormat = "2006-01-02 15:04:05 MST" +const timeFormat = "2006-01-02 15:04:05 MST" -// BigQueryTime wraps a time.Time object and provides marshaling/unmarshaling -// for bigquery's time format. -type BigQueryTime struct { +// Time wraps a time.Time object and provides marshaling/unmarshaling for +// bigquery's time format. +type Time struct { time.Time } // MarshalText implements the encoding.TextMarshaler interface. -func (t BigQueryTime) MarshalText() ([]byte, error) { - str := t.Time.Format(bqTimeFormat) +func (t Time) MarshalText() ([]byte, error) { + str := t.Time.Format(timeFormat) return []byte(str), nil } // UnmarshalText implements the encoding.TextUnmarshaler interface. -func (t *BigQueryTime) UnmarshalText(b []byte) error { - tt, err := time.Parse(bqTimeFormat, string(b)) +func (t *Time) UnmarshalText(b []byte) error { + tt, err := time.Parse(timeFormat, string(b)) if err != nil { return err } @@ -136,7 +137,7 @@ func (t *BigQueryTime) UnmarshalText(b []byte) error { } // MarshalJSON implements the json.Marshaler interface. -func (t *BigQueryTime) MarshalJSON() ([]byte, error) { +func (t *Time) MarshalJSON() ([]byte, error) { b, err := t.MarshalText() if err != nil { return nil, err @@ -145,7 +146,7 @@ func (t *BigQueryTime) MarshalJSON() ([]byte, error) { } // UnmarshalJSON implements the json.Unmarshaler interface. -func (t *BigQueryTime) UnmarshalJSON(b []byte) error { +func (t *Time) UnmarshalJSON(b []byte) error { var str string if err := json.Unmarshal(b, &str); err != nil { return err diff --git a/mdb/ps.go b/mdb/mpubsub/pubsub.go similarity index 95% rename from mdb/ps.go rename to mdb/mpubsub/pubsub.go index 484a35b..5f6d4d1 100644 --- a/mdb/ps.go +++ b/mdb/mpubsub/pubsub.go @@ -1,4 +1,4 @@ -package mdb +package mpubsub import ( "context" @@ -9,13 +9,14 @@ import ( "cloud.google.com/go/pubsub" "github.com/mediocregopher/mediocre-go-lib/m" "github.com/mediocregopher/mediocre-go-lib/mcfg" + "github.com/mediocregopher/mediocre-go-lib/mdb" "github.com/mediocregopher/mediocre-go-lib/mlog" oldctx "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) -func psIsErrAlreadyExists(err error) bool { +func isErrAlreadyExists(err error) bool { if err == nil { return false } @@ -30,18 +31,18 @@ type Message = pubsub.Message type PubSub struct { *pubsub.Client - gce *GCE + gce *mdb.GCE log *mlog.Logger } -// CfgPubSub configures and returns a PubSub instance which will be usable once -// Run is called on the passed in Cfg instance -func CfgPubSub(cfg *mcfg.Cfg) *PubSub { +// Cfg configures and returns a PubSub instance which will be usable once Run is +// called on the passed in Cfg instance +func Cfg(cfg *mcfg.Cfg) *PubSub { cfg = cfg.Child("pubsub") var ps PubSub - ps.gce = CfgGCE(cfg) + ps.gce = mdb.CfgGCE(cfg) + ps.log = m.Log(cfg, &ps) cfg.Start.Then(func(ctx context.Context) error { - ps.log = m.Log(cfg, ps.KV()) ps.log.Info("connecting to pubsub") var err error ps.Client, err = pubsub.NewClient(ctx, ps.gce.Project, ps.gce.ClientOptions()...) @@ -72,7 +73,7 @@ func (ps *PubSub) Topic(ctx context.Context, name string, create bool) (*Topic, var err error if create { t, err = ps.Client.CreateTopic(ctx, name) - if psIsErrAlreadyExists(err) { + if isErrAlreadyExists(err) { t = ps.Client.Topic(name) } else if err != nil { return nil, mlog.ErrWithKV(err, kv) @@ -130,7 +131,7 @@ func (t *Topic) Subscription(ctx context.Context, name string, create bool) (*Su s, err = t.ps.CreateSubscription(ctx, name, pubsub.SubscriptionConfig{ Topic: t.topic, }) - if psIsErrAlreadyExists(err) { + if isErrAlreadyExists(err) { s = t.ps.Subscription(name) } else if err != nil { return nil, mlog.ErrWithKV(err, kv) diff --git a/mdb/ps_test.go b/mdb/mpubsub/pubsub_test.go similarity index 97% rename from mdb/ps_test.go rename to mdb/mpubsub/pubsub_test.go index aef3566..7867f7f 100644 --- a/mdb/ps_test.go +++ b/mdb/mpubsub/pubsub_test.go @@ -1,4 +1,4 @@ -package mdb +package mpubsub import ( "context" @@ -6,6 +6,7 @@ import ( "time" "github.com/mediocregopher/mediocre-go-lib/mcfg" + "github.com/mediocregopher/mediocre-go-lib/mdb" "github.com/mediocregopher/mediocre-go-lib/mrand" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -14,9 +15,9 @@ import ( var testPS *PubSub func init() { - DefaultGCEProject = "test" + mdb.DefaultGCEProject = "test" cfg := mcfg.New() - testPS = CfgPubSub(cfg) + testPS = Cfg(cfg) cfg.StartTestRun() }