Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodImport;
2

    
3

    
4
import java.io.IOException;
5
import java.sql.Connection;
6
import java.sql.SQLException;
7
import java.sql.Statement;
8

    
9
import org.apache.hadoop.conf.Configuration;
10
import org.apache.hadoop.io.LongWritable;
11
import org.apache.hadoop.io.Text;
12
import org.apache.hadoop.mapreduce.Mapper;
13
import org.apache.log4j.Logger;
14
import org.joda.time.DateTime;
15
import org.joda.time.format.DateTimeFormat;
16
import org.joda.time.format.DateTimeFormatter;
17

    
18
import com.jolbox.bonecp.BoneCPDataSource;
19

    
20
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.DB;
21

    
22

    
23
public class LodImportMapper extends Mapper<LongWritable, Text, Text, Text> {
24
    private Logger log = Logger.getLogger(this.getClass());
25

    
26
    private String fileName = "";
27
    private String lastExecutionDate = "";
28

    
29

    
30
    private Configuration hadoopConf;
31
    private BoneCPDataSource ds;
32
    private Connection conn;
33
    String baseURI;
34
    private String defaultGraph = "";
35
    private String deleteQuery = "";
36
    private int counter;
37
    private int entitiesPerQuery;
38

    
39
    public static enum ENTITIES_COUNTER {
40
        STORED, DISCARDED,
41
        ENTITIES, RELATIONS, NULL_T_DATES, DELETED
42
    }
43

    
44
    @Override
45
    protected void setup(Mapper.Context context) throws IOException, InterruptedException {
46

    
47
        this.fileName = context.getConfiguration().get("lod.inputFile");
48
        this.lastExecutionDate = context.getConfiguration().get("lod.lastExecutionDate");
49

    
50
        this.hadoopConf = context.getConfiguration();
51

    
52
        if (fileName.contains("entities")) {
53
            defaultGraph = hadoopConf.get("lod.defaultGraph");
54
            entitiesPerQuery = Integer.parseInt(hadoopConf.get("lod.entitiesPerQuery"));
55
        } else {
56
            defaultGraph = hadoopConf.get("lod.relationsGraph");
57
            entitiesPerQuery = Integer.parseInt(hadoopConf.get("lod.relationsPerQuery"));
58
        }
59

    
60
        baseURI = hadoopConf.get("lod.baseURI");
61

    
62
        try {
63
            DB db = new DB();
64
            ds = db.getDatasource(hadoopConf.get("lod.conLine"), hadoopConf.get("lod.username"), hadoopConf.get("lod.password"), "1", "1", "1");
65

    
66
            conn = ds.getConnection();
67
            conn.setAutoCommit(false);
68

    
69
            Statement stmt1;
70
            stmt1 = conn.createStatement();
71
            stmt1.execute("__dbf_set ('enable_qp', 1)");
72
            stmt1.close();
73

    
74

    
75
        } catch (Exception e) {
76
            log.error(e.toString(), e);
77

    
78
        }
79

    
80
    }
81

    
82
    @Override
83
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
84
        //TODO remove enclosing  delim from csv -not needed
85

    
86
        String split[] = value.toString().split(context.getConfiguration().get("lod.delim"));
87
        if (split.length > 1) {
88
            Text outputKey = null;
89
            Text outputValue = null;
90
            String resourceURI = "";
91
            try {
92
                outputKey = new Text(split[1]);
93
                outputValue = new Text(value.toString().replace("\\", " "));
94
                resourceURI = baseURI + split[0] + "/" + split[1];
95
            } catch (Exception e) {
96
                log.error(split.toString(), e);
97
            }
98

    
99
            if (fileName.contains("relations")) {
100
                outputKey = new Text(key.toString());
101
                writeOut(context, outputKey, outputValue);
102
                context.getCounter(ENTITIES_COUNTER.RELATIONS).increment(1);
103
            } else {
104
                context.getCounter(ENTITIES_COUNTER.ENTITIES).increment(1);
105
                writeOut(context, outputKey, outputValue);
106
            }
107
        }
108
    }
109

    
110
    @Override
111
    protected void cleanup(Context context) throws IOException, InterruptedException {
112
        super.cleanup(context);
113
        try {
114
            conn.close();
115
        } catch (SQLException e) {
116
            log.error(e.toString(), e);
117
        }
118

    
119
    }
120

    
121
    private void writeOut(Context context, Text outputKey, Text outputValue) throws IOException, InterruptedException {
122
        context.write(outputKey, outputValue);
123
    }
124

    
125
}
(1-1/2)