You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
60 lines
1021 B
60 lines
1021 B
package es |
|
|
|
import ( |
|
"context" |
|
"server/util" |
|
"time" |
|
|
|
"github.com/liangdas/mqant/log" |
|
"github.com/olivere/elastic/v7" |
|
) |
|
|
|
var ( |
|
Bulk *elastic.BulkService |
|
WriteChan = make(chan *WriteData, 10000) |
|
) |
|
|
|
type WriteData struct { |
|
ID string |
|
Index string |
|
Data interface{} |
|
} |
|
|
|
func InitBulkQueue(es *elastic.Client) { |
|
Bulk = es.Bulk() |
|
util.Go(func() { |
|
t := time.NewTimer(time.Second) |
|
for { |
|
select { |
|
case <-t.C: |
|
t.Reset(time.Second) |
|
if Bulk.EstimatedSizeInBytes() > 0 { |
|
bulk := Bulk |
|
// util.IndexTry(func() error { |
|
util.Go(func() { |
|
_, err := bulk.Do(context.Background()) |
|
if err != nil { |
|
log.Error("err:%v", err) |
|
// return err |
|
} |
|
}) |
|
// return nil |
|
// }) |
|
Bulk = es.Bulk() |
|
} |
|
case one := <-WriteChan: |
|
e := elastic.NewBulkCreateRequest().Index(one.Index) |
|
if one.ID != "" { |
|
e.Id(one.ID) |
|
} |
|
e.Doc(one.Data) |
|
Bulk.Add(e) |
|
} |
|
// fmt.Println(total) |
|
} |
|
}) |
|
} |
|
|
|
func AddBulk(w *WriteData) { |
|
WriteChan <- w |
|
}
|
|
|