Project

General

Profile

1
package eu.dnetlib.msro.workflows.dedup;
2

    
3
import com.googlecode.sarasvati.NodeToken;
4
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
5
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestration;
6
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode;
7
import eu.dnetlib.pace.config.DedupConfig;
8
import org.apache.commons.lang.StringUtils;
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11

    
12
public class DedupConfigurationAwareJobNode extends SubmitHadoopJobNode {
13

    
14
	private static final Log log = LogFactory.getLog(DedupConfigurationAwareJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
15

    
16
	private String dedupConfigSequenceParam;
17

    
18
	@Override
19
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
20
		super.prepareJob(job, token);
21

    
22
		final DedupConfigurationOrchestration dedupConfigurations = dedupConfigurations(token);
23
		final DedupConfig currentConf = dedupConfigurations.getConfigurations().peek();
24

    
25
		log.debug("using dedup configuration: '" + currentConf + "'");
26

    
27
		job.getParameters().put("dedup.conf", currentConf.toString());
28

    
29
		token.getEnv().setAttribute("dedup.conf", currentConf.toString());
30
	}
31

    
32
	protected DedupConfigurationOrchestration dedupConfigurations(final NodeToken token) {
33
		final String configs = token.getFullEnv().getAttribute(getDedupConfigSequenceParam());
34
		if ((configs == null) || configs.trim().isEmpty())
35
			throw new IllegalStateException("Cannot find dedup configurations in workflow env: '" + getDedupConfigSequenceParam() + "'");
36

    
37
		return DedupConfigurationOrchestration.fromJSON(configs);
38
	}
39

    
40
	protected String getEntityType(final NodeToken token) {
41
		final String entityType = token.getEnv().getAttribute("entityType");
42
		if (StringUtils.isBlank(entityType)) throw new IllegalStateException("Cannot find 'entityType' parameter in workflow env.");
43
		return entityType;
44
	}
45

    
46
	// //////////
47

    
48
	public String getDedupConfigSequenceParam() {
49
		return dedupConfigSequenceParam;
50
	}
51

    
52
	public void setDedupConfigSequenceParam(final String dedupConfigSequenceParam) {
53
		this.dedupConfigSequenceParam = dedupConfigSequenceParam;
54
	}
55

    
56
}
(5-5/15)