// Copyright 2019 getensh.com. All rights reserved. // Use of this source code is governed by getensh.com. package elastic import ( "context" "errors" "fmt" "strconv" "strings" "github.com/tidwall/gjson" elastic "gopkg.in/olivere/elastic.v5" ) const ( // 分割符 delimiter = "," // 分页常量 slideUp = 1 // 向上滑动 slideDown = 2 // 向下滑动 positive = 1 // 正序 reverse = 2 // 倒序 ) // SplitTopBottomData 分割分页的top/bottom func SplitTopBottomData(param string) (base, num, extra int64, err error) { if param == "" { return 0.0, 0, 0.0, nil } mems := strings.Split(param, delimiter) switch { case len(mems) == 0: return 0.0, 0, 0.0, errors.New("member is not enough") case len(mems) == 1: base, _ = strconv.ParseInt(mems[0], 10, 64) case len(mems) == 2: if base, err = strconv.ParseInt(mems[0], 10, 64); err != nil { return 0.0, 0, 0.0, err } else { num, _ = strconv.ParseInt(mems[1], 10, 64) } case len(mems) >= 3: if base, err = strconv.ParseInt(mems[0], 10, 64); err != nil { return 0.0, 0, 0.0, err } else { num, _ = strconv.ParseInt(mems[1], 10, 64) extra, _ = strconv.ParseInt(mems[2], 10, 64) } default: return 0.0, 0, 0.0, nil } return } // JoinTopBottomData 组装分页的top/bottom func JoinTopBottomData(base, num, extra int64) string { return fmt.Sprintf("%d%s%d%s%d", base, delimiter, num, delimiter, extra) } // ElasticSort 排序参数结构 type ElasticSort struct { Field string Asc bool } // ElasticPageParam 分页参数 type ElasticPageParam struct { Indexs []string Types []string Match map[string]string Query elastic.Query Sorts []ElasticSort GeoSort *elastic.GeoDistanceSort Size uint32 // 单次拉取条数 Slide uint32 // 滑动方向 1: 向上滑动 2:向下滑动 Top string // 页首数据 Bottom string // 页尾数据 SequenceType uint32 // 1:正序 2:反序 BaseFilter string // 基础过滤域, 默认为"timestamp" ExtraFilter string // 附加过滤域 Includes []string // includes, 要获取的字段名列表 Excludes []string // excludes,不获取的字段名列表 } // ElasticCountParam 计数参数 type ElasticCountParam struct { Indexs []string Types []string Match map[string]string Query elastic.Query } // ElasticSumParam 取和参数 type ElasticSumParam struct { Indexs []string Types []string Match map[string]string Query elastic.Query AggFields map[string]string } // ElasticRandomParam 随机取多条数据 type ElasticRandomParam struct { Indexs []string Types []string Query elastic.Query GeoSort *elastic.GeoDistanceSort Size uint32 // 单次拉取条数 BaseFilter string // 基础过滤域, 默认为"timestamp" ExtraFilter string // 附加过滤域 OtherKeys []string // 需要的其它key, 如target } // ElasticHit 结果参数 type ElasticHit struct { Index string Type string ID string Base int64 Extra int64 Source string } // PageResult 分页结果 type PageResult struct { Top string Bottom string HasMore bool TotalCount uint64 Hits []ElasticHit } // RandomResult 随机结果 type RandomResult struct { TotalCount uint64 Hits []ElasticHit } // GetPageFromElastic 从elastic中读单面数据 func GetPageFromElastic(param *ElasticPageParam) (result PageResult, err error) { // 参数检查 if client == nil || param == nil { return result, fmt.Errorf("client or param is nil.") } if len(param.Indexs) <= 0 || len(param.Types) <= 0 { return result, fmt.Errorf("Invalid es params. %v", param) } // 支持多个索引和type查询 searchServer := client.Search() if len(param.Indexs) > 0 { searchServer = searchServer.Index(param.Indexs...) } if len(param.Types) > 0 { searchServer = searchServer.Type(param.Types...) } // 解析top topTime, topSameNum, topExtra, err := SplitTopBottomData(param.Top) if err != nil { return result, err } // 解析bottom botTime, botSameNum, botExtra, err := SplitTopBottomData(param.Bottom) if err != nil { return result, err } // 默认为"timestamp" if param.BaseFilter == "" { param.BaseFilter = "timestamp" } if len(param.Includes) > 0 || len(param.Excludes) > 0 { fetchSourceContext := elastic.NewFetchSourceContext(true) if len(param.Includes) > 0 { fetchSourceContext.Include(param.Includes...) } if len(param.Excludes) > 0 { fetchSourceContext.Exclude(param.Excludes...) } searchServer = searchServer.FetchSourceContext(fetchSourceContext) } // 组装match条件 // 注意: Match 与 Query 互斥 boolQuery := elastic.NewBoolQuery() if len(param.Match) == 0 { if param.Query != nil { boolQuery = boolQuery.Must(param.Query) } else { boolQuery = boolQuery.Must(elastic.NewMatchAllQuery()) } } else { for key, value := range param.Match { boolQuery = boolQuery.Must(elastic.NewMatchQuery(key, value)) } } // 按具体字段排序 if len(param.Sorts) > 0 { for _, s := range param.Sorts { searchServer = searchServer.Sort(s.Field, s.Asc) } } // 位置排序 if param.GeoSort != nil { searchServer = searchServer.SortBy(param.GeoSort) } topRemove := true removeCount := uint32(0) count := uint32(0) positiveFlag := true switch param.Slide { case slideDown: switch param.SequenceType { case positive: // 正序向下滑 timestamp >=0 if topTime == 0 { // timestamp >= 0 正序排列 if param.ExtraFilter != "" { searchServer = searchServer.Sort(param.ExtraFilter, true) boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Gte(0)) } boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Gte(0)) searchServer = searchServer.Sort(param.BaseFilter, true) } else { // timestamp <= top 倒序排列 if param.ExtraFilter != "" { searchServer = searchServer.Sort(param.ExtraFilter, false) boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Lte(topExtra)) } boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Lte(topTime)) searchServer = searchServer.Sort(param.BaseFilter, false) positiveFlag = false } default: // 倒序向下滑 if topTime == 0 { // timestamp >= 0 倒序排列 if param.ExtraFilter != "" { searchServer = searchServer.Sort(param.ExtraFilter, false) boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Gte(0)) } boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Gte(0)) searchServer = searchServer.Sort(param.BaseFilter, false) } else { // timestamp >= top 倒序排列 if param.ExtraFilter != "" { searchServer = searchServer.Sort(param.ExtraFilter, false) boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Gte(topExtra)) } boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Gte(topTime)) searchServer = searchServer.Sort(param.BaseFilter, false) } } // 删除的数量 removeCount = uint32(topSameNum) // 需要拉取的条数 count = removeCount + param.Size // 去掉头部相同数据 topRemove = false default: switch param.SequenceType { case positive: // 正序向上滑 if botTime == 0 { // timestamp >= 0 正序排序 if param.ExtraFilter != "" { boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Gte(0)) searchServer = searchServer.Sort(param.ExtraFilter, true) } boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Gte(0)) searchServer = searchServer.Sort(param.BaseFilter, true) } else { // timestamp >= bot 正序排序 if param.ExtraFilter != "" { boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Gte(botExtra)) searchServer = searchServer.Sort(param.ExtraFilter, true) } boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Gte(botTime)) searchServer = searchServer.Sort(param.BaseFilter, true) } default: // 倒序向上滑 if botTime == 0 { // timestamp >= 0 倒序排序 if param.ExtraFilter != "" { boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Gte(0)) searchServer = searchServer.Sort(param.ExtraFilter, false) } boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Gte(0)) searchServer = searchServer.Sort(param.BaseFilter, false) } else { // timestamp <= bot 倒序排序 if param.ExtraFilter != "" { boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Lte(botExtra)) searchServer = searchServer.Sort(param.ExtraFilter, false) } boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Lte(botTime)) searchServer = searchServer.Sort(param.BaseFilter, false) } } // 删除的数量 removeCount = uint32(botSameNum) // 需要拉取的条数 count = removeCount + param.Size } // 执行搜索 searchServer = searchServer.From(0).Size(int(count)) res, err := searchServer.Query(boolQuery).Do(context.Background()) if err != nil { return result, err } result.Hits = make([]ElasticHit, 0, len(res.Hits.Hits)) if res != nil && res.Hits != nil && len(res.Hits.Hits) > 0 { result.HasMore = res.Hits.TotalHits > int64(count) if res.Hits.TotalHits > int64(removeCount) { result.TotalCount = uint64(res.Hits.TotalHits - int64(removeCount)) } for _, hit := range res.Hits.Hits { if hit.Source != nil { result.Hits = append(result.Hits, ElasticHit{ ID: hit.Id, Index: hit.Index, Type: hit.Type, Base: gjson.GetBytes(*hit.Source, param.BaseFilter).Int(), Extra: gjson.GetBytes(*hit.Source, param.ExtraFilter).Int(), Source: string(*hit.Source), }) } } } // 计算top与bottom length := len(result.Hits) if length > 0 { // 检查 删除数据比查找到的数据要多直接返回空 if length <= int(removeCount) { return PageResult{}, nil } // 倒序 if !positiveFlag { //需要倒序结果 对其数据进行正序 for i := 0; i < length/2; i++ { result.Hits[i], result.Hits[length-1-i] = result.Hits[length-1-i], result.Hits[i] } } topBase := result.Hits[0].Base topExtra := result.Hits[0].Extra botBase := result.Hits[length-1].Base botExtra := result.Hits[length-1].Extra // 去掉上一次访问过的数据 if topRemove { // 去掉头部 result.Hits = result.Hits[removeCount:] } else { // 去掉尾部 result.Hits = result.Hits[:length-int(removeCount)] } // 对重复数据删除后的处理时需要重新计算length与topbase/botbase等, 否则会可能内存越界 length = len(result.Hits) if length > 0 { topBase = result.Hits[0].Base topExtra = result.Hits[0].Extra botBase = result.Hits[length-1].Base botExtra = result.Hits[length-1].Extra } topNum := int64(0) bottomNum := int64(0) for _, item := range result.Hits { if item.Base == topBase { topNum += 1 } else { break } } for i := length - 1; i >= 0; i -= 1 { if result.Hits[i].Base == botBase { bottomNum += 1 } else { break } } result.Top = JoinTopBottomData(topBase, topNum, topExtra) result.Bottom = JoinTopBottomData(botBase, bottomNum, botExtra) } return result, nil } // GetPageFromElasticByIndex 索引取分页,类似于mysql的 from to 分页 func GetPageFromElasticByIndex(param *ElasticPageParam) (result PageResult, err error) { // 参数检查 if client == nil || param == nil { return result, fmt.Errorf("client or param is nil.") } if len(param.Indexs) <= 0 || len(param.Types) <= 0 { return result, fmt.Errorf("Invalid es params. %v", param) } // 支持多个索引和type查询 searchServer := client.Search() if len(param.Indexs) > 0 { searchServer = searchServer.Index(param.Indexs...) } if len(param.Types) > 0 { searchServer = searchServer.Type(param.Types...) } // 组装match条件 // 注意: Match 与 Query 互斥 boolQuery := elastic.NewBoolQuery() if len(param.Match) == 0 { if param.Query != nil { boolQuery = boolQuery.Must(param.Query) } else { boolQuery = boolQuery.Must(elastic.NewMatchAllQuery()) } } else { for key, value := range param.Match { boolQuery = boolQuery.Must(elastic.NewMatchQuery(key, value)) } } if len(param.Sorts) > 0 { for _, s := range param.Sorts { searchServer = searchServer.Sort(s.Field, s.Asc) } } if param.GeoSort != nil { searchServer = searchServer.SortBy(param.GeoSort) } // 解析top from, _, _, err := SplitTopBottomData(param.Bottom) if err != nil { return result, err } searchServer = searchServer.From(int(from)).Size(int(param.Size)) res, err := searchServer.Query(boolQuery).Do(context.Background()) if err != nil { return result, err } result.Hits = make([]ElasticHit, 0, len(res.Hits.Hits)) if res != nil && res.Hits != nil && len(res.Hits.Hits) > 0 { result.HasMore = res.Hits.TotalHits > int64(param.Size) result.TotalCount = uint64(res.Hits.TotalHits) for _, hit := range res.Hits.Hits { if hit.Source != nil { result.Hits = append(result.Hits, ElasticHit{ ID: hit.Id, Index: hit.Index, Type: hit.Type, Source: string(*hit.Source), }) } } } // 计算top与bottom len := int64(len(result.Hits)) if len > 0 { result.Top = JoinTopBottomData(from, 0, 0) result.Bottom = JoinTopBottomData(from+len, 0, 0) } return result, nil } // GetSumFromElastic 取和函数 func GetSumFromElastic(param *ElasticSumParam) (map[string]int64, error) { // 参数检查 if client == nil || param == nil { return nil, fmt.Errorf("client or param is nil.") } if len(param.Indexs) <= 0 || len(param.Types) <= 0 { return nil, fmt.Errorf("Invalid es params. %v", param) } // 支持多个索引和type查询 searchServer := client.Search() if len(param.Indexs) > 0 { searchServer = searchServer.Index(param.Indexs...) } if len(param.Types) > 0 { searchServer = searchServer.Type(param.Types...) } // 组装要汇总的参数字段 for key, field := range param.AggFields { agg := elastic.NewSumAggregation().Field(field) searchServer = searchServer.Aggregation(key, agg) } // 开始查询 res, err := searchServer.Query(param.Query).Do(context.Background()) if err != nil { return nil, err } // 取出结果 result := make(map[string]int64, len(param.AggFields)) if res != nil && res.Hits != nil && len(res.Hits.Hits) > 0 { agg := res.Aggregations if agg != nil { for k, v := range agg { result[k] = gjson.GetBytes(*v, "value").Int() } } } return result, nil } // GetCountFromElastic 计数函数 func GetCountFromElastic(param *ElasticCountParam) (int64, error) { // 参数检查 if client == nil || param == nil { return 0, fmt.Errorf("client or param is nil.") } if len(param.Indexs) <= 0 || len(param.Types) <= 0 { return 0, fmt.Errorf("Invalid es params. %v", param) } return client.Count(param.Indexs...).Type(param.Types...).Query(param.Query).Do(context.TODO()) } // GGetRandomFromElastic 随机取多条数据函数 func GetRandomFromElastic(param *ElasticRandomParam) (result RandomResult, err error) { // 参数检查 if client == nil || param == nil { return result, fmt.Errorf("client or param is nil.") } if len(param.Indexs) <= 0 || len(param.Types) <= 0 { return result, fmt.Errorf("Invalid es params. %v", param) } // 支持多个索引和type查询 searchServer := client.Search() if len(param.Indexs) > 0 { searchServer = searchServer.Index(param.Indexs...) } if len(param.Types) > 0 { searchServer = searchServer.Type(param.Types...) } if param.GeoSort != nil { searchServer = searchServer.SortBy(param.GeoSort) } searchServer = searchServer.Size(int(param.Size)) res, err := searchServer.Query(param.Query).Do(context.Background()) if err != nil { return result, err } result.Hits = make([]ElasticHit, 0, len(res.Hits.Hits)) if res != nil && res.Hits != nil && len(res.Hits.Hits) > 0 { result.TotalCount = uint64(res.Hits.TotalHits) for _, hit := range res.Hits.Hits { if hit.Source != nil { result.Hits = append(result.Hits, ElasticHit{ ID: hit.Id, Index: hit.Index, Type: hit.Type, Source: string(*hit.Source), }) } } } return result, nil }