Project

General

Profile

1 40545 eri.katsar
package eu.dnetlib.data.mapreduce.hbase.lodImport;
2
3 40778 giorgos.al
import java.io.IOException;
4
import java.util.Iterator;
5
import java.util.List;
6
7
import javax.sql.DataSource;
8
9 40728 giorgos.al
import org.apache.hadoop.conf.Configuration;
10 40729 eri.katsar
import org.apache.hadoop.io.NullWritable;
11 40545 eri.katsar
import org.apache.hadoop.io.Text;
12
import org.apache.hadoop.mapreduce.Reducer;
13
import org.apache.log4j.Logger;
14 40719 giorgos.al
import org.json.JSONObject;
15 40545 eri.katsar
16 40778 giorgos.al
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.DB;
17
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.RDFizer;
18
19
20 40774 eri.katsar
import java.util.Arrays;
21 40725 eri.katsar
22 40718 giorgos.al
23 40778 giorgos.al
24
25 40774 eri.katsar
public class LodImportReducer extends Reducer<Text, Text, NullWritable, NullWritable> {
26 40545 eri.katsar
27 40726 eri.katsar
    private Logger log = Logger.getLogger(LodImportReducer.class);
28
    private DataSource ds;
29
    private String fileName = "";
30 40728 giorgos.al
    private Configuration hadoopConf;
31 40545 eri.katsar
32 40726 eri.katsar
    @Override
33
    protected void setup(Context context) throws IOException, InterruptedException {
34 40778 giorgos.al
        this.fileName = hadoopConf.get("lod.inputFile");
35 40728 giorgos.al
        this.hadoopConf = context.getConfiguration();
36 40726 eri.katsar
        try {
37 40778 giorgos.al
            ds = DB.getDatasource(hadoopConf.get("lod.conLine"), hadoopConf.get("lod.username"), hadoopConf.get("lod.password"), hadoopConf.get("lod.minCpart"), hadoopConf.get("lod.maxCpart"), hadoopConf.get("lod.part"));
38 40726 eri.katsar
        } catch (Exception e) {
39
            log.error("", e);
40 40728 giorgos.al
41 40726 eri.katsar
        }
42 40545 eri.katsar
43 40726 eri.katsar
    }
44 40729 eri.katsar
45 40726 eri.katsar
    @Override
46 40774 eri.katsar
    protected void reduce(final Text key, final Iterable <Text> values, final Context context) throws IOException, InterruptedException {
47 40778 giorgos.al
        JSONObject entitiesMappings = new JSONObject(hadoopConf.get("lod.jsonEntities"));
48
        JSONObject relationsMappings = new JSONObject(hadoopConf.get("lod.jsonRels"));
49 40725 eri.katsar
50 40723 giorgos.al
//		Connection conn = ds.getConnection();
51 40725 eri.katsar
52 40774 eri.katsar
        Iterator<Text> it = values.iterator();
53 40545 eri.katsar
54 40726 eri.katsar
        while (it.hasNext()) {
55 40774 eri.katsar
            String split= it.next().toString();
56
            List<String> row =   Arrays.asList(split);
57 40726 eri.katsar
            System.out.println(" Values : ");
58
            log.info(" Values : ");
59 40778 giorgos.al
            if (fileName.startsWith("entities")) RDFizer.RDFizeEntityRow(row, ds, entitiesMappings,hadoopConf);
60
            else RDFizer.RDFizeRelationRow(row, ds, relationsMappings,hadoopConf);
61 40728 giorgos.al
62 40545 eri.katsar
63 40726 eri.katsar
        }
64 40545 eri.katsar
65 40726 eri.katsar
    }
66 40545 eri.katsar
67 40726 eri.katsar
    @Override
68
    protected void cleanup(Context context) throws IOException, InterruptedException {
69
        log.info("Cleaning up reducer...");
70
71
    }
72 40545 eri.katsar
}