Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.index;
2

    
3
import static java.lang.String.format;
4

    
5
import org.apache.commons.lang.StringUtils;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.springframework.beans.factory.annotation.Autowired;
9

    
10
import com.googlecode.sarasvati.NodeToken;
11

    
12
import eu.dnetlib.data.provision.index.rmi.IndexService;
13
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
14
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
15
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
16
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
17
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
18
import eu.dnetlib.functionality.index.solr.feed.InputDocumentFactory;
19
import eu.dnetlib.msro.rmi.MSROException;
20
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
21

    
22
public class FinalizeIndexJobNode extends BlackboardJobNode {
23

    
24
	private static final Log log = LogFactory.getLog(FinalizeIndexJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
25

    
26
	@Autowired
27
	private UniqueServiceLocator serviceLocator;
28

    
29
	@Override
30
	protected String obtainServiceId(final NodeToken token) {
31
		return getServiceLocator().getServiceId(IndexService.class);
32
	}
33

    
34
	@Override
35
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
36
		final String indexDsId = getEnvParam(token, "index_id");
37

    
38
		log.info("preparing blackboard job DELETE_BY_QUERY index: " + indexDsId);
39

    
40
		final String backendId = getBackendId(indexDsId);
41
		if (StringUtils.isBlank(backendId))
42
			throw new MSROException("empty index backend Id");
43

    
44
		job.setAction("DELETE_BY_QUERY");
45
		job.getParameters().put("id", indexDsId);
46
		job.getParameters().put("backend_Id", backendId);
47
		job.getParameters().put("query", buildQuery(getEnvParam(token, "index.feed.timestamp")));
48
	}
49

    
50
	private String buildQuery(final String version) {
51
		final String query =
52
				String.format("__dsversion:{* TO %s}", InputDocumentFactory.getParsedDateField(version));
53

    
54
		log.info("delete by query: " + query);
55

    
56
		return query;
57
	}
58

    
59
	private String getEnvParam(final NodeToken token, final String name) throws MSROException {
60
		final String value = token.getEnv().getAttribute(name);
61

    
62
		if (StringUtils.isBlank(value))
63
			throw new MSROException(format("unable to finalize index feeding, cannot find property '%s' in the workflow env.", name));
64

    
65
		return value;
66
	}
67

    
68
	public String getBackendId(final String indexDsId) throws ISLookUpDocumentNotFoundException, ISLookUpException {
69
		return getServiceLocator().getService(ISLookUpService.class).getResourceProfileByQuery(
70
				"//RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value='" + indexDsId + "']//BACKEND/text()");
71
	}
72

    
73
}
(3-3/6)