服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - elasticsearch索引index之put mapping的设置分析

elasticsearch索引index之put mapping的设置分析

2022-11-20 14:30zziawan Java教程

这篇文章主要为大家介绍了elasticsearch索引index之put mapping的设置分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

mapping的设置过程

mapping机制使得elasticsearch索引数据变的更加灵活,近乎于no schema。mapping可以在建立索引时设置,也可以在后期设置。

后期设置可以是修改mapping(无法对已有的field属性进行修改,一般来说只是增加新的field)或者对没有mapping的索引设置mapping。

put mapping操作必须是master节点来完成,因为它涉及到集群matedata的修改,同时它跟index和type密切相关。修改只是针对特定index的特定type。

Action support分析中我们分析过几种Action的抽象类型,put mapping Action属于TransportMasterNodeOperationAction的子类。

put mapping

它实现了masterOperation方法,每个继承自TransportMasterNodeOperationAction的子类都会根据自己的具体功能来实现这个方法。

这里的实现如下所示:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected void masterOperation(final PutMappingRequest request, final ClusterState state, final ActionListener<PutMappingResponse> listener) throws ElasticsearchException {
        final String[] concreteIndices = clusterService.state().metaData().concreteIndices(request.indicesOptions(), request.indices());
      //构造request
        PutMappingClusterStateUpdateRequest updateRequest = new PutMappingClusterStateUpdateRequest()
                .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
                .indices(concreteIndices).type(request.type())
                .source(request.source()).ignoreConflicts(request.ignoreConflicts());
      //调用putMapping方法,同时传入一个Listener
        metaDataMappingService.putMapping(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
            @Override
            public void onResponse(ClusterStateUpdateResponse response) {
                listener.onResponse(new PutMappingResponse(response.isAcknowledged()));
            }
            @Override
            public void onFailure(Throwable t) {
                logger.debug("failed to put mappings on indices [{}], type [{}]", t, concreteIndices, request.type());
                listener.onFailure(t);
            }
        });
    }

以上是TransportPutMappingAction对masterOperation方法的实现,这里并没有多少复杂的逻辑和操作。具体操作在matedataMappingService中。

updateTask响应

跟之前的CreateIndex一样,put Mapping也是向master提交一个updateTask。所有逻辑也都在execute方法中。这个task的基本跟CreateIndex一样,也需要在给定的时间内响应。它的代码如下所示:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
public void putMapping(final PutMappingClusterStateUpdateRequest request, final ActionListener&lt;ClusterStateUpdateResponse&gt; listener) {
    //提交一个高基本的updateTask
        clusterService.submitStateUpdateTask("put-mapping [" + request.type() + "]", Priority.HIGH, new AckedClusterStateUpdateTask&lt;ClusterStateUpdateResponse&gt;(request, listener) {
            @Override
            protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
                return new ClusterStateUpdateResponse(acknowledged);
            }
            @Override
            public ClusterState execute(final ClusterState currentState) throws Exception {
                List&lt;String&gt; indicesToClose = Lists.newArrayList();
                try {
            //必须针对已经在matadata中存在的index,否则抛出异常
                    for (String index : request.indices()) {
                        if (!currentState.metaData().hasIndex(index)) {
                            throw new IndexMissingException(new Index(index));
                        }
                    }
                    //还需要存在于indices中,否则无法进行操作。所以这里要进行预建
                    for (String index : request.indices()) {
                        if (indicesService.hasIndex(index)) {
                            continue;
                        }
                        final IndexMetaData indexMetaData = currentState.metaData().index(index);
              //不存在就进行创建
                        IndexService indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), clusterService.localNode().id());
                        indicesToClose.add(indexMetaData.index());
                        // make sure to add custom default mapping if exists
                        if (indexMetaData.mappings().containsKey(MapperService.DEFAULT_MAPPING)) {
                            indexService.mapperService().merge(MapperService.DEFAULT_MAPPING, indexMetaData.mappings().get(MapperService.DEFAULT_MAPPING).source(), false);
                        }
                        // only add the current relevant mapping (if exists)
                        if (indexMetaData.mappings().containsKey(request.type())) {
                            indexService.mapperService().merge(request.type(), indexMetaData.mappings().get(request.type()).source(), false);
                        }
                    }
            //合并更新Mapping
                    Map&lt;String, DocumentMapper&gt; newMappers = newHashMap();
                    Map&lt;String, DocumentMapper&gt; existingMappers = newHashMap();
            //针对每个index进行Mapping合并
                    for (String index : request.indices()) {
                        IndexService indexService = indicesService.indexServiceSafe(index);
                        // try and parse it (no need to add it here) so we can bail early in case of parsing exception
                        DocumentMapper newMapper;
                        DocumentMapper existingMapper = indexService.mapperService().documentMapper(request.type());
                        if (MapperService.DEFAULT_MAPPING.equals(request.type())) {//存在defaultmapping则合并default mapping
                            // _default_ types do not go through merging, but we do test the new settings. Also don't apply the old default
                            newMapper = indexService.mapperService().parse(request.type(), new CompressedString(request.source()), false);
                        } else {
                            newMapper = indexService.mapperService().parse(request.type(), new CompressedString(request.source()), existingMapper == null);
                            if (existingMapper != null) {
                                // first, simulate
                                DocumentMapper.MergeResult mergeResult = existingMapper.merge(newMapper, mergeFlags().simulate(true));
                                // if we have conflicts, and we are not supposed to ignore them, throw an exception
                                if (!request.ignoreConflicts() &amp;&amp; mergeResult.hasConflicts()) {
                                    throw new MergeMappingException(mergeResult.conflicts());
                                }
                            }
                        }
                        newMappers.put(index, newMapper);
                        if (existingMapper != null) {
                            existingMappers.put(index, existingMapper);
                        }
                    }
                    String mappingType = request.type();
                    if (mappingType == null) {
                        mappingType = newMappers.values().iterator().next().type();
                    } else if (!mappingType.equals(newMappers.values().iterator().next().type())) {
                        throw new InvalidTypeNameException("Type name provided does not match type name within mapping definition");
                    }
                    if (!MapperService.DEFAULT_MAPPING.equals(mappingType) &amp;&amp; !PercolatorService.TYPE_NAME.equals(mappingType) &amp;&amp; mappingType.charAt(0) == '_') {
                        throw new InvalidTypeNameException("Document mapping type name can't start with '_'");
                    }
                    final Map&lt;String, MappingMetaData&gt; mappings = newHashMap();
                    for (Map.Entry&lt;String, DocumentMapper&gt; entry : newMappers.entrySet()) {
                        String index = entry.getKey();
                        // do the actual merge here on the master, and update the mapping source
                        DocumentMapper newMapper = entry.getValue();
                        IndexService indexService = indicesService.indexService(index);
                        if (indexService == null) {
                            continue;
                        }
                        CompressedString existingSource = null;
                        if (existingMappers.containsKey(entry.getKey())) {
                            existingSource = existingMappers.get(entry.getKey()).mappingSource();
                        }
                        DocumentMapper mergedMapper = indexService.mapperService().merge(newMapper.type(), newMapper.mappingSource(), false);
                        CompressedString updatedSource = mergedMapper.mappingSource();
                        if (existingSource != null) {
                            if (existingSource.equals(updatedSource)) {
                                // same source, no changes, ignore it
                            } else {
                                // use the merged mapping source
                                mappings.put(index, new MappingMetaData(mergedMapper));
                                if (logger.isDebugEnabled()) {
                                    logger.debug("[{}] update_mapping [{}] with source [{}]", index, mergedMapper.type(), updatedSource);
                                } else if (logger.isInfoEnabled()) {
                                    logger.info("[{}] update_mapping [{}]", index, mergedMapper.type());
                                }
                            }
                        } else {
                            mappings.put(index, new MappingMetaData(mergedMapper));
                            if (logger.isDebugEnabled()) {
                                logger.debug("[{}] create_mapping [{}] with source [{}]", index, newMapper.type(), updatedSource);
                            } else if (logger.isInfoEnabled()) {
                                logger.info("[{}] create_mapping [{}]", index, newMapper.type());
                            }
                        }
                    }
                    if (mappings.isEmpty()) {
                        // no changes, return
                        return currentState;
                    }
            //根据mapping的更新情况重新生成matadata
                    MetaData.Builder builder = MetaData.builder(currentState.metaData());
                    for (String indexName : request.indices()) {
                        IndexMetaData indexMetaData = currentState.metaData().index(indexName);
                        if (indexMetaData == null) {
                            throw new IndexMissingException(new Index(indexName));
                        }
                        MappingMetaData mappingMd = mappings.get(indexName);
                        if (mappingMd != null) {
                            builder.put(IndexMetaData.builder(indexMetaData).putMapping(mappingMd));
                        }
                    }
                    return ClusterState.builder(currentState).metaData(builder).build();
                } finally {
                    for (String index : indicesToClose) {
                        indicesService.removeIndex(index, "created for mapping processing");
                    }
                }
            }
        });
    }

以上就是mapping的设置过程,首先它跟Create index一样,只有master节点才能操作,而且是以task的形式提交给master;其次它的本质是将request中的mapping和index现存的或者是default mapping合并,并最终生成新的matadata更新到集群的各个节点。

总结

集群中的master操作无论是index方面还是集群方面,最终都是集群matadata的更新过程。而这些操作只能在master上进行,并且都是会超时的任务。put mapping当然也不例外。上面的两段代码基本概况了mapping的设置过程。这里就不再重复了。

这里还有一个问题没有涉及到就是mapping的合并。mapping合并会在很多地方用到。在下一节中会它进行详细分析,更多关于elasticsearch索引index put mapping设置的资料请关注服务器之家其它相关文章!

原文链接:https://www.cnblogs.com/zziawanblog/p/7011367.html

延伸 · 阅读

精彩推荐