You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
80 lines
1.6 KiB
80 lines
1.6 KiB
|
1 year ago
|
package natsClient
|
||
|
|
|
||
|
|
import (
|
||
|
|
"errors"
|
||
|
|
"runtime"
|
||
|
|
"server/pb"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"github.com/gogo/protobuf/proto"
|
||
|
|
"github.com/liangdas/mqant/log"
|
||
|
|
"github.com/nats-io/nats.go"
|
||
|
|
)
|
||
|
|
|
||
|
|
// CommonNatsImp 通用回调natsClient
|
||
|
|
type CommonNatsImp struct {
|
||
|
|
f func(data []byte)
|
||
|
|
}
|
||
|
|
|
||
|
|
// g 是否用协程处理
|
||
|
|
func NewCommonNatsImp(conn *nats.Conn, topic string, f func(data []byte), queue ...string) {
|
||
|
|
base := NewNatsClient()
|
||
|
|
base.Topic = topic
|
||
|
|
r := &CommonNatsImp{f: f}
|
||
|
|
base.NatsImp = r
|
||
|
|
base.Conn = conn
|
||
|
|
base.GO = true
|
||
|
|
if len(queue) > 0 {
|
||
|
|
base.Queue = queue[0]
|
||
|
|
}
|
||
|
|
go func() {
|
||
|
|
err := base.OnRequestHandle()
|
||
|
|
if err != nil {
|
||
|
|
log.Error("newClientDissconnectNatsImp OnRequestHandle error:%v", err)
|
||
|
|
}
|
||
|
|
}()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (r *CommonNatsImp) OnMsgCallBack(m *nats.Msg) {
|
||
|
|
r.f(m.Data)
|
||
|
|
}
|
||
|
|
|
||
|
|
// NewReplyNatsImp 回复监听
|
||
|
|
func NewReplyNatsImp(conn *nats.Conn, topic string) error {
|
||
|
|
defer func() {
|
||
|
|
if r := recover(); r != nil {
|
||
|
|
var rn = ""
|
||
|
|
switch r := r.(type) {
|
||
|
|
case string:
|
||
|
|
rn = r
|
||
|
|
case error:
|
||
|
|
rn = r.Error()
|
||
|
|
}
|
||
|
|
buf := make([]byte, 1024)
|
||
|
|
l := runtime.Stack(buf, false)
|
||
|
|
errstr := string(buf[:l])
|
||
|
|
log.Error("%s\n ----Stack----\n%s", rn, errstr)
|
||
|
|
}
|
||
|
|
}()
|
||
|
|
subs, err := conn.SubscribeSync(topic)
|
||
|
|
if err != nil {
|
||
|
|
log.Error("err:%v", err)
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
defer subs.Unsubscribe()
|
||
|
|
m, err := subs.NextMsg(5 * time.Second)
|
||
|
|
if err != nil {
|
||
|
|
log.Error("err:%v", err)
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
one := &pb.InnerReply{}
|
||
|
|
if err := proto.Unmarshal(m.Data, one); err != nil {
|
||
|
|
log.Error("err:%v", err)
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
if one.Code == 0 {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
return errors.New(one.Msg)
|
||
|
|
}
|