Project

General

Profile

1 40545 eri.katsar
package eu.dnetlib.data.mapreduce.hbase.lodImport;
2
3 40979 giorgos.al
import java.io.IOException;
4 41076 giorgos.al
import java.sql.Connection;
5 41140 giorgos.al
import java.sql.PreparedStatement;
6 40979 giorgos.al
import java.util.ArrayList;
7
import java.util.Arrays;
8
import java.util.Iterator;
9
import java.util.List;
10
11 40728 giorgos.al
import org.apache.hadoop.conf.Configuration;
12 40729 eri.katsar
import org.apache.hadoop.io.NullWritable;
13 40545 eri.katsar
import org.apache.hadoop.io.Text;
14
import org.apache.hadoop.mapreduce.Reducer;
15
import org.apache.log4j.Logger;
16 40719 giorgos.al
import org.json.JSONObject;
17 45795 giorgos.al
18 41000 giorgos.al
import com.jolbox.bonecp.BoneCPDataSource;
19 40979 giorgos.al
20
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.DB;
21 41007 giorgos.al
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.MapCountries;
22
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.MapLanguages;
23 40979 giorgos.al
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.RDFizer;
24 40778 giorgos.al
25
26 40774 eri.katsar
public class LodImportReducer extends Reducer<Text, Text, NullWritable, NullWritable> {
27 40545 eri.katsar
28 40726 eri.katsar
    private Logger log = Logger.getLogger(LodImportReducer.class);
29 41000 giorgos.al
    private BoneCPDataSource ds;
30 40726 eri.katsar
    private String fileName = "";
31 40811 eri.katsar
    private Configuration hadoopConf;
32 42282 eri.katsar
    //    private VirtGraph graph;
33 40843 giorgos.al
    private int counter;
34 42719 giorgos.al
    private String[] buildQuery = new String[2];
35
    private String insertQuery = "";
36
    private String deleteQuery = "";
37 41286 giorgos.al
    private String defaultGraph = "";
38 41002 giorgos.al
    private int entitiesPerQuery;
39 41007 giorgos.al
    private MapCountries mapCountries;
40
    private MapLanguages mapLanguages;
41 41286 giorgos.al
    private Connection conn;
42 42722 giorgos.al
    private Connection dConn;
43 41335 giorgos.al
    private JSONObject entitiesMappings;
44
    private JSONObject relationsMappings;
45 40935 eri.katsar
46 41335 giorgos.al
47 45712 eri.katsar
    public static enum REDUCES_COUNTER {
48 41331 eri.katsar
        RESULT,
49
        PROJECT,
50
        DATASOURCE,
51
        PERSON,
52
        ORGANIZATION,
53
        TOTAL_ENTITIES,
54
        TOTAL_RELATIONS
55
    }
56
57
58 40726 eri.katsar
    @Override
59
    protected void setup(Context context) throws IOException, InterruptedException {
60 40796 eri.katsar
        this.hadoopConf = context.getConfiguration();
61 40778 giorgos.al
        this.fileName = hadoopConf.get("lod.inputFile");
62 45743 giorgos.al
        if (!fileName.contains("relations")) defaultGraph = hadoopConf.get("lod.defaultGraph");
63 40935 eri.katsar
        else defaultGraph = hadoopConf.get("lod.relationsGraph");
64
        this.counter = 0;
65 41007 giorgos.al
        mapCountries = new MapCountries();
66
        mapLanguages = new MapLanguages();
67 42282 eri.katsar
68 45712 eri.katsar
        buildQuery[0] = "";
69
        buildQuery[1] = "";
70
71 41335 giorgos.al
        entitiesMappings = new JSONObject(hadoopConf.get("lod.jsonEntities"));
72
        relationsMappings = new JSONObject(hadoopConf.get("lod.jsonRels"));
73 41057 giorgos.al
74 45743 giorgos.al
        if (!fileName.contains("relations")) entitiesPerQuery = Integer.parseInt(hadoopConf.get("lod.entitiesPerQuery"));
75 41002 giorgos.al
        else entitiesPerQuery = Integer.parseInt(hadoopConf.get("lod.relationsPerQuery"));
76 40796 eri.katsar
77 40726 eri.katsar
        try {
78 42282 eri.katsar
            DB db = new DB();
79 41076 giorgos.al
            ds = db.getDatasource(hadoopConf.get("lod.conLine"), hadoopConf.get("lod.username"), hadoopConf.get("lod.password"), hadoopConf.get("lod.minCpart"), hadoopConf.get("lod.maxCpart"), hadoopConf.get("lod.part"));
80
            conn = ds.getConnection();
81 41140 giorgos.al
            conn.setAutoCommit(false);
82 40726 eri.katsar
        } catch (Exception e) {
83 40813 eri.katsar
            log.error(e.toString(), e);
84 40726 eri.katsar
        }
85 42282 eri.katsar
86 40726 eri.katsar
    }
87 40729 eri.katsar
88 40726 eri.katsar
    @Override
89 40811 eri.katsar
    protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
90 42282 eri.katsar
91 40935 eri.katsar
        if (entitiesMappings == null || relationsMappings == null) {
92
            throw new InterruptedException("Could not read json mappings!");
93
        }
94 40774 eri.katsar
        Iterator<Text> it = values.iterator();
95 42282 eri.katsar
96 40726 eri.katsar
        while (it.hasNext()) {
97 45712 eri.katsar
            String data = it.next().toString();
98 41331 eri.katsar
            String[] split = data.split(hadoopConf.get("lod.delim"));
99 40843 giorgos.al
            List<String> row = new ArrayList<String>(Arrays.asList(split));
100 41331 eri.katsar
101 45712 eri.katsar
            if (!fileName.contains("relations")) {
102
                buildQuery = RDFizer.RDFizeEntityRow(row, ds, entitiesMappings, hadoopConf, mapCountries, mapLanguages, defaultGraph);
103
                insertQuery += buildQuery[0];
104
                deleteQuery += buildQuery[1];
105 40935 eri.katsar
            } else {
106 42719 giorgos.al
                buildQuery = RDFizer.RDFizeRelationRow(row, relationsMappings, hadoopConf);
107 45712 eri.katsar
                insertQuery += buildQuery[0];
108 40843 giorgos.al
            }
109 41331 eri.katsar
110
        }
111 45743 giorgos.al
112 41076 giorgos.al
        counter++;
113 45743 giorgos.al
        if (counter > entitiesPerQuery ) {
114
        	 try {
115
                 PreparedStatement ps = conn.prepareStatement("DB.DBA.TTLP_MT (?, ?, ?, ?, ?, ?,?)");
116
                 ps.setString(1, insertQuery);
117
                 ps.setString(2, "");
118
                 ps.setString(3, defaultGraph);
119
                 final int NQUAD_LEVEL = 64;
120
                 ps.setInt(4, NQUAD_LEVEL);
121
                 ps.setInt(5, 0);
122 45819 giorgos.al
                 ps.setInt(6, 15);
123 45743 giorgos.al
                 ps.setInt(7, 0);
124
                 ps.execute();
125
126
                 conn.commit();
127 45712 eri.katsar
128 45743 giorgos.al
                 ps.close();
129 45712 eri.katsar
130 45743 giorgos.al
                 buildQuery = new String[2];
131
                 buildQuery[0] = "";
132
                 buildQuery[1] = "";
133
                 insertQuery = "";
134 45712 eri.katsar
135 45743 giorgos.al
                 if (!fileName.contains("relations")) {
136
                     //s// TODO: keep counters for all entity types
137 45819 giorgos.al
                	 context.getCounter(REDUCES_COUNTER.TOTAL_ENTITIES).increment(counter);
138 45743 giorgos.al
                 } else {
139 45819 giorgos.al
                     context.getCounter(REDUCES_COUNTER.TOTAL_RELATIONS).increment(counter);
140 45743 giorgos.al
                 }
141 45819 giorgos.al
142
                 counter = 0;
143 45743 giorgos.al
        }catch(Exception e){
144
        	log.error("Virtuoso write failed  at  query: " + insertQuery + " \n and with error " + e.toString(), e);
145 40935 eri.katsar
            try {
146 45743 giorgos.al
                if (conn != null) {
147
                    conn.rollback();
148
                    conn.close();
149
                    conn = ds.getConnection();
150
                    conn.setAutoCommit(false);
151 45712 eri.katsar
                } else {
152 45743 giorgos.al
                    conn.close();
153
                    conn = ds.getConnection();
154
                    conn.setAutoCommit(false);
155 41331 eri.katsar
                }
156
157 45819 giorgos.al
                PreparedStatement ps = conn.prepareStatement("DB.DBA.TTLP_MT (?, ?, ?, ?, ?, ?,?)");
158
                ps.setString(1, insertQuery);
159
                ps.setString(2, "");
160
                ps.setString(3, defaultGraph);
161
                final int NQUAD_LEVEL = 64;
162
                ps.setInt(4, NQUAD_LEVEL);
163
                ps.setInt(5, 0);
164
                ps.setInt(6, 15);
165
                ps.setInt(7, 0);
166
                ps.execute();
167
168
                conn.commit();
169
170
                ps.close();
171
172
                buildQuery = new String[2];
173
                buildQuery[0] = "";
174
                buildQuery[1] = "";
175
                insertQuery = "";
176
177
                if (!fileName.contains("relations")) {
178
                    //s// TODO: keep counters for all entity types
179
               	 context.getCounter(REDUCES_COUNTER.TOTAL_ENTITIES).increment(counter);
180
                } else {
181
                    context.getCounter(REDUCES_COUNTER.TOTAL_RELATIONS).increment(counter);
182
                }
183
184
                counter = 0;
185 45743 giorgos.al
            } catch (Exception e1) {
186
                log.error("Getting another Connection fail:" + e1.toString(), e1);
187 42282 eri.katsar
            }
188 45743 giorgos.al
          }
189
190 40843 giorgos.al
        }
191 40545 eri.katsar
192 40726 eri.katsar
    }
193 40545 eri.katsar
194 40942 eri.katsar
195 45743 giorgos.al
196 40726 eri.katsar
    @Override
197
    protected void cleanup(Context context) throws IOException, InterruptedException {
198 40935 eri.katsar
        try {
199 45769 giorgos.al
        	log.info("########### \n Cleanup Started. Commiting last Triples...\n##########");
200
        	PreparedStatement ps = conn.prepareStatement("DB.DBA.TTLP_MT (?, ?, ?, ?, ?, ?,?)");
201 42719 giorgos.al
            ps.setString(1, insertQuery);
202 41140 giorgos.al
            ps.setString(2, "");
203
            ps.setString(3, defaultGraph);
204 41286 giorgos.al
            final int NQUAD_LEVEL = 64;
205 41140 giorgos.al
            ps.setInt(4, NQUAD_LEVEL);
206 42240 giorgos.al
            ps.setInt(5, 0);
207 45819 giorgos.al
            ps.setInt(6, 15);
208 41140 giorgos.al
            ps.setInt(7, 0);
209 45769 giorgos.al
            ps.execute();
210
211 41313 giorgos.al
            conn.commit();
212 42282 eri.katsar
213 42240 giorgos.al
            ps.close();
214 45769 giorgos.al
            if (!fileName.contains("relations")) {
215
                //s// TODO: keep counters for all entity types
216 45819 giorgos.al
           	 context.getCounter(REDUCES_COUNTER.TOTAL_ENTITIES).increment(counter);
217 45769 giorgos.al
            } else {
218 45819 giorgos.al
                context.getCounter(REDUCES_COUNTER.TOTAL_RELATIONS).increment(counter);
219 45769 giorgos.al
            }
220 42282 eri.katsar
221 40935 eri.katsar
        } catch (Exception e) {
222 45769 giorgos.al
        	log.error("Virtuoso write failed  at  query: " + insertQuery + " \n and with error " + e.toString(), e);
223
            try {
224
                if (conn != null) {
225
                    conn.rollback();
226
                    conn.close();
227
                    conn = ds.getConnection();
228
                    conn.setAutoCommit(false);
229
                    PreparedStatement ps = conn.prepareStatement("DB.DBA.TTLP_MT (?, ?, ?, ?, ?, ?,?)");
230
                    ps.setString(1, insertQuery);
231
                    ps.setString(2, "");
232
                    ps.setString(3, defaultGraph);
233
                    final int NQUAD_LEVEL = 64;
234
                    ps.setInt(4, NQUAD_LEVEL);
235
                    ps.setInt(5, 0);
236 45819 giorgos.al
                    ps.setInt(6, 15);
237 45769 giorgos.al
                    ps.setInt(7, 0);
238
                    ps.execute();
239
240
                    conn.commit();
241
242
                    ps.close();
243
                    if (!fileName.contains("relations")) {
244
                        //s// TODO: keep counters for all entity types
245 45819 giorgos.al
                   	 context.getCounter(REDUCES_COUNTER.TOTAL_ENTITIES).increment(counter);
246 45769 giorgos.al
                    } else {
247 45819 giorgos.al
                        context.getCounter(REDUCES_COUNTER.TOTAL_RELATIONS).increment(counter);
248 45769 giorgos.al
                    }
249
                } else {
250
                    conn.close();
251
                    conn = ds.getConnection();
252
                    conn.setAutoCommit(false);
253
                    PreparedStatement ps = conn.prepareStatement("DB.DBA.TTLP_MT (?, ?, ?, ?, ?, ?,?)");
254
                    ps.setString(1, insertQuery);
255
                    ps.setString(2, "");
256
                    ps.setString(3, defaultGraph);
257
                    final int NQUAD_LEVEL = 64;
258
                    ps.setInt(4, NQUAD_LEVEL);
259
                    ps.setInt(5, 0);
260 45819 giorgos.al
                    ps.setInt(6, 15);
261 45769 giorgos.al
                    ps.setInt(7, 0);
262
                    ps.execute();
263
264
                    conn.commit();
265
266
                    ps.close();
267
                    if (!fileName.contains("relations")) {
268
                        //s// TODO: keep counters for all entity types
269 45819 giorgos.al
                   	 context.getCounter(REDUCES_COUNTER.TOTAL_ENTITIES).increment(counter);
270 45769 giorgos.al
                    } else {
271 45819 giorgos.al
                        context.getCounter(REDUCES_COUNTER.TOTAL_RELATIONS).increment(counter);
272 45769 giorgos.al
                    }
273
                }
274
275
            } catch (Exception e1) {
276
                log.error("Getting another Connection fail:" + e1.toString(), e1);
277
            }
278 40935 eri.katsar
        } finally {
279 40966 giorgos.al
            log.info("Cleaning up reducer...\nClosing Graph and Datasource");
280 45712 eri.katsar
            try {
281 42282 eri.katsar
                conn.commit();
282
                conn.close();
283 45712 eri.katsar
            } catch (Exception e) {
284 42282 eri.katsar
                log.error("Could not Close connection \n" + e.toString(), e);
285 40966 giorgos.al
            }
286 42282 eri.katsar
287 40966 giorgos.al
            log.info("Graph and Datasource Closed...");
288 40935 eri.katsar
        }
289 40726 eri.katsar
    }
290 40935 eri.katsar
291 40545 eri.katsar
}