mdb: break each inner database into its own package
This commit is contained in:
parent
23d1a8fc91
commit
eaaf0b99cb
@ -1,4 +1,4 @@
|
|||||||
package mdb
|
package mbigquery
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -8,6 +8,7 @@ import (
|
|||||||
|
|
||||||
"github.com/mediocregopher/mediocre-go-lib/m"
|
"github.com/mediocregopher/mediocre-go-lib/m"
|
||||||
"github.com/mediocregopher/mediocre-go-lib/mcfg"
|
"github.com/mediocregopher/mediocre-go-lib/mcfg"
|
||||||
|
"github.com/mediocregopher/mediocre-go-lib/mdb"
|
||||||
"github.com/mediocregopher/mediocre-go-lib/mlog"
|
"github.com/mediocregopher/mediocre-go-lib/mlog"
|
||||||
|
|
||||||
"cloud.google.com/go/bigquery"
|
"cloud.google.com/go/bigquery"
|
||||||
@ -16,7 +17,7 @@ import (
|
|||||||
|
|
||||||
// TODO this file needs tests
|
// TODO this file needs tests
|
||||||
|
|
||||||
func bqIsErrAlreadyExists(err error) bool {
|
func isErrAlreadyExists(err error) bool {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -29,7 +30,7 @@ func bqIsErrAlreadyExists(err error) bool {
|
|||||||
// BigQuery is a wrapper around a bigquery client providing more functionality.
|
// BigQuery is a wrapper around a bigquery client providing more functionality.
|
||||||
type BigQuery struct {
|
type BigQuery struct {
|
||||||
*bigquery.Client
|
*bigquery.Client
|
||||||
gce *GCE
|
gce *mdb.GCE
|
||||||
log *mlog.Logger
|
log *mlog.Logger
|
||||||
|
|
||||||
// key is dataset/tableName
|
// key is dataset/tableName
|
||||||
@ -38,12 +39,12 @@ type BigQuery struct {
|
|||||||
tableUploaders map[[2]string]*bigquery.Uploader
|
tableUploaders map[[2]string]*bigquery.Uploader
|
||||||
}
|
}
|
||||||
|
|
||||||
// CfgBigQuery configures and returns a BigQuery instance which will be usable
|
// Cfg configures and returns a BigQuery instance which will be usable once Run
|
||||||
// once Run is called on the passed in Cfg instance.
|
// is called on the passed in Cfg instance.
|
||||||
func CfgBigQuery(cfg *mcfg.Cfg) *BigQuery {
|
func Cfg(cfg *mcfg.Cfg) *BigQuery {
|
||||||
cfg = cfg.Child("bigquery")
|
cfg = cfg.Child("bigquery")
|
||||||
bq := BigQuery{
|
bq := BigQuery{
|
||||||
gce: CfgGCE(cfg),
|
gce: mdb.CfgGCE(cfg),
|
||||||
tables: map[[2]string]*bigquery.Table{},
|
tables: map[[2]string]*bigquery.Table{},
|
||||||
tableUploaders: map[[2]string]*bigquery.Uploader{},
|
tableUploaders: map[[2]string]*bigquery.Uploader{},
|
||||||
}
|
}
|
||||||
@ -90,7 +91,7 @@ func (bq *BigQuery) Table(
|
|||||||
}
|
}
|
||||||
|
|
||||||
ds := bq.Dataset(dataset)
|
ds := bq.Dataset(dataset)
|
||||||
if err := ds.Create(ctx, nil); err != nil && !bqIsErrAlreadyExists(err) {
|
if err := ds.Create(ctx, nil); err != nil && !isErrAlreadyExists(err) {
|
||||||
return nil, nil, mlog.ErrWithKV(err, bq, kv)
|
return nil, nil, mlog.ErrWithKV(err, bq, kv)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -99,7 +100,7 @@ func (bq *BigQuery) Table(
|
|||||||
Name: tableName,
|
Name: tableName,
|
||||||
Schema: schema,
|
Schema: schema,
|
||||||
}
|
}
|
||||||
if err := table.Create(ctx, meta); err != nil && !bqIsErrAlreadyExists(err) {
|
if err := table.Create(ctx, meta); err != nil && !isErrAlreadyExists(err) {
|
||||||
return nil, nil, mlog.ErrWithKV(err, bq, kv)
|
return nil, nil, mlog.ErrWithKV(err, bq, kv)
|
||||||
}
|
}
|
||||||
uploader := table.Uploader()
|
uploader := table.Uploader()
|
||||||
@ -111,23 +112,23 @@ func (bq *BigQuery) Table(
|
|||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
const bqTimeFormat = "2006-01-02 15:04:05 MST"
|
const timeFormat = "2006-01-02 15:04:05 MST"
|
||||||
|
|
||||||
// BigQueryTime wraps a time.Time object and provides marshaling/unmarshaling
|
// Time wraps a time.Time object and provides marshaling/unmarshaling for
|
||||||
// for bigquery's time format.
|
// bigquery's time format.
|
||||||
type BigQueryTime struct {
|
type Time struct {
|
||||||
time.Time
|
time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarshalText implements the encoding.TextMarshaler interface.
|
// MarshalText implements the encoding.TextMarshaler interface.
|
||||||
func (t BigQueryTime) MarshalText() ([]byte, error) {
|
func (t Time) MarshalText() ([]byte, error) {
|
||||||
str := t.Time.Format(bqTimeFormat)
|
str := t.Time.Format(timeFormat)
|
||||||
return []byte(str), nil
|
return []byte(str), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnmarshalText implements the encoding.TextUnmarshaler interface.
|
// UnmarshalText implements the encoding.TextUnmarshaler interface.
|
||||||
func (t *BigQueryTime) UnmarshalText(b []byte) error {
|
func (t *Time) UnmarshalText(b []byte) error {
|
||||||
tt, err := time.Parse(bqTimeFormat, string(b))
|
tt, err := time.Parse(timeFormat, string(b))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -136,7 +137,7 @@ func (t *BigQueryTime) UnmarshalText(b []byte) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// MarshalJSON implements the json.Marshaler interface.
|
// MarshalJSON implements the json.Marshaler interface.
|
||||||
func (t *BigQueryTime) MarshalJSON() ([]byte, error) {
|
func (t *Time) MarshalJSON() ([]byte, error) {
|
||||||
b, err := t.MarshalText()
|
b, err := t.MarshalText()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -145,7 +146,7 @@ func (t *BigQueryTime) MarshalJSON() ([]byte, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// UnmarshalJSON implements the json.Unmarshaler interface.
|
// UnmarshalJSON implements the json.Unmarshaler interface.
|
||||||
func (t *BigQueryTime) UnmarshalJSON(b []byte) error {
|
func (t *Time) UnmarshalJSON(b []byte) error {
|
||||||
var str string
|
var str string
|
||||||
if err := json.Unmarshal(b, &str); err != nil {
|
if err := json.Unmarshal(b, &str); err != nil {
|
||||||
return err
|
return err
|
@ -1,4 +1,4 @@
|
|||||||
package mdb
|
package mpubsub
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -9,13 +9,14 @@ import (
|
|||||||
"cloud.google.com/go/pubsub"
|
"cloud.google.com/go/pubsub"
|
||||||
"github.com/mediocregopher/mediocre-go-lib/m"
|
"github.com/mediocregopher/mediocre-go-lib/m"
|
||||||
"github.com/mediocregopher/mediocre-go-lib/mcfg"
|
"github.com/mediocregopher/mediocre-go-lib/mcfg"
|
||||||
|
"github.com/mediocregopher/mediocre-go-lib/mdb"
|
||||||
"github.com/mediocregopher/mediocre-go-lib/mlog"
|
"github.com/mediocregopher/mediocre-go-lib/mlog"
|
||||||
oldctx "golang.org/x/net/context"
|
oldctx "golang.org/x/net/context"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
func psIsErrAlreadyExists(err error) bool {
|
func isErrAlreadyExists(err error) bool {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -30,18 +31,18 @@ type Message = pubsub.Message
|
|||||||
type PubSub struct {
|
type PubSub struct {
|
||||||
*pubsub.Client
|
*pubsub.Client
|
||||||
|
|
||||||
gce *GCE
|
gce *mdb.GCE
|
||||||
log *mlog.Logger
|
log *mlog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// CfgPubSub configures and returns a PubSub instance which will be usable once
|
// Cfg configures and returns a PubSub instance which will be usable once Run is
|
||||||
// Run is called on the passed in Cfg instance
|
// called on the passed in Cfg instance
|
||||||
func CfgPubSub(cfg *mcfg.Cfg) *PubSub {
|
func Cfg(cfg *mcfg.Cfg) *PubSub {
|
||||||
cfg = cfg.Child("pubsub")
|
cfg = cfg.Child("pubsub")
|
||||||
var ps PubSub
|
var ps PubSub
|
||||||
ps.gce = CfgGCE(cfg)
|
ps.gce = mdb.CfgGCE(cfg)
|
||||||
|
ps.log = m.Log(cfg, &ps)
|
||||||
cfg.Start.Then(func(ctx context.Context) error {
|
cfg.Start.Then(func(ctx context.Context) error {
|
||||||
ps.log = m.Log(cfg, ps.KV())
|
|
||||||
ps.log.Info("connecting to pubsub")
|
ps.log.Info("connecting to pubsub")
|
||||||
var err error
|
var err error
|
||||||
ps.Client, err = pubsub.NewClient(ctx, ps.gce.Project, ps.gce.ClientOptions()...)
|
ps.Client, err = pubsub.NewClient(ctx, ps.gce.Project, ps.gce.ClientOptions()...)
|
||||||
@ -72,7 +73,7 @@ func (ps *PubSub) Topic(ctx context.Context, name string, create bool) (*Topic,
|
|||||||
var err error
|
var err error
|
||||||
if create {
|
if create {
|
||||||
t, err = ps.Client.CreateTopic(ctx, name)
|
t, err = ps.Client.CreateTopic(ctx, name)
|
||||||
if psIsErrAlreadyExists(err) {
|
if isErrAlreadyExists(err) {
|
||||||
t = ps.Client.Topic(name)
|
t = ps.Client.Topic(name)
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return nil, mlog.ErrWithKV(err, kv)
|
return nil, mlog.ErrWithKV(err, kv)
|
||||||
@ -130,7 +131,7 @@ func (t *Topic) Subscription(ctx context.Context, name string, create bool) (*Su
|
|||||||
s, err = t.ps.CreateSubscription(ctx, name, pubsub.SubscriptionConfig{
|
s, err = t.ps.CreateSubscription(ctx, name, pubsub.SubscriptionConfig{
|
||||||
Topic: t.topic,
|
Topic: t.topic,
|
||||||
})
|
})
|
||||||
if psIsErrAlreadyExists(err) {
|
if isErrAlreadyExists(err) {
|
||||||
s = t.ps.Subscription(name)
|
s = t.ps.Subscription(name)
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return nil, mlog.ErrWithKV(err, kv)
|
return nil, mlog.ErrWithKV(err, kv)
|
@ -1,4 +1,4 @@
|
|||||||
package mdb
|
package mpubsub
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -6,6 +6,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/mediocregopher/mediocre-go-lib/mcfg"
|
"github.com/mediocregopher/mediocre-go-lib/mcfg"
|
||||||
|
"github.com/mediocregopher/mediocre-go-lib/mdb"
|
||||||
"github.com/mediocregopher/mediocre-go-lib/mrand"
|
"github.com/mediocregopher/mediocre-go-lib/mrand"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
@ -14,9 +15,9 @@ import (
|
|||||||
var testPS *PubSub
|
var testPS *PubSub
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
DefaultGCEProject = "test"
|
mdb.DefaultGCEProject = "test"
|
||||||
cfg := mcfg.New()
|
cfg := mcfg.New()
|
||||||
testPS = CfgPubSub(cfg)
|
testPS = Cfg(cfg)
|
||||||
cfg.StartTestRun()
|
cfg.StartTestRun()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user