Project

General

Profile

« Previous | Next » 

Revision 33106

cleanup, more logging

View differences:

modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/action/SequenceFileFeeder.java
2 2

  
3 3
import java.io.IOException;
4 4

  
5
import org.apache.commons.lang.StringUtils;
5 6
import org.apache.commons.logging.Log;
6 7
import org.apache.commons.logging.LogFactory;
7 8
import org.apache.hadoop.conf.Configuration;
......
17 18
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
18 19

  
19 20
public class SequenceFileFeeder {
20
	
21

  
21 22
	private static final Log log = LogFactory.getLog(SequenceFileFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM
22
	
23

  
23 24
	private ResultSetClientFactory resultSetClientFactory;
24
	
25

  
25 26
	@Autowired
26 27
	protected ConfigurationEnumerator configurationEnumerator;
27
	
28

  
28 29
	@Autowired
29 30
	protected SequenceFileWriterFactory sequenceFileWriterFactory;
30
	
31

  
31 32
	public int feed(final String epr, final ClusterName clusterName, final String path) throws IOException {
32
        return doWrite(epr, clusterName, path);
33
		return doWrite(epr, clusterName, path);
33 34
	}
34 35

  
35
    private int doWrite(final String epr, final ClusterName clusterName, final String path) throws IOException {
36
        final SequenceFile.Writer writer = sequenceFileWriterFactory.getSequenceFileWriter(Text.class, Text.class, getConf(clusterName), new Path(path));
36
	private int doWrite(final String epr, final ClusterName clusterName, final String path) throws IOException {
37
		final SequenceFile.Writer writer = sequenceFileWriterFactory.getSequenceFileWriter(Text.class, Text.class, getConf(clusterName), new Path(path));
37 38

  
38
        final Text idText = new Text();
39
        final Text bodyText = new Text();
39
		log.info("Opened sequence file writer: " + writer.toString());
40 40

  
41
        int count = 0;
41
		final Text idText = new Text();
42
		final Text bodyText = new Text();
43
		int count = 0;
44
		int nulls = 0;
45
		for (String record : getResultSetClientFactory().getClient(epr)) {
46
			if (StringUtils.isBlank(record)) {
47
				nulls++;
48
			} else {
49
				idText.set(String.valueOf(count++));
50
				bodyText.set(record);
51
				writer.append(idText, bodyText);
52
			}
53
		}
54
		writer.close();
42 55

  
43
        for (String record : getResultSetClientFactory().getClient(epr)) {
44
            idText.set(String.valueOf(count++));
45
            bodyText.set(record);
46
            writer.append(idText, bodyText);
47
        }
48
        writer.close();
56
		log.info("written " + count + " records in sequence file: " + path);
57
		if (nulls > 0) {
58
			log.warn("found " + nulls + " records in epr!");
59
		}
60
		return count;
61
	}
49 62

  
50
        log.info("written " + count + " records in sequence file: " + path);
51
        return count;
52
    }
53

  
54
	protected Configuration getConf(ClusterName clusterName) {
63
	protected Configuration getConf(final ClusterName clusterName) {
55 64
		return configurationEnumerator.get(clusterName);
56 65
	}
57 66

  
......
60 69
	}
61 70

  
62 71
	@Required
63
	public void setResultSetClientFactory(ResultSetClientFactory resultSetClientFactory) {
72
	public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) {
64 73
		this.resultSetClientFactory = resultSetClientFactory;
65
	}	
66
	
74
	}
75

  
67 76
}
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/action/ImportEprHdfsAction.java
1 1
package eu.dnetlib.data.hadoop.action;
2 2

  
3
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
3
import java.io.IOException;
4

  
5
import org.apache.commons.lang.StringUtils;
4 6
import org.apache.commons.logging.Log;
5 7
import org.apache.commons.logging.LogFactory;
6 8
import org.springframework.beans.factory.annotation.Required;
7 9

  
8 10
import eu.dnetlib.data.hadoop.config.ClusterName;
11
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
9 12
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
10 13
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
11 14
import eu.dnetlib.miscutils.functional.xml.DnetXsltFunctions;
12 15

  
13
import java.io.IOException;
14

  
15 16
public class ImportEprHdfsAction extends AbstractHadoopAction {
16 17

  
17 18
	private static final Log log = LogFactory.getLog(ImportEprHdfsAction.class); // NOPMD by marko on 11/24/08 5:02 PM
......
21 22
	@Override
22 23
	public void executeAsync(final BlackboardServerHandler handler, final BlackboardJob job) throws HadoopServiceException {
23 24

  
24
        final String epr = DnetXsltFunctions.decodeBase64(job.getParameters().get("input_epr"));
25
        final ClusterName clusterName = ClusterName.valueOf(job.getParameters().get("cluster"));
26
        final String path = job.getParameters().get("path");
25
		final String epr = DnetXsltFunctions.decodeBase64(job.getParameters().get("input_epr"));
26
		final ClusterName clusterName = ClusterName.valueOf(job.getParameters().get("cluster"));
27
		final String path = job.getParameters().get("path");
27 28

  
28
        log.info("Starting import in hdfs sequence file: " + path);
29

  
30
        try {
29
		log.info(String.format("Starting import in hdfs sequence file '%s', cluster '%s', epr '%s...", path, clusterName.toString(),
30
				StringUtils.substring(epr, 0, 20)));
31
		try {
31 32
			Integer count = getSequenceFileFeeder().feed(epr, clusterName, path);
33
			if (count != null) {
34
				log.info("Import completed successfully");
35
				job.getParameters().put("count", String.valueOf(count));
36
			}
32 37

  
33
            if (count != null) {
34
                log.info("Import completed successfully");
35
                job.getParameters().put("count", String.valueOf(count));
36
            }
37

  
38
            handler.done(job);
38
			handler.done(job);
39 39
		} catch (IOException e) {
40 40
			throw new HadoopServiceException("Import failed", e);
41 41
		}
......
46 46
	}
47 47

  
48 48
	@Required
49
	public void setSequenceFileFeeder(SequenceFileFeeder sequenceFileFeeder) {
49
	public void setSequenceFileFeeder(final SequenceFileFeeder sequenceFileFeeder) {
50 50
		this.sequenceFileFeeder = sequenceFileFeeder;
51 51
	}
52 52

  
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/action/AbstractHadoopAction.java
31 31
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
32 32
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
33 33
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
34
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
35 34
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
36 35
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerAction;
37 36
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
......
46 45
	public final static Set<String> HDFS_SPECIAL_PROPERTIES = Sets.newHashSet("mapred.input.dir", "mapred.output.dir");
47 46

  
48 47
	@Resource
49
    protected UniqueServiceLocator serviceLocator;
48
	protected UniqueServiceLocator serviceLocator;
50 49

  
51 50
	@Autowired
52 51
	protected ConfigurationEnumerator configurationEnumerator;
......
59 58
	public void execute(final BlackboardServerHandler handler, final BlackboardJob job) {
60 59
		executor.execute(new Runnable() {
61 60

  
62
            @Override
63
            public void run() {
64
                try {
65
                    handler.ongoing(job);
66
                    executeAsync(handler, job);
67
                } catch (Throwable e) {
68
                    log.error(e);
69
                    log.error(e.getCause());
70
                    handler.failed(job, e);
71
                }
72
            }
73
        });
61
			@Override
62
			public void run() {
63
				try {
64
					handler.ongoing(job);
65
					executeAsync(handler, job);
66
				} catch (Throwable e) {
67
					log.error("exception: " + e);
68
					log.error("cause: " + e.getCause());
69
					handler.failed(job, e);
70
				}
71
			}
72
		});
74 73
	}
75 74

  
76 75
	protected JobProfile loadISJobConfiguration(final String jobName, final Map<String, String> bbParams) throws HadoopServiceException {

Also available in: Unified diff