Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.dedup;
2

    
3
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
4
import eu.dnetlib.msro.workflows.nodes.dedup.utils.DedupConfigurationOrchestration;
5
import eu.dnetlib.msro.workflows.nodes.hadoop.SubmitHadoopJobNode;
6
import eu.dnetlib.msro.workflows.procs.Token;
7
import eu.dnetlib.pace.config.DedupConfig;
8
import eu.dnetlib.rmi.manager.MSROException;
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11

    
12
public class DedupConfigurationAwareJobNode extends SubmitHadoopJobNode {
13

    
14
	public static final String DEDUP_CONF = "dedup.conf";
15

    
16
	private static final Log log = LogFactory.getLog(DedupConfigurationAwareJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
17

    
18
	private DedupConfigurationOrchestration dedupConfigurationOrchestration;
19

    
20
	@Override
21
	protected void prepareJob(final BlackboardJob job, final Token token) throws Exception {
22
		super.prepareJob(job, token);
23

    
24
		final DedupConfigurationOrchestration conf = getDedupConfigurationOrchestration();
25
		if (conf == null) {
26
			throw new MSROException("Cannot find dedup configurations in workflow env");
27
		}
28
		final DedupConfig currentConf = conf.getConfigurations().peek();
29

    
30
		log.debug("using dedup configuration: '" + currentConf + "'");
31

    
32
		job.getParameters().put(DEDUP_CONF, currentConf.toString());
33

    
34
		token.getEnv().setAttribute(DEDUP_CONF, currentConf);
35
	}
36

    
37
	// //////////
38

    
39
	public DedupConfigurationOrchestration getDedupConfigurationOrchestration() {
40
		return dedupConfigurationOrchestration;
41
	}
42

    
43
	public void setDedupConfigurationOrchestration(final DedupConfigurationOrchestration dedupConfigurationOrchestration) {
44
		this.dedupConfigurationOrchestration = dedupConfigurationOrchestration;
45
	}
46
}
(3-3/13)