Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodImport;
2

    
3

    
4
import java.io.IOException;
5
import java.sql.Connection;
6
import java.sql.SQLException;
7
import java.sql.Statement;
8

    
9
import org.apache.hadoop.conf.Configuration;
10
import org.apache.hadoop.io.LongWritable;
11
import org.apache.hadoop.io.Text;
12
import org.apache.hadoop.mapreduce.Mapper;
13
import org.apache.log4j.Logger;
14
import org.joda.time.DateTime;
15
import org.joda.time.format.DateTimeFormat;
16
import org.joda.time.format.DateTimeFormatter;
17

    
18
import com.jolbox.bonecp.BoneCPDataSource;
19

    
20
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.DB;
21

    
22

    
23
public class LodImportMapper extends Mapper<LongWritable, Text, Text, Text> {
24
    private Logger log = Logger.getLogger(this.getClass());
25

    
26
    private String fileName = "";
27
    private String lastExecutionDate = "";
28

    
29

    
30
    private Configuration hadoopConf;
31
    private BoneCPDataSource ds;
32
    private Connection conn;
33
    String baseURI;
34
    private String defaultGraph = "";
35
    private String deleteQuery = "";
36
    private int counter;
37
    private int entitiesPerQuery;
38

    
39
    public static enum ENTITIES_COUNTER {
40
        STORED, DISCARDED,
41
        ENTITIES, RELATIONS, NULL_T_DATES, DELETED
42
    }
43

    
44
    @Override
45
    protected void setup(Mapper.Context context) throws IOException, InterruptedException {
46

    
47
        this.fileName = context.getConfiguration().get("lod.inputFile");
48

    
49
        this.hadoopConf = context.getConfiguration();
50

    
51
        try {
52
            DB db = new DB();
53
            ds = db.getDatasource(hadoopConf.get("lod.conLine"), hadoopConf.get("lod.username"), hadoopConf.get("lod.password"), "1", "1", "1");
54

    
55
            conn = ds.getConnection();
56
            conn.setAutoCommit(false);
57

    
58
            Statement stmt1;
59
            stmt1 = conn.createStatement();
60
            stmt1.execute("__dbf_set ('enable_qp', 1)");
61
            stmt1.close();
62
            conn.close();
63

    
64
        } catch (Exception e) {
65
            log.error(e.toString(), e);
66

    
67
        }
68

    
69
    }
70

    
71
    @Override
72
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
73
        //TODO remove enclosing  delim from csv -not needed
74

    
75
        String split[] = value.toString().split(context.getConfiguration().get("lod.delim"));
76
        if (split.length > 1) {
77
            Text outputKey = null;
78
            Text outputValue = null;
79
            try {
80
                outputKey = new Text(split[1]);
81
                outputValue = new Text(value.toString().replace("\\", " "));
82
            } catch (Exception e) {
83
                log.error(split.toString(), e);
84
            }
85

    
86
            if (fileName.contains("relations")) {
87
                outputKey = new Text(key.toString());
88
                writeOut(context, outputKey, outputValue);
89
                context.getCounter(ENTITIES_COUNTER.RELATIONS).increment(1);
90
            } else {
91
                context.getCounter(ENTITIES_COUNTER.ENTITIES).increment(1);
92
                writeOut(context, outputKey, outputValue);
93
            }
94
        }
95
    }
96

    
97
    @Override
98
    protected void cleanup(Context context) throws IOException, InterruptedException {
99
        super.cleanup(context);
100
        if(conn!=null){
101
	        try {
102
	            conn.close();
103
	        } catch (SQLException e) {
104
	            log.error(e.toString(), e);
105
	        }
106
        }
107
    }
108

    
109
    private void writeOut(Context context, Text outputKey, Text outputValue) throws IOException, InterruptedException {
110
        context.write(outputKey, outputValue);
111
    }
112

    
113
}
(1-1/2)