Project

General

Profile

1
package eu.dnetlib.msro.workflows.dedup;
2

    
3
import java.nio.file.FileSystems;
4
import java.nio.file.Path;
5

    
6
import com.googlecode.sarasvati.Arc;
7
import com.googlecode.sarasvati.Engine;
8
import com.googlecode.sarasvati.NodeToken;
9
import eu.dnetlib.data.hadoop.rmi.HadoopService;
10
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
11
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
12
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
13
import org.apache.commons.lang.StringUtils;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.springframework.beans.factory.annotation.Autowired;
17

    
18
/**
19
 * Created by claudio on 14/10/15.
20
 */
21
public class MinDistSearchHadoopJobNode extends DedupConfigurationAwareJobNode {
22

    
23
	private static final Log log = LogFactory.getLog(MinDistSearchHadoopJobNode.class);
24
	private final static String StatusParam = "MinDistSearchHadoopJobNode.status";
25
	private final static String DepthParam = "mindist_recursion_depth";
26
	private final static String UpdateCounterParam = "UpdateCounter.UPDATED";
27
	private final static String DebugParam = "mindist_DEBUG";
28
	@Autowired
29
	private UniqueServiceLocator serviceLocator;
30
	private boolean debug = false;
31
	private String outPathParam;
32

    
33
	@Override
34
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
35

    
36
		String depthString = token.getFullEnv().getAttribute(DepthParam);
37
		log.debug(String.format("found depthParam: '%s'", depthString));
38
		if (StringUtils.isBlank(depthString)) {
39
			depthString = "0";
40
		}
41

    
42
		int depth = Integer.valueOf(depthString);
43

    
44
		final String cluster = token.getEnv().getAttribute("cluster");
45
		final String outputPath = getPath(token.getEnv().getAttribute("workDir"), depth);
46

    
47
		final HadoopService hadoopService = serviceLocator.getService(HadoopService.class);
48
		switch (getStatusFromEnv(token)) {
49

    
50
		case DATALOAD:
51

    
52
			setHadoopJob("dedupSimilarity2GraphJob");
53

    
54
			job.getParameters().put("mapred.output.dir", getPath(token.getEnv().getAttribute("workDir"), depth) + "/out");
55

    
56
			hadoopService.createHdfsDirectory(cluster, outputPath, true);
57

    
58
			break;
59
		case DEPTH_N:
60

    
61
			setHadoopJob("dedupMinDistGraphJob");
62

    
63
			final String newOutputPath = getPath(token.getEnv().getAttribute("workDir"), depth + 1);
64
			hadoopService.createHdfsDirectory(cluster, newOutputPath, true);
65

    
66
			job.getParameters().put(DepthParam, String.valueOf(depth));
67
			job.getParameters().put(DebugParam, String.valueOf(isDebug()));
68

    
69
			job.getParameters().put("mapred.input.dir", outputPath + "/out");
70
			job.getParameters().put("mapred.output.dir", newOutputPath + "/out");
71

    
72
			if (log.isDebugEnabled()) {
73
				log.debug(String.format("input job parameters: %s", job.getParameters()));
74
			}
75

    
76
			token.getFullEnv().setAttribute(DepthParam, String.valueOf(depth + 1));
77
			token.getFullEnv().setAttribute(getOutPathParam(), newOutputPath + "/out");
78

    
79
			break;
80
		}
81

    
82
		super.prepareJob(job, token);
83
	}
84

    
85
	private String getPath(final String basePath, final int depth) {
86
		Path path = FileSystems.getDefault().getPath(basePath, "depth_" + depth);
87
		return path.toAbsolutePath().toString();
88
	}
89

    
90
	private STATUS getStatusFromEnv(final NodeToken token) {
91
		if(StringUtils.isBlank(token.getEnv().getAttribute(StatusParam))) {
92
			return STATUS.DATALOAD;
93
		}
94
		STATUS current = STATUS.DATALOAD;
95
		try {
96
			current = STATUS.valueOf(token.getEnv().getAttribute(StatusParam));
97
			log.debug("found status: " + current.toString());
98
		} catch (IllegalArgumentException e) {}
99
		return current;
100
	}
101

    
102
	@Override
103
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
104
		return new BlackboardWorkflowJobListener(engine, token) {
105

    
106
			@Override
107
			protected void onDone(final BlackboardJob job) {
108

    
109
				final STATUS status = getStatusFromEnv(token);
110
				log.debug("complete phase: " + status);
111
				switch (status) {
112
				case DATALOAD:
113
					token.getFullEnv().setAttribute(StatusParam, STATUS.DEPTH_N.toString());
114
					token.getFullEnv().setAttribute(DepthParam, "0");
115
					complete("depth_n");
116
					break;
117
				case DEPTH_N:
118

    
119
					if (log.isDebugEnabled()) {
120
						log.debug(String.format("return job parameters: %s=%s, %s=%s", DepthParam, job.getParameters().get(DepthParam), UpdateCounterParam, job.getParameters().get(UpdateCounterParam)));
121
					}
122

    
123
					final String counter = job.getParameters().get(UpdateCounterParam);
124
					if (StringUtils.isBlank(counter)) {
125
						token.getFullEnv().removeAttribute(StatusParam);
126
						token.getFullEnv().removeAttribute(DepthParam);
127
						log.info(String.format("done iteration %s:%s", UpdateCounterParam, 0));
128
						complete(Arc.DEFAULT_ARC);
129
					} else {
130
						log.info(String.format("continue with next iteration %s:%s", UpdateCounterParam, counter));
131
						complete("depth_n");
132
					}
133

    
134
					break;
135
				}
136
			}
137

    
138
			private void complete(final String arc) {
139
				engine.complete(token, arc);
140
				engine.executeQueuedArcTokens(token.getProcess());
141
			}
142
		};
143
	}
144

    
145
	public boolean isDebug() {
146
		return debug;
147
	}
148

    
149
	public void setDebug(boolean debug) {
150
		this.debug = debug;
151
	}
152

    
153
	public String getOutPathParam() {
154
		return outPathParam;
155
	}
156

    
157
	public void setOutPathParam(String outPathParam) {
158
		this.outPathParam = outPathParam;
159
	}
160

    
161
	enum STATUS {DATALOAD, DEPTH_N}
162

    
163
}
(11-11/15)