Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.dedup;
2

    
3
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
4
import eu.dnetlib.miscutils.datetime.DateUtils;
5
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
6
import eu.dnetlib.msro.workflows.procs.Env;
7
import eu.dnetlib.msro.workflows.procs.Token;
8
import eu.dnetlib.rmi.enabling.ISLookUpDocumentNotFoundException;
9
import eu.dnetlib.rmi.enabling.ISLookUpException;
10
import eu.dnetlib.rmi.enabling.ISLookUpService;
11
import eu.dnetlib.rmi.manager.MSROException;
12
import eu.dnetlib.rmi.provision.IndexService;
13
import org.apache.commons.lang3.StringUtils;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16

    
17
import static java.lang.String.format;
18

    
19
public class FinalizeDedupIndexJobNode extends BlackboardJobNode {
20

    
21
	private static final Log log = LogFactory.getLog(FinalizeDedupIndexJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
22

    
23
	private String actionset;
24

    
25
	@Override
26
	protected String obtainServiceId(final Env env) {
27
		return getServiceLocator().getServiceId(IndexService.class);
28
	}
29

    
30
	@Override
31
	protected void prepareJob(final BlackboardJob job, final Token token) throws Exception {
32
		final String indexDsId = getEnvParam(token, "index_id");
33

    
34
		log.info("preparing blackboard job DELETE_BY_QUERY index: " + indexDsId);
35

    
36
		final String backendId = getBackendId(indexDsId);
37
		if (StringUtils.isBlank(backendId))
38
			throw new MSROException("empty index backend Id");
39

    
40
		job.setAction("DELETE_BY_QUERY");
41
		job.getParameters().put("id", indexDsId);
42
		job.getParameters().put("backend_Id", backendId);
43
		job.getParameters().put("query",
44
				buildQuery(getEnvParam(token, "entityType"), getEnvParam(token, "index.feed.timestamp"), getActionset()));
45
	}
46

    
47
	private String buildQuery(final String entityType, final String version, final String actionset) {
48
		final String query =
49
				String.format("__dsversion:{* TO %s} AND oaftype:%s AND actionset:%s", DateUtils.getParsedDateField(version), entityType, actionset);
50

    
51
		log.info("delete by query: " + query);
52

    
53
		return query;
54
	}
55

    
56
	private String getEnvParam(final Token token, final String name) throws MSROException {
57
		final String value = token.getEnv().getAttribute(name, String.class);
58

    
59
		if (StringUtils.isBlank(value))
60
			throw new MSROException(format("unable to finalize index feeding, cannot find property '%s' in the workflow env.", name));
61

    
62
		return value;
63
	}
64

    
65
	public String getBackendId(final String indexDsId) throws ISLookUpDocumentNotFoundException, ISLookUpException {
66
		return getServiceLocator().getService(ISLookUpService.class).getResourceProfileByQuery(
67
				"//RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value='" + indexDsId + "']//BACKEND/text()");
68
	}
69

    
70
	public String getActionset() {
71
		return actionset;
72
	}
73

    
74
	public void setActionset(final String actionset) {
75
		this.actionset = actionset;
76
	}
77
}
(8-8/13)