package call import ( "context" "encoding/json" "server/db" "server/natsClient" "server/pb" "server/util" "time" "github.com/liangdas/mqant/log" "github.com/olivere/elastic/v7" ) func InsertToESGO(index string, params interface{}, id ...string) { if GetTopicName() == "common" { data := &WriteData{Index: index, Data: params} if id != nil { data.ID = id[0] } Go(func() { AddBulk(data) }) } else { byt, _ := json.Marshal(params) data := &pb.InnerESBulk{Index: index, Data: string(byt)} if id != nil { data.ID = id[0] } Go(func() { Publish(natsClient.TopicInnerESBulk, data) }) } } var ( Bulk *elastic.BulkService WriteChan = make(chan *WriteData, 20000) ESCloseChan = make(chan struct{}) ESFinishChan = make(chan struct{}) // 完成信号 MaxSize int64 = 50 * 1024 * 1024 // bulk最大长度 ) type WriteData struct { ID string Index string Data interface{} } func InitBulkQueue() { es := db.ES().C() Bulk = es.Bulk() util.Go(func() { t := time.NewTimer(time.Second) for { select { case <-t.C: t.Reset(time.Second) Flush() case one := <-WriteChan: e := elastic.NewBulkCreateRequest().Index(one.Index) if one.ID != "" { e.Id(one.ID) } e.Doc(one.Data) Bulk.Add(e) if Bulk.EstimatedSizeInBytes() >= MaxSize { Flush() } case <-ESCloseChan: log.Debug("module closing") Flush() ESFinishChan <- struct{}{} } // fmt.Println(total) } }) } func AddBulk(w *WriteData) { WriteChan <- w } func Flush() { if Bulk.EstimatedSizeInBytes() <= 0 { return } bulk := Bulk // util.IndexTry(func() error { util.Go(func() { _, err := bulk.Do(context.Background()) if err != nil { log.Error("err:%v", err) // return err } }) // return nil // }) Bulk = db.ES().C().Bulk() } // 滚动设置新的索引 func Rollover(aliasName, indexName string) { if db.ES().IndexExist(indexName) { return } // 不存在创建 if !db.ES().IndexExist(aliasName) { db.ES().CreateExist(indexName) db.ES().AddAlias(aliasName, indexName) } else { db.ES().Rollover(aliasName, indexName, nil) } }