Project

General

Profile

1 39617 claudio.at
package eu.dnetlib.msro.workflows.dedup;
2
3
import java.nio.file.FileSystems;
4
import java.nio.file.Path;
5
6
import com.googlecode.sarasvati.Arc;
7
import com.googlecode.sarasvati.Engine;
8
import com.googlecode.sarasvati.NodeToken;
9
import eu.dnetlib.data.hadoop.rmi.HadoopService;
10
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
11
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
12
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
13
import org.apache.commons.lang.StringUtils;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.springframework.beans.factory.annotation.Autowired;
17
18
/**
19
 * Created by claudio on 14/10/15.
20
 */
21 39629 claudio.at
public class MinDistSearchHadoopJobNode extends DedupConfigurationAwareJobNode {
22 39617 claudio.at
23
	private static final Log log = LogFactory.getLog(MinDistSearchHadoopJobNode.class);
24 40247 claudio.at
	private final static String StatusParam = "MinDistSearchHadoopJobNode.status";
25
	private final static String DepthParam = "mindist_recursion_depth";
26
	private final static String UpdateCounterParam = "UpdateCounter.UPDATED";
27
	private final static String DebugParam = "mindist_DEBUG";
28 39617 claudio.at
	@Autowired
29
	private UniqueServiceLocator serviceLocator;
30
	private boolean debug = false;
31
	private String outPathParam;
32
33
	@Override
34
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
35
36
		String depthString = token.getFullEnv().getAttribute(DepthParam);
37
		log.debug(String.format("found depthParam: '%s'", depthString));
38
		if (StringUtils.isBlank(depthString)) {
39
			depthString = "0";
40
		}
41
42
		int depth = Integer.valueOf(depthString);
43
44
		final String cluster = token.getEnv().getAttribute("cluster");
45
		final String outputPath = getPath(token.getEnv().getAttribute("workDir"), depth);
46
47
		final HadoopService hadoopService = serviceLocator.getService(HadoopService.class);
48
		switch (getStatusFromEnv(token)) {
49
50
		case DATALOAD:
51
52
			setHadoopJob("dedupSimilarity2GraphJob");
53
54
			job.getParameters().put("mapred.output.dir", getPath(token.getEnv().getAttribute("workDir"), depth) + "/out");
55
56
			hadoopService.createHdfsDirectory(cluster, outputPath, true);
57
58
			break;
59
		case DEPTH_N:
60
61
			setHadoopJob("dedupMinDistGraphJob");
62
63
			final String newOutputPath = getPath(token.getEnv().getAttribute("workDir"), depth + 1);
64
			hadoopService.createHdfsDirectory(cluster, newOutputPath, true);
65
66
			job.getParameters().put(DepthParam, String.valueOf(depth));
67
			job.getParameters().put(DebugParam, String.valueOf(isDebug()));
68
69
			job.getParameters().put("mapred.input.dir", outputPath + "/out");
70
			job.getParameters().put("mapred.output.dir", newOutputPath + "/out");
71
72
			if (log.isDebugEnabled()) {
73
				log.debug(String.format("input job parameters: %s", job.getParameters()));
74
			}
75
76
			token.getFullEnv().setAttribute(DepthParam, String.valueOf(depth + 1));
77
			token.getFullEnv().setAttribute(getOutPathParam(), newOutputPath + "/out");
78
79
			break;
80
		}
81
82
		super.prepareJob(job, token);
83
	}
84
85
	private String getPath(final String basePath, final int depth) {
86
		Path path = FileSystems.getDefault().getPath(basePath, "depth_" + depth);
87
		return path.toAbsolutePath().toString();
88
	}
89
90
	private STATUS getStatusFromEnv(final NodeToken token) {
91
		if(StringUtils.isBlank(token.getEnv().getAttribute(StatusParam))) {
92
			return STATUS.DATALOAD;
93
		}
94
		STATUS current = STATUS.DATALOAD;
95
		try {
96
			current = STATUS.valueOf(token.getEnv().getAttribute(StatusParam));
97
			log.debug("found status: " + current.toString());
98
		} catch (IllegalArgumentException e) {}
99
		return current;
100
	}
101
102
	@Override
103
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
104
		return new BlackboardWorkflowJobListener(engine, token) {
105
106
			@Override
107
			protected void onDone(final BlackboardJob job) {
108
109
				final STATUS status = getStatusFromEnv(token);
110
				log.debug("complete phase: " + status);
111
				switch (status) {
112
				case DATALOAD:
113
					token.getFullEnv().setAttribute(StatusParam, STATUS.DEPTH_N.toString());
114
					token.getFullEnv().setAttribute(DepthParam, "0");
115 44037 claudio.at
					complete("depth_n");
116 39617 claudio.at
					break;
117
				case DEPTH_N:
118
119
					if (log.isDebugEnabled()) {
120
						log.debug(String.format("return job parameters: %s=%s, %s=%s", DepthParam, job.getParameters().get(DepthParam), UpdateCounterParam, job.getParameters().get(UpdateCounterParam)));
121
					}
122
123
					final String counter = job.getParameters().get(UpdateCounterParam);
124
					if (StringUtils.isBlank(counter)) {
125 40247 claudio.at
						token.getFullEnv().removeAttribute(StatusParam);
126
						token.getFullEnv().removeAttribute(DepthParam);
127 39617 claudio.at
						log.info(String.format("done iteration %s:%s", UpdateCounterParam, 0));
128 44037 claudio.at
						complete(Arc.DEFAULT_ARC);
129 39617 claudio.at
					} else {
130
						log.info(String.format("continue with next iteration %s:%s", UpdateCounterParam, counter));
131 44037 claudio.at
						complete("depth_n");
132 39617 claudio.at
					}
133
134
					break;
135
				}
136
			}
137 44037 claudio.at
138
			private void complete(final String arc) {
139
				engine.complete(token, arc);
140
				engine.executeQueuedArcTokens(token.getProcess());
141
			}
142 39617 claudio.at
		};
143
	}
144
145
	public boolean isDebug() {
146
		return debug;
147
	}
148
149
	public void setDebug(boolean debug) {
150
		this.debug = debug;
151
	}
152
153
	public String getOutPathParam() {
154
		return outPathParam;
155
	}
156
157
	public void setOutPathParam(String outPathParam) {
158
		this.outPathParam = outPathParam;
159
	}
160
161 40247 claudio.at
	enum STATUS {DATALOAD, DEPTH_N}
162
163 39617 claudio.at
}