Revision 45712
Added by Eri Katsari about 7 years ago
LodImportReducer.java | ||
---|---|---|
3 | 3 |
import java.io.IOException; |
4 | 4 |
import java.sql.Connection; |
5 | 5 |
import java.sql.PreparedStatement; |
6 |
import java.sql.SQLException; |
|
7 |
import java.sql.Statement; |
|
8 | 6 |
import java.util.ArrayList; |
9 | 7 |
import java.util.Arrays; |
10 |
import java.util.HashSet; |
|
11 | 8 |
import java.util.Iterator; |
12 | 9 |
import java.util.List; |
13 | 10 |
|
... | ... | |
17 | 14 |
import org.apache.hadoop.mapreduce.Reducer; |
18 | 15 |
import org.apache.log4j.Logger; |
19 | 16 |
import org.json.JSONObject; |
20 |
|
|
21 |
import com.hp.hpl.jena.rdf.model.StmtIterator; |
|
22 | 17 |
import com.jolbox.bonecp.BoneCPDataSource; |
23 | 18 |
|
24 | 19 |
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.DB; |
25 | 20 |
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.MapCountries; |
26 | 21 |
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.MapLanguages; |
27 | 22 |
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.RDFizer; |
28 |
import virtuoso.jena.driver.VirtGraph; |
|
29 | 23 |
|
30 | 24 |
|
31 | 25 |
public class LodImportReducer extends Reducer<Text, Text, NullWritable, NullWritable> { |
... | ... | |
50 | 44 |
private JSONObject relationsMappings; |
51 | 45 |
|
52 | 46 |
|
53 |
public static enum ENTITIES_COUNTER {
|
|
47 |
public static enum REDUCES_COUNTER {
|
|
54 | 48 |
RESULT, |
55 | 49 |
PROJECT, |
56 | 50 |
DATASOURCE, |
... | ... | |
70 | 64 |
this.counter = 0; |
71 | 65 |
mapCountries = new MapCountries(); |
72 | 66 |
mapLanguages = new MapLanguages(); |
73 |
|
|
74 |
buildQuery[0]=""; |
|
75 |
buildQuery[1]=""; |
|
76 | 67 |
|
68 |
buildQuery[0] = ""; |
|
69 |
buildQuery[1] = ""; |
|
70 |
|
|
77 | 71 |
entitiesMappings = new JSONObject(hadoopConf.get("lod.jsonEntities")); |
78 | 72 |
relationsMappings = new JSONObject(hadoopConf.get("lod.jsonRels")); |
79 | 73 |
|
... | ... | |
83 | 77 |
try { |
84 | 78 |
DB db = new DB(); |
85 | 79 |
ds = db.getDatasource(hadoopConf.get("lod.conLine"), hadoopConf.get("lod.username"), hadoopConf.get("lod.password"), hadoopConf.get("lod.minCpart"), hadoopConf.get("lod.maxCpart"), hadoopConf.get("lod.part")); |
86 |
|
|
87 | 80 |
conn = ds.getConnection(); |
88 | 81 |
conn.setAutoCommit(false); |
89 |
|
|
90 |
// dConn = ds.getConnection(); |
|
91 |
// dConn.setAutoCommit(false); |
|
92 | 82 |
} catch (Exception e) { |
93 | 83 |
log.error(e.toString(), e); |
94 |
|
|
95 | 84 |
} |
96 | 85 |
|
97 |
// this.graph = new VirtGraph(ds); |
|
98 |
|
|
99 | 86 |
} |
100 | 87 |
|
101 | 88 |
@Override |
... | ... | |
107 | 94 |
Iterator<Text> it = values.iterator(); |
108 | 95 |
|
109 | 96 |
while (it.hasNext()) { |
110 |
String data=it.next().toString();
|
|
97 |
String data = it.next().toString();
|
|
111 | 98 |
String[] split = data.split(hadoopConf.get("lod.delim")); |
112 | 99 |
List<String> row = new ArrayList<String>(Arrays.asList(split)); |
113 | 100 |
|
114 |
if (fileName.contains("entities")) {
|
|
115 |
buildQuery = RDFizer.RDFizeEntityRow(row, ds, entitiesMappings, hadoopConf, mapCountries,mapLanguages,defaultGraph);
|
|
116 |
insertQuery+=buildQuery[0];
|
|
117 |
deleteQuery+=buildQuery[1];
|
|
101 |
if (!fileName.contains("relations")) {
|
|
102 |
buildQuery = RDFizer.RDFizeEntityRow(row, ds, entitiesMappings, hadoopConf, mapCountries, mapLanguages, defaultGraph);
|
|
103 |
insertQuery += buildQuery[0];
|
|
104 |
deleteQuery += buildQuery[1];
|
|
118 | 105 |
} else { |
119 | 106 |
buildQuery = RDFizer.RDFizeRelationRow(row, relationsMappings, hadoopConf); |
120 |
insertQuery+=buildQuery[0];
|
|
107 |
insertQuery += buildQuery[0];
|
|
121 | 108 |
} |
122 | 109 |
|
123 | 110 |
} |
124 |
|
|
125 |
// System.out.println("/nID-> "+key.toString()+"\n"); |
|
126 |
// System.out.println("/nITCOUNTER-> "+itCounter+"\n"); |
|
127 |
// System.err.println("\nCOUNTER "+counter); |
|
128 |
// System.out.println("so FAR: "+buildQuery); |
|
129 | 111 |
counter++; |
130 | 112 |
if (counter > entitiesPerQuery) { |
131 |
// String insertQuery = "SPARQL INSERT INTO <" + defaultGraph + "> { " + buildQuery + "}"; |
|
132 |
// Statement stmt; |
|
133 |
//// Connection conn; |
|
134 |
if (fileName.contains("entities")) { |
|
135 |
try{ |
|
113 |
if (!fileName.contains("relations")) { |
|
114 |
try { |
|
136 | 115 |
// Statement stmt1; |
137 | 116 |
// stmt1 = dConn.createStatement(); |
138 | 117 |
// stmt1.execute("__dbf_set ('enable_qp', 1)"); |
139 | 118 |
// stmt1.close(); |
140 |
|
|
119 |
|
|
141 | 120 |
// String deleteQueryString ="SPARQL DEFINE sql:log-enable 3 WITH <"+defaultGraph+"> DELETE {"+deleteQuery+"} WHERE {"+deleteQuery+"}" ; |
142 | 121 |
// Statement stmt; |
143 | 122 |
// stmt = dConn.createStatement(); |
... | ... | |
145 | 124 |
// dConn.commit(); |
146 | 125 |
// stmt.close(); |
147 | 126 |
// dConn.close(); |
148 |
|
|
149 |
deleteQuery="";
|
|
150 |
|
|
151 |
}catch (Exception e) {
|
|
127 |
|
|
128 |
deleteQuery = "";
|
|
129 |
|
|
130 |
} catch (Exception e) {
|
|
152 | 131 |
// log.error("Virtuoso delete failed at query" + deleteQuery + " \n and with error " + e.toString(), e); |
153 | 132 |
// try { |
154 | 133 |
// if(dConn!=null){ |
... | ... | |
165 | 144 |
// } catch (Exception e1) { |
166 | 145 |
// log.error("Getting another Connection failed:"+ e1.toString(), e1); |
167 | 146 |
// } |
168 |
|
|
169 |
}
|
|
170 |
}
|
|
147 |
|
|
148 |
}
|
|
149 |
}
|
|
171 | 150 |
try { |
172 | 151 |
PreparedStatement ps = conn.prepareStatement("DB.DBA.TTLP_MT (?, ?, ?, ?, ?, ?,?)"); |
173 | 152 |
ps.setString(1, insertQuery); |
... | ... | |
187 | 166 |
// |
188 | 167 |
// conn.close(); |
189 | 168 |
buildQuery = new String[2]; |
190 |
buildQuery[0]="";
|
|
191 |
buildQuery[1]="";
|
|
192 |
insertQuery="";
|
|
169 |
buildQuery[0] = "";
|
|
170 |
buildQuery[1] = "";
|
|
171 |
insertQuery = "";
|
|
193 | 172 |
// commitCounter+=counter; |
194 | 173 |
counter = 0; |
195 | 174 |
|
... | ... | |
200 | 179 |
// conn=ds.getConnection(); |
201 | 180 |
// } |
202 | 181 |
// |
203 |
if (fileName.contains("entities")) {
|
|
204 |
//// TODO: keep counters for all entity types |
|
205 |
context.getCounter(ENTITIES_COUNTER.TOTAL_ENTITIES).increment(entitiesPerQuery);
|
|
182 |
if (!fileName.contains("relations")) {
|
|
183 |
//s// TODO: keep counters for all entity types
|
|
184 |
context.getCounter(REDUCES_COUNTER.TOTAL_ENTITIES).increment(entitiesPerQuery);
|
|
206 | 185 |
|
186 |
} else { |
|
187 |
context.getCounter(REDUCES_COUNTER.TOTAL_RELATIONS).increment(1); |
|
207 | 188 |
} |
208 |
else |
|
209 |
{ context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1); |
|
210 |
} |
|
211 | 189 |
|
212 | 190 |
} catch (Exception e) { |
213 | 191 |
log.error("Virtuoso write failed at query" + insertQuery + " \n and with error " + e.toString(), e); |
214 | 192 |
try { |
215 |
if(conn!=null){
|
|
193 |
if (conn != null) {
|
|
216 | 194 |
conn.rollback(); |
217 | 195 |
conn.close(); |
218 |
conn=ds.getConnection();
|
|
196 |
conn = ds.getConnection();
|
|
219 | 197 |
conn.setAutoCommit(false); |
220 |
}else{
|
|
198 |
} else {
|
|
221 | 199 |
conn.close(); |
222 |
conn=ds.getConnection();
|
|
200 |
conn = ds.getConnection();
|
|
223 | 201 |
conn.setAutoCommit(false); |
224 | 202 |
} |
225 | 203 |
|
226 | 204 |
} catch (Exception e1) { |
227 |
log.error("Getting another Connection fail:"+ e1.toString(), e1); |
|
205 |
log.error("Getting another Connection fail:" + e1.toString(), e1);
|
|
228 | 206 |
} |
229 | 207 |
|
230 | 208 |
} |
... | ... | |
254 | 232 |
ps.close(); |
255 | 233 |
|
256 | 234 |
ps = conn.prepareStatement("?"); |
257 |
ps.setString(1,"commit work"); |
|
235 |
ps.setString(1, "commit work");
|
|
258 | 236 |
ps.close(); |
259 | 237 |
|
260 | 238 |
} catch (Exception e) { |
... | ... | |
262 | 240 |
// throw new InterruptedException("Virtuoso write failed at query" + insertQuery + " \n and with error " + e.toString()); |
263 | 241 |
} finally { |
264 | 242 |
log.info("Cleaning up reducer...\nClosing Graph and Datasource"); |
265 |
try{ |
|
243 |
try {
|
|
266 | 244 |
conn.commit(); |
267 | 245 |
conn.close(); |
268 | 246 |
// graph.close(); |
269 | 247 |
// ds.close(); |
270 |
}catch(Exception e){
|
|
248 |
} catch (Exception e) {
|
|
271 | 249 |
log.error("Could not Close connection \n" + e.toString(), e); |
272 | 250 |
} |
273 | 251 |
|
Also available in: Unified diff
Cleaning up code, adding some counters