package es import ( "context" "encoding/json" "errors" "fmt" "math/rand" "reflect" "server/common" "server/util" "time" "github.com/liangdas/mqant/log" "github.com/olivere/elastic/v7" ) // 向Es数据库中新增数据 index 数据表名 params 一定要是json数据结构 func (ES *EsClient) InsertToES(index string, params interface{}) error { _, err := ES.client.Index().Index(index).BodyJson(params).Do(context.Background()) if err != nil { log.Error("index:%v,write:%v,err:%v", index, params, err) return err } return nil } // 向Es数据库中新增数据 指数退避写入 func (ES *EsClient) InsertToESGO(index string, params interface{}) { // util.IndexTry(func() error { // _, err := ES.client.Index().Index(index).BodyJson(params).Do(context.Background()) // if err != nil { // log.Error("index:%v,write:%v,err:%v", index, params, err) // return err // } // return nil // }) util.Go(func() { AddBulk(&WriteData{Index: index, Data: params}) }) } // 向Es数据库中新增数据 index 数据表名 params 一定要是json数据结构 func (ES *EsClient) InsertToESByID(index, id string, params interface{}) error { _, err := ES.client.Index().Index(index).Id(id).BodyJson(params).Do(context.Background()) if err != nil { log.Error("index:%v,write:%v,err:%v", index, params, err) return err } return nil } // 向Es数据库中新增数据 指数退避写入 func (ES *EsClient) InsertToESByIDGO(index, id string, params interface{}) { // util.IndexTry(func() error { // _, err := ES.client.Index().Index(index).Id(id).BodyJson(params).Do(context.Background()) // if err != nil { // log.Error("index:%v,write:%v,err:%v", index, params, err) // return err // } // return nil // }) util.Go(func() { AddBulk(&WriteData{ID: id, Index: index, Data: params}) }) } func (ES *EsClient) Count(index string, q elastic.Query) int64 { count, err := ES.client.Count(index).Query(q).Do(context.Background()) if err != nil { return 0 } return count } // CountCard 统计数量,按提供字段去重 func (ES *EsClient) CountCard(index, field string, q elastic.Query) int64 { result, err := ES.C().Search().Index(index).Query(q).Aggregation("Total", elastic.NewCardinalityAggregation().Field(field)).Size(0).Do(context.Background()) if err != nil { log.Error("err:%v", err) return 0 } ret := map[string]int64{} err = json.Unmarshal(result.Aggregations["Total"], &ret) if err != nil { log.Error("err:%v", err) return 0 } return ret["value"] } // Update 更新文档,val必须是map|||struct||es script func (ES *EsClient) Update(index, id string, val interface{}) (res *elastic.UpdateResponse, err error) { switch t := val.(type) { case *elastic.Script: res, err = ES.client.Update().Index(index).Id(id).Script(t).Do(context.Background()) return default: // struct or map res, err = ES.client.Update().Index(index).Id(id).Doc(val).Do(context.Background()) return } } func (ES *EsClient) DeleteByID(index string, id string) error { _, err := ES.client.Delete().Index(index).Id(id).Do(context.Background()) return err } func (ES EsClient) DeleteByQuery(indices string, q elastic.Query) (err error) { _, err = ES.client.DeleteByQuery(indices).Query(q).WaitForCompletion(false).Do(context.Background()) if err != nil { log.Error("err:%v", err) } return } func (ES *EsClient) DeleteIndex(index ...string) (*elastic.IndicesDeleteResponse, error) { return ES.client.DeleteIndex(index...).Do(context.Background()) } func (ES *EsClient) C() *elastic.Client { return ES.client } // 查询一组数据 index表名,page页码,num一页数量,q查询语句,kind取值的类型(必须是指针,其中包含的ID会有特殊含义,与文档中的_id同步),sort排序(false为逆序) func (ES *EsClient) QueryList(index string, page, num int, q elastic.Query, kind interface{}, sort ...interface{}) (int64, error) { from := page * num if from < 0 || num == 0 { return 0, errors.New("invalid page or num") } oneType := reflect.TypeOf(kind).Kind() if oneType != reflect.Ptr { return 0, errors.New("invalid kind") } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() searchRes := new(elastic.SearchResult) var err error if len(sort) > 0 { next := ES.client.Search(index).Query(q) for i := 0; i < len(sort)-1; i += 2 { field, ok := sort[i].(string) if !ok { log.Error("invalid sort:%v", sort...) return 0, errors.New("invalid sort") } seq, ok := sort[i+1].(bool) if !ok { log.Error("invalid sort:%v", sort...) return 0, errors.New("invalid sort") } next.Sort(field, seq) } searchRes, err = next.From(from).Size(num).Do(ctx) } else { searchRes, err = ES.client.Search(index).Query(q).From(from).Size(num).Do(ctx) } // ret := []interface{}{} if elastic.IsNotFound(err) { return 0, nil } if err != nil { log.Error("search error:%v", err) return 0, err } newArr := make([]reflect.Value, 0) val := reflect.ValueOf(kind) for _, v := range searchRes.Hits.Hits { t := reflect.TypeOf(kind) tee := reflect.New(t.Elem().Elem()) ti := tee.Interface() err := json.Unmarshal(v.Source, ti) if err != nil { return 0, err } e := reflect.ValueOf(ti).Elem() if e.Kind() == reflect.Struct { if id := e.FieldByName("ID"); id.Kind() == reflect.String { id.SetString(v.Id) } } else if e.Kind() == reflect.Ptr { if e.Elem().Kind() == reflect.Struct { if id := e.Elem().FieldByName("ID"); id.Kind() == reflect.String { id.SetString(v.Id) } } } newArr = append(newArr, reflect.ValueOf(ti).Elem()) } val.Elem().Set(reflect.Append(val.Elem(), newArr...)) count := ES.Count(index, q) return count, nil } // 查询一组数据 index表名,after上次查询最后一个数据,num一页数量,q查询语句,kind取值的类型(必须是指针,其中包含的ID会有特殊含义,与文档中的_id同步),sort排序(false为逆序) func (ES *EsClient) SearchAfter(index string, after interface{}, num int, q elastic.Query, kind interface{}, seqSort bool) (int64, error) { oneType := reflect.TypeOf(kind).Kind() if oneType != reflect.Ptr { return 0, errors.New("invalid kind") } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() searchRes := new(elastic.SearchResult) var err error next := ES.client.Search(index).Query(q).Sort("_seq_no", seqSort) // for i := 0; i < len(sort)-1; i += 2 { // field, ok := sort[i].(string) // if !ok { // log.Error("invalid sort:%v", sort...) // return 0, errors.New("invalid sort") // } // seq, ok := sort[i+1].(bool) // if !ok { // log.Error("invalid sort:%v", sort...) // return 0, errors.New("invalid sort") // } // next.Sort(field, seq) // } if after != nil { searchRes, err = next.Size(num).SearchAfter(after).SeqNoPrimaryTerm(true).Do(ctx) } else { searchRes, err = next.Size(num).SeqNoPrimaryTerm(true).Do(ctx) } // ret := []interface{}{} if elastic.IsNotFound(err) { return 0, nil } if err != nil { log.Error("search error:%v", err) return 0, err } newArr := make([]reflect.Value, 0) val := reflect.ValueOf(kind) var resAfter int64 for i, v := range searchRes.Hits.Hits { t := reflect.TypeOf(kind) tee := reflect.New(t.Elem().Elem()) ti := tee.Interface() err := json.Unmarshal(v.Source, ti) if err != nil { return 0, err } e := reflect.ValueOf(ti).Elem() if e.Kind() == reflect.Struct { if id := e.FieldByName("ID"); id.Kind() == reflect.String { id.SetString(v.Id) } } else if e.Kind() == reflect.Ptr { if e.Elem().Kind() == reflect.Struct { if id := e.Elem().FieldByName("ID"); id.Kind() == reflect.String { id.SetString(v.Id) } } } fmt.Println(*v.SeqNo) newArr = append(newArr, reflect.ValueOf(ti).Elem()) if i == len(searchRes.Hits.Hits)-1 { resAfter = *v.SeqNo } } val.Elem().Set(reflect.Append(val.Elem(), newArr...)) return resAfter, nil } func (ES *EsClient) UpdateByScript(index string, q elastic.Query, script string) (updated int64, err error) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() var res *elastic.BulkIndexByScrollResponse res, err = ES.client.UpdateByQuery(index).Query(q).Script(elastic.NewScriptInline(script)).Do(ctx) if err != nil { log.Error("err:%v", err) return } updated = res.Updated return } // QueryOne 查询单条数据,kind取值的类型(必须是指针,其中包含的ID会有特殊含义,与文档中的_id同步),sort排序(false为逆序) func (ES *EsClient) QueryOne(index string, q elastic.Query, kind interface{}, sort ...interface{}) error { // ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) // defer cancel() k := reflect.ValueOf(kind) oneType := k.Kind() if oneType != reflect.Ptr { return errors.New("invalid kind") } s := ES.client.Search(index).Query(q) ret := new(elastic.SearchResult) var err error if sort != nil { ret, err = s.Sort(sort[0].(string), sort[1].(bool)).Size(1).Do(context.Background()) } else { ret, err = s.Size(1).Do(context.Background()) } if err != nil && !elastic.IsNotFound(err) { // if !elastic.IsNotFound(err) { log.Error("err:%v", err) // } return err } if ret == nil || len(ret.Hits.Hits) == 0 { return nil } // randomIdx := rand.Intn(len(ret.Hits.Hits)) // hit := ret.Hits.Hits[randomIdx] for _, v := range ret.Hits.Hits { // log.Debug("source:%v", v) err = json.Unmarshal(v.Source, kind) if err != nil { log.Error("err:%v,source:%v", err, string(v.Source)) return err } if k.Elem().Kind() == reflect.Struct { // log.Debug("set:%v", v) if id := k.Elem().FieldByName("ID"); id.Kind() == reflect.String { id.SetString(v.Id) } } break } return nil } // QueryOne 查询单条数据,kind取值的类型(必须是指针,其中包含的ID会有特殊含义,与文档中的_id同步),sort排序(false为逆序) func (ES *EsClient) QueryOneRandom(index string, q elastic.Query, kind interface{}, sort ...interface{}) error { // ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) // defer cancel() k := reflect.ValueOf(kind) oneType := k.Kind() if oneType != reflect.Ptr { return errors.New("invalid kind") } s := ES.client.Search(index).Query(q) ret := new(elastic.SearchResult) var err error if sort != nil { ret, err = s.Sort(sort[0].(string), sort[1].(bool)).Do(context.Background()) } else { ret, err = s.Do(context.Background()) } if err != nil { if !elastic.IsNotFound(err) { log.Error("err:%v", err) } return err } if len(ret.Hits.Hits) == 0 { return nil } randomIdx := rand.Intn(len(ret.Hits.Hits)) hit := ret.Hits.Hits[randomIdx] err = json.Unmarshal(hit.Source, kind) if err != nil { log.Error("source:%v", string(hit.Source)) return err } if k.Elem().Kind() == reflect.Struct { // log.Debug("set:%v", v) if id := k.Elem().FieldByName("ID"); id.Kind() == reflect.String { id.SetString(hit.Id) } } return nil } // Upsert 没找到插入,找到则执行更新 func (ES *EsClient) Upsert(index, id string, val interface{}, uVal ...interface{}) (ret *elastic.UpdateResponse, err error) { ctx := context.Background() var s *elastic.UpdateService switch t := val.(type) { case *elastic.Script: s = ES.client.Update().Index(index).Id(id).Script(t) default: // struct or map s = ES.client.Update().Index(index).Id(id).Doc(val) } if uVal == nil { ret, err = s.Do(ctx) return } // 是否强制执行更新脚本 if len(uVal) > 1 { ret, err = s.Upsert(uVal[0]).ScriptedUpsert(uVal[1].(bool)).Do(ctx) } else { ret, err = s.Upsert(uVal[0]).Do(ctx) } return } // IncrBy 根据字段叠加 func (ES *EsClient) IncrBy(index, id, field string, val interface{}, uVal interface{}) (ret *elastic.UpdateResponse, err error) { ctx := context.Background() s := ES.client.Update().Index(index).Id(id).Script(elastic.NewScript(fmt.Sprintf("if(ctx._source.%v==null){ctx._source.%v=%v}else{ctx._source.%v+=%v}", field, field, val, field, val))) if uVal == nil { ret, err = s.Do(ctx) return } ret, err = s.Upsert(uVal).ScriptedUpsert(true).Do(ctx) return } func (ES *EsClient) Refresh(index string) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() _, err := ES.client.Refresh(index).Do(ctx) if err != nil && !elastic.IsNotFound(err) { log.Error("refresh err:%v", err) } } // GroupBy 聚合查询数量 // size 不设置默认为10 func (ES *EsClient) GroupBy(index, field string, q elastic.Query, size int) (*common.GroupBuckets, error) { query := ES.C().Search().Index(index).Query(q) if size > 0 { query.Aggregation("total", elastic.NewTermsAggregation().Field(field).Size(size)) } else { query.Aggregation("total", elastic.NewTermsAggregation().Field(field)) } result, err := query.Do(context.Background()) if err != nil { log.Error("err:%v", err) return nil, err } res := new(common.GroupBuckets) if err := json.Unmarshal(result.Aggregations["total"], &res); err != nil { log.Error("err:%v", err) return nil, err } return res, nil } // GroupBySub 聚合查询数量 // size 不设置默认为10 func (ES *EsClient) GroupBySub(index, field, sub string, q elastic.Query, size int) (*common.Group2CardBuckets, error) { query := ES.C().Search().Index(index).Query(q).Aggregation("total", elastic.NewTermsAggregation().Field(field).SubAggregation("sub1", elastic.NewTermsAggregation().Field(sub))) if size > 0 { query.Size(size) } result, err := query.Do(context.Background()) if err != nil { log.Error("err:%v", err) return nil, err } res := new(common.Group2CardBuckets) if err := json.Unmarshal(result.Aggregations["total"], &res); err != nil { log.Error("err:%v", err) return nil, err } return res, nil } // GroupByCard 聚合查询数量,包含子查询card // size 不设置默认为10 func (ES *EsClient) GroupByCard(index, field, sub string, q elastic.Query, size int) (*common.GroupCardBuckets, error) { query := ES.C().Search().Index(index).Query(q) if size > 0 { query.Aggregation("total", elastic.NewTermsAggregation().Field(field).SubAggregation("sub", elastic.NewCardinalityAggregation().Field(sub)).Size(size)) } else { query.Aggregation("total", elastic.NewTermsAggregation().Field(field).SubAggregation("sub", elastic.NewCardinalityAggregation().Field(sub))) } result, err := query.Do(context.Background()) if err != nil { log.Error("err:%v", err) return nil, err } res := new(common.GroupCardBuckets) if err := json.Unmarshal(result.Aggregations["total"], &res); err != nil { log.Error("err:%v", err) return nil, err } return res, nil } // GroupBy2Card 聚合查询数量,包含双重子查询card // size 不设置默认为10 func (ES *EsClient) GroupBy2Card(index, field, sub1, sub2 string, q elastic.Query, size int) (*common.Group2CardBuckets, error) { query := ES.C().Search().Index(index).Query(q).Aggregation("total", elastic.NewTermsAggregation().Field(field).SubAggregation("sub1", elastic.NewTermsAggregation().Field(sub1).SubAggregation("sub2", elastic.NewCardinalityAggregation().Field(sub2)))) if size > 0 { query.Size(size) } result, err := query.Do(context.Background()) if err != nil { log.Error("err:%v", err) return nil, err } res := new(common.Group2CardBuckets) if err := json.Unmarshal(result.Aggregations["total"], &res); err != nil { log.Error("err:%v", err) return nil, err } return res, nil } // GroupBy 聚合查询数量 field聚合的字段 sumField求和的字段 func (ES *EsClient) GroupSumBy(index, field string, q elastic.Query, ret interface{}, order string, or bool, size int, sumField ...string) error { agg := elastic.NewTermsAggregation().Field(field) for _, v := range sumField { agg.SubAggregation(v, elastic.NewSumAggregation().Field(v)) } if order != "" { agg.OrderByAggregation(order, or) } agg.SubAggregation("Top", elastic.NewTopHitsAggregation().From(0).Size(1)) if size == 0 { size = 5000 } agg.Size(size) result, err := ES.C().Search().Index(index).Query(q).Aggregation("total", agg).Do(context.Background()) if err != nil { log.Error("err:%v", err) return err } fmt.Println(string(result.Aggregations["total"])) if err := json.Unmarshal(result.Aggregations["total"], &ret); err != nil { log.Error("err:%v", err) return err } return nil } // Group2SumBy 聚合查询数量 field聚合的字段 sub2求和的字段 func (ES *EsClient) Group2SumBy(index, field, sub1, sub2 string, q elastic.Query, order string, or bool, size int) (*common.Group2SumBuckets, error) { agg := elastic.NewTermsAggregation().Field(field).SubAggregation("sub1", elastic.NewTermsAggregation().Field(sub1).SubAggregation("sub2", elastic.NewSumAggregation().Field(sub2))) if order != "" { agg.OrderByAggregation(order, or) } // agg.SubAggregation("Top", elastic.NewTopHitsAggregation().From(0).Size(1)) if size == 0 { size = 5000 } agg.Size(size) ret := new(common.Group2SumBuckets) result, err := ES.C().Search().Index(index).Query(q).Aggregation("total", agg).Do(context.Background()) if err != nil { log.Error("err:%v", err) return nil, err } if err := json.Unmarshal(result.Aggregations["total"], &ret); err != nil { log.Error("err:%v", err) return nil, err } return ret, nil } // SumBy 聚合求和数量 func (ES *EsClient) SumBy(index, field string, q elastic.Query) (float64, error) { result, err := ES.C().Search().Index(index).Query(q).Aggregation("total", elastic.NewSumAggregation().Field(field)).Do(context.Background()) if err != nil { log.Error("err:%v", err) return 0, err } // fmt.Println(string(result.Aggregations["total"])) res := new(common.SumByResult) if err := json.Unmarshal(result.Aggregations["total"], &res); err != nil { log.Error("err:%v", err) return res.Value, err } return res.Value, nil } // SumByInt64 聚合求和数量 func (ES *EsClient) SumByInt64(index, field string, q elastic.Query) int64 { result, err := ES.C().Search().Index(index).Query(q).Aggregation("total", elastic.NewSumAggregation().Field(field)).Size(0).Do(context.Background()) if err != nil { log.Error("err:%v", err) return 0 } // fmt.Println(string(result.Aggregations["total"])) res := new(common.SumByResult) if err := json.Unmarshal(result.Aggregations["total"], &res); err != nil { log.Error("err:%v", err) return int64(res.Value) } return int64(res.Value) } // AVG 聚合求平均值 func (ES *EsClient) AVG(index, field string, q elastic.Query) (float64, error) { result, err := ES.C().Search().Index(index).Query(q).Aggregation("total", elastic.NewAvgAggregation().Field(field)).Do(context.Background()) if err != nil { log.Error("err:%v", err) return 0, err } // fmt.Println(string(result.Aggregations["total"])) res := new(common.SumByResult) if err := json.Unmarshal(result.Aggregations["total"], &res); err != nil { log.Error("err:%v", err) return res.Value, err } return res.Value, nil } // Exist 判断是否存在 func (ES *EsClient) Exist(index, id string) bool { ret, err := ES.C().Exists().Index(index).Id(id).Do(context.Background()) if err != nil { log.Error("err:%v", err) return false } return ret } // Exist 判断是否存在 func (ES *EsClient) IndexExist(index string) bool { ret, err := ES.C().IndexExists(index).Do(context.Background()) if err != nil { log.Error("err:%v", err) return false } return ret } // Exist 判断是否存在 func (ES *EsClient) CreateExist(index string) error { _, err := ES.C().CreateIndex(index).Do(context.Background()) if err != nil { log.Error("err:%v", err) return err } return nil } // Exist 判断是否存在 func (ES *EsClient) AddAlias(aliasName, indexName string) error { alias := elastic.NewAliasAddAction(aliasName).Index(indexName).IsWriteIndex(true) aliasResult, err := ES.client.Alias().Action(alias).Do(context.Background()) if err != nil || !aliasResult.Acknowledged { log.Error("err:%v", err) return err } return nil } func (ES *EsClient) Rollover(aliasName, indexName string, conditon map[string]interface{}) error { req := ES.client.RolloverIndex(aliasName).NewIndex(indexName) if conditon != nil { for k, v := range conditon { req.AddCondition(k, v) } } aliasResult, err := req.Do(context.Background()) if err != nil || !aliasResult.Acknowledged { log.Error("err:%v", err) return err } return nil }