Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.dedup;
2

    
3
import java.nio.file.FileSystems;
4
import java.nio.file.Path;
5

    
6
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
7
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
8
import eu.dnetlib.msro.workflows.graph.Arc;
9
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
10
import eu.dnetlib.msro.workflows.procs.Env;
11
import eu.dnetlib.msro.workflows.procs.Token;
12
import eu.dnetlib.rmi.data.hadoop.HadoopService;
13
import org.apache.commons.lang3.StringUtils;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.springframework.beans.factory.annotation.Autowired;
17

    
18
/**
19
 * Created by claudio on 14/10/15.
20
 */
21
public class MinDistSearchHadoopJobNode extends DedupConfigurationLoaderJobNode {
22

    
23
	private static final Log log = LogFactory.getLog(MinDistSearchHadoopJobNode.class);
24

    
25
	private final static String StatusParam = "MinDistSearchHadoopJobNode.status";
26
	private final static String DepthParam = "mindist_recursion_depth";
27
	private final static String UpdateCounterParam = "UpdateCounter.UPDATED";
28
	private final static String DebugParam = "mindist_DEBUG";
29

    
30
	@Autowired
31
	private UniqueServiceLocator serviceLocator;
32
	private boolean debug = false;
33
	private String outPathParam;
34

    
35
	private String workDir;
36

    
37
	@Override
38
	protected void prepareJob(final BlackboardJob job, final Token token) throws Exception {
39

    
40
		String depthString = token.getEnv().getAttribute(DepthParam, String.class);
41
		log.debug(String.format("found depthParam: '%s'", depthString));
42
		if (StringUtils.isBlank(depthString)) {
43
			depthString = "0";
44
		}
45

    
46
		int depth = Integer.valueOf(depthString);
47
		final String outputPath = getPath(getWorkDir(), depth);
48

    
49
		final HadoopService hadoopService = serviceLocator.getService(HadoopService.class);
50
		switch (getStatusFromEnv(token.getEnv())) {
51

    
52
		case DATALOAD:
53

    
54
			setHadoopJob("dedupSimilarity2GraphJob");
55

    
56
			job.getParameters().put("mapred.output.dir", getPath(getWorkDir(), depth) + "/out");
57

    
58
			hadoopService.createHdfsDirectory(getCluster(), outputPath, true);
59

    
60
			break;
61
		case DEPTH_N:
62

    
63
			setHadoopJob("dedupMinDistGraphJob");
64

    
65
			final String newOutputPath = getPath(getWorkDir(), depth + 1);
66
			hadoopService.createHdfsDirectory(getCluster(), newOutputPath, true);
67

    
68
			job.getParameters().put(DepthParam, String.valueOf(depth));
69
			job.getParameters().put(DebugParam, String.valueOf(isDebug()));
70

    
71
			job.getParameters().put("mapred.input.dir", outputPath + "/out");
72
			job.getParameters().put("mapred.output.dir", newOutputPath + "/out");
73

    
74
			if (log.isDebugEnabled()) {
75
				log.debug(String.format("input job parameters: %s", job.getParameters()));
76
			}
77

    
78
			token.getEnv().setAttribute(DepthParam, String.valueOf(depth + 1));
79
			token.getEnv().setAttribute(getOutPathParam(), newOutputPath + "/out");
80

    
81
			break;
82
		}
83

    
84
		super.prepareJob(job, token);
85
	}
86

    
87
	private String getPath(final String basePath, final int depth) {
88

    
89
		log.info("got basePath: " + basePath);
90

    
91
		Path fsPath = FileSystems.getDefault().getPath(basePath, "depth_" + depth);
92
		final String path = fsPath.toAbsolutePath().toString();
93

    
94
		log.info("built outputPath: " + path);
95

    
96
		return path;
97
	}
98

    
99
	private STATUS getStatusFromEnv(final Env env) {
100
		if (StringUtils.isBlank(env.getAttribute(StatusParam, String.class))) {
101
			return STATUS.DATALOAD;
102
		}
103
		STATUS current = STATUS.DATALOAD;
104
		try {
105
			current = STATUS.valueOf(env.getAttribute(StatusParam, String.class));
106
			log.debug("found status: " + current.toString());
107
		} catch (IllegalArgumentException e) {}
108
		return current;
109
	}
110

    
111
	@Override
112
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Token token) {
113
		return new BlackboardWorkflowJobListener(token) {
114

    
115
			@Override
116
			protected void onDone(final BlackboardJob job) {
117

    
118
				final STATUS status = getStatusFromEnv(token.getEnv());
119
				log.debug("complete phase: " + status);
120
				switch (status) {
121
				case DATALOAD:
122
					token.getEnv().setAttribute(StatusParam, STATUS.DEPTH_N.toString());
123
					token.getEnv().setAttribute(DepthParam, "0");
124
					token.release("depth_n");
125
					break;
126
				case DEPTH_N:
127

    
128
					if (log.isDebugEnabled()) {
129
						log.debug(String.format("return job parameters: %s=%s, %s=%s", DepthParam, job.getParameters().get(DepthParam), UpdateCounterParam,
130
								job.getParameters().get(UpdateCounterParam)));
131
					}
132

    
133
					final String counter = job.getParameters().get(UpdateCounterParam);
134
					if (StringUtils.isBlank(counter)) {
135
						token.getEnv().removeAttribute(StatusParam);
136
						token.getEnv().removeAttribute(DepthParam);
137
						log.info(String.format("done iteration %s:%s", UpdateCounterParam, 0));
138

    
139
						token.release(Arc.DEFAULT_ARC);
140
					} else {
141
						log.info(String.format("continue with next iteration %s:%s", UpdateCounterParam, counter));
142
						token.release("depth_n");
143
					}
144

    
145
					break;
146
				}
147
			}
148
		};
149
	}
150

    
151
	public boolean isDebug() {
152
		return debug;
153
	}
154

    
155
	public void setDebug(boolean debug) {
156
		this.debug = debug;
157
	}
158

    
159
	public String getOutPathParam() {
160
		return outPathParam;
161
	}
162

    
163
	public void setOutPathParam(String outPathParam) {
164
		this.outPathParam = outPathParam;
165
	}
166

    
167
	public String getWorkDir() {
168
		return workDir;
169
	}
170

    
171
	public void setWorkDir(final String workDir) {
172
		this.workDir = workDir;
173
	}
174

    
175
	enum STATUS {DATALOAD, DEPTH_N}
176

    
177
}
(9-9/13)