印度包网
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

1058 lines
31 KiB

package call
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"server/common"
"server/config"
"server/db"
"server/natsClient"
"server/pb"
"server/util"
"strconv"
"strings"
"sync"
"time"
"github.com/gogo/protobuf/proto"
"github.com/liangdas/mqant/log"
timewheel "github.com/liangdas/mqant/module/modules/timer"
"github.com/mitchellh/mapstructure"
"github.com/nats-io/nats.go"
"github.com/olivere/elastic/v7"
)
// 初始化预警监听
var (
warnMap map[int]*SysWarn
warnLock sync.RWMutex
)
func InitWarn(conn *nats.Conn) {
warnMap = map[int]*SysWarn{}
natsClient.NewCommonNatsImp(conn, natsClient.TopicInnerUpdateWarn, func(data []byte) {
updateWarn(data)
})
if err := initSysWarns(); err != nil {
log.Error("err:%v", err)
}
}
func initSysWarns() error {
all := []SysWarn{}
_, err := db.ES().QueryList(common.ESIndexBackWarn, 0, 5000, nil, &all)
if err != nil {
return err
}
warnLock.Lock()
defer warnLock.Unlock()
for _, v := range all {
sysWarn, err := NewSysWarn(v.Channel, v.Type, v.Interval, v.Condition, v.WarnWay, v.WarnMember, v.WarnPhone, v.OtherPhone)
if err != nil {
log.Error("error:%v", err)
continue
}
id := sysWarn.SysWarnInf.ID()
if _, ok := warnMap[id]; ok {
log.Error("重复添加预警:%v", id)
continue
}
warnMap[id] = sysWarn
sysWarn.Start()
}
return nil
}
func AddSysWarn(channel []int, t, interval int, condition map[string]interface{}, way int, member []int, phone []string, otherPhone []string) error {
sysWarn, err := NewSysWarn(channel, t, interval, condition, way, member, phone, otherPhone)
if err != nil {
return err
}
id := sysWarn.SysWarnInf.ID()
warnLock.Lock()
defer warnLock.Unlock()
if _, ok := warnMap[id]; ok {
return errors.New("重复添加预警")
}
// ok
err = db.ES().InsertToESByID(common.ESIndexBackWarn, strconv.Itoa(id), sysWarn)
if err != nil {
return err
}
warnMap[id] = sysWarn
sysWarn.Start()
return nil
}
func DelSysWarn(mid string) error {
id, err := strconv.Atoi(mid)
if err != nil {
log.Error("error:%v", err)
return err
}
warnLock.Lock()
defer warnLock.Unlock()
warn, ok := warnMap[id]
if !ok {
return errors.New("预警id不存在")
}
// ok
err = db.ES().DeleteByID(common.ESIndexBackWarn, mid)
if err != nil {
log.Error("error:%v", err)
return err
}
warn.Stop()
delete(warnMap, id)
return nil
}
func updateWarn(data []byte) {
d := &pb.InnerUpdateWarn{}
err := proto.Unmarshal(data, d)
if err != nil {
log.Error("err:%v", err)
return
}
log.Debug("InnerUpdateWarn:%+v", *d)
warnLock.RLock()
warn, ok := warnMap[int(d.Type)]
warnLock.RUnlock()
if !ok {
return
}
switch d.Type {
case 1: // 更新
warn.Update(d.Amount)
case 2: // 确认
warn.Check(d.Amount)
}
}
// 预警类型
const (
WarnTypeMin = iota
WarnTypeRecharge // 充值预警
WarnTypeWithdraw // 退出预警
WarnTypeError // 游戏报错预警
WarnTypeData // 数据异常
WarnTypeGold // 货币发放预警
WarnTypeWithdrawGold // 退出存量预警
WarnTypeExamineWithdraw // 退出审核预警
WarnTypeActivity // 活动预警
WarnTypeBroadcast // 广播预警
WarnTypeShare // 分享预警
WarnTypeOnline // 在线人数涨跌预警
WarnTypeRechargeOrder // 充值订单预警
WarnTypeProfit // 游戏场次利润预警
WarnTypePay // 支付预警
WarnTypeWithdrawPer // 退出成功率预警
WarnTypeAll
)
// 预警方式
const (
WarnWayMin = iota
WarnWayPhone
WarnWayAll
)
// 预警系统一些常量
const (
WarnCheckInterval = 10 * time.Minute
)
func PublishWarn(t, opt int, data []int64) {
Publish(natsClient.TopicInnerUpdateWarn, &pb.InnerUpdateWarn{Type: uint32(t), Opt: uint32(opt), Amount: data})
}
// SysWarn 后台预警
// ID
// Channel 生效的渠道
// Type 预警类型
// Condition 预警条件
// SysWarnInf 预警子类实现
// WarnWay 预警方式 1手机短信
// WarnMember 预警人员id
// Time 创建时间
// WarnPhone 预警人员的手机号
// Interval 提醒间隔单位分钟
// LastWarn 上次预警时间
type SysWarn struct {
ID string `json:"ID" Redis:"ID"`
Channel []int `json:"Channel" Redis:"Channel"`
Type int `json:"Type" Redis:"Type"`
Condition map[string]interface{} `json:"Condition" Redis:"Condition"`
SysWarnInf SysWarnInf `json:"-" Redis:"SysWarnInf"`
WarnWay int `json:"WarnWay" Redis:"WarnWay"`
WarnMember []int `json:"WarnMember" Redis:"WarnMember"`
Time int64 `json:"Time" Redis:"Time"`
WarnPhone []string `json:"WarnPhone" Redis:"WarnPhone"`
OtherPhone []string `json:"OtherPhone" Redis:"OtherPhone"`
Interval int `json:"Interval" Redis:"Interval"`
LastWarn int64 `json:"LastWarn" Redis:"LastWarn"`
ShouldStart bool `json:"-" Redis:"ShouldStart"`
}
// NewSysWarn 新增预警
func NewSysWarn(channel []int, t, interval int, condition map[string]interface{}, way int, member []int, phone []string, otherPhone []string) (*SysWarn, error) {
if len(channel) == 0 {
return nil, errors.New("生效渠道不能为空")
}
if t <= WarnTypeMin || t >= WarnTypeAll {
return nil, errors.New("预警类型不合法")
}
if way <= WarnWayMin || way >= WarnWayAll {
return nil, errors.New("预警方式不合法")
}
/*if len(member) == 0 || len(phone) == 0 {
return nil, errors.New("预警人员不能为空")
}*/
if interval < 1 {
return nil, errors.New("预警时间间隔不合法")
}
s := &SysWarn{Channel: channel, Type: t, Condition: condition, WarnWay: way, WarnMember: member, WarnPhone: phone, Interval: interval, OtherPhone: otherPhone}
var sInf SysWarnInf
switch t {
case WarnTypeRecharge:
sInf = new(SysWarnRecharge)
if err := sInf.Init(s); err != nil {
log.Error("err:%v", err)
return nil, err
}
case WarnTypeWithdraw:
sInf = new(SysWarnWithdraw)
if err := sInf.Init(s); err != nil {
log.Error("err:%v", err)
return nil, err
}
case WarnTypeError:
err := errors.New("当前种类预警未实现")
return nil, err
case WarnTypeData:
err := errors.New("当前种类预警未实现")
return nil, err
case WarnTypeWithdrawGold:
s.ShouldStart = true
sInf = new(SysWarnWithdrawStorage)
if err := sInf.Init(s); err != nil {
log.Error("err:%v", err)
return nil, err
}
case WarnTypeExamineWithdraw:
s.ShouldStart = true
sInf = new(SysWarnWithdrawExamine)
if err := sInf.Init(s); err != nil {
log.Error("err:%v", err)
return nil, err
}
case WarnTypeActivity:
s.ShouldStart = true
sInf = new(SysWarnActivity)
if err := sInf.Init(s); err != nil {
log.Error("err:%v", err)
return nil, err
}
case WarnTypeOnline:
s.ShouldStart = true
sInf = new(SysWarnOnline)
if err := sInf.Init(s); err != nil {
log.Error("err:%v", err)
return nil, err
}
case WarnTypeRechargeOrder:
s.ShouldStart = true
sInf = new(SysWarnRechargeOrder)
if err := sInf.Init(s); err != nil {
log.Error("err:%v", err)
return nil, err
}
case WarnTypePay:
s.ShouldStart = true
sInf = new(SysWarnPay)
if err := sInf.Init(s); err != nil {
log.Error("err:%v", err)
return nil, err
}
case WarnTypeWithdrawPer:
s.ShouldStart = true
sInf = new(SysWarnTypeWithdrawPer)
if err := sInf.Init(s); err != nil {
log.Error("err:%v", err)
return nil, err
}
case WarnTypeBroadcast:
err := errors.New("当前种类预警未实现")
return nil, err
case WarnTypeShare:
err := errors.New("当前种类预警未实现")
return nil, err
default:
err := errors.New("未知预警种类")
return nil, err
}
s.SysWarnInf = sInf
s.Time = time.Now().Unix()
return s, nil
}
func (s *SysWarn) Warn(content string) {
if len(s.WarnPhone) == 0 && len(s.OtherPhone) == 0 {
log.Error("warn:%+v phone invalid", *s)
return
}
if time.Now().Unix()-s.LastWarn < int64(WarnCheckInterval.Seconds()) {
log.Error("预警:%+v过于频繁", *s)
return
}
res := make(map[string]string)
res["userId"] = config.GetBase().Warn.ID
res["account"] = config.GetBase().Warn.Account
res["password"] = config.GetBase().Warn.Password
allPhones := ""
for _, v := range util.RemoveDuplication(append(s.WarnPhone, s.OtherPhone...)) {
allPhones += v + ","
}
if len(allPhones) < 2 {
return
}
log.Info("需要通知的手机号码 : [%s]", allPhones)
res["mobile"] = allPhones[:len(allPhones)-1]
res["content"] = config.GetBase().Warn.Sign + content
res["sendTime"] = ""
res["action"] = config.GetBase().Warn.Action
res["checkcontent"] = "0"
bytesData, err := json.Marshal(res)
if err != nil {
log.Error(err.Error())
return
}
req, err := http.NewRequest("POST", config.GetBase().Warn.URL, bytes.NewReader(bytesData))
if err != nil {
log.Error("err:%v", err)
return
}
req.Header.Set("Content-Type", "application/json;charset=UTF-8")
client := &http.Client{
Timeout: 5 * time.Second,
}
log.Debug("Warn req:%+v", req)
resp, err := client.Do(req)
if err != nil {
log.Error("Warn post err:%v", err)
return
}
s.LastWarn = time.Now().Unix()
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
log.Debug("Warn response Body%v:", string(body))
}
func (s *SysWarn) Start() {
if !s.ShouldStart {
return
}
timewheel.GetTimeWheel().AddTimerCustom(time.Duration(s.Interval*60)*time.Second, getTimeKey(s.SysWarnInf.ID()), nil, func(arge interface{}) {
s.SysWarnInf.Start()
s.Start()
})
}
func (s *SysWarn) Stop() {
timewheel.GetTimeWheel().RemoveTimer(getTimeKey(s.SysWarnInf.ID()))
}
func (s *SysWarn) Check(data []int64) {
if !util.SliceContain(s.Channel, int(data[0])) {
return
}
s.SysWarnInf.Check(data)
}
func (s *SysWarn) Update(data []int64) {
if !util.SliceContain(s.Channel, int(data[0])) {
return
}
s.SysWarnInf.Update(data)
}
type SysWarnInf interface {
Update([]int64) // 更新预警数据
Check([]int64) // 检查是否触发预警 第一个参数必须为渠道id
Start() // 开始预警监控
Init(*SysWarn) error
ID() int // 获取子预警id
}
func getTimeKey(id int) string {
return fmt.Sprintf("warnKey:%v", id)
}
// =========================================================================
// SysWarnRecharge 充值预警实现
// Amount 异常次数
// CurrentAmount 当前异常次数
type SysWarnRecharge struct {
S *SysWarn
Amount int `json:"Amount" Redis:"Amount"`
CurrentAmount int `json:"CurrentAmount" Redis:"CurrentAmount"`
}
func (s *SysWarnRecharge) Init(sys *SysWarn) error {
if err := mapstructure.Decode(sys.Condition, s); err != nil {
log.Error("err:%v", err)
return errors.New("解析出错")
}
if s.Amount <= 0 {
return errors.New("异常次数不合法")
}
s.S = sys
return nil
}
func (s *SysWarnRecharge) Start() {}
func (s *SysWarnRecharge) Update(data []int64) {
s.CurrentAmount += int(data[1])
if s.CurrentAmount >= s.Amount {
s.S.Warn(fmt.Sprintf("充值异常达到%v次", s.CurrentAmount))
s.CurrentAmount = 0
}
}
func (s *SysWarnRecharge) Check(data []int64) {}
func (s *SysWarnRecharge) ID() int {
return s.S.Type
}
// =========================================================================
// SysWarnWithdraw 退出预警实现
// MaxWithdraw 最大退出次数
// WithdrawLimit 单人当日退出额度
type SysWarnWithdraw struct {
S *SysWarn
MaxWithdraw int64 `json:"MaxWithdraw" Redis:"MaxWithdraw"`
WithdrawLimit int64 `json:"WithdrawLimit" Redis:"WithdrawLimit"`
}
func (s *SysWarnWithdraw) Init(sys *SysWarn) error {
if err := mapstructure.Decode(sys.Condition, s); err != nil {
log.Error("err:%v", err)
return errors.New("解析出错")
}
if s.MaxWithdraw <= 0 {
return errors.New("最大退出次数不合法")
}
if s.WithdrawLimit <= 0 {
return errors.New("单人当日退出额度不合法")
}
s.S = sys
return nil
}
func (s *SysWarnWithdraw) Start() {}
func (s *SysWarnWithdraw) Update(data []int64) {}
// 约定第二个参数为uid
func (s *SysWarnWithdraw) Check(data []int64) {
info := new(common.RechargeInfo)
info.UID = int(data[1])
if err := db.Mysql().Get(info); err != nil {
log.Error("err:%v", err)
return
}
if int64(info.WithdrawCount) > s.MaxWithdraw {
s.S.Warn(fmt.Sprintf("玩家%v已退出%v次", info.UID, info.WithdrawCount))
}
if info.TotalWithdraw > s.WithdrawLimit {
s.S.Warn(fmt.Sprintf("玩家%v退出金额已达到%v", info.UID, info.TotalWithdraw))
}
}
func (s *SysWarnWithdraw) ID() int {
return s.S.Type
}
// =========================================================================
// SysWarnWithdrawStorage 日均用户退出存量预警
// Storage 存量值
type SysWarnWithdrawStorage struct {
S *SysWarn
Storage int64 `json:"Storage" Redis:"Storage"`
}
func (s *SysWarnWithdrawStorage) Init(sys *SysWarn) error {
if err := mapstructure.Decode(sys.Condition, s); err != nil {
log.Error("err:%v", err)
return errors.New("解析出错")
}
if s.Storage <= 0 {
return errors.New("日均用户退出存量值设置不合法")
}
s.S = sys
return nil
}
func (s *SysWarnWithdrawStorage) Start() {
m := &common.RechargeOrder{}
start := time.Now().Unix()
end := time.Now().AddDate(0, 0, 1).Unix()
con := fmt.Sprintf(`create_time > %d and create_time < %d and event = %v`, start, end, common.StatusROrderFinish)
total := db.Mysql().Sum(m, con, "amount")
if total > s.Storage {
s.S.Warn(fmt.Sprintf("今日退出总额已超过%v", s.Storage))
}
}
func (s *SysWarnWithdrawStorage) Update(data []int64) {}
func (s *SysWarnWithdrawStorage) Check(data []int64) {}
func (s *SysWarnWithdrawStorage) ID() int {
return s.S.Type
}
// =========================================================================
// SysWarnWithdrawExamine 退出审核预警
// Minute 分钟
// NoticeTime 通知时间间隔
// LastNoticeTime 上一次通知时间 -------
type SysWarnWithdrawExamine struct {
S *SysWarn
Minute int `json:"Minute" Redis:"Minute"`
NoticeTime int64 `json:"NoticeTime" Redis:"NoticeTime"`
LastNoticeTime int64 `json:"LastNoticeTime" Redis:"LastNoticeTime"`
OrderId map[string]bool
}
func (s *SysWarnWithdrawExamine) Init(sys *SysWarn) error {
if err := mapstructure.Decode(sys.Condition, s); err != nil {
log.Error("err:%v", err)
return errors.New("解析出错")
}
if s.Minute < 0 {
return errors.New("审批退出时间设置不合法")
}
s.S = sys
s.OrderId = make(map[string]bool)
return nil
}
func (s *SysWarnWithdrawExamine) Start() {
datetime := time.Now().Unix() - int64(s.Minute*60)
sqlList := fmt.Sprintf("SELECT * FROM recharge_order WHERE create_time < %d AND event = %v AND status = %v",
datetime,
common.CurrencyEventWithDraw,
common.StatusROrderCreate)
sqlCount := fmt.Sprintf("SELECT COUNT(*) FROM recharge_order WHERE create_time < %d AND event = %v AND status = %v",
datetime,
common.CurrencyEventWithDraw,
common.StatusROrderCreate)
var count int64
err := db.Mysql().QueryBySql(sqlCount, &count)
if err != nil {
log.Error(err.Error())
return
}
if count == 0 {
return
}
var order []common.RechargeOrder
err = db.Mysql().QueryBySql(sqlList, &order)
if err != nil {
log.Error(err.Error())
return
}
var flag bool
temp := make(map[string]bool)
for i := 0; i < len(order); i++ {
if _, ok := s.OrderId[order[i].OrderID]; ok {
// 历史退出订单未被处理
flag = true
}
temp[order[i].OrderID] = true
}
s.OrderId = temp
if flag {
// 历史退出订单未被处理
now := time.Now().Unix()
if s.LastNoticeTime+s.NoticeTime*60 <= now {
s.S.Warn(fmt.Sprintf("当前有需要审核的订单数量 : [%d],请及时处理", count))
s.LastNoticeTime = time.Now().Unix()
}
} else {
// 历史退出订单已被处理 短信通知
s.S.Warn(fmt.Sprintf("当前有需要审核的订单数量 : [%d],请及时处理", count))
s.LastNoticeTime = time.Now().Unix()
}
}
func (s *SysWarnWithdrawExamine) Update(data []int64) {}
func (s *SysWarnWithdrawExamine) Check(data []int64) {}
func (s *SysWarnWithdrawExamine) ID() int {
return s.S.Type
}
// =========================================================================
// SysWarnActivity 活动预警
// ActID 活动id
// WarnHour 过期前多久提醒,单位小时
// RewardAmount 发放总量
type SysWarnActivity struct {
S *SysWarn
ActID int `json:"ActID" Redis:"ActID"`
WarnHour int `json:"WarnHour" Redis:"WarnHour"`
RewardAmount int64 `json:"RewardAmount" Redis:"RewardAmount"`
}
func (s *SysWarnActivity) Init(sys *SysWarn) error {
if err := mapstructure.Decode(sys.Condition, s); err != nil {
log.Error("err:%v", err)
return errors.New("解析出错")
}
if s.ActID <= 10000 {
return errors.New("活动id不合法")
}
if s.WarnHour <= 0 {
return errors.New("提醒小时不合法")
}
if s.RewardAmount <= 0 {
return errors.New("发放总量不合法")
}
s.S = sys
return nil
}
func (s *SysWarnActivity) Start() {
m := &common.CurrencyBalance{}
con := fmt.Sprintf(`extern = %v`, s.ActID)
total := db.Mysql().Sum(m, con, "value")
if total > 0 {
s.S.Warn(fmt.Sprintf("活动%v发放金币大于%v,请注意", s.ActID, total))
}
act := GetConfigActivityByID(s.ActID)
if act.End-time.Now().Unix() < int64(s.WarnHour)*3600 {
s.S.Warn(fmt.Sprintf("活动%v即将过期,请注意", s.ActID))
}
}
func (s *SysWarnActivity) Update(data []int64) {}
func (s *SysWarnActivity) Check(data []int64) {}
func (s *SysWarnActivity) ID() int {
return s.ActID
}
// =========================================================================
// OnlineWarn 在线预警实现
// OldOnlineTotal 上一个时间间隔的在线人数纪录
// OnlinePer 在线人数涨跌 百分比
type SysWarnOnline struct {
S *SysWarn
OldOnlineTotal int64 `json:"OldOnlineTotal" Redis:"OldOnlineTotal"`
OnlinePer int64 `json:"OnlinePer" Redis:"OldOnlineTotal"`
}
func (s *SysWarnOnline) Init(sys *SysWarn) error {
log.Info("初始化在线预警 ...")
if err := mapstructure.Decode(sys.Condition, s); err != nil {
log.Error("err:%v", err)
return errors.New("解析出错")
}
if s.OnlinePer <= 0 {
return errors.New("在线人数涨跌值设置不合法")
}
s.S = sys
return nil
}
func (s *SysWarnOnline) Start() {
// var OnlineData struct {
// Total int64 `redis:"Total"`
// New int64 `redis:"New"`
// }
// err := db.Redis().HGetAll("online:Total", &OnlineData)
// if err != nil {
// log.Error(err.Error())
// }
// onlineTotal := OnlineData.Total
// defer func() {
// s.OldOnlineTotal = onlineTotal
// }()
// 在线人数为0 预警
// if onlineTotal == 0 {
// s.S.Warn(fmt.Sprintf("上一次记录在线人数:[%d], 当前在线人数:[%d], 在线人数差:[%d]", s.OldOnlineTotal, onlineTotal, onlineTotal-s.OldOnlineTotal))
// return
// }
// // 上一次在线人数为0 预警
// if s.OldOnlineTotal == 0 {
// s.S.Warn(fmt.Sprintf("上一次记录在线人数:[%d], 当前在线人数:[%d], 在线人数差:[%d]", s.OldOnlineTotal, onlineTotal, onlineTotal-s.OldOnlineTotal))
// return
// }
// // 跌 预警
// if onlineTotal < s.OldOnlineTotal {
// if (s.OldOnlineTotal - (s.OnlinePer*s.OldOnlineTotal)/100) >= onlineTotal {
// s.S.Warn(fmt.Sprintf("上一次记录在线人数:[%d], 当前在线人数:[%d], 在线人数差:[%d]", s.OldOnlineTotal, onlineTotal, onlineTotal-s.OldOnlineTotal))
// return
// }
// } else {
// // 涨 预警
// if (s.OldOnlineTotal + (s.OnlinePer*s.OldOnlineTotal)/100) <= onlineTotal {
// s.S.Warn(fmt.Sprintf("上一次记录在线人数:[%d], 当前在线人数:[%d], 在线人数差:[%d]", s.OldOnlineTotal, onlineTotal, onlineTotal-s.OldOnlineTotal))
// }
// }
q := elastic.NewBoolQuery()
q.Must(elastic.NewMatchQuery("GameID", 0))
q.Filter(elastic.NewRangeQuery("Time").Gte(time.Now().Unix() - 10*60))
type GroupSumBuckets struct {
Buckets []struct {
Key interface{}
Doc_count int
Total struct {
Value float64
}
}
}
ret := GroupSumBuckets{}
db.ES().GroupSumBy(common.ESIndexBackPlayerOnline, "Time", q, &ret, "", false, 0, "Total")
log.Debug("online:%v", ret)
if len(ret.Buckets) < 2 {
return
}
total0 := int64(ret.Buckets[0].Total.Value)
if total0 == 0 {
total0 = 1
}
total1 := int64(ret.Buckets[1].Total.Value)
diff := util.Abs(total0 - total1)
if diff*100/total0 >= s.OnlinePer {
s.S.Warn(fmt.Sprintf("上一次记录在线人数:[%d], 当前在线人数:[%d], 在线人数差:[%d]", total0, total1, diff))
}
}
func (s *SysWarnOnline) Update(data []int64) {}
func (s *SysWarnOnline) Check(data []int64) {}
func (s *SysWarnOnline) ID() int {
return s.S.Type
}
// SysWarnRechargeOrder
// =========================================================================
// 最近订单数量
// 最近充值成功率
// 通知时间间隔
type SysWarnRechargeOrder struct {
S *SysWarn
RecentOrder int64 // 最近订单数量
RechargePer int64 // 最近充值成功率
NoticeTime int64 // 通知时间间隔
LastWarnTime int64 // 上一次预警时间
}
func (s *SysWarnRechargeOrder) Init(sys *SysWarn) error {
if err := mapstructure.Decode(sys.Condition, s); err != nil {
log.Error("err:%v", err)
return errors.New("解析出错")
}
if s.RecentOrder <= 0 {
return errors.New("最近订单数量不合法")
}
if s.RechargePer <= 0 {
return errors.New("最近订单成功率不合法")
}
s.S = sys
return nil
}
func (s *SysWarnRechargeOrder) Start() {
var order []common.RechargeOrder
_, err := db.Mysql().QueryList(int(s.RecentOrder), 1, fmt.Sprintf("event = %v", common.CurrencyEventReCharge), "create_time desc", &common.RechargeOrder{}, &order)
if err != nil {
log.Error(err.Error())
return
}
successCount := db.Mysql().Count(&common.RechargeOrder{}, fmt.Sprintf("event = %v and status = %v and create_time >= %v", common.CurrencyEventReCharge, common.StatusROrderPay, order[0].CreateTime))
if s.RecentOrder*s.RechargePer >= successCount*100 {
if time.Now().Unix()-s.LastWarnTime >= s.NoticeTime {
s.LastWarnTime = time.Now().Unix()
s.S.Warn(fmt.Sprintf("当前最后%d条充值订单成功%d单, 低于预计值%d单", s.RecentOrder, successCount, s.RechargePer))
}
}
}
func (s *SysWarnRechargeOrder) Update(data []int64) {}
func (s *SysWarnRechargeOrder) Check(data []int64) {}
func (s *SysWarnRechargeOrder) ID() int {
return s.S.Type
}
// SysWarnPay 支付预警
// =========================================================================
// PayPer 充值成功率
// PayPerBase 充值成功率基数,订单达到改基数时才预警
// PayOrderCount 支付订单数
// PayOrderCountBase 充值成功订单数基数,订单达到改基数时才预警
// OneUserPayFailCount 单一用户连续充值失败笔数
type SysWarnPay struct {
S *SysWarn
NoticeTime int64 // 通知时间间隔
PayPer int64 // 充值成功率
PayPerBase int64 // 充值成功率基数,订单达到改基数时才预警
PayOrderCount int64 // 充值成功订单数 跟前一天同一时刻比较
PayOrderCountBase int64 // 充值成功订单数基数,订单达到改基数时才预警
OneUserPayFailCount int64 // 单一用户充值失败笔数
LastWarnData []string // 上一次预警数据
LastWarnTime int64 // 上一次预警时间
}
func (s *SysWarnPay) Init(sys *SysWarn) error {
log.Info("初始化支付预警 ...")
if err := mapstructure.Decode(sys.Condition, s); err != nil {
log.Error("err:%v", err)
return errors.New("解析出错")
}
s.S = sys
s.LastWarnData = make([]string, 3)
s.LastWarnData[0] = "-1" // 初始化值
s.LastWarnData[1] = "-1" // 初始化值
s.LastWarnData[2] = "-1" // 初始化值
return nil
}
func (s *SysWarnPay) Start() {
// 支付订单数预警
nowPayPer := s.warnPayPer()
// 单一用户充值失败笔数预警
nowPayOrderCount := s.warnPayOrderCount()
// 单一用户充值失败笔数预警
nowOneUserPayFailCount := s.warnOneUserPayFailCount()
if nowPayPer || nowPayOrderCount || nowOneUserPayFailCount {
var content string
if nowPayPer {
content += fmt.Sprintf(" 当前充值成功率百分之%s ", strings.Split(s.LastWarnData[0], "_")[0])
}
if nowPayOrderCount {
content += fmt.Sprintf(" 当前支付订单数预警%s ", strings.Split(s.LastWarnData[1], "_")[0])
}
if nowOneUserPayFailCount {
content += fmt.Sprintf(" 当前单一用户连续充值失败笔数预警%s ", strings.Split(s.LastWarnData[2], "_")[0])
}
s.LastWarnTime = time.Now().Unix()
s.S.Warn(content)
}
}
// 充值成功率预警
func (s *SysWarnPay) warnPayPer() bool {
var oneDay int64 = 24 * 60 * 60
st, _ := time.ParseInLocation("2006-01-02", time.Now().Format("2006-01-02"), time.Local)
su := st.Unix()
su2 := time.Now().Unix()
eu2 := time.Now().AddDate(0, 0, 1).Unix()
var successOrder int64
successStr := fmt.Sprintf("SELECT COUNT(*) AS successOrder FROM recharge_order WHERE event = %v AND status = %v AND callback_time >= %d AND callback_time < %d",
common.CurrencyEventReCharge, common.StatusROrderPay, su, su+oneDay)
err := db.Mysql().QueryBySql(successStr, &successOrder)
if err != nil {
log.Error(err.Error())
return false
}
var allOrder int64
allOrderStr := fmt.Sprintf("SELECT COUNT(*) FROM recharge_order WHERE event = %v AND create_time >= %d AND create_time < %d",
common.CurrencyEventReCharge, su2, eu2)
err = db.Mysql().QueryBySql(allOrderStr, &allOrder)
if err != nil {
log.Error(err.Error())
return false
}
if allOrder < s.PayPerBase {
return false
}
// 当前成功率
nowPayPer := (successOrder / allOrder) * 10000
nowPayPerStr := util.FormatFloat(float64(successOrder*100)/float64(allOrder), 2)
if s.LastWarnData[0] == fmt.Sprintf("%s_%s", nowPayPerStr, su2) {
if nowPayPer < (s.PayPer*100) && (time.Now().Unix()-s.LastWarnTime) > s.NoticeTime {
s.LastWarnData[0] = fmt.Sprintf("%s_%s", nowPayPerStr, su2)
return true
}
} else {
if nowPayPer < (s.PayPer * 100) {
s.LastWarnData[0] = fmt.Sprintf("%s_%s", nowPayPerStr, su2)
return true
}
}
return false
}
// 支付成功订单数预警
func (s *SysWarnPay) warnPayOrderCount() bool {
var oneDay int64 = 24 * 60 * 60
now := time.Now().Unix()
st, _ := time.ParseInLocation("2006-01-02", time.Now().Format("2006-01-02"), time.Local)
su := st.Unix()
su2 := time.Now().Format("2006-01-02")
var lastOrder int64
lastOrderStr := fmt.Sprintf("SELECT COUNT(*) FROM recharge_order WHERE event = %v AND status = %v AND callback_time >= %d AND callback_time < %d",
common.CurrencyEventReCharge, common.StatusROrderPay, su-oneDay, now-oneDay)
err := db.Mysql().QueryBySql(lastOrderStr, &lastOrder)
if err != nil {
log.Error(err.Error())
}
var nowOrder int64
nowOrderStr := fmt.Sprintf("SELECT COUNT(*) FROM recharge_order WHERE event = %v AND status = %v AND callback_time >= %d AND callback_time < %d",
common.CurrencyEventReCharge, common.StatusROrderPay, su, su+oneDay)
err = db.Mysql().QueryBySql(nowOrderStr, &nowOrder)
if err != nil {
log.Error(err.Error())
}
if nowOrder < s.PayOrderCountBase {
return false
}
// 相差订单数
var temp int64
if lastOrder > nowOrder {
temp = lastOrder - nowOrder
} else {
temp = nowOrder - lastOrder
}
if s.LastWarnData[1] == fmt.Sprintf("%d_%s", temp, su2) {
if temp < s.PayOrderCount && (time.Now().Unix()-s.LastWarnTime) > s.NoticeTime {
s.LastWarnData[1] = fmt.Sprintf("%d_%s", temp, su2)
return true
}
} else {
if temp < s.PayOrderCount {
s.LastWarnData[1] = fmt.Sprintf("%d_%s", temp, su2)
return true
}
}
return false
}
// 单一用户充值连续失败笔数预警
func (s *SysWarnPay) warnOneUserPayFailCount() bool {
su := time.Now().Unix()
eu := time.Now().AddDate(0, 0, 1).Unix()
var uidArr []int64
str := fmt.Sprintf("Select DISTINCT(uid) From recharge_order Where (uid, `event`) In (Select uid, `event` From recharge_order WHERE `event` = %d Group By uid,`event` Having Count(*)>%d) AND `status` != %d AND create_time >= %d AND create_time < %d ",
common.CurrencyEventReCharge,
common.StatusROrderPay,
s.OneUserPayFailCount,
su,
eu,
)
err := db.Mysql().QueryBySql(str, &uidArr)
if err != nil {
log.Error(err.Error())
return false
}
var count int64
for i := 0; i < len(uidArr); i++ {
var order []common.RechargeOrder
queryStr := fmt.Sprintf(" SELECT * FROM recharge_order WHERE uid = %d AND event = %d AND status != %d AND create_time >= %d AND create_time < %d ",
uidArr[i], common.CurrencyEventReCharge, common.StatusROrderPay, su, eu)
err = db.Mysql().QueryBySql(queryStr, &order)
if err != nil {
log.Error(err.Error())
continue
}
var temp int64
for j := 0; j < len(order); j++ {
if order[j].Status == common.StatusROrderPay {
temp = 0
} else {
temp++
}
if temp >= s.OneUserPayFailCount {
count++
break
}
}
}
if s.LastWarnData[2] == fmt.Sprintf("%d_%s", count, su) {
if count > s.OneUserPayFailCount && (time.Now().Unix()-s.LastWarnTime) > s.NoticeTime {
s.LastWarnData[2] = fmt.Sprintf("%d_%s", count, su)
return true
}
} else {
if count > 0 {
s.LastWarnData[2] = fmt.Sprintf("%d_%s", count, su)
return true
}
}
return false
}
func (s *SysWarnPay) Update(data []int64) {}
func (s *SysWarnPay) Check(data []int64) {}
func (s *SysWarnPay) ID() int {
return s.S.Type
}
// SysWarnTypeWithdrawPer 退出成功率预警
type SysWarnTypeWithdrawPer struct {
S *SysWarn
OrderGte int64 `json:"OrderGte" Redis:"OrderGte"` // 订单数量区间下限
WithdrawSuccessPer int64 `json:"WithdrawSuccessPer" Redis:"WithdrawSuccessPer"` // 退出成功率
NoticeTime int64 `json:"NoticeTime" Redis:"NoticeTime"` // 通知时间间隔
LastWarnTime int64 `json:"LastWarnTime" Redis:"LastWarnTime"` // 上一次预警时间
}
func (s *SysWarnTypeWithdrawPer) Init(sys *SysWarn) error {
log.Info("初始化退出成功率预警 ...")
if err := mapstructure.Decode(sys.Condition, s); err != nil {
log.Error("err:%v", err)
return errors.New("解析出错")
}
s.S = sys
log.Info("OrderGte: %v, WithdrawSuccessPer: %v, NoticeTime: %v", s.OrderGte, s.WithdrawSuccessPer, s.NoticeTime)
return nil
}
func (s *SysWarnTypeWithdrawPer) Start() {
su := time.Now().Unix()
eu := time.Now().AddDate(0, 0, 1).Unix()
withdrawTotalCount := db.Mysql().Count(&common.RechargeOrder{}, fmt.Sprintf("event = %v and create_time >= %d and create_time < %d", common.CurrencyEventWithDraw, su, eu))
if withdrawTotalCount < s.OrderGte {
return
}
WithdrawSuccessCount := db.Mysql().Count(&common.RechargeOrder{}, fmt.Sprintf("event = %v and status = %v and create_time >= %d and create_time < %d", common.CurrencyEventWithDraw, common.StatusROrderFinish, su, eu))
if int64((float64(WithdrawSuccessCount)/float64(withdrawTotalCount))*100) <= s.WithdrawSuccessPer {
if time.Now().Unix()-s.LastWarnTime >= s.NoticeTime {
s.LastWarnTime = time.Now().Unix()
s.S.Warn(fmt.Sprintf("当前退出成功率:%v", int64((float64(WithdrawSuccessCount)/float64(withdrawTotalCount))*100)))
}
}
}
func (s *SysWarnTypeWithdrawPer) Update(data []int64) {}
func (s *SysWarnTypeWithdrawPer) Check(data []int64) {}
func (s *SysWarnTypeWithdrawPer) ID() int {
return s.S.Type
}