You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
79 lines
1.6 KiB
79 lines
1.6 KiB
package natsClient |
|
|
|
import ( |
|
"errors" |
|
"runtime" |
|
"server/pb" |
|
"time" |
|
|
|
"github.com/gogo/protobuf/proto" |
|
"github.com/liangdas/mqant/log" |
|
"github.com/nats-io/nats.go" |
|
) |
|
|
|
// CommonNatsImp 通用回调natsClient |
|
type CommonNatsImp struct { |
|
f func(data []byte) |
|
} |
|
|
|
// g 是否用协程处理 |
|
func NewCommonNatsImp(conn *nats.Conn, topic string, f func(data []byte), queue ...string) { |
|
base := NewNatsClient() |
|
base.Topic = topic |
|
r := &CommonNatsImp{f: f} |
|
base.NatsImp = r |
|
base.Conn = conn |
|
base.GO = true |
|
if len(queue) > 0 { |
|
base.Queue = queue[0] |
|
} |
|
go func() { |
|
err := base.OnRequestHandle() |
|
if err != nil { |
|
log.Error("newClientDissconnectNatsImp OnRequestHandle error:%v", err) |
|
} |
|
}() |
|
} |
|
|
|
func (r *CommonNatsImp) OnMsgCallBack(m *nats.Msg) { |
|
r.f(m.Data) |
|
} |
|
|
|
// NewReplyNatsImp 回复监听 |
|
func NewReplyNatsImp(conn *nats.Conn, topic string) error { |
|
defer func() { |
|
if r := recover(); r != nil { |
|
var rn = "" |
|
switch r := r.(type) { |
|
case string: |
|
rn = r |
|
case error: |
|
rn = r.Error() |
|
} |
|
buf := make([]byte, 1024) |
|
l := runtime.Stack(buf, false) |
|
errstr := string(buf[:l]) |
|
log.Error("%s\n ----Stack----\n%s", rn, errstr) |
|
} |
|
}() |
|
subs, err := conn.SubscribeSync(topic) |
|
if err != nil { |
|
log.Error("err:%v", err) |
|
return err |
|
} |
|
defer subs.Unsubscribe() |
|
m, err := subs.NextMsg(5 * time.Second) |
|
if err != nil { |
|
log.Error("err:%v", err) |
|
return err |
|
} |
|
one := &pb.InnerReply{} |
|
if err := proto.Unmarshal(m.Data, one); err != nil { |
|
log.Error("err:%v", err) |
|
return err |
|
} |
|
if one.Code == 0 { |
|
return nil |
|
} |
|
return errors.New(one.Msg) |
|
}
|
|
|