mdb: update PubSubs's error logging
This commit is contained in:
parent
91f56cb40c
commit
789555eb3a
35
mdb/ps.go
35
mdb/ps.go
@ -15,7 +15,7 @@ import (
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func isErrAlreadyExists(err error) bool {
|
||||
func psIsErrAlreadyExists(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
@ -26,8 +26,7 @@ func isErrAlreadyExists(err error) bool {
|
||||
// Message aliases the type in the official driver
|
||||
type Message = pubsub.Message
|
||||
|
||||
// PubSub holds onto the info needed to communicate with PubSub and wraps a
|
||||
// *pubsub.Client
|
||||
// PubSub is a wrapper around a pubsub client providing more functionality.
|
||||
type PubSub struct {
|
||||
*pubsub.Client
|
||||
|
||||
@ -46,10 +45,7 @@ func CfgPubSub(cfg *mcfg.Cfg) *PubSub {
|
||||
ps.log.Info("connecting to pubsub")
|
||||
var err error
|
||||
ps.Client, err = pubsub.NewClient(ctx, ps.gce.Project, ps.gce.ClientOptions()...)
|
||||
if err != nil {
|
||||
return mlog.ErrWithKV(err, &ps)
|
||||
}
|
||||
return nil
|
||||
return mlog.ErrWithKV(err, &ps)
|
||||
})
|
||||
return &ps
|
||||
}
|
||||
@ -68,22 +64,25 @@ type Topic struct {
|
||||
|
||||
// Topic returns, after potentially creating, a topic of the given name
|
||||
func (ps *PubSub) Topic(ctx context.Context, name string, create bool) (*Topic, error) {
|
||||
kv := mlog.KVerFunc(func() mlog.KV {
|
||||
return ps.KV().Set("topicName", name)
|
||||
})
|
||||
|
||||
var t *pubsub.Topic
|
||||
var err error
|
||||
if create {
|
||||
t, err = ps.Client.CreateTopic(ctx, name)
|
||||
if isErrAlreadyExists(err) {
|
||||
if psIsErrAlreadyExists(err) {
|
||||
t = ps.Client.Topic(name)
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
return nil, mlog.ErrWithKV(err, kv)
|
||||
}
|
||||
} else {
|
||||
t = ps.Client.Topic(name)
|
||||
if exists, err := t.Exists(ctx); err != nil {
|
||||
panic(err)
|
||||
return nil, err
|
||||
return nil, mlog.ErrWithKV(err, kv)
|
||||
} else if !exists {
|
||||
return nil, mlog.ErrWithKV(errors.New("topic dne"), ps.KV().Set("topicName", name))
|
||||
return nil, mlog.ErrWithKV(errors.New("topic dne"), kv)
|
||||
}
|
||||
}
|
||||
return &Topic{
|
||||
@ -121,23 +120,27 @@ type Subscription struct {
|
||||
// for the Topic
|
||||
func (t *Topic) Subscription(ctx context.Context, name string, create bool) (*Subscription, error) {
|
||||
name = t.name + "_" + name
|
||||
kv := mlog.KVerFunc(func() mlog.KV {
|
||||
return t.KV().Set("subName", name)
|
||||
})
|
||||
|
||||
var s *pubsub.Subscription
|
||||
var err error
|
||||
if create {
|
||||
s, err = t.ps.CreateSubscription(ctx, name, pubsub.SubscriptionConfig{
|
||||
Topic: t.topic,
|
||||
})
|
||||
if isErrAlreadyExists(err) {
|
||||
if psIsErrAlreadyExists(err) {
|
||||
s = t.ps.Subscription(name)
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
return nil, mlog.ErrWithKV(err, kv)
|
||||
}
|
||||
} else {
|
||||
s = t.ps.Subscription(name)
|
||||
if exists, err := s.Exists(ctx); err != nil {
|
||||
return nil, err
|
||||
return nil, mlog.ErrWithKV(err, kv)
|
||||
} else if !exists {
|
||||
return nil, mlog.ErrWithKV(errors.New("sub dne"), t.KV().Set("subName", name))
|
||||
return nil, mlog.ErrWithKV(errors.New("sub dne"), kv)
|
||||
}
|
||||
}
|
||||
return &Subscription{
|
||||
|
Loading…
Reference in New Issue
Block a user