Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodImport;
2

    
3
import java.io.IOException;
4
import java.util.Iterator;
5
import java.util.List;
6

    
7
import javax.sql.DataSource;
8

    
9
import org.apache.hadoop.conf.Configuration;
10
import org.apache.hadoop.io.NullWritable;
11
import org.apache.hadoop.io.Text;
12
import org.apache.hadoop.mapreduce.Reducer;
13
import org.apache.log4j.Logger;
14
import org.json.JSONObject;
15

    
16
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.DB;
17
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.RDFizer;
18

    
19

    
20
import java.util.Arrays;
21

    
22

    
23

    
24

    
25
public class LodImportReducer extends Reducer<Text, Text, NullWritable, NullWritable> {
26

    
27
    private Logger log = Logger.getLogger(LodImportReducer.class);
28
    private DataSource ds;
29
    private String fileName = "";
30
    private Configuration hadoopConf; 
31

    
32
    @Override
33
    protected void setup(Context context) throws IOException, InterruptedException {
34
        this.fileName = hadoopConf.get("lod.inputFile");
35
        this.hadoopConf = context.getConfiguration();
36
        try {
37
            ds = DB.getDatasource(hadoopConf.get("lod.conLine"), hadoopConf.get("lod.username"), hadoopConf.get("lod.password"), hadoopConf.get("lod.minCpart"), hadoopConf.get("lod.maxCpart"), hadoopConf.get("lod.part"));
38
        } catch (Exception e) {
39
            log.error("", e);
40
            	
41
        }
42

    
43
    }
44

    
45
    @Override
46
    protected void reduce(final Text key, final Iterable <Text> values, final Context context) throws IOException, InterruptedException {
47
        JSONObject entitiesMappings = new JSONObject(hadoopConf.get("lod.jsonEntities"));
48
        JSONObject relationsMappings = new JSONObject(hadoopConf.get("lod.jsonRels"));
49

    
50
//		Connection conn = ds.getConnection();
51

    
52
        Iterator<Text> it = values.iterator();
53

    
54
        while (it.hasNext()) {
55
            String split= it.next().toString();
56
            List<String> row =   Arrays.asList(split);
57
            System.out.println(" Values : ");
58
            log.info(" Values : ");
59
            if (fileName.startsWith("entities")) RDFizer.RDFizeEntityRow(row, ds, entitiesMappings,hadoopConf);
60
            else RDFizer.RDFizeRelationRow(row, ds, relationsMappings,hadoopConf);
61
            
62

    
63
        }
64

    
65
    }
66

    
67
    @Override
68
    protected void cleanup(Context context) throws IOException, InterruptedException {
69
        log.info("Cleaning up reducer...");
70

    
71
    }
72
}
(2-2/2)