1 |
40545
|
eri.katsar
|
package eu.dnetlib.data.mapreduce.hbase.lodImport;
|
2 |
|
|
|
3 |
40778
|
giorgos.al
|
import java.io.IOException;
|
4 |
|
|
import java.util.Iterator;
|
5 |
|
|
import java.util.List;
|
6 |
|
|
|
7 |
|
|
import javax.sql.DataSource;
|
8 |
|
|
|
9 |
40728
|
giorgos.al
|
import org.apache.hadoop.conf.Configuration;
|
10 |
40729
|
eri.katsar
|
import org.apache.hadoop.io.NullWritable;
|
11 |
40545
|
eri.katsar
|
import org.apache.hadoop.io.Text;
|
12 |
|
|
import org.apache.hadoop.mapreduce.Reducer;
|
13 |
|
|
import org.apache.log4j.Logger;
|
14 |
40719
|
giorgos.al
|
import org.json.JSONObject;
|
15 |
40545
|
eri.katsar
|
|
16 |
40778
|
giorgos.al
|
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.DB;
|
17 |
|
|
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.RDFizer;
|
18 |
|
|
|
19 |
|
|
|
20 |
40774
|
eri.katsar
|
import java.util.Arrays;
|
21 |
40725
|
eri.katsar
|
|
22 |
40718
|
giorgos.al
|
|
23 |
40778
|
giorgos.al
|
|
24 |
|
|
|
25 |
40774
|
eri.katsar
|
public class LodImportReducer extends Reducer<Text, Text, NullWritable, NullWritable> {
|
26 |
40545
|
eri.katsar
|
|
27 |
40726
|
eri.katsar
|
private Logger log = Logger.getLogger(LodImportReducer.class);
|
28 |
|
|
private DataSource ds;
|
29 |
|
|
private String fileName = "";
|
30 |
40728
|
giorgos.al
|
private Configuration hadoopConf;
|
31 |
40545
|
eri.katsar
|
|
32 |
40726
|
eri.katsar
|
@Override
|
33 |
|
|
protected void setup(Context context) throws IOException, InterruptedException {
|
34 |
40778
|
giorgos.al
|
this.fileName = hadoopConf.get("lod.inputFile");
|
35 |
40728
|
giorgos.al
|
this.hadoopConf = context.getConfiguration();
|
36 |
40726
|
eri.katsar
|
try {
|
37 |
40778
|
giorgos.al
|
ds = DB.getDatasource(hadoopConf.get("lod.conLine"), hadoopConf.get("lod.username"), hadoopConf.get("lod.password"), hadoopConf.get("lod.minCpart"), hadoopConf.get("lod.maxCpart"), hadoopConf.get("lod.part"));
|
38 |
40726
|
eri.katsar
|
} catch (Exception e) {
|
39 |
|
|
log.error("", e);
|
40 |
40728
|
giorgos.al
|
|
41 |
40726
|
eri.katsar
|
}
|
42 |
40545
|
eri.katsar
|
|
43 |
40726
|
eri.katsar
|
}
|
44 |
40729
|
eri.katsar
|
|
45 |
40726
|
eri.katsar
|
@Override
|
46 |
40774
|
eri.katsar
|
protected void reduce(final Text key, final Iterable <Text> values, final Context context) throws IOException, InterruptedException {
|
47 |
40778
|
giorgos.al
|
JSONObject entitiesMappings = new JSONObject(hadoopConf.get("lod.jsonEntities"));
|
48 |
|
|
JSONObject relationsMappings = new JSONObject(hadoopConf.get("lod.jsonRels"));
|
49 |
40725
|
eri.katsar
|
|
50 |
40723
|
giorgos.al
|
// Connection conn = ds.getConnection();
|
51 |
40725
|
eri.katsar
|
|
52 |
40774
|
eri.katsar
|
Iterator<Text> it = values.iterator();
|
53 |
40545
|
eri.katsar
|
|
54 |
40726
|
eri.katsar
|
while (it.hasNext()) {
|
55 |
40774
|
eri.katsar
|
String split= it.next().toString();
|
56 |
|
|
List<String> row = Arrays.asList(split);
|
57 |
40726
|
eri.katsar
|
System.out.println(" Values : ");
|
58 |
|
|
log.info(" Values : ");
|
59 |
40778
|
giorgos.al
|
if (fileName.startsWith("entities")) RDFizer.RDFizeEntityRow(row, ds, entitiesMappings,hadoopConf);
|
60 |
|
|
else RDFizer.RDFizeRelationRow(row, ds, relationsMappings,hadoopConf);
|
61 |
40728
|
giorgos.al
|
|
62 |
40545
|
eri.katsar
|
|
63 |
40726
|
eri.katsar
|
}
|
64 |
40545
|
eri.katsar
|
|
65 |
40726
|
eri.katsar
|
}
|
66 |
40545
|
eri.katsar
|
|
67 |
40726
|
eri.katsar
|
@Override
|
68 |
|
|
protected void cleanup(Context context) throws IOException, InterruptedException {
|
69 |
|
|
log.info("Cleaning up reducer...");
|
70 |
|
|
|
71 |
|
|
}
|
72 |
40545
|
eri.katsar
|
}
|