Revision 33106
Added by Claudio Atzori over 9 years ago
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/action/SequenceFileFeeder.java | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import java.io.IOException; |
4 | 4 |
|
5 |
import org.apache.commons.lang.StringUtils; |
|
5 | 6 |
import org.apache.commons.logging.Log; |
6 | 7 |
import org.apache.commons.logging.LogFactory; |
7 | 8 |
import org.apache.hadoop.conf.Configuration; |
... | ... | |
17 | 18 |
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory; |
18 | 19 |
|
19 | 20 |
public class SequenceFileFeeder { |
20 |
|
|
21 |
|
|
21 | 22 |
private static final Log log = LogFactory.getLog(SequenceFileFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM |
22 |
|
|
23 |
|
|
23 | 24 |
private ResultSetClientFactory resultSetClientFactory; |
24 |
|
|
25 |
|
|
25 | 26 |
@Autowired |
26 | 27 |
protected ConfigurationEnumerator configurationEnumerator; |
27 |
|
|
28 |
|
|
28 | 29 |
@Autowired |
29 | 30 |
protected SequenceFileWriterFactory sequenceFileWriterFactory; |
30 |
|
|
31 |
|
|
31 | 32 |
public int feed(final String epr, final ClusterName clusterName, final String path) throws IOException { |
32 |
return doWrite(epr, clusterName, path);
|
|
33 |
return doWrite(epr, clusterName, path);
|
|
33 | 34 |
} |
34 | 35 |
|
35 |
private int doWrite(final String epr, final ClusterName clusterName, final String path) throws IOException {
|
|
36 |
final SequenceFile.Writer writer = sequenceFileWriterFactory.getSequenceFileWriter(Text.class, Text.class, getConf(clusterName), new Path(path));
|
|
36 |
private int doWrite(final String epr, final ClusterName clusterName, final String path) throws IOException {
|
|
37 |
final SequenceFile.Writer writer = sequenceFileWriterFactory.getSequenceFileWriter(Text.class, Text.class, getConf(clusterName), new Path(path));
|
|
37 | 38 |
|
38 |
final Text idText = new Text(); |
|
39 |
final Text bodyText = new Text(); |
|
39 |
log.info("Opened sequence file writer: " + writer.toString()); |
|
40 | 40 |
|
41 |
int count = 0; |
|
41 |
final Text idText = new Text(); |
|
42 |
final Text bodyText = new Text(); |
|
43 |
int count = 0; |
|
44 |
int nulls = 0; |
|
45 |
for (String record : getResultSetClientFactory().getClient(epr)) { |
|
46 |
if (StringUtils.isBlank(record)) { |
|
47 |
nulls++; |
|
48 |
} else { |
|
49 |
idText.set(String.valueOf(count++)); |
|
50 |
bodyText.set(record); |
|
51 |
writer.append(idText, bodyText); |
|
52 |
} |
|
53 |
} |
|
54 |
writer.close(); |
|
42 | 55 |
|
43 |
for (String record : getResultSetClientFactory().getClient(epr)) {
|
|
44 |
idText.set(String.valueOf(count++));
|
|
45 |
bodyText.set(record);
|
|
46 |
writer.append(idText, bodyText);
|
|
47 |
}
|
|
48 |
writer.close();
|
|
56 |
log.info("written " + count + " records in sequence file: " + path);
|
|
57 |
if (nulls > 0) {
|
|
58 |
log.warn("found " + nulls + " records in epr!");
|
|
59 |
}
|
|
60 |
return count;
|
|
61 |
}
|
|
49 | 62 |
|
50 |
log.info("written " + count + " records in sequence file: " + path); |
|
51 |
return count; |
|
52 |
} |
|
53 |
|
|
54 |
protected Configuration getConf(ClusterName clusterName) { |
|
63 |
protected Configuration getConf(final ClusterName clusterName) { |
|
55 | 64 |
return configurationEnumerator.get(clusterName); |
56 | 65 |
} |
57 | 66 |
|
... | ... | |
60 | 69 |
} |
61 | 70 |
|
62 | 71 |
@Required |
63 |
public void setResultSetClientFactory(ResultSetClientFactory resultSetClientFactory) { |
|
72 |
public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) {
|
|
64 | 73 |
this.resultSetClientFactory = resultSetClientFactory; |
65 |
}
|
|
66 |
|
|
74 |
} |
|
75 |
|
|
67 | 76 |
} |
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/action/ImportEprHdfsAction.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.hadoop.action; |
2 | 2 |
|
3 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
3 |
import java.io.IOException; |
|
4 |
|
|
5 |
import org.apache.commons.lang.StringUtils; |
|
4 | 6 |
import org.apache.commons.logging.Log; |
5 | 7 |
import org.apache.commons.logging.LogFactory; |
6 | 8 |
import org.springframework.beans.factory.annotation.Required; |
7 | 9 |
|
8 | 10 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
11 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
9 | 12 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
10 | 13 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler; |
11 | 14 |
import eu.dnetlib.miscutils.functional.xml.DnetXsltFunctions; |
12 | 15 |
|
13 |
import java.io.IOException; |
|
14 |
|
|
15 | 16 |
public class ImportEprHdfsAction extends AbstractHadoopAction { |
16 | 17 |
|
17 | 18 |
private static final Log log = LogFactory.getLog(ImportEprHdfsAction.class); // NOPMD by marko on 11/24/08 5:02 PM |
... | ... | |
21 | 22 |
@Override |
22 | 23 |
public void executeAsync(final BlackboardServerHandler handler, final BlackboardJob job) throws HadoopServiceException { |
23 | 24 |
|
24 |
final String epr = DnetXsltFunctions.decodeBase64(job.getParameters().get("input_epr"));
|
|
25 |
final ClusterName clusterName = ClusterName.valueOf(job.getParameters().get("cluster"));
|
|
26 |
final String path = job.getParameters().get("path");
|
|
25 |
final String epr = DnetXsltFunctions.decodeBase64(job.getParameters().get("input_epr"));
|
|
26 |
final ClusterName clusterName = ClusterName.valueOf(job.getParameters().get("cluster"));
|
|
27 |
final String path = job.getParameters().get("path");
|
|
27 | 28 |
|
28 |
log.info("Starting import in hdfs sequence file: " + path);
|
|
29 |
|
|
30 |
try {
|
|
29 |
log.info(String.format("Starting import in hdfs sequence file '%s', cluster '%s', epr '%s...", path, clusterName.toString(),
|
|
30 |
StringUtils.substring(epr, 0, 20))); |
|
31 |
try {
|
|
31 | 32 |
Integer count = getSequenceFileFeeder().feed(epr, clusterName, path); |
33 |
if (count != null) { |
|
34 |
log.info("Import completed successfully"); |
|
35 |
job.getParameters().put("count", String.valueOf(count)); |
|
36 |
} |
|
32 | 37 |
|
33 |
if (count != null) { |
|
34 |
log.info("Import completed successfully"); |
|
35 |
job.getParameters().put("count", String.valueOf(count)); |
|
36 |
} |
|
37 |
|
|
38 |
handler.done(job); |
|
38 |
handler.done(job); |
|
39 | 39 |
} catch (IOException e) { |
40 | 40 |
throw new HadoopServiceException("Import failed", e); |
41 | 41 |
} |
... | ... | |
46 | 46 |
} |
47 | 47 |
|
48 | 48 |
@Required |
49 |
public void setSequenceFileFeeder(SequenceFileFeeder sequenceFileFeeder) { |
|
49 |
public void setSequenceFileFeeder(final SequenceFileFeeder sequenceFileFeeder) {
|
|
50 | 50 |
this.sequenceFileFeeder = sequenceFileFeeder; |
51 | 51 |
} |
52 | 52 |
|
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/action/AbstractHadoopAction.java | ||
---|---|---|
31 | 31 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
32 | 32 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
33 | 33 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
34 |
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory; |
|
35 | 34 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
36 | 35 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerAction; |
37 | 36 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler; |
... | ... | |
46 | 45 |
public final static Set<String> HDFS_SPECIAL_PROPERTIES = Sets.newHashSet("mapred.input.dir", "mapred.output.dir"); |
47 | 46 |
|
48 | 47 |
@Resource |
49 |
protected UniqueServiceLocator serviceLocator;
|
|
48 |
protected UniqueServiceLocator serviceLocator;
|
|
50 | 49 |
|
51 | 50 |
@Autowired |
52 | 51 |
protected ConfigurationEnumerator configurationEnumerator; |
... | ... | |
59 | 58 |
public void execute(final BlackboardServerHandler handler, final BlackboardJob job) { |
60 | 59 |
executor.execute(new Runnable() { |
61 | 60 |
|
62 |
@Override
|
|
63 |
public void run() {
|
|
64 |
try {
|
|
65 |
handler.ongoing(job);
|
|
66 |
executeAsync(handler, job);
|
|
67 |
} catch (Throwable e) {
|
|
68 |
log.error(e);
|
|
69 |
log.error(e.getCause());
|
|
70 |
handler.failed(job, e);
|
|
71 |
}
|
|
72 |
}
|
|
73 |
});
|
|
61 |
@Override
|
|
62 |
public void run() {
|
|
63 |
try {
|
|
64 |
handler.ongoing(job);
|
|
65 |
executeAsync(handler, job);
|
|
66 |
} catch (Throwable e) {
|
|
67 |
log.error("exception: " + e);
|
|
68 |
log.error("cause: " + e.getCause());
|
|
69 |
handler.failed(job, e);
|
|
70 |
}
|
|
71 |
}
|
|
72 |
});
|
|
74 | 73 |
} |
75 | 74 |
|
76 | 75 |
protected JobProfile loadISJobConfiguration(final String jobName, final Map<String, String> bbParams) throws HadoopServiceException { |
Also available in: Unified diff
cleanup, more logging