mdb: add GCE type, refactor PubSub configuration a bit
This commit is contained in:
parent
6c17eaa62f
commit
91f56cb40c
51
mdb/mdb.go
51
mdb/mdb.go
@ -1,3 +1,54 @@
|
||||
// Package mdb contains a number of database wrappers for databases I commonly
|
||||
// use
|
||||
package mdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/mediocregopher/mediocre-go-lib/mcfg"
|
||||
"github.com/mediocregopher/mediocre-go-lib/mlog"
|
||||
"google.golang.org/api/option"
|
||||
)
|
||||
|
||||
// DefaultGCEProject can be set before any of the Cfg* functions are called for
|
||||
// GCE services, and will be used as the default value for the "project"
|
||||
// configuration parameter for each service.
|
||||
var DefaultGCEProject string
|
||||
|
||||
// GCE wraps configuration parameters commonly used for interacting with GCE
|
||||
// services.
|
||||
type GCE struct {
|
||||
Project string
|
||||
CredFile string
|
||||
}
|
||||
|
||||
// CfgGCE configures and returns a GCE instance which will be usable once Run is
|
||||
// called on the passed in Cfg instance.
|
||||
func CfgGCE(cfg *mcfg.Cfg) *GCE {
|
||||
proj := cfg.ParamString("project", DefaultGCEProject, "name of GCE project")
|
||||
credFile := cfg.ParamString("cred-file", "", "path to GCE credientials json file, if any")
|
||||
var gce GCE
|
||||
cfg.Start.Then(func(ctx context.Context) error {
|
||||
gce.Project = *proj
|
||||
gce.CredFile = *credFile
|
||||
return nil
|
||||
})
|
||||
return &gce
|
||||
}
|
||||
|
||||
// ClientOptions generates and returns the ClientOption instances which can be
|
||||
// passed into most GCE client drivers.
|
||||
func (gce *GCE) ClientOptions() []option.ClientOption {
|
||||
var opts []option.ClientOption
|
||||
if gce.CredFile != "" {
|
||||
opts = append(opts, option.WithCredentialsFile(gce.CredFile))
|
||||
}
|
||||
return opts
|
||||
}
|
||||
|
||||
// KV implements the mlog.KVer interface
|
||||
func (gce *GCE) KV() mlog.KV {
|
||||
return mlog.KV{
|
||||
"project": gce.Project,
|
||||
}
|
||||
}
|
||||
|
59
mdb/ps.go
59
mdb/ps.go
@ -11,7 +11,6 @@ import (
|
||||
"github.com/mediocregopher/mediocre-go-lib/mcfg"
|
||||
"github.com/mediocregopher/mediocre-go-lib/mlog"
|
||||
oldctx "golang.org/x/net/context"
|
||||
"google.golang.org/api/option"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
@ -30,68 +29,34 @@ type Message = pubsub.Message
|
||||
// PubSub holds onto the info needed to communicate with PubSub and wraps a
|
||||
// *pubsub.Client
|
||||
type PubSub struct {
|
||||
proj, credFile string
|
||||
*pubsub.Client
|
||||
|
||||
gce *GCE
|
||||
log *mlog.Logger
|
||||
}
|
||||
|
||||
// NewPubSub initializes and returns PubSub instance for the given projectID and
|
||||
// using the credentials file (if given)
|
||||
func NewPubSub(ctx context.Context, projectID, credFile string) (*PubSub, error) {
|
||||
ps := &PubSub{
|
||||
proj: projectID,
|
||||
credFile: credFile,
|
||||
}
|
||||
ps.log = mlog.DefaultLogger.WithKV(ps.KV())
|
||||
var err error
|
||||
ps.Client, err = pubsub.NewClient(ctx, ps.proj, ps.clientOpts()...)
|
||||
return ps, mlog.ErrWithKV(err, ps)
|
||||
}
|
||||
|
||||
// CfgPubSub configures and returns a PubSub instance which will be usable once
|
||||
// Run is called on the passed in Cfg instance
|
||||
//
|
||||
// defaultProject is optional and can be given to indcate the default gce
|
||||
// project id for the configuration parameter which is created.
|
||||
func CfgPubSub(cfg *mcfg.Cfg, defaultProject string) *PubSub {
|
||||
func CfgPubSub(cfg *mcfg.Cfg) *PubSub {
|
||||
cfg = cfg.Child("pubsub")
|
||||
proj := cfg.ParamString("project", defaultProject, "name of the project in gce")
|
||||
credFile := cfg.ParamString("cred-file", "", "path to pubsub credientials json file, if any")
|
||||
|
||||
var ps PubSub
|
||||
ps.gce = CfgGCE(cfg)
|
||||
cfg.Start.Then(func(ctx context.Context) error {
|
||||
ps.proj = *proj
|
||||
ps.credFile = *credFile
|
||||
log := m.Log(cfg, ps.KV())
|
||||
log.Info("connecting to pubsub")
|
||||
psInner, err := NewPubSub(ctx, *proj, *credFile)
|
||||
ps.log = m.Log(cfg, ps.KV())
|
||||
ps.log.Info("connecting to pubsub")
|
||||
var err error
|
||||
ps.Client, err = pubsub.NewClient(ctx, ps.gce.Project, ps.gce.ClientOptions()...)
|
||||
if err != nil {
|
||||
return mlog.ErrWithKV(err, &ps)
|
||||
}
|
||||
ps = *psInner
|
||||
ps.log = log
|
||||
return nil
|
||||
})
|
||||
return &ps
|
||||
}
|
||||
|
||||
func (ps *PubSub) clientOpts() []option.ClientOption {
|
||||
var opts []option.ClientOption
|
||||
//opts := []option.ClientOption{
|
||||
// option.WithGRPCConnectionPool(ps.NumConns),
|
||||
//}
|
||||
if ps.credFile != "" {
|
||||
opts = append(opts, option.WithCredentialsFile(ps.credFile))
|
||||
}
|
||||
return opts
|
||||
}
|
||||
|
||||
// KV implements the mlog.KVer interface
|
||||
func (ps *PubSub) KV() mlog.KV {
|
||||
return mlog.KV{
|
||||
"psProj": ps.proj,
|
||||
}
|
||||
return ps.gce.KV()
|
||||
}
|
||||
|
||||
// Topic provides methods around a particular topic in PubSub
|
||||
@ -130,9 +95,7 @@ func (ps *PubSub) Topic(ctx context.Context, name string, create bool) (*Topic,
|
||||
|
||||
// KV implements the mlog.KVer interface
|
||||
func (t *Topic) KV() mlog.KV {
|
||||
kv := t.ps.KV()
|
||||
kv["topicName"] = t.name
|
||||
return kv
|
||||
return t.ps.KV().Set("topicName", t.name)
|
||||
}
|
||||
|
||||
// Publish publishes a message with the given data as its body to the Topic
|
||||
@ -186,9 +149,7 @@ func (t *Topic) Subscription(ctx context.Context, name string, create bool) (*Su
|
||||
|
||||
// KV implements the mlog.KVer interface
|
||||
func (s *Subscription) KV() mlog.KV {
|
||||
kv := s.topic.KV()
|
||||
kv["subName"] = s.name
|
||||
return kv
|
||||
return s.topic.KV().Set("subName", s.name)
|
||||
}
|
||||
|
||||
// ConsumerFunc is a function which messages being consumed will be passed. The
|
||||
|
@ -14,8 +14,9 @@ import (
|
||||
var testPS *PubSub
|
||||
|
||||
func init() {
|
||||
DefaultGCEProject = "test"
|
||||
cfg := mcfg.New()
|
||||
testPS = CfgPubSub(cfg, "test")
|
||||
testPS = CfgPubSub(cfg)
|
||||
cfg.StartTestRun()
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user