Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodImport;
2

    
3
import java.io.IOException;
4
import java.sql.Connection;
5
import java.sql.PreparedStatement;
6
import java.util.ArrayList;
7
import java.util.Arrays;
8
import java.util.Iterator;
9
import java.util.List;
10

    
11
import org.apache.hadoop.conf.Configuration;
12
import org.apache.hadoop.io.NullWritable;
13
import org.apache.hadoop.io.Text;
14
import org.apache.hadoop.mapreduce.Reducer;
15
import org.apache.log4j.Logger;
16
import org.json.JSONObject;
17

    
18
import com.jolbox.bonecp.BoneCPDataSource;
19

    
20
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.DB;
21
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.MapCountries;
22
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.MapLanguages;
23
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.RDFizer;
24

    
25

    
26
public class LodImportReducer extends Reducer<Text, Text, NullWritable, NullWritable> {
27

    
28
    private Logger log = Logger.getLogger(LodImportReducer.class);
29
    private BoneCPDataSource ds;
30
    private String fileName = "";
31
    private Configuration hadoopConf;
32
    //    private VirtGraph graph;
33
    private int counter;
34
    private int commitCounter;
35
    private String[] buildQuery = new String[2];
36
    private String insertQuery = "";
37
    private String deleteQuery = "";
38
    private String defaultGraph = "";
39
    private int entitiesPerQuery;
40
    private MapCountries mapCountries;
41
    private MapLanguages mapLanguages;
42
    private Connection conn;
43
    private Connection dConn;
44
    private JSONObject entitiesMappings;
45
    private JSONObject relationsMappings;
46

    
47

    
48
    public static enum REDUCES_COUNTER {
49
        RESULT,
50
        PROJECT,
51
        DATASOURCE,
52
        PERSON,
53
        ORGANIZATION,
54
        TOTAL_ENTITIES,
55
        TOTAL_RELATIONS
56
    }
57

    
58

    
59
    @Override
60
    protected void setup(Context context) throws IOException, InterruptedException {
61
        this.hadoopConf = context.getConfiguration();
62
        this.fileName = hadoopConf.get("lod.inputFile");
63
        if (!fileName.contains("relations")) defaultGraph = hadoopConf.get("lod.defaultGraph");
64
        else defaultGraph = hadoopConf.get("lod.relationsGraph");
65
        this.counter = 0;
66
        mapCountries = new MapCountries();
67
        mapLanguages = new MapLanguages();
68

    
69
        buildQuery[0] = "";
70
        buildQuery[1] = "";
71

    
72
        entitiesMappings = new JSONObject(hadoopConf.get("lod.jsonEntities"));
73
        relationsMappings = new JSONObject(hadoopConf.get("lod.jsonRels"));
74

    
75
        if (!fileName.contains("relations")) entitiesPerQuery = Integer.parseInt(hadoopConf.get("lod.entitiesPerQuery"));
76
        else entitiesPerQuery = Integer.parseInt(hadoopConf.get("lod.relationsPerQuery"));
77

    
78
        try {
79
            DB db = new DB();
80
            ds = db.getDatasource(hadoopConf.get("lod.conLine"), hadoopConf.get("lod.username"), hadoopConf.get("lod.password"), hadoopConf.get("lod.minCpart"), hadoopConf.get("lod.maxCpart"), hadoopConf.get("lod.part"));
81
            conn = ds.getConnection();
82
            conn.setAutoCommit(false);
83
        } catch (Exception e) {
84
            log.error(e.toString(), e);
85
        }
86

    
87
    }
88

    
89
    @Override
90
    protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
91

    
92
        if (entitiesMappings == null || relationsMappings == null) {
93
            throw new InterruptedException("Could not read json mappings!");
94
        }
95
        Iterator<Text> it = values.iterator();
96

    
97
        while (it.hasNext()) {
98
            String data = it.next().toString();
99
            String[] split = data.split(hadoopConf.get("lod.delim"));
100
            List<String> row = new ArrayList<String>(Arrays.asList(split));
101

    
102
            if (!fileName.contains("relations")) {
103
                buildQuery = RDFizer.RDFizeEntityRow(row, ds, entitiesMappings, hadoopConf, mapCountries, mapLanguages, defaultGraph);
104
                insertQuery += buildQuery[0];
105
                deleteQuery += buildQuery[1];
106
            } else {
107
                buildQuery = RDFizer.RDFizeRelationRow(row, relationsMappings, hadoopConf);
108
                insertQuery += buildQuery[0];
109
            }
110

    
111
        }
112
        
113
        counter++;
114
        if (counter > entitiesPerQuery ) {
115
        	 try {
116
                 PreparedStatement ps = conn.prepareStatement("DB.DBA.TTLP_MT (?, ?, ?, ?, ?, ?,?)");
117
                 ps.setString(1, insertQuery);
118
                 ps.setString(2, "");
119
                 ps.setString(3, defaultGraph);
120
                 final int NQUAD_LEVEL = 64;
121
                 ps.setInt(4, NQUAD_LEVEL);
122
                 ps.setInt(5, 0);
123
                 ps.setInt(6, 12);
124
                 ps.setInt(7, 0);
125
                 ps.execute();                 
126
                 
127
                 conn.commit();
128

    
129
                 ps.close();
130

    
131
                 buildQuery = new String[2];
132
                 buildQuery[0] = "";
133
                 buildQuery[1] = "";
134
                 insertQuery = "";
135

    
136
                 counter = 0;
137
                 if (!fileName.contains("relations")) {
138
                     //s// TODO: keep counters for all entity types
139
                	 context.getCounter(REDUCES_COUNTER.TOTAL_ENTITIES).increment(entitiesPerQuery);    
140
                 } else {
141
                     context.getCounter(REDUCES_COUNTER.TOTAL_RELATIONS).increment(entitiesPerQuery);
142
                 }   	
143
        }catch(Exception e){
144
        	log.error("Virtuoso write failed  at  query: " + insertQuery + " \n and with error " + e.toString(), e);
145
            try {
146
                if (conn != null) {
147
                    conn.rollback();
148
                    conn.close();
149
                    conn = ds.getConnection();
150
                    conn.setAutoCommit(false);
151
                } else {
152
                    conn.close();
153
                    conn = ds.getConnection();
154
                    conn.setAutoCommit(false);
155
                }
156

    
157
            } catch (Exception e1) {
158
                log.error("Getting another Connection fail:" + e1.toString(), e1);
159
            }
160
          }
161
       	
162
        }
163

    
164
    }
165

    
166

    
167

    
168
    @Override
169
    protected void cleanup(Context context) throws IOException, InterruptedException {
170
        try {
171
        	log.info("########### \n Cleanup Started. Commiting last Triples...\n##########");
172
        	PreparedStatement ps = conn.prepareStatement("DB.DBA.TTLP_MT (?, ?, ?, ?, ?, ?,?)");
173
            ps.setString(1, insertQuery);
174
            ps.setString(2, "");
175
            ps.setString(3, defaultGraph);
176
            final int NQUAD_LEVEL = 64;
177
            ps.setInt(4, NQUAD_LEVEL);
178
            ps.setInt(5, 0);
179
            ps.setInt(6, 12);
180
            ps.setInt(7, 0);
181
            ps.execute();                 
182
            
183
            conn.commit();
184

    
185
            ps.close();
186
            if (!fileName.contains("relations")) {
187
                //s// TODO: keep counters for all entity types
188
           	 context.getCounter(REDUCES_COUNTER.TOTAL_ENTITIES).increment(entitiesPerQuery);    
189
            } else {
190
                context.getCounter(REDUCES_COUNTER.TOTAL_RELATIONS).increment(entitiesPerQuery);
191
            }   	
192

    
193
        } catch (Exception e) {
194
        	log.error("Virtuoso write failed  at  query: " + insertQuery + " \n and with error " + e.toString(), e);
195
            try {
196
                if (conn != null) {
197
                    conn.rollback();
198
                    conn.close();
199
                    conn = ds.getConnection();
200
                    conn.setAutoCommit(false);
201
                    PreparedStatement ps = conn.prepareStatement("DB.DBA.TTLP_MT (?, ?, ?, ?, ?, ?,?)");
202
                    ps.setString(1, insertQuery);
203
                    ps.setString(2, "");
204
                    ps.setString(3, defaultGraph);
205
                    final int NQUAD_LEVEL = 64;
206
                    ps.setInt(4, NQUAD_LEVEL);
207
                    ps.setInt(5, 0);
208
                    ps.setInt(6, 12);
209
                    ps.setInt(7, 0);
210
                    ps.execute();                 
211
                    
212
                    conn.commit();
213

    
214
                    ps.close();
215
                    if (!fileName.contains("relations")) {
216
                        //s// TODO: keep counters for all entity types
217
                   	 context.getCounter(REDUCES_COUNTER.TOTAL_ENTITIES).increment(entitiesPerQuery);    
218
                    } else {
219
                        context.getCounter(REDUCES_COUNTER.TOTAL_RELATIONS).increment(entitiesPerQuery);
220
                    }   	
221
                } else {
222
                    conn.close();
223
                    conn = ds.getConnection();
224
                    conn.setAutoCommit(false);
225
                    PreparedStatement ps = conn.prepareStatement("DB.DBA.TTLP_MT (?, ?, ?, ?, ?, ?,?)");
226
                    ps.setString(1, insertQuery);
227
                    ps.setString(2, "");
228
                    ps.setString(3, defaultGraph);
229
                    final int NQUAD_LEVEL = 64;
230
                    ps.setInt(4, NQUAD_LEVEL);
231
                    ps.setInt(5, 0);
232
                    ps.setInt(6, 12);
233
                    ps.setInt(7, 0);
234
                    ps.execute();                 
235
                    
236
                    conn.commit();
237

    
238
                    ps.close();
239
                    if (!fileName.contains("relations")) {
240
                        //s// TODO: keep counters for all entity types
241
                   	 context.getCounter(REDUCES_COUNTER.TOTAL_ENTITIES).increment(entitiesPerQuery);    
242
                    } else {
243
                        context.getCounter(REDUCES_COUNTER.TOTAL_RELATIONS).increment(entitiesPerQuery);
244
                    }   	
245
                }
246

    
247
            } catch (Exception e1) {
248
                log.error("Getting another Connection fail:" + e1.toString(), e1);
249
            }
250
        } finally {
251
            log.info("Cleaning up reducer...\nClosing Graph and Datasource");
252
            try {
253
                conn.commit();
254
                conn.close();
255
            } catch (Exception e) {
256
                log.error("Could not Close connection \n" + e.toString(), e);
257
            }
258

    
259
            log.info("Graph and Datasource Closed...");
260
        }
261
    }
262

    
263
}
(2-2/2)