β

Elasticsearch 不重启重建索引

技术尘埃@伯仁 934 阅读

很久没写博客了,一直在忙,今天有点时间写点,记录一下。

我们在用Elasticsearch 作搜索服务的时候要经常更新词库,es 的动态词库更新功能不错,新增加的数据可以按照最新的词库分词,但旧数据就不会有任何变化。

如果需要所有数据都按最新的词库分词,那就要重索引,如果手动重建则需要重启es。但作为生产系统这是不可接受的。

那怎么办呢?

es 有一个别名的东西(这玩意儿可好了),也就是索引别名,我们可以利用它来实现自动重建索引,而且不需要重启es。 在操作es 的时候使用别名来做 增,删,查,改 操作。

接下来就开干:

1 、我们在创建索引的时候为索引指定一个别名如下(我的索引名为: esblog   别名为:blog):

curl -X POST http://localhost:9200/_aliases -d'
 {
        "actions": [
            {"add": {"index": "esblog", "alias": "blog"}}
          ]
    }
'

2、自动重建索引(看java 代码)

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.sort.SortParseElement;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

/**
 * elasticsearch 不重启重建索引
 *
 */
public class App 
{
    public static void main( String[] args )
    {
        
	     System.out.println( "elasticsearch 重建索引" );
	        
	   	 String aliasName = "blog"; //别名
	     String aliasNameOld = "blogOld"; //上一次的别名
	   	 String indexName = "prodblog"; //新索引名前缀
	   	 
	   	 String oldIndex = ""; //老索引
	   	 String preOldIndex = "";//备份索引
		   	 
	   	 Date nowTime = new Date();
	   	 SimpleDateFormat time=new SimpleDateFormat("yyyyMMddHHmmss"); 
	   	 //生成新索引
	   	 indexName += time.format(nowTime);//System.currentTimeMillis();
	   	
	   	 System.out.println("新索引名:"+indexName);
	   	
	   	 //初始化客户端
         Client client = null;
	        
			try {
				
				Settings settings = Settings.settingsBuilder()
				        .put("cluster.name", "elasticsearch")
				        .put("client.transport.ping_timeout", "120s")
				        .put("client.transport.sniff", true ).build();
				
				client = TransportClient.builder().settings(settings).build().addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
				
			} catch (UnknownHostException e) {
				// TODO Auto-generated catch block
				System.out.print("client error:"+e.getMessage());
			}
		
		   //获取备份的索引	
		   SearchResponse oldres = null;
			try {
				oldres = client
					        .prepareSearch(aliasNameOld)
					        .addSort(SortParseElement.DOC_FIELD_NAME, SortOrder.ASC)//.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
					        .setFrom(0).setSize(1)
					        .execute()
					        .actionGet();
				
			} catch (Exception e) {
				// TODO Auto-generated catch block
				//e.printStackTrace();
				System.out.println("无此别名:"+e.getMessage());
				
			}	

			if(oldres !=null)
			{
				   SearchHit hitOld  =  oldres.getHits().getHits()[0];
				   
				   preOldIndex = hitOld.getIndex();
				   
				   System.out.println("preoldIndex:"+preOldIndex);
			}
		
			//获取mappings
			ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>>  mappingRes = client.admin().indices().prepareGetMappings(aliasName).get().getMappings(); 
			ImmutableOpenMap<String, MappingMetaData> mappings = null;
		   for( ObjectObjectCursor<String, ImmutableOpenMap<String, MappingMetaData>> cursor : mappingRes)
		   {
			    mappings = cursor.value;
			   	System.out.println("now index:"+cursor.key);
		   }
		   
		   //创建新索引
		   //client.admin().indices().prepareCreate(indexName).addMapping(typeName, indexMapping).get();
		   CreateIndexRequestBuilder create =  client.admin().indices().prepareCreate(indexName);
		   
		   for( ObjectObjectCursor<String, MappingMetaData> cursor :mappings)
		   {
			   System.out.println("typeName:"+cursor.key);
			   try {
					create.addMapping(cursor.key, cursor.value.getSourceAsMap());
				} catch (IOException e) {
					// TODO Auto-generated catch block
					System.out.println("createType for mapping error:"+e.getMessage());
				}
		   }
		   create.get();
		   
		   //重建索引
		   SearchResponse scrollResp = client
			        .prepareSearch(aliasName)
			        .addSort(SortParseElement.DOC_FIELD_NAME, SortOrder.ASC)//.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
			        .setScroll(new TimeValue(60000))//.setFrom(0).setSize(10).setExplain(true)
			        .setSize(300)
			        .execute()
			        .actionGet();
		   
		    int num = 0;
		   
		 	while(true)
		 	{
		 		
		     	// 批量处理request  
	 	        BulkRequestBuilder bulkRequest = client.prepareBulk();  
	 	        
		 		for(SearchHit hit : scrollResp.getHits().getHits())
		 		{
		 			if("".equals(oldIndex))
		 			{
		 				oldIndex = hit.getIndex();
		 			}
		 			 
		 			bulkRequest.add(new IndexRequest(indexName,hit.getType(),hit.getId()).source(hit.getSource()));
		 		}
		 	    
		 		// 执行批量处理request  
		        BulkResponse bulkResponse = bulkRequest.get(); 
		        
		        num += 300;
		        
		        System.out.println("已处理 "+ num +"条");
		        
		       // 处理错误信息  
		        if (bulkResponse.hasFailures()) {  
		            System.out.println("====================批量创建索引过程中出现错误 下面是错误信息==========================");  
		            long count = 0L;  
		            for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {  
		                System.out.println("发生错误的 索引id为 : "+bulkItemResponse.getId()+" ,错误信息为:"+ bulkItemResponse.getFailureMessage());  
		                count++;  
		            }  
		            System.out.println("====================批量创建索引过程中出现错误 上面是错误信息 共有: "+count+" 条记录==========================");  
		        }  
		 		
		 		scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
		 	   
		 		//Break condition: No hits are returned
		 	    if (scrollResp.getHits().getHits().length == 0) {
		 	    	
		 	    	System.out.println("remove oldIndex:"+oldIndex  +" deletepreoldIndex:"+preOldIndex);
		 	    	
		 	    	 //别名指向新的索引,旧索引从别名上移出
		 	    	client.admin().indices().prepareAliases().addAlias(indexName, aliasName).removeAlias(oldIndex, aliasName).get();
		 	    	
		 	    	if("".equals(preOldIndex)) //第一次没有备份索引
		 	    	{
		 	    		client.admin().indices().prepareAliases().addAlias(oldIndex, aliasNameOld).get();
		 	    		
		 	    	}else{
		 	    		//备份别名指向上次一的索引,老索引删除
		 	    		client.admin().indices().prepareAliases().addAlias(oldIndex, aliasNameOld).removeAlias(preOldIndex, aliasNameOld).get();
		 	    		//删除以前备份的索引
		 	    		DeleteIndexResponse dResponse =  client.admin().indices().prepareDelete(preOldIndex).execute().actionGet();
		 	    	}
		 	    	
		 	        break;
		 	    }
		 	}
		 	
			// on shutdown
			client.close();
    }
}

pom.xml (elasticsearch 客户端)

<!-- elasticsearch -->
      <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>2.3.3</version>
      </dependency>


3、当我们发现词库有更新时,我们运行第二步的代码将重新生成一个新的索引,并将别名指向新索引,以前的索引会从别名上移出,并由blogOld 别名指向(当然你也可以直接删除以前的索引,我这里是为了防止意外发生保留了上一次的索引)。以前blogOld 指向的索引将会被删除。

查看es 所有索引可以看到如下(上面一个是最新的索引,下面个是上一次重建时的索引,现在的别名是指向上面这个索引的):

health status index                      pri rep docs.count docs.deleted store.size pri.store.size
yellow open   prodblog20170512010001   5   1     237323          624      713mb          713mb
yellow open   prodblog20170505174709   5   1     227727            2    638.1mb        638.1mb

4、写一段 shell 脚本,检测如果词库发生了变化就执行第二步的代码,我这里在每天一个固定的时刻去检测词库有无变化。降低重建索引的频率,同时也能最大限度的保证搜索的准确性。

作者:技术尘埃@伯仁
别让梦想只是梦想而以......
原文地址:Elasticsearch 不重启重建索引, 感谢原作者分享。

发表评论