印度包网
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

80 lines
1.6 KiB

1 year ago
package natsClient
import (
"errors"
"runtime"
"server/pb"
"time"
"github.com/gogo/protobuf/proto"
"github.com/liangdas/mqant/log"
"github.com/nats-io/nats.go"
)
// CommonNatsImp 通用回调natsClient
type CommonNatsImp struct {
f func(data []byte)
}
// g 是否用协程处理
func NewCommonNatsImp(conn *nats.Conn, topic string, f func(data []byte), queue ...string) {
base := NewNatsClient()
base.Topic = topic
r := &CommonNatsImp{f: f}
base.NatsImp = r
base.Conn = conn
base.GO = true
if len(queue) > 0 {
base.Queue = queue[0]
}
go func() {
err := base.OnRequestHandle()
if err != nil {
log.Error("newClientDissconnectNatsImp OnRequestHandle error:%v", err)
}
}()
}
func (r *CommonNatsImp) OnMsgCallBack(m *nats.Msg) {
r.f(m.Data)
}
// NewReplyNatsImp 回复监听
func NewReplyNatsImp(conn *nats.Conn, topic string) error {
defer func() {
if r := recover(); r != nil {
var rn = ""
switch r := r.(type) {
case string:
rn = r
case error:
rn = r.Error()
}
buf := make([]byte, 1024)
l := runtime.Stack(buf, false)
errstr := string(buf[:l])
log.Error("%s\n ----Stack----\n%s", rn, errstr)
}
}()
subs, err := conn.SubscribeSync(topic)
if err != nil {
log.Error("err:%v", err)
return err
}
defer subs.Unsubscribe()
m, err := subs.NextMsg(5 * time.Second)
if err != nil {
log.Error("err:%v", err)
return err
}
one := &pb.InnerReply{}
if err := proto.Unmarshal(m.Data, one); err != nil {
log.Error("err:%v", err)
return err
}
if one.Code == 0 {
return nil
}
return errors.New(one.Msg)
}