Project

General

Profile

1 35869 claudio.at
package eu.dnetlib.msro.workflows.dedup;
2 35866 claudio.at
3
import com.googlecode.sarasvati.NodeToken;
4
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
5 36417 claudio.at
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestration;
6 35869 claudio.at
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode;
7 36623 claudio.at
import eu.dnetlib.pace.config.DedupConfig;
8 39617 claudio.at
import org.apache.commons.lang.StringUtils;
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11 35866 claudio.at
12 36761 claudio.at
public class DedupConfigurationAwareJobNode extends SubmitHadoopJobNode {
13 35866 claudio.at
14 36240 claudio.at
	private static final Log log = LogFactory.getLog(DedupConfigurationAwareJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
15 35866 claudio.at
16
	private String dedupConfigSequenceParam;
17
18
	@Override
19
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
20
		super.prepareJob(job, token);
21
22 36632 claudio.at
		final DedupConfigurationOrchestration dedupConfigurations = dedupConfigurations(token);
23
		final DedupConfig currentConf = dedupConfigurations.getConfigurations().peek();
24 35866 claudio.at
25 36623 claudio.at
		log.debug("using dedup configuration: '" + currentConf + "'");
26 35866 claudio.at
27 36623 claudio.at
		job.getParameters().put("dedup.conf", currentConf.toString());
28 35866 claudio.at
29 36623 claudio.at
		token.getEnv().setAttribute("dedup.conf", currentConf.toString());
30 35866 claudio.at
	}
31
32 36632 claudio.at
	protected DedupConfigurationOrchestration dedupConfigurations(final NodeToken token) {
33 35866 claudio.at
		final String configs = token.getFullEnv().getAttribute(getDedupConfigSequenceParam());
34 36632 claudio.at
		if ((configs == null) || configs.trim().isEmpty())
35 35866 claudio.at
			throw new IllegalStateException("Cannot find dedup configurations in workflow env: '" + getDedupConfigSequenceParam() + "'");
36
37 36632 claudio.at
		return DedupConfigurationOrchestration.fromJSON(configs);
38 35866 claudio.at
	}
39
40
	protected String getEntityType(final NodeToken token) {
41
		final String entityType = token.getEnv().getAttribute("entityType");
42
		if (StringUtils.isBlank(entityType)) throw new IllegalStateException("Cannot find 'entityType' parameter in workflow env.");
43
		return entityType;
44
	}
45
46
	// //////////
47
48
	public String getDedupConfigSequenceParam() {
49
		return dedupConfigSequenceParam;
50
	}
51
52
	public void setDedupConfigSequenceParam(final String dedupConfigSequenceParam) {
53
		this.dedupConfigSequenceParam = dedupConfigSequenceParam;
54
	}
55
56
}