package call import ( "bytes" "encoding/json" "errors" "fmt" "io/ioutil" "net/http" "server/common" "server/config" "server/db" "server/natsClient" "server/pb" "server/util" "strconv" "strings" "sync" "time" "github.com/gogo/protobuf/proto" "github.com/liangdas/mqant/log" timewheel "github.com/liangdas/mqant/module/modules/timer" "github.com/mitchellh/mapstructure" "github.com/nats-io/nats.go" "github.com/olivere/elastic/v7" ) // 初始化预警监听 var ( warnMap map[int]*SysWarn warnLock sync.RWMutex ) func InitWarn(conn *nats.Conn) { warnMap = map[int]*SysWarn{} natsClient.NewCommonNatsImp(conn, natsClient.TopicInnerUpdateWarn, func(data []byte) { updateWarn(data) }) if err := initSysWarns(); err != nil { log.Error("err:%v", err) } } func initSysWarns() error { all := []SysWarn{} _, err := db.ES().QueryList(common.ESIndexBackWarn, 0, 5000, nil, &all) if err != nil { return err } warnLock.Lock() defer warnLock.Unlock() for _, v := range all { sysWarn, err := NewSysWarn(v.Channel, v.Type, v.Interval, v.Condition, v.WarnWay, v.WarnMember, v.WarnPhone, v.OtherPhone) if err != nil { log.Error("error:%v", err) continue } id := sysWarn.SysWarnInf.ID() if _, ok := warnMap[id]; ok { log.Error("重复添加预警:%v", id) continue } warnMap[id] = sysWarn sysWarn.Start() } return nil } func AddSysWarn(channel []int, t, interval int, condition map[string]interface{}, way int, member []int, phone []string, otherPhone []string) error { sysWarn, err := NewSysWarn(channel, t, interval, condition, way, member, phone, otherPhone) if err != nil { return err } id := sysWarn.SysWarnInf.ID() warnLock.Lock() defer warnLock.Unlock() if _, ok := warnMap[id]; ok { return errors.New("重复添加预警") } // ok err = db.ES().InsertToESByID(common.ESIndexBackWarn, strconv.Itoa(id), sysWarn) if err != nil { return err } warnMap[id] = sysWarn sysWarn.Start() return nil } func DelSysWarn(mid string) error { id, err := strconv.Atoi(mid) if err != nil { log.Error("error:%v", err) return err } warnLock.Lock() defer warnLock.Unlock() warn, ok := warnMap[id] if !ok { return errors.New("预警id不存在") } // ok err = db.ES().DeleteByID(common.ESIndexBackWarn, mid) if err != nil { log.Error("error:%v", err) return err } warn.Stop() delete(warnMap, id) return nil } func updateWarn(data []byte) { d := &pb.InnerUpdateWarn{} err := proto.Unmarshal(data, d) if err != nil { log.Error("err:%v", err) return } log.Debug("InnerUpdateWarn:%+v", *d) warnLock.RLock() warn, ok := warnMap[int(d.Type)] warnLock.RUnlock() if !ok { return } switch d.Type { case 1: // 更新 warn.Update(d.Amount) case 2: // 确认 warn.Check(d.Amount) } } // 预警类型 const ( WarnTypeMin = iota WarnTypeRecharge // 充值预警 WarnTypeWithdraw // 退出预警 WarnTypeError // 游戏报错预警 WarnTypeData // 数据异常 WarnTypeGold // 货币发放预警 WarnTypeWithdrawGold // 退出存量预警 WarnTypeExamineWithdraw // 退出审核预警 WarnTypeActivity // 活动预警 WarnTypeBroadcast // 广播预警 WarnTypeShare // 分享预警 WarnTypeOnline // 在线人数涨跌预警 WarnTypeRechargeOrder // 充值订单预警 WarnTypeProfit // 游戏场次利润预警 WarnTypePay // 支付预警 WarnTypeWithdrawPer // 退出成功率预警 WarnTypeAll ) // 预警方式 const ( WarnWayMin = iota WarnWayPhone WarnWayAll ) // 预警系统一些常量 const ( WarnCheckInterval = 10 * time.Minute ) func PublishWarn(t, opt int, data []int64) { Publish(natsClient.TopicInnerUpdateWarn, &pb.InnerUpdateWarn{Type: uint32(t), Opt: uint32(opt), Amount: data}) } // SysWarn 后台预警 // ID // Channel 生效的渠道 // Type 预警类型 // Condition 预警条件 // SysWarnInf 预警子类实现 // WarnWay 预警方式 1手机短信 // WarnMember 预警人员id // Time 创建时间 // WarnPhone 预警人员的手机号 // Interval 提醒间隔单位分钟 // LastWarn 上次预警时间 type SysWarn struct { ID string `json:"ID" Redis:"ID"` Channel []int `json:"Channel" Redis:"Channel"` Type int `json:"Type" Redis:"Type"` Condition map[string]interface{} `json:"Condition" Redis:"Condition"` SysWarnInf SysWarnInf `json:"-" Redis:"SysWarnInf"` WarnWay int `json:"WarnWay" Redis:"WarnWay"` WarnMember []int `json:"WarnMember" Redis:"WarnMember"` Time int64 `json:"Time" Redis:"Time"` WarnPhone []string `json:"WarnPhone" Redis:"WarnPhone"` OtherPhone []string `json:"OtherPhone" Redis:"OtherPhone"` Interval int `json:"Interval" Redis:"Interval"` LastWarn int64 `json:"LastWarn" Redis:"LastWarn"` ShouldStart bool `json:"-" Redis:"ShouldStart"` } // NewSysWarn 新增预警 func NewSysWarn(channel []int, t, interval int, condition map[string]interface{}, way int, member []int, phone []string, otherPhone []string) (*SysWarn, error) { if len(channel) == 0 { return nil, errors.New("生效渠道不能为空") } if t <= WarnTypeMin || t >= WarnTypeAll { return nil, errors.New("预警类型不合法") } if way <= WarnWayMin || way >= WarnWayAll { return nil, errors.New("预警方式不合法") } /*if len(member) == 0 || len(phone) == 0 { return nil, errors.New("预警人员不能为空") }*/ if interval < 1 { return nil, errors.New("预警时间间隔不合法") } s := &SysWarn{Channel: channel, Type: t, Condition: condition, WarnWay: way, WarnMember: member, WarnPhone: phone, Interval: interval, OtherPhone: otherPhone} var sInf SysWarnInf switch t { case WarnTypeRecharge: sInf = new(SysWarnRecharge) if err := sInf.Init(s); err != nil { log.Error("err:%v", err) return nil, err } case WarnTypeWithdraw: sInf = new(SysWarnWithdraw) if err := sInf.Init(s); err != nil { log.Error("err:%v", err) return nil, err } case WarnTypeError: err := errors.New("当前种类预警未实现") return nil, err case WarnTypeData: err := errors.New("当前种类预警未实现") return nil, err case WarnTypeWithdrawGold: s.ShouldStart = true sInf = new(SysWarnWithdrawStorage) if err := sInf.Init(s); err != nil { log.Error("err:%v", err) return nil, err } case WarnTypeExamineWithdraw: s.ShouldStart = true sInf = new(SysWarnWithdrawExamine) if err := sInf.Init(s); err != nil { log.Error("err:%v", err) return nil, err } case WarnTypeActivity: s.ShouldStart = true sInf = new(SysWarnActivity) if err := sInf.Init(s); err != nil { log.Error("err:%v", err) return nil, err } case WarnTypeOnline: s.ShouldStart = true sInf = new(SysWarnOnline) if err := sInf.Init(s); err != nil { log.Error("err:%v", err) return nil, err } case WarnTypeRechargeOrder: s.ShouldStart = true sInf = new(SysWarnRechargeOrder) if err := sInf.Init(s); err != nil { log.Error("err:%v", err) return nil, err } case WarnTypePay: s.ShouldStart = true sInf = new(SysWarnPay) if err := sInf.Init(s); err != nil { log.Error("err:%v", err) return nil, err } case WarnTypeWithdrawPer: s.ShouldStart = true sInf = new(SysWarnTypeWithdrawPer) if err := sInf.Init(s); err != nil { log.Error("err:%v", err) return nil, err } case WarnTypeBroadcast: err := errors.New("当前种类预警未实现") return nil, err case WarnTypeShare: err := errors.New("当前种类预警未实现") return nil, err default: err := errors.New("未知预警种类") return nil, err } s.SysWarnInf = sInf s.Time = time.Now().Unix() return s, nil } func (s *SysWarn) Warn(content string) { if len(s.WarnPhone) == 0 && len(s.OtherPhone) == 0 { log.Error("warn:%+v phone invalid", *s) return } if time.Now().Unix()-s.LastWarn < int64(WarnCheckInterval.Seconds()) { log.Error("预警:%+v过于频繁", *s) return } res := make(map[string]string) res["userId"] = config.GetBase().Warn.ID res["account"] = config.GetBase().Warn.Account res["password"] = config.GetBase().Warn.Password allPhones := "" for _, v := range util.RemoveDuplication(append(s.WarnPhone, s.OtherPhone...)) { allPhones += v + "," } if len(allPhones) < 2 { return } log.Info("需要通知的手机号码 : [%s]", allPhones) res["mobile"] = allPhones[:len(allPhones)-1] res["content"] = config.GetBase().Warn.Sign + content res["sendTime"] = "" res["action"] = config.GetBase().Warn.Action res["checkcontent"] = "0" bytesData, err := json.Marshal(res) if err != nil { log.Error(err.Error()) return } req, err := http.NewRequest("POST", config.GetBase().Warn.URL, bytes.NewReader(bytesData)) if err != nil { log.Error("err:%v", err) return } req.Header.Set("Content-Type", "application/json;charset=UTF-8") client := &http.Client{ Timeout: 5 * time.Second, } log.Debug("Warn req:%+v", req) resp, err := client.Do(req) if err != nil { log.Error("Warn post err:%v", err) return } s.LastWarn = time.Now().Unix() defer resp.Body.Close() body, _ := ioutil.ReadAll(resp.Body) log.Debug("Warn response Body%v:", string(body)) } func (s *SysWarn) Start() { if !s.ShouldStart { return } timewheel.GetTimeWheel().AddTimerCustom(time.Duration(s.Interval*60)*time.Second, getTimeKey(s.SysWarnInf.ID()), nil, func(arge interface{}) { s.SysWarnInf.Start() s.Start() }) } func (s *SysWarn) Stop() { timewheel.GetTimeWheel().RemoveTimer(getTimeKey(s.SysWarnInf.ID())) } func (s *SysWarn) Check(data []int64) { if !util.SliceContain(s.Channel, int(data[0])) { return } s.SysWarnInf.Check(data) } func (s *SysWarn) Update(data []int64) { if !util.SliceContain(s.Channel, int(data[0])) { return } s.SysWarnInf.Update(data) } type SysWarnInf interface { Update([]int64) // 更新预警数据 Check([]int64) // 检查是否触发预警 第一个参数必须为渠道id Start() // 开始预警监控 Init(*SysWarn) error ID() int // 获取子预警id } func getTimeKey(id int) string { return fmt.Sprintf("warnKey:%v", id) } // ========================================================================= // SysWarnRecharge 充值预警实现 // Amount 异常次数 // CurrentAmount 当前异常次数 type SysWarnRecharge struct { S *SysWarn Amount int `json:"Amount" Redis:"Amount"` CurrentAmount int `json:"CurrentAmount" Redis:"CurrentAmount"` } func (s *SysWarnRecharge) Init(sys *SysWarn) error { if err := mapstructure.Decode(sys.Condition, s); err != nil { log.Error("err:%v", err) return errors.New("解析出错") } if s.Amount <= 0 { return errors.New("异常次数不合法") } s.S = sys return nil } func (s *SysWarnRecharge) Start() {} func (s *SysWarnRecharge) Update(data []int64) { s.CurrentAmount += int(data[1]) if s.CurrentAmount >= s.Amount { s.S.Warn(fmt.Sprintf("充值异常达到%v次", s.CurrentAmount)) s.CurrentAmount = 0 } } func (s *SysWarnRecharge) Check(data []int64) {} func (s *SysWarnRecharge) ID() int { return s.S.Type } // ========================================================================= // SysWarnWithdraw 退出预警实现 // MaxWithdraw 最大退出次数 // WithdrawLimit 单人当日退出额度 type SysWarnWithdraw struct { S *SysWarn MaxWithdraw int64 `json:"MaxWithdraw" Redis:"MaxWithdraw"` WithdrawLimit int64 `json:"WithdrawLimit" Redis:"WithdrawLimit"` } func (s *SysWarnWithdraw) Init(sys *SysWarn) error { if err := mapstructure.Decode(sys.Condition, s); err != nil { log.Error("err:%v", err) return errors.New("解析出错") } if s.MaxWithdraw <= 0 { return errors.New("最大退出次数不合法") } if s.WithdrawLimit <= 0 { return errors.New("单人当日退出额度不合法") } s.S = sys return nil } func (s *SysWarnWithdraw) Start() {} func (s *SysWarnWithdraw) Update(data []int64) {} // 约定第二个参数为uid func (s *SysWarnWithdraw) Check(data []int64) { info := new(common.RechargeInfo) info.UID = int(data[1]) if err := db.Mysql().Get(info); err != nil { log.Error("err:%v", err) return } if int64(info.WithdrawCount) > s.MaxWithdraw { s.S.Warn(fmt.Sprintf("玩家%v已退出%v次", info.UID, info.WithdrawCount)) } if info.TotalWithdraw > s.WithdrawLimit { s.S.Warn(fmt.Sprintf("玩家%v退出金额已达到%v", info.UID, info.TotalWithdraw)) } } func (s *SysWarnWithdraw) ID() int { return s.S.Type } // ========================================================================= // SysWarnWithdrawStorage 日均用户退出存量预警 // Storage 存量值 type SysWarnWithdrawStorage struct { S *SysWarn Storage int64 `json:"Storage" Redis:"Storage"` } func (s *SysWarnWithdrawStorage) Init(sys *SysWarn) error { if err := mapstructure.Decode(sys.Condition, s); err != nil { log.Error("err:%v", err) return errors.New("解析出错") } if s.Storage <= 0 { return errors.New("日均用户退出存量值设置不合法") } s.S = sys return nil } func (s *SysWarnWithdrawStorage) Start() { m := &common.RechargeOrder{} start := time.Now().Unix() end := time.Now().AddDate(0, 0, 1).Unix() con := fmt.Sprintf(`create_time > %d and create_time < %d and event = %v`, start, end, common.StatusROrderFinish) total := db.Mysql().Sum(m, con, "amount") if total > s.Storage { s.S.Warn(fmt.Sprintf("今日退出总额已超过%v", s.Storage)) } } func (s *SysWarnWithdrawStorage) Update(data []int64) {} func (s *SysWarnWithdrawStorage) Check(data []int64) {} func (s *SysWarnWithdrawStorage) ID() int { return s.S.Type } // ========================================================================= // SysWarnWithdrawExamine 退出审核预警 // Minute 分钟 // NoticeTime 通知时间间隔 // LastNoticeTime 上一次通知时间 ------- type SysWarnWithdrawExamine struct { S *SysWarn Minute int `json:"Minute" Redis:"Minute"` NoticeTime int64 `json:"NoticeTime" Redis:"NoticeTime"` LastNoticeTime int64 `json:"LastNoticeTime" Redis:"LastNoticeTime"` OrderId map[string]bool } func (s *SysWarnWithdrawExamine) Init(sys *SysWarn) error { if err := mapstructure.Decode(sys.Condition, s); err != nil { log.Error("err:%v", err) return errors.New("解析出错") } if s.Minute < 0 { return errors.New("审批退出时间设置不合法") } s.S = sys s.OrderId = make(map[string]bool) return nil } func (s *SysWarnWithdrawExamine) Start() { datetime := time.Now().Unix() - int64(s.Minute*60) sqlList := fmt.Sprintf("SELECT * FROM recharge_order WHERE create_time < %d AND event = %v AND status = %v", datetime, common.CurrencyEventWithDraw, common.StatusROrderCreate) sqlCount := fmt.Sprintf("SELECT COUNT(*) FROM recharge_order WHERE create_time < %d AND event = %v AND status = %v", datetime, common.CurrencyEventWithDraw, common.StatusROrderCreate) var count int64 err := db.Mysql().QueryBySql(sqlCount, &count) if err != nil { log.Error(err.Error()) return } if count == 0 { return } var order []common.RechargeOrder err = db.Mysql().QueryBySql(sqlList, &order) if err != nil { log.Error(err.Error()) return } var flag bool temp := make(map[string]bool) for i := 0; i < len(order); i++ { if _, ok := s.OrderId[order[i].OrderID]; ok { // 历史退出订单未被处理 flag = true } temp[order[i].OrderID] = true } s.OrderId = temp if flag { // 历史退出订单未被处理 now := time.Now().Unix() if s.LastNoticeTime+s.NoticeTime*60 <= now { s.S.Warn(fmt.Sprintf("当前有需要审核的订单数量 : [%d],请及时处理", count)) s.LastNoticeTime = time.Now().Unix() } } else { // 历史退出订单已被处理 短信通知 s.S.Warn(fmt.Sprintf("当前有需要审核的订单数量 : [%d],请及时处理", count)) s.LastNoticeTime = time.Now().Unix() } } func (s *SysWarnWithdrawExamine) Update(data []int64) {} func (s *SysWarnWithdrawExamine) Check(data []int64) {} func (s *SysWarnWithdrawExamine) ID() int { return s.S.Type } // ========================================================================= // SysWarnActivity 活动预警 // ActID 活动id // WarnHour 过期前多久提醒,单位小时 // RewardAmount 发放总量 type SysWarnActivity struct { S *SysWarn ActID int `json:"ActID" Redis:"ActID"` WarnHour int `json:"WarnHour" Redis:"WarnHour"` RewardAmount int64 `json:"RewardAmount" Redis:"RewardAmount"` } func (s *SysWarnActivity) Init(sys *SysWarn) error { if err := mapstructure.Decode(sys.Condition, s); err != nil { log.Error("err:%v", err) return errors.New("解析出错") } if s.ActID <= 10000 { return errors.New("活动id不合法") } if s.WarnHour <= 0 { return errors.New("提醒小时不合法") } if s.RewardAmount <= 0 { return errors.New("发放总量不合法") } s.S = sys return nil } func (s *SysWarnActivity) Start() { m := &common.CurrencyBalance{} con := fmt.Sprintf(`extern = %v`, s.ActID) total := db.Mysql().Sum(m, con, "value") if total > 0 { s.S.Warn(fmt.Sprintf("活动%v发放金币大于%v,请注意", s.ActID, total)) } act := GetConfigActivityByID(s.ActID) if act.End-time.Now().Unix() < int64(s.WarnHour)*3600 { s.S.Warn(fmt.Sprintf("活动%v即将过期,请注意", s.ActID)) } } func (s *SysWarnActivity) Update(data []int64) {} func (s *SysWarnActivity) Check(data []int64) {} func (s *SysWarnActivity) ID() int { return s.ActID } // ========================================================================= // OnlineWarn 在线预警实现 // OldOnlineTotal 上一个时间间隔的在线人数纪录 // OnlinePer 在线人数涨跌 百分比 type SysWarnOnline struct { S *SysWarn OldOnlineTotal int64 `json:"OldOnlineTotal" Redis:"OldOnlineTotal"` OnlinePer int64 `json:"OnlinePer" Redis:"OldOnlineTotal"` } func (s *SysWarnOnline) Init(sys *SysWarn) error { log.Info("初始化在线预警 ...") if err := mapstructure.Decode(sys.Condition, s); err != nil { log.Error("err:%v", err) return errors.New("解析出错") } if s.OnlinePer <= 0 { return errors.New("在线人数涨跌值设置不合法") } s.S = sys return nil } func (s *SysWarnOnline) Start() { // var OnlineData struct { // Total int64 `redis:"Total"` // New int64 `redis:"New"` // } // err := db.Redis().HGetAll("online:Total", &OnlineData) // if err != nil { // log.Error(err.Error()) // } // onlineTotal := OnlineData.Total // defer func() { // s.OldOnlineTotal = onlineTotal // }() // 在线人数为0 预警 // if onlineTotal == 0 { // s.S.Warn(fmt.Sprintf("上一次记录在线人数:[%d], 当前在线人数:[%d], 在线人数差:[%d]", s.OldOnlineTotal, onlineTotal, onlineTotal-s.OldOnlineTotal)) // return // } // // 上一次在线人数为0 预警 // if s.OldOnlineTotal == 0 { // s.S.Warn(fmt.Sprintf("上一次记录在线人数:[%d], 当前在线人数:[%d], 在线人数差:[%d]", s.OldOnlineTotal, onlineTotal, onlineTotal-s.OldOnlineTotal)) // return // } // // 跌 预警 // if onlineTotal < s.OldOnlineTotal { // if (s.OldOnlineTotal - (s.OnlinePer*s.OldOnlineTotal)/100) >= onlineTotal { // s.S.Warn(fmt.Sprintf("上一次记录在线人数:[%d], 当前在线人数:[%d], 在线人数差:[%d]", s.OldOnlineTotal, onlineTotal, onlineTotal-s.OldOnlineTotal)) // return // } // } else { // // 涨 预警 // if (s.OldOnlineTotal + (s.OnlinePer*s.OldOnlineTotal)/100) <= onlineTotal { // s.S.Warn(fmt.Sprintf("上一次记录在线人数:[%d], 当前在线人数:[%d], 在线人数差:[%d]", s.OldOnlineTotal, onlineTotal, onlineTotal-s.OldOnlineTotal)) // } // } q := elastic.NewBoolQuery() q.Must(elastic.NewMatchQuery("GameID", 0)) q.Filter(elastic.NewRangeQuery("Time").Gte(time.Now().Unix() - 10*60)) type GroupSumBuckets struct { Buckets []struct { Key interface{} Doc_count int Total struct { Value float64 } } } ret := GroupSumBuckets{} db.ES().GroupSumBy(common.ESIndexBackPlayerOnline, "Time", q, &ret, "", false, 0, "Total") log.Debug("online:%v", ret) if len(ret.Buckets) < 2 { return } total0 := int64(ret.Buckets[0].Total.Value) if total0 == 0 { total0 = 1 } total1 := int64(ret.Buckets[1].Total.Value) diff := util.Abs(total0 - total1) if diff*100/total0 >= s.OnlinePer { s.S.Warn(fmt.Sprintf("上一次记录在线人数:[%d], 当前在线人数:[%d], 在线人数差:[%d]", total0, total1, diff)) } } func (s *SysWarnOnline) Update(data []int64) {} func (s *SysWarnOnline) Check(data []int64) {} func (s *SysWarnOnline) ID() int { return s.S.Type } // SysWarnRechargeOrder // ========================================================================= // 最近订单数量 // 最近充值成功率 // 通知时间间隔 type SysWarnRechargeOrder struct { S *SysWarn RecentOrder int64 // 最近订单数量 RechargePer int64 // 最近充值成功率 NoticeTime int64 // 通知时间间隔 LastWarnTime int64 // 上一次预警时间 } func (s *SysWarnRechargeOrder) Init(sys *SysWarn) error { if err := mapstructure.Decode(sys.Condition, s); err != nil { log.Error("err:%v", err) return errors.New("解析出错") } if s.RecentOrder <= 0 { return errors.New("最近订单数量不合法") } if s.RechargePer <= 0 { return errors.New("最近订单成功率不合法") } s.S = sys return nil } func (s *SysWarnRechargeOrder) Start() { var order []common.RechargeOrder _, err := db.Mysql().QueryList(int(s.RecentOrder), 1, fmt.Sprintf("event = %v", common.CurrencyEventReCharge), "create_time desc", &common.RechargeOrder{}, &order) if err != nil { log.Error(err.Error()) return } successCount := db.Mysql().Count(&common.RechargeOrder{}, fmt.Sprintf("event = %v and status = %v and create_time >= %v", common.CurrencyEventReCharge, common.StatusROrderPay, order[0].CreateTime)) if s.RecentOrder*s.RechargePer >= successCount*100 { if time.Now().Unix()-s.LastWarnTime >= s.NoticeTime { s.LastWarnTime = time.Now().Unix() s.S.Warn(fmt.Sprintf("当前最后%d条充值订单成功%d单, 低于预计值%d单", s.RecentOrder, successCount, s.RechargePer)) } } } func (s *SysWarnRechargeOrder) Update(data []int64) {} func (s *SysWarnRechargeOrder) Check(data []int64) {} func (s *SysWarnRechargeOrder) ID() int { return s.S.Type } // SysWarnPay 支付预警 // ========================================================================= // PayPer 充值成功率 // PayPerBase 充值成功率基数,订单达到改基数时才预警 // PayOrderCount 支付订单数 // PayOrderCountBase 充值成功订单数基数,订单达到改基数时才预警 // OneUserPayFailCount 单一用户连续充值失败笔数 type SysWarnPay struct { S *SysWarn NoticeTime int64 // 通知时间间隔 PayPer int64 // 充值成功率 PayPerBase int64 // 充值成功率基数,订单达到改基数时才预警 PayOrderCount int64 // 充值成功订单数 跟前一天同一时刻比较 PayOrderCountBase int64 // 充值成功订单数基数,订单达到改基数时才预警 OneUserPayFailCount int64 // 单一用户充值失败笔数 LastWarnData []string // 上一次预警数据 LastWarnTime int64 // 上一次预警时间 } func (s *SysWarnPay) Init(sys *SysWarn) error { log.Info("初始化支付预警 ...") if err := mapstructure.Decode(sys.Condition, s); err != nil { log.Error("err:%v", err) return errors.New("解析出错") } s.S = sys s.LastWarnData = make([]string, 3) s.LastWarnData[0] = "-1" // 初始化值 s.LastWarnData[1] = "-1" // 初始化值 s.LastWarnData[2] = "-1" // 初始化值 return nil } func (s *SysWarnPay) Start() { // 支付订单数预警 nowPayPer := s.warnPayPer() // 单一用户充值失败笔数预警 nowPayOrderCount := s.warnPayOrderCount() // 单一用户充值失败笔数预警 nowOneUserPayFailCount := s.warnOneUserPayFailCount() if nowPayPer || nowPayOrderCount || nowOneUserPayFailCount { var content string if nowPayPer { content += fmt.Sprintf(" 当前充值成功率百分之%s ", strings.Split(s.LastWarnData[0], "_")[0]) } if nowPayOrderCount { content += fmt.Sprintf(" 当前支付订单数预警%s ", strings.Split(s.LastWarnData[1], "_")[0]) } if nowOneUserPayFailCount { content += fmt.Sprintf(" 当前单一用户连续充值失败笔数预警%s ", strings.Split(s.LastWarnData[2], "_")[0]) } s.LastWarnTime = time.Now().Unix() s.S.Warn(content) } } // 充值成功率预警 func (s *SysWarnPay) warnPayPer() bool { var oneDay int64 = 24 * 60 * 60 st, _ := time.ParseInLocation("2006-01-02", time.Now().Format("2006-01-02"), time.Local) su := st.Unix() su2 := time.Now().Unix() eu2 := time.Now().AddDate(0, 0, 1).Unix() var successOrder int64 successStr := fmt.Sprintf("SELECT COUNT(*) AS successOrder FROM recharge_order WHERE event = %v AND status = %v AND callback_time >= %d AND callback_time < %d", common.CurrencyEventReCharge, common.StatusROrderPay, su, su+oneDay) err := db.Mysql().QueryBySql(successStr, &successOrder) if err != nil { log.Error(err.Error()) return false } var allOrder int64 allOrderStr := fmt.Sprintf("SELECT COUNT(*) FROM recharge_order WHERE event = %v AND create_time >= %d AND create_time < %d", common.CurrencyEventReCharge, su2, eu2) err = db.Mysql().QueryBySql(allOrderStr, &allOrder) if err != nil { log.Error(err.Error()) return false } if allOrder < s.PayPerBase { return false } // 当前成功率 nowPayPer := (successOrder / allOrder) * 10000 nowPayPerStr := util.FormatFloat(float64(successOrder*100)/float64(allOrder), 2) if s.LastWarnData[0] == fmt.Sprintf("%s_%s", nowPayPerStr, su2) { if nowPayPer < (s.PayPer*100) && (time.Now().Unix()-s.LastWarnTime) > s.NoticeTime { s.LastWarnData[0] = fmt.Sprintf("%s_%s", nowPayPerStr, su2) return true } } else { if nowPayPer < (s.PayPer * 100) { s.LastWarnData[0] = fmt.Sprintf("%s_%s", nowPayPerStr, su2) return true } } return false } // 支付成功订单数预警 func (s *SysWarnPay) warnPayOrderCount() bool { var oneDay int64 = 24 * 60 * 60 now := time.Now().Unix() st, _ := time.ParseInLocation("2006-01-02", time.Now().Format("2006-01-02"), time.Local) su := st.Unix() su2 := time.Now().Format("2006-01-02") var lastOrder int64 lastOrderStr := fmt.Sprintf("SELECT COUNT(*) FROM recharge_order WHERE event = %v AND status = %v AND callback_time >= %d AND callback_time < %d", common.CurrencyEventReCharge, common.StatusROrderPay, su-oneDay, now-oneDay) err := db.Mysql().QueryBySql(lastOrderStr, &lastOrder) if err != nil { log.Error(err.Error()) } var nowOrder int64 nowOrderStr := fmt.Sprintf("SELECT COUNT(*) FROM recharge_order WHERE event = %v AND status = %v AND callback_time >= %d AND callback_time < %d", common.CurrencyEventReCharge, common.StatusROrderPay, su, su+oneDay) err = db.Mysql().QueryBySql(nowOrderStr, &nowOrder) if err != nil { log.Error(err.Error()) } if nowOrder < s.PayOrderCountBase { return false } // 相差订单数 var temp int64 if lastOrder > nowOrder { temp = lastOrder - nowOrder } else { temp = nowOrder - lastOrder } if s.LastWarnData[1] == fmt.Sprintf("%d_%s", temp, su2) { if temp < s.PayOrderCount && (time.Now().Unix()-s.LastWarnTime) > s.NoticeTime { s.LastWarnData[1] = fmt.Sprintf("%d_%s", temp, su2) return true } } else { if temp < s.PayOrderCount { s.LastWarnData[1] = fmt.Sprintf("%d_%s", temp, su2) return true } } return false } // 单一用户充值连续失败笔数预警 func (s *SysWarnPay) warnOneUserPayFailCount() bool { su := time.Now().Unix() eu := time.Now().AddDate(0, 0, 1).Unix() var uidArr []int64 str := fmt.Sprintf("Select DISTINCT(uid) From recharge_order Where (uid, `event`) In (Select uid, `event` From recharge_order WHERE `event` = %d Group By uid,`event` Having Count(*)>%d) AND `status` != %d AND create_time >= %d AND create_time < %d ", common.CurrencyEventReCharge, common.StatusROrderPay, s.OneUserPayFailCount, su, eu, ) err := db.Mysql().QueryBySql(str, &uidArr) if err != nil { log.Error(err.Error()) return false } var count int64 for i := 0; i < len(uidArr); i++ { var order []common.RechargeOrder queryStr := fmt.Sprintf(" SELECT * FROM recharge_order WHERE uid = %d AND event = %d AND status != %d AND create_time >= %d AND create_time < %d ", uidArr[i], common.CurrencyEventReCharge, common.StatusROrderPay, su, eu) err = db.Mysql().QueryBySql(queryStr, &order) if err != nil { log.Error(err.Error()) continue } var temp int64 for j := 0; j < len(order); j++ { if order[j].Status == common.StatusROrderPay { temp = 0 } else { temp++ } if temp >= s.OneUserPayFailCount { count++ break } } } if s.LastWarnData[2] == fmt.Sprintf("%d_%s", count, su) { if count > s.OneUserPayFailCount && (time.Now().Unix()-s.LastWarnTime) > s.NoticeTime { s.LastWarnData[2] = fmt.Sprintf("%d_%s", count, su) return true } } else { if count > 0 { s.LastWarnData[2] = fmt.Sprintf("%d_%s", count, su) return true } } return false } func (s *SysWarnPay) Update(data []int64) {} func (s *SysWarnPay) Check(data []int64) {} func (s *SysWarnPay) ID() int { return s.S.Type } // SysWarnTypeWithdrawPer 退出成功率预警 type SysWarnTypeWithdrawPer struct { S *SysWarn OrderGte int64 `json:"OrderGte" Redis:"OrderGte"` // 订单数量区间下限 WithdrawSuccessPer int64 `json:"WithdrawSuccessPer" Redis:"WithdrawSuccessPer"` // 退出成功率 NoticeTime int64 `json:"NoticeTime" Redis:"NoticeTime"` // 通知时间间隔 LastWarnTime int64 `json:"LastWarnTime" Redis:"LastWarnTime"` // 上一次预警时间 } func (s *SysWarnTypeWithdrawPer) Init(sys *SysWarn) error { log.Info("初始化退出成功率预警 ...") if err := mapstructure.Decode(sys.Condition, s); err != nil { log.Error("err:%v", err) return errors.New("解析出错") } s.S = sys log.Info("OrderGte: %v, WithdrawSuccessPer: %v, NoticeTime: %v", s.OrderGte, s.WithdrawSuccessPer, s.NoticeTime) return nil } func (s *SysWarnTypeWithdrawPer) Start() { su := time.Now().Unix() eu := time.Now().AddDate(0, 0, 1).Unix() withdrawTotalCount := db.Mysql().Count(&common.RechargeOrder{}, fmt.Sprintf("event = %v and create_time >= %d and create_time < %d", common.CurrencyEventWithDraw, su, eu)) if withdrawTotalCount < s.OrderGte { return } WithdrawSuccessCount := db.Mysql().Count(&common.RechargeOrder{}, fmt.Sprintf("event = %v and status = %v and create_time >= %d and create_time < %d", common.CurrencyEventWithDraw, common.StatusROrderFinish, su, eu)) if int64((float64(WithdrawSuccessCount)/float64(withdrawTotalCount))*100) <= s.WithdrawSuccessPer { if time.Now().Unix()-s.LastWarnTime >= s.NoticeTime { s.LastWarnTime = time.Now().Unix() s.S.Warn(fmt.Sprintf("当前退出成功率:%v", int64((float64(WithdrawSuccessCount)/float64(withdrawTotalCount))*100))) } } } func (s *SysWarnTypeWithdrawPer) Update(data []int64) {} func (s *SysWarnTypeWithdrawPer) Check(data []int64) {} func (s *SysWarnTypeWithdrawPer) ID() int { return s.S.Type }