diff --git a/mdb/ps.go b/mdb/ps.go index cc40c9a..484a35b 100644 --- a/mdb/ps.go +++ b/mdb/ps.go @@ -15,7 +15,7 @@ import ( "google.golang.org/grpc/status" ) -func isErrAlreadyExists(err error) bool { +func psIsErrAlreadyExists(err error) bool { if err == nil { return false } @@ -26,8 +26,7 @@ func isErrAlreadyExists(err error) bool { // Message aliases the type in the official driver type Message = pubsub.Message -// PubSub holds onto the info needed to communicate with PubSub and wraps a -// *pubsub.Client +// PubSub is a wrapper around a pubsub client providing more functionality. type PubSub struct { *pubsub.Client @@ -46,10 +45,7 @@ func CfgPubSub(cfg *mcfg.Cfg) *PubSub { ps.log.Info("connecting to pubsub") var err error ps.Client, err = pubsub.NewClient(ctx, ps.gce.Project, ps.gce.ClientOptions()...) - if err != nil { - return mlog.ErrWithKV(err, &ps) - } - return nil + return mlog.ErrWithKV(err, &ps) }) return &ps } @@ -68,22 +64,25 @@ type Topic struct { // Topic returns, after potentially creating, a topic of the given name func (ps *PubSub) Topic(ctx context.Context, name string, create bool) (*Topic, error) { + kv := mlog.KVerFunc(func() mlog.KV { + return ps.KV().Set("topicName", name) + }) + var t *pubsub.Topic var err error if create { t, err = ps.Client.CreateTopic(ctx, name) - if isErrAlreadyExists(err) { + if psIsErrAlreadyExists(err) { t = ps.Client.Topic(name) } else if err != nil { - return nil, err + return nil, mlog.ErrWithKV(err, kv) } } else { t = ps.Client.Topic(name) if exists, err := t.Exists(ctx); err != nil { - panic(err) - return nil, err + return nil, mlog.ErrWithKV(err, kv) } else if !exists { - return nil, mlog.ErrWithKV(errors.New("topic dne"), ps.KV().Set("topicName", name)) + return nil, mlog.ErrWithKV(errors.New("topic dne"), kv) } } return &Topic{ @@ -121,23 +120,27 @@ type Subscription struct { // for the Topic func (t *Topic) Subscription(ctx context.Context, name string, create bool) (*Subscription, error) { name = t.name + "_" + name + kv := mlog.KVerFunc(func() mlog.KV { + return t.KV().Set("subName", name) + }) + var s *pubsub.Subscription var err error if create { s, err = t.ps.CreateSubscription(ctx, name, pubsub.SubscriptionConfig{ Topic: t.topic, }) - if isErrAlreadyExists(err) { + if psIsErrAlreadyExists(err) { s = t.ps.Subscription(name) } else if err != nil { - return nil, err + return nil, mlog.ErrWithKV(err, kv) } } else { s = t.ps.Subscription(name) if exists, err := s.Exists(ctx); err != nil { - return nil, err + return nil, mlog.ErrWithKV(err, kv) } else if !exists { - return nil, mlog.ErrWithKV(errors.New("sub dne"), t.KV().Set("subName", name)) + return nil, mlog.ErrWithKV(errors.New("sub dne"), kv) } } return &Subscription{