package es import ( "context" "server/util" "time" "github.com/liangdas/mqant/log" "github.com/olivere/elastic/v7" ) var ( Bulk *elastic.BulkService WriteChan = make(chan *WriteData, 10000) ) type WriteData struct { ID string Index string Data interface{} } func InitBulkQueue(es *elastic.Client) { Bulk = es.Bulk() util.Go(func() { t := time.NewTimer(time.Second) for { select { case <-t.C: t.Reset(time.Second) if Bulk.EstimatedSizeInBytes() > 0 { bulk := Bulk // util.IndexTry(func() error { util.Go(func() { _, err := bulk.Do(context.Background()) if err != nil { log.Error("err:%v", err) // return err } }) // return nil // }) Bulk = es.Bulk() } case one := <-WriteChan: e := elastic.NewBulkCreateRequest().Index(one.Index) if one.ID != "" { e.Id(one.ID) } e.Doc(one.Data) Bulk.Add(e) } // fmt.Println(total) } }) } func AddBulk(w *WriteData) { WriteChan <- w }