Project

General

Profile

« Previous | Next » 

Revision 61279

Added by Alessia Bardi 7 months ago

catch CloudIndexClientException trying to avoid connection problems between SolrJClient and ZooKeeper

View differences:

modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/index/FeedMissingClaimsJobNode.java
1 1
package eu.dnetlib.msro.openaireplus.workflows.nodes.index;
2 2

  
3
import java.io.IOException;
3 4
import java.io.StringReader;
4 5
import java.util.ArrayList;
5 6
import java.util.List;
......
32 33
 */
33 34
public class FeedMissingClaimsJobNode extends AsyncJobNode {
34 35

  
35
	private static final Log log = LogFactory.getLog(FeedMissingClaimsJobNode.class);
36
	public static final int BATCH_SIZE = 1000;
37
	private RecentResultsQueue queue;
38
	private OafToIndexRecordFactory oafToIndexRecordFactory;
36
    private static final Log log = LogFactory.getLog(FeedMissingClaimsJobNode.class);
37
    public static final int BATCH_SIZE = 1000;
38
    public static final int ATTEMPTS = 3;
39
    public static final int SLEEP_MS_SOLR_CLIENT = 5000;
40
    private RecentResultsQueue queue;
41
    private OafToIndexRecordFactory oafToIndexRecordFactory;
39 42

  
40
	@Resource
41
	private UniqueServiceLocator serviceLocator;
43
    @Resource
44
    private UniqueServiceLocator serviceLocator;
42 45

  
43
	@Value(value = "${openaire.api.directindex.findSolrIndexUrl.xquery}")
44
	private ClassPathResource findSolrIndexUrl;
46
    @Value(value = "${openaire.api.directindex.findSolrIndexUrl.xquery}")
47
    private ClassPathResource findSolrIndexUrl;
45 48

  
46
	@Override
47
	protected String execute(final NodeToken nodeToken) throws Exception {
49
    @Override
50
    protected String execute(final NodeToken nodeToken) throws Exception {
48 51

  
49
		final String format =
50
				nodeToken.getEnv().hasAttribute("format") ? nodeToken.getEnv().getAttribute("format") : nodeToken.getFullEnv().getAttribute("format");
51
		final String coll = format + "-index-openaire";
52
		final String indexDsId = nodeToken.getEnv().getAttribute("index_id");
53
		final String baseUrl = calculateIndexBaseUrl();
52
        final String format =
53
                nodeToken.getEnv().hasAttribute("format") ? nodeToken.getEnv().getAttribute("format") : nodeToken.getFullEnv().getAttribute("format");
54
        final String coll = format + "-index-openaire";
55
        final String indexDsId = nodeToken.getEnv().getAttribute("index_id");
56
        final String baseUrl = calculateIndexBaseUrl();
54 57

  
55
		CloudIndexClient idxClient = null;
58
        CloudIndexClient idxClient = null;
56 59

  
57
		try {
58
			final List<SolrInputDocument> toFeed = new ArrayList<SolrInputDocument>();
59
			final List<String> toDeleteFromCache = new ArrayList<String>();
60
        try {
61
            final List<SolrInputDocument> toFeed = new ArrayList<SolrInputDocument>();
62
            final List<String> toDeleteFromCache = new ArrayList<String>();
60 63

  
61
			final SAXReader reader = new SAXReader();
62
			final ApplyXslt xslt = oafToIndexRecordFactory.newTransformer(format);
64
            final SAXReader reader = new SAXReader();
65
            final ApplyXslt xslt = oafToIndexRecordFactory.newTransformer(format);
63 66

  
64
			idxClient = CloudIndexClientFactory.newIndexClient(baseUrl, coll, false);
65
			log.info("Starting to feed claims in index collection "+coll);
66
			int count = 0;
67
			for (String record : queue) {
68
				try {
69
					final String id = reader.read(new StringReader(record)).valueOf("//*[local-name() = 'objIdentifier']");
70
					count++;
71
					if (log.isDebugEnabled()) {
72
						log.debug("Processing record " + count);
73
					}
74
					if (idxClient.isRecordIndexed(id)) {
75
						toDeleteFromCache.add(id);
76
					} else {
77
						toFeed.add(idxClient.prepareSolrDocument(record, indexDsId, xslt));
78
					}
79
					if (count % BATCH_SIZE == 0) processLists(idxClient, toFeed, toDeleteFromCache);
80
				}catch(CloudIndexClientException cie){
81
					log.error("Error feeding missing claims", cie);
82
					if (idxClient != null) {
83
						idxClient.close();
84
					}
85
					idxClient = CloudIndexClientFactory.newIndexClient(baseUrl, coll, false);
86
					log.info("Got new CloudIndexClient");
87
				}
88
			}
89
			if(!toFeed.isEmpty() || !toDeleteFromCache.isEmpty()) processLists(idxClient, toFeed, toDeleteFromCache);
90
			log.info(String.format("Finished feeding of claims in index collection %s, total: %d", coll, count));
67
            idxClient = CloudIndexClientFactory.newIndexClient(baseUrl, coll, false);
68
            log.info("Starting to feed claims in index collection " + coll);
69
            int count = 0;
70
            for (String record : queue) {
71
                int max_attempts = ATTEMPTS;
72
                count++;
73
                final String id = reader.read(new StringReader(record)).valueOf("//*[local-name() = 'objIdentifier']");
74
                if (log.isDebugEnabled()) {
75
                    log.debug(String.format("Processing record %s, number: %d", id, count));
76
                }
77
                if (isRecordIndexed(idxClient, baseUrl, coll, id, max_attempts)) {
78
                    toDeleteFromCache.add(id);
79
                } else {
80
                    max_attempts = ATTEMPTS;
81
                    toFeed.add(prepareSolrDoc(idxClient, baseUrl, coll, id, record, indexDsId, xslt, max_attempts));
82
                }
83
                if (count % BATCH_SIZE == 0) processLists(idxClient, baseUrl, coll, toFeed, toDeleteFromCache);
84
            }
85
            if (!toFeed.isEmpty() || !toDeleteFromCache.isEmpty()) processLists(idxClient, baseUrl, coll, toFeed, toDeleteFromCache);
86
            log.info(String.format("Finished feeding of claims in index collection %s, total: %d", coll, count));
91 87

  
92
		} catch(Throwable e) {
93
			log.error("Error feeding missing claims", e);
94
			throw new MSROException("Error feeding missing claims: " + e.getMessage(), e);
95
		} finally {
96
			if (idxClient != null) {
97
				idxClient.close();
98
			}
99
			log.info("Closed Solr index client");
100
		}
101
		log.info("Now proceeding to Arc.DEFAULT_ARC");
102
		return Arc.DEFAULT_ARC;
103
	}
88
        } catch (Throwable e) {
89
            log.error("Error feeding missing claims", e);
90
            throw e;
91
        } finally {
92
            if (idxClient != null) {
93
                idxClient.close();
94
            }
95
            log.info("Closed Solr index client");
96
        }
97
        log.info("Now proceeding to Arc.DEFAULT_ARC");
98
        return Arc.DEFAULT_ARC;
99
    }
104 100

  
101
    protected boolean isRecordIndexed(CloudIndexClient idxClient, String baseUrl, String coll, String id, int attempt) throws IOException, CloudIndexClientException, MSROException, InterruptedException {
102
        try {
103
            return idxClient.isRecordIndexed(id);
104
        } catch (CloudIndexClientException cie) {
105
            log.error(String.format("Error querying for %s, message: %s. Trying again, remaining attempts:", id, cie, attempt));
106
            idxClient = resetCloudIndexClient(idxClient, baseUrl, coll);
107
            if (attempt > 0) return isRecordIndexed(idxClient, baseUrl, coll, id, --attempt);
108
            else {
109
                String msg = String.format("Too many attempts %d to recreate the index client for checking if record %s exists.", ATTEMPTS, id);
110
                log.error(msg);
111
                throw new MSROException(cie);
112
            }
113
        }
114
    }
105 115

  
116
    protected SolrInputDocument prepareSolrDoc(CloudIndexClient idxClient, String baseUrl, String coll, String recordId, String record, String indexDsId, ApplyXslt xslt, int attempt) throws IOException, CloudIndexClientException, MSROException, InterruptedException {
117
        try {
118
            return idxClient.prepareSolrDocument(record, indexDsId, xslt);
119
        } catch (CloudIndexClientException cie) {
120
            log.error(String.format("Error preparing Solr doc for %s, message: %s. Trying again, remaining attempts:", recordId, cie, attempt));
121
            idxClient = resetCloudIndexClient(idxClient, baseUrl, coll);
122
            if (attempt > 0)
123
                return prepareSolrDoc(idxClient, baseUrl, coll, recordId, record, indexDsId, xslt, --attempt);
124
            else {
125
                String msg = String.format("Too many attempts %d to recreate the index client for preparing SolrDocument for %s", ATTEMPTS, id);
126
                log.error(msg);
127
                throw new MSROException(cie);
128
            }
129
        }
130
    }
106 131

  
107
	private void processLists(final CloudIndexClient idxClient, final List<SolrInputDocument> toFeed, final List<String> toDeleteFromCache) throws CloudIndexClientException{
108
		idxClient.feed(toFeed, null);
109
		queue.remove(toDeleteFromCache);
110
		log.info(String.format("%d claims fed and cache cleaned of %d records", toFeed.size(), toDeleteFromCache.size()));
111
		toFeed.clear();
112
		toDeleteFromCache.clear();
113
		log.info("Cleaned temporary lists");
114
	}
132
    protected void tryToFeed(CloudIndexClient idxClient, String baseUrl, String coll, List<SolrInputDocument> toFeed, int attempt) throws MSROException, IOException, CloudIndexClientException, InterruptedException {
133
        try {
134
            idxClient.feed(toFeed, null);
135
        } catch (CloudIndexClientException cie) {
136
            log.error(String.format("Error feeding Solr in attempt number %d", attempt));
137
            idxClient = resetCloudIndexClient(idxClient, baseUrl, coll);
138
            if (attempt > 0) tryToFeed(idxClient, baseUrl, coll, toFeed, --attempt);
139
            else {
140
                String msg = String.format("Too many attempts %d to recreate the index client for feeding Solr", ATTEMPTS);
141
                log.error(msg);
142
                throw new MSROException(cie);
143
            }
144
        }
145
    }
115 146

  
116
	public RecentResultsQueue getQueue() {
117
		return queue;
118
	}
147
    private CloudIndexClient resetCloudIndexClient(CloudIndexClient idxClient, String baseUrl, String coll) throws IOException, CloudIndexClientException, InterruptedException {
148
        if (idxClient != null) {
149
            idxClient.close();
150
        }
151
        Thread.sleep(SLEEP_MS_SOLR_CLIENT);
152
        CloudIndexClient newclient = CloudIndexClientFactory.newIndexClient(baseUrl, coll, false);
153
        log.info("Got new CloudIndexClient");
154
        return newclient;
155
    }
119 156

  
120
	@Required
121
	public void setQueue(final RecentResultsQueue queue) {
122
		this.queue = queue;
123
	}
124 157

  
125
	public OafToIndexRecordFactory getOafToIndexRecordFactory() {
126
		return oafToIndexRecordFactory;
127
	}
158
    private void processLists(final CloudIndexClient idxClient, String baseUrl, String coll, final List<SolrInputDocument> toFeed, final List<String> toDeleteFromCache) throws CloudIndexClientException, MSROException, IOException, InterruptedException {
159
        int max_attempts = ATTEMPTS;
160
        tryToFeed(idxClient, baseUrl, coll, toFeed, max_attempts);
161
        queue.remove(toDeleteFromCache);
162
        log.info(String.format("%d claims fed and cache cleaned of %d records", toFeed.size(), toDeleteFromCache.size()));
163
        toFeed.clear();
164
        toDeleteFromCache.clear();
165
        log.info("Cleaned temporary lists");
166
    }
128 167

  
129
	@Required
130
	public void setOafToIndexRecordFactory(final OafToIndexRecordFactory oafToIndexRecordFactory) {
131
		this.oafToIndexRecordFactory = oafToIndexRecordFactory;
132
	}
168
    public RecentResultsQueue getQueue() {
169
        return queue;
170
    }
133 171

  
134
	private String calculateIndexBaseUrl() throws Exception {
135
		final String query = IOUtils.toString(findSolrIndexUrl.getInputStream());
136
		return serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(query);
137
	}
172
    @Required
173
    public void setQueue(final RecentResultsQueue queue) {
174
        this.queue = queue;
175
    }
176

  
177
    public OafToIndexRecordFactory getOafToIndexRecordFactory() {
178
        return oafToIndexRecordFactory;
179
    }
180

  
181
    @Required
182
    public void setOafToIndexRecordFactory(final OafToIndexRecordFactory oafToIndexRecordFactory) {
183
        this.oafToIndexRecordFactory = oafToIndexRecordFactory;
184
    }
185

  
186
    private String calculateIndexBaseUrl() throws Exception {
187
        final String query = IOUtils.toString(findSolrIndexUrl.getInputStream());
188
        return serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(query);
189
    }
138 190
}

Also available in: Unified diff