Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.index;
2

    
3
import java.io.IOException;
4
import java.io.StringReader;
5
import java.util.Map.Entry;
6

    
7
import org.apache.hadoop.conf.Configuration;
8
import org.apache.hadoop.io.Text;
9
import org.apache.hadoop.mapreduce.Mapper;
10
import org.dom4j.Document;
11
import org.dom4j.io.SAXReader;
12

    
13
public class DocumentDatabaseMapper extends Mapper<Text, Text, Text, Text> {
14

    
15
	@Override
16
	protected void setup(final Context context) throws IOException, InterruptedException {
17
		logConfiguration(context.getConfiguration());
18
	}
19

    
20
	@Override
21
	protected void map(final Text key, final Text value, final Context context) throws IOException, InterruptedException {
22
		try {
23
			final SAXReader reader = new SAXReader();
24

    
25
			final Document doc = reader.read(new StringReader(value.toString()));
26

    
27
			final String id = doc.valueOf("//dri:objIdentifier");
28

    
29
			addDocument(context, id, value.toString());
30

    
31
			context.getCounter("Document Database", "records").increment(1);
32
		} catch (Exception e) {
33
			context.getCounter("Document Database", "errors").increment(1);
34
			context.write(key, value);
35
			e.printStackTrace(System.err);
36
		}
37

    
38
	}
39

    
40
	private void addDocument(final Context context, final String id, final String string) {
41
		// TODO
42

    
43
		// ADD the document to the DB
44

    
45
	}
46

    
47
	@Override
48
	protected void cleanup(final Context context) throws IOException, InterruptedException {
49
		super.cleanup(context);
50
	}
51

    
52
	private void logConfiguration(final Configuration conf) {
53
		System.out.println("job configutation #################");
54
		for (Entry<String, String> e : conf) {
55
			System.out.println("'" + e.getKey() + "' : '" + e.getValue() + "'");
56
		}
57
		System.out.println("end of job configutation #################\n\n");
58
	}
59

    
60
}
(3-3/7)