Project

General

Profile

1
package eu.dnetlib.msro.workflows.dedup;
2

    
3
import static java.lang.String.format;
4

    
5
import org.apache.commons.lang.StringUtils;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8

    
9
import com.googlecode.sarasvati.NodeToken;
10

    
11
import eu.dnetlib.data.provision.index.rmi.IndexService;
12
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
13
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
14
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
15
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
16
import eu.dnetlib.functionality.index.solr.feed.InputDocumentFactory;
17
import eu.dnetlib.msro.rmi.MSROException;
18
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
19

    
20
public class FinalizeDedupIndexJobNode extends BlackboardJobNode {
21

    
22
	private static final Log log = LogFactory.getLog(FinalizeDedupIndexJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
23

    
24
	@Override
25
	protected String obtainServiceId(final NodeToken token) {
26
		return getServiceLocator().getServiceId(IndexService.class);
27
	}
28

    
29
	@Override
30
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
31
		final String indexDsId = getEnvParam(token, "index_id");
32

    
33
		log.info("preparing blackboard job DELETE_BY_QUERY index: " + indexDsId);
34

    
35
		final String backendId = getBackendId(indexDsId);
36
		if (StringUtils.isBlank(backendId))
37
			throw new MSROException("empty index backend Id");
38

    
39
		job.setAction("DELETE_BY_QUERY");
40
		job.getParameters().put("id", indexDsId);
41
		job.getParameters().put("backend_Id", backendId);
42
		job.getParameters().put("query",
43
				buildQuery(getEnvParam(token, "entityType"), getEnvParam(token, "index.feed.timestamp"), getEnvParam(token, "actionset")));
44
	}
45

    
46
	private String buildQuery(final String entityType, final String version, final String actionset) {
47
		final String query =
48
				String.format("__dsversion:{* TO %s} AND oaftype:%s AND actionset:%s", InputDocumentFactory.getParsedDateField(version), entityType, actionset);
49

    
50
		log.info("delete by query: " + query);
51

    
52
		return query;
53
	}
54

    
55
	private String getEnvParam(final NodeToken token, final String name) throws MSROException {
56
		final String value = token.getEnv().getAttribute(name);
57

    
58
		if (StringUtils.isBlank(value))
59
			throw new MSROException(format("unable to finalize index feeding, cannot find property '%s' in the workflow env.", name));
60

    
61
		return value;
62
	}
63

    
64
	public String getBackendId(final String indexDsId) throws ISLookUpDocumentNotFoundException, ISLookUpException {
65
		return getServiceLocator().getService(ISLookUpService.class).getResourceProfileByQuery(
66
				"//RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value='" + indexDsId + "']//BACKEND/text()");
67
	}
68

    
69
}
(10-10/15)