Skip to content

Elasticsearch生产实践

ES深分页

Elasticsearch 的From/Size方式提供了分页的功能,同时,也有相应的限制。

举个例子,一个索引,有10亿数据,分10个 shards,然后,一个搜索请求,from=1000000,size=100,这时候,会带来严重的性能问题:CPU,内存,IO,网络带宽。

在 query 阶段,每个shards需要返回 1000100 条数据给 coordinating node,而 coordinating node 需要接收10 * 1000,100 条数据,即使每条数据只有 _doc _id_score,这数据量也很大了。

「在另一方面,我们意识到,这种深度分页的请求并不合理,因为我们是很少人为的看很后面的请求的,在很多的业务场景中,都直接限制分页,比如只能看前100页。」

比如,有1千万粉丝的微信大V,要给所有粉丝群发消息,或者给某省粉丝群发,这时候就需要取得所有符合条件的粉丝,而最容易想到的就是利用 from + size 来实现,不过,这个是不现实的,这时,可以采用 Elasticsearch 提供的其他方式来实现遍历。

Scroll

可以把scroll理解为关系型数据库里的cursor,因此,scroll并不适合用来做实时搜索,而更适合用于后台批处理任务,比如群发。

这个分页的用法,「不是为了实时查询数据」,而是为了**「一次性查询大量的数据(甚至是全部的数据」**)。

因为这个scroll相当于维护了一份当前索引段的快照信息,这个快照信息是你执行这个scroll查询时的快照。在这个查询后的任何新索引进来的数据,都不会在这个快照中查询到。

但是它相对于from和size,不是查询所有数据然后剔除不要的部分,而是记录一个读取的位置,保证下一次快速继续读取。

不考虑排序的时候,可以结合SearchType.SCAN使用。

scroll可以分为初始化和遍历两部,初始化时将**「所有符合搜索条件的搜索结果缓存起来(注意,这里只是缓存的doc_id,而并不是真的缓存了所有的文档数据,取数据是在fetch阶段完成的)」**,可以想象成快照。

在遍历时,从这个快照里取数据,也就是说,在初始化后,对索引插入、删除、更新数据都不会影响遍历结果。

遍历结果。

「基本使用」

json
POST /twitter/tweet/_search?scroll=1m
{
    "size": 100,
    "query": {
        "match" : {
            "title" : "elasticsearch"
        }
    }
}
POST /twitter/tweet/_search?scroll=1m
{
    "size": 100,
    "query": {
        "match" : {
            "title" : "elasticsearch"
        }
    }
}

初始化指明 index 和 type,然后,加上参数 scroll,表示暂存搜索结果的时间,其它就像一个普通的search请求一样。会返回一个_scroll_id_scroll_id用来下次取数据用。

「遍历」

json
POST /_search?scroll=1m
{
    "scroll_id":"XXXXXXXXXXXXXXXXXXXXXXX I am scroll id XXXXXXXXXXXXXXX"
}
POST /_search?scroll=1m
{
    "scroll_id":"XXXXXXXXXXXXXXXXXXXXXXX I am scroll id XXXXXXXXXXXXXXX"
}

scroll_id即 上一次遍历取回的_scroll_id或者是初始化返回的_scroll_id,同样的,需要带 scroll 参数。

重复这一步骤,直到返回的数据为空,即遍历完成。

「注意,每次都要传参数 scroll,刷新搜索结果的缓存时间」。另外,「不需要指定 index 和 type」

设置scroll的时候,需要使搜索结果缓存到下一次遍历完成,「同时,也不能太长,毕竟空间有限。」

「缺点:」

  1. 「scroll_id会占用大量的资源(特别是排序的请求)」
  2. 同样的,scroll后接超时时间,频繁的发起scroll请求,会出现一些列问题。
  3. 「是生成的历史快照,对于数据的变更不会反映到快照上。」

「优点:」

适用于非实时处理大量数据的情况,比如要进行数据迁移或者索引变更之类的。

Scroll Scan

ES提供scroll scan方式进一步提高遍历性能,但是scroll scan不支持排序,因此scroll scan适合不需要排序的场景。

「基本使用」

Scroll Scan 的遍历与普通 Scroll 一样,初始化存在一点差别。

json
POST /my_index/my_type/_search?search_type=scan&scroll=1m&size=50
{
 "query": { "match_all": {}}
}
POST /my_index/my_type/_search?search_type=scan&scroll=1m&size=50
{
 "query": { "match_all": {}}
}

需要指明参数:

  • search_type:赋值为scan,表示采用 Scroll Scan 的方式遍历,同时告诉 Elasticsearch 搜索结果不需要排序。
  • scroll:同上,传时间。
  • size:与普通的 size 不同,这个 size 表示的是每个 shard 返回的 size 数,最终结果最大为 number_of_shards * size

「Scroll Scan与Scroll的区别」

  1. Scroll-Scan结果**「没有排序」**,按index顺序返回,没有排序,可以提高取数据性能。
  2. 初始化时只返回 _scroll_id,没有具体的hits结果
  3. size控制的是每个分片的返回的数据量,而不是整个请求返回的数据量。

Sliced Scroll

如果你数据量很大,用Scroll遍历数据那确实是接受不了,现在Scroll接口可以并发来进行数据遍历了。

每个Scroll请求,可以分成多个Slice请求,可以理解为切片,各Slice独立并行,比用Scroll遍历要快很多倍。

json
POST /index/type/_search?scroll=1m
{
    "query": { "match_all": {}},
    "slice": {
        "id": 0,
        "max": 5
    }   
}
 
POST ip:port/index/type/_search?scroll=1m
{
    "query": { "match_all": {}},
    "slice": {
        "id": 1,
        "max": 5
    }   
}
POST /index/type/_search?scroll=1m
{
    "query": { "match_all": {}},
    "slice": {
        "id": 0,
        "max": 5
    }   
}
 
POST ip:port/index/type/_search?scroll=1m
{
    "query": { "match_all": {}},
    "slice": {
        "id": 1,
        "max": 5
    }   
}

上边的示例可以单独请求两块数据,最终五块数据合并的结果与直接scroll scan相同。

其中max是分块数,id是第几块。

官方文档中建议max的值不要超过shard的数量,否则可能会导致内存爆炸。

Search After

Search_after是 ES 5 新引入的一种分页查询机制,其原理几乎就是和scroll一样,因此代码也几乎是一样的。

「基本使用:」

第一步:

json
POST twitter/_search
{
    "size": 10,
    "query": {
        "match" : {
            "title" : "es"
        }
    },
    "sort": [
        {"date": "asc"},
        {"_id": "desc"}
    ]
}
POST twitter/_search
{
    "size": 10,
    "query": {
        "match" : {
            "title" : "es"
        }
    },
    "sort": [
        {"date": "asc"},
        {"_id": "desc"}
    ]
}

返回出的结果信息 :

json
{
      "took" : 29,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 5,
          "relation" : "eq"
        },
        "max_score" : null,
        "hits" : [
          {
            ...
            },
            "sort" : [
              ...
            ]
          },
          {
            ...
            },
            "sort" : [
              124648691,
              "624812"
            ]
          }
        ]
      }
    }
{
      "took" : 29,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 5,
          "relation" : "eq"
        },
        "max_score" : null,
        "hits" : [
          {
            ...
            },
            "sort" : [
              ...
            ]
          },
          {
            ...
            },
            "sort" : [
              124648691,
              "624812"
            ]
          }
        ]
      }
    }

上面的请求会为每一个文档返回一个包含sort排序值的数组。

这些sort排序值可以被用于search_after参数里以便抓取下一页的数据。

比如,我们可以使用最后的一个文档的sort排序值,将它传递给search_after参数:

json
GET twitter/_search
{
    "size": 10,
    "query": {
        "match" : {
            "title" : "es"
        }
    },
    "search_after": [124648691, "624812"],
    "sort": [
        {"date": "asc"},
        {"_id": "desc"}
    ]
}
GET twitter/_search
{
    "size": 10,
    "query": {
        "match" : {
            "title" : "es"
        }
    },
    "search_after": [124648691, "624812"],
    "sort": [
        {"date": "asc"},
        {"_id": "desc"}
    ]
}

若我们想接着上次读取的结果进行读取下一页数据,第二次查询在第一次查询时的语句基础上添加search_after,并指明从哪个数据后开始读取。

「基本原理」

es维护一个实时游标,它以上一次查询的最后一条记录为游标,方便对下一页的查询,它是一个无状态的查询,因此每次查询的都是最新的数据。

由于它采用记录作为游标,因此**「SearchAfter要求doc中至少有一条全局唯一变量(每个文档具有一个唯一值的字段应该用作排序规范)」**

「优缺点」

「优点:」

  1. 无状态查询,可以防止在查询过程中,数据的变更无法及时反映到查询中。
  2. 不需要维护scroll_id,不需要维护快照,因此可以避免消耗大量的资源。

「缺点:」

  1. 由于无状态查询,因此在查询期间的变更可能会导致跨页面的不一值。
  2. 排序顺序可能会在执行期间发生变化,具体取决于索引的更新和删除。
  3. 至少需要制定一个唯一的不重复字段来排序。
  4. 它不适用于大幅度跳页查询,或者全量导出,对第N页的跳转查询相当于对es不断重复的执行N次search after,而全量导出则是在短时间内执行大量的重复查询。

SEARCH_AFTER不是自由跳转到任意页面的解决方案,而是并行滚动多个查询的解决方案。

总结

分页方式性能优点缺点场景
from + size灵活性好,实现简单深度分页问题数据量比较小,能容忍深度分页问题
scroll解决了深度分页问题无法反应数据的实时性(快照版本)维护成本高,需要维护一个 scroll_id海量数据的导出需要查询海量结果集的数据
search_after性能最好不存在深度分页问题能够反映数据的实时变更实现复杂,需要有一个全局唯一的字段连续分页的实现会比较复杂,因为每一次查询都需要上次查询的结果,它不适用于大幅度跳页查询海量数据的分页

image-20240624174405578

  • 如果数据量小(from+size在10000条内),或者只关注结果集的TopN数据,可以使用from/size 分页,简单粗暴

  • 数据量大,深度翻页,后台批处理任务(数据迁移)之类的任务,使用 scroll 方式

  • 数据量大,深度翻页,用户实时、高并发查询需求,使用 search after 方式

ES7版本变更:带PIT的search_after来进行查询

7.*版本中,ES官方不再推荐使用Scroll方法来进行深分页,推荐使用带PIT的search_after来进行查询;

7.*版本开始,您可以使用SEARCH_AFTER参数通过上一页中的一组排序值检索下一页命中。

使用SEARCH_AFTER需要多个具有相同查询和排序值的搜索请求。

如果这些请求之间发生刷新,则结果的顺序可能会更改,从而导致页面之间的结果不一致。

为防止出现这种情况,您可以创建一个时间点(PIT)来在搜索过程中保留当前索引状态。

json
POST /my-index-000001/_pit?keep_alive=1m

返回一个PIT ID
{
  "id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
}
POST /my-index-000001/_pit?keep_alive=1m

返回一个PIT ID
{
  "id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
}

在搜索请求中指定PIT:

json
GET /_search
{
  "size": 10000,
  "query": {
    "match" : {
      "user.id" : "elkbee"
    }
  },
  "pit": {
     "id":  "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", 
     "keep_alive": "1m"
  },
  "sort": [ 
    {"@timestamp": {"order": "asc", "format": "strict_date_optional_time_nanos", "numeric_type" : "date_nanos" }}
  ]
}
GET /_search
{
  "size": 10000,
  "query": {
    "match" : {
      "user.id" : "elkbee"
    }
  },
  "pit": {
     "id":  "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", 
     "keep_alive": "1m"
  },
  "sort": [ 
    {"@timestamp": {"order": "asc", "format": "strict_date_optional_time_nanos", "numeric_type" : "date_nanos" }}
  ]
}

ES数据清理

  1. 先扩容。
  2. 找到只读索引。
  3. 清除索引的数据。
  4. 通知用户。
  5. 完美。

参考笔记

ES的默认磁盘使用机制

当为85%时:Elasticsearch不会将碎片分配给磁盘使用率超过85%的节点(cluster.routing.allocation.disk.watermark.low: "85%")

当为90%时:Elasticsearch尝试重新分配给磁盘低于使用率90%的节点(cluster.routing.allocation.disk.watermark.high: "90%")

当为85%时:Elasticsearch执行只读模块(cluster.routing.allocation.disk.watermark.flood_stage: "85%")

查看 es 的 索引数据

GET /_cat/allocation?v

image-20240624170215002

参数解释
shards分片个数
disk.used已用磁盘大小
disk.indices索引所占磁盘大小
disk.avail可以使用磁盘大小
disk.total磁盘总容量
disk.percent磁盘使用百分比
node节点名称

从磁盘情况可以看出节点es-cn-oew1s63x6001fxazs-0e6b24df-0001 使用率基本在 60%-80%,如果超过85%,索引会变成只读。ES把这个索引置为 read_only_allow_delete: true

多表关联设计

关系型数据库中的多表之间的关联查询,ES中有什么好的解决方案?文档结构除了对象之间的嵌套还有什么好的解决方案?

以下四种常用的方法,用来在 Elasticsearch 中进行关联数据的管理:

应用端关联

这是普遍使用的技术,即在应用接口层面来处理关联关系。

  1. 存储层面:独立两个索引存储。
  2. 实际业务层面分两次请求:

第一次查询返回:Top5中文姓名和成绩; 根据第一次查询的结果,第二次查询返回:Top5中文姓名和英文姓名;

将第一次查询结果和第二次查询结果组合后,返回给用户。

即:实际业务层面是进行了两次查询,统一返回给用户。用户是无感知的。

适用场景数据量少的业务场景。

优点:数据量少时,用户体验好。

缺点:数据量大,两次查询耗时肯定会比较长,影响用户体验。

引申场景:关系型数据库和ES 结合,各取所长。将关系型数据库全量同步到 ES 存储,不做冗余存储。

如前所述:ES 擅长的是检索,而 MySQL 才擅长关系管理。所以可以考虑二者结合,使用 ES 多索引建立相同的别名,针对别名检索到对应 ID 后再回 MySQL 查询,业务层面通过关联 ID join 出需要的数据。

宽表冗余存储

冗余存储,对每个文档保持一定数量的冗余数据可以在需要访问时避免进行关联。

这点通过logstash 同步关联数据到ES时,通常会建议:先通过视图对Mysql数据做好多表关联,然后同步视图数据到ES。此处的视图就是宽表。

示例:姓名、英文名、成绩两张表合为一张表存储。

适用场景:一对多或者多对多关联。

优点:速度快。因为每个文档都包含了所需的所有信息,当这些信息需要在查询进行匹配时,并不需要进行昂贵的关联操作。

缺点:索引更新或删除数据,应用程序不得不处理宽表的冗余数据;由于冗余存储,导致某些搜索和聚合操作可能无法按照预期工作。

嵌套文档(Nested)存储

Nested类型是ES Mapping定义的集合类型之一,它是比object类型更NB的支持独立检索的类型。

举例:有一个文档描述了一个帖子和一个包含帖子上所有评论的内部对象评论。可以借助 Nested 实现。

注意1:当使用嵌套文档时,使用通用的查询方式是无法访问到的,必须使用合适的查询方式(nested query、nested filter、nested facet等),很多场景下,使用嵌套文档的复杂度在于索引阶段对关联关系的组织拼装。

注意2

index.mapping.nested_fields.limit 缺省值是50。

即:一个索引中最大允许拥有50个nested类型的数据。

index.mapping.nested_objects.limit 缺省值是10000。

即:1个文档中所有nested类型json对象数据的总量是10000。

适用场景:1 对少量,子文档偶尔更新、查询频繁的场景。如果需要索引对象数组并保持数组中每个对象的独立性,则应使用嵌套 Nested 数据类型而不是对象 Oject 数据类型。

优点:nested文档可以将父子关系的两部分数据(举例:博客+评论)关联起来做任何的查询。

缺点:查询相对较慢,更新子文档需要更新整篇文档。

父子文档存储

注意:6.X之前的版本的父子文档存储在相同索引的不同type中。而6.X之上的版本,单索引下已不存在多type的概念。父子文档Join的都是基于相同索引相同type实现的。

Join类型是ES Mapping定义的类型之一,用于在同一索引的文档中创建父/子关系。 关系部分定义文档中的一组可能关系,每个关系是父名称和子名称。

适用场景:子文档数据量要明显多于父文档的数据量,存在1 对多量的关系;子文档更新频繁的场景。

举例:1 个产品和供应商之间是1对N的关联关系。 当使用父子文档时,使用has_child 或者has_parent做父子关联查询。

优点:父子文档可独立更新。

缺点:维护Join关系需要占据部分内存,查询较Nested更耗资源。

别名重建索引

假设我们现在有一个索引 bucket_size_index

json
PUT /bucket_size_index 
{
  "settings": {
    "number_of_shards": 6,
    "number_of_replicas": 0
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "long"
      },
      "size": {
        "type": "long"
      },
      "tenantId": {
        "type": "long"
      },
      "time": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
      }
    }
  }
}
PUT /bucket_size_index 
{
  "settings": {
    "number_of_shards": 6,
    "number_of_replicas": 0
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "long"
      },
      "size": {
        "type": "long"
      },
      "tenantId": {
        "type": "long"
      },
      "time": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
      }
    }
  }
}

在项目中我们不要直接使用bucket_size_index,先创建一个别名bucket_size_alias

项目中使用别名替代索引

json
POST /_aliases
{
  "actions": [
    {
      "add": {
        "index": "bucket_size_index",
        "alias": "bucket_size_alias"
      }
    }
  ]
}
POST /_aliases
{
  "actions": [
    {
      "add": {
        "index": "bucket_size_index",
        "alias": "bucket_size_alias"
      }
    }
  ]
}

我们需要再添加一个字段bucket_name,我们可以创建了个新的索引 bucket_size_index_2

json
PUT /bucket_size_index_2
{
  "settings": {
    "number_of_shards": 6,
    "number_of_replicas": 0
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "long"
      },
      "bucket_name": {
        "type": "keyword"
      },
      "size": {
        "type": "long"
      },
      "tenantId": {
        "type": "long"
      },
      "time": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
      }
    }
  }
}
PUT /bucket_size_index_2
{
  "settings": {
    "number_of_shards": 6,
    "number_of_replicas": 0
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "long"
      },
      "bucket_name": {
        "type": "keyword"
      },
      "size": {
        "type": "long"
      },
      "tenantId": {
        "type": "long"
      },
      "time": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
      }
    }
  }
}

使用_reindex将bucket_size_index中的数据重建到 bucket_size_index_2

json
POST _reindex
{
  "source": {
    "index": "bucket_size_index"
  },
  "dest": {
    "index": "bucket_size_index_2"
  }
}
POST _reindex
{
  "source": {
    "index": "bucket_size_index"
  },
  "dest": {
    "index": "bucket_size_index_2"
  }
}

如果数据量非常大reindex会很慢,接口会超时,我们可以使用异步reindex

json
POST _reindex?wait_for_completion=false
{
  "source": {
    "index": "bucket_size_index"
  },
  "dest": {
    "index": "bucket_size_index_2"
  }
}
POST _reindex?wait_for_completion=false
{
  "source": {
    "index": "bucket_size_index"
  },
  "dest": {
    "index": "bucket_size_index_2"
  }
}

接口会返回任务ID

json
{"task" : "jnj5k6NlQK-LvEopzRycxw:90463975"}
{"task" : "jnj5k6NlQK-LvEopzRycxw:90463975"}

可以查询取消任务状态

json
// 查询所有状态
GET _tasks?detailed=true&actions=*reindex
// 查询指定任务状态
GET /_tasks/jnj5k6NlQK-LvEopzRycxw:90463975
// 取消任务
POST _tasks/jnj5k6NlQK-LvEopzRycxw:90463975/_cancel
// 查询所有状态
GET _tasks?detailed=true&actions=*reindex
// 查询指定任务状态
GET /_tasks/jnj5k6NlQK-LvEopzRycxw:90463975
// 取消任务
POST _tasks/jnj5k6NlQK-LvEopzRycxw:90463975/_cancel

修改别名指向

json
POST /_aliases
{
   "actions" : [
      { "remove":  { "index": "bucket_size_index", "alias": "bucket_size_alias" } },
      { "add":  { "index": "bucket_size_index_2", "alias": "bucket_size_alias" } }
  ]
}
POST /_aliases
{
   "actions" : [
      { "remove":  { "index": "bucket_size_index", "alias": "bucket_size_alias" } },
      { "add":  { "index": "bucket_size_index_2", "alias": "bucket_size_alias" } }
  ]
}

发现新添加的数据有bucket_mame这个字段,老数据没有bucket_name字段数据,这里我们可以通过_update_by_query批量给bucket_name添加个默认值

POST /bucket_size_alias/_update_by_query
{
    "query": {
        "bool": {
            "must_not": {
                "exists": {
                    "field": "bucket_name"
                }
            }
        }
    },
    "script":{
    "inline" : "ctx._source.bucket_name= 'default_bucket_name'",
    "lang" : "painless"
  }
}
POST /bucket_size_alias/_update_by_query
{
    "query": {
        "bool": {
            "must_not": {
                "exists": {
                    "field": "bucket_name"
                }
            }
        }
    },
    "script":{
    "inline" : "ctx._source.bucket_name= 'default_bucket_name'",
    "lang" : "painless"
  }
}

References