Project

General

Profile

« Previous | Next » 

Revision 45712

Added by Eri Katsari about 7 years ago

Cleaning up code, adding some counters

View differences:

LodImportReducer.java
3 3
import java.io.IOException;
4 4
import java.sql.Connection;
5 5
import java.sql.PreparedStatement;
6
import java.sql.SQLException;
7
import java.sql.Statement;
8 6
import java.util.ArrayList;
9 7
import java.util.Arrays;
10
import java.util.HashSet;
11 8
import java.util.Iterator;
12 9
import java.util.List;
13 10

  
......
17 14
import org.apache.hadoop.mapreduce.Reducer;
18 15
import org.apache.log4j.Logger;
19 16
import org.json.JSONObject;
20

  
21
import com.hp.hpl.jena.rdf.model.StmtIterator;
22 17
import com.jolbox.bonecp.BoneCPDataSource;
23 18

  
24 19
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.DB;
25 20
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.MapCountries;
26 21
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.MapLanguages;
27 22
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.RDFizer;
28
import virtuoso.jena.driver.VirtGraph;
29 23

  
30 24

  
31 25
public class LodImportReducer extends Reducer<Text, Text, NullWritable, NullWritable> {
......
50 44
    private JSONObject relationsMappings;
51 45

  
52 46

  
53
    public static enum ENTITIES_COUNTER {
47
    public static enum REDUCES_COUNTER {
54 48
        RESULT,
55 49
        PROJECT,
56 50
        DATASOURCE,
......
70 64
        this.counter = 0;
71 65
        mapCountries = new MapCountries();
72 66
        mapLanguages = new MapLanguages();
73
        
74
        buildQuery[0]="";
75
        buildQuery[1]="";
76 67

  
68
        buildQuery[0] = "";
69
        buildQuery[1] = "";
70

  
77 71
        entitiesMappings = new JSONObject(hadoopConf.get("lod.jsonEntities"));
78 72
        relationsMappings = new JSONObject(hadoopConf.get("lod.jsonRels"));
79 73

  
......
83 77
        try {
84 78
            DB db = new DB();
85 79
            ds = db.getDatasource(hadoopConf.get("lod.conLine"), hadoopConf.get("lod.username"), hadoopConf.get("lod.password"), hadoopConf.get("lod.minCpart"), hadoopConf.get("lod.maxCpart"), hadoopConf.get("lod.part"));
86
            
87 80
            conn = ds.getConnection();
88 81
            conn.setAutoCommit(false);
89
            
90
//            dConn = ds.getConnection();
91
//            dConn.setAutoCommit(false);
92 82
        } catch (Exception e) {
93 83
            log.error(e.toString(), e);
94

  
95 84
        }
96 85

  
97
//        this.graph = new VirtGraph(ds);
98

  
99 86
    }
100 87

  
101 88
    @Override
......
107 94
        Iterator<Text> it = values.iterator();
108 95

  
109 96
        while (it.hasNext()) {
110
            String data=it.next().toString();
97
            String data = it.next().toString();
111 98
            String[] split = data.split(hadoopConf.get("lod.delim"));
112 99
            List<String> row = new ArrayList<String>(Arrays.asList(split));
113 100

  
114
            if (fileName.contains("entities")) {
115
                buildQuery = RDFizer.RDFizeEntityRow(row, ds, entitiesMappings, hadoopConf, mapCountries,mapLanguages,defaultGraph);
116
                insertQuery+=buildQuery[0];
117
                deleteQuery+=buildQuery[1];
101
            if (!fileName.contains("relations")) {
102
                buildQuery = RDFizer.RDFizeEntityRow(row, ds, entitiesMappings, hadoopConf, mapCountries, mapLanguages, defaultGraph);
103
                insertQuery += buildQuery[0];
104
                deleteQuery += buildQuery[1];
118 105
            } else {
119 106
                buildQuery = RDFizer.RDFizeRelationRow(row, relationsMappings, hadoopConf);
120
                insertQuery+=buildQuery[0];
107
                insertQuery += buildQuery[0];
121 108
            }
122 109

  
123 110
        }
124

  
125
//        System.out.println("/nID-> "+key.toString()+"\n");
126
//        System.out.println("/nITCOUNTER-> "+itCounter+"\n");
127
//        System.err.println("\nCOUNTER "+counter);
128
//        System.out.println("so FAR: "+buildQuery);
129 111
        counter++;
130 112
        if (counter > entitiesPerQuery) {
131
//            String insertQuery = "SPARQL INSERT INTO <" + defaultGraph + "> { " + buildQuery + "}";
132
//            Statement stmt;
133
////            Connection conn;
134
        	if (fileName.contains("entities")) { 	
135
	        	try{
113
            if (!fileName.contains("relations")) {
114
                try {
136 115
//	        		Statement stmt1;
137 116
//	        		stmt1 = dConn.createStatement();
138 117
//	        		stmt1.execute("__dbf_set ('enable_qp', 1)");
139 118
//	        		stmt1.close();
140
	        		
119

  
141 120
//	        		String deleteQueryString ="SPARQL DEFINE sql:log-enable 3 WITH <"+defaultGraph+"> DELETE {"+deleteQuery+"} WHERE {"+deleteQuery+"}" ;
142 121
//	        		Statement stmt;
143 122
//	        		stmt = dConn.createStatement();
......
145 124
//	    			dConn.commit();
146 125
//	    			stmt.close();
147 126
//	    			dConn.close();
148
	    			
149
	    			deleteQuery="";
150
	    			
151
	        	}catch (Exception e) {
127

  
128
                    deleteQuery = "";
129

  
130
                } catch (Exception e) {
152 131
//	                log.error("Virtuoso delete failed  at  query" + deleteQuery + " \n and with error " + e.toString(), e);
153 132
//	                try {
154 133
//	                    if(dConn!=null){
......
165 144
//	                } catch (Exception e1) {
166 145
//	                    log.error("Getting another Connection failed:"+ e1.toString(), e1);
167 146
//	                }
168
	
169
	            }
170
        	}
147

  
148
                }
149
            }
171 150
            try {
172 151
                PreparedStatement ps = conn.prepareStatement("DB.DBA.TTLP_MT (?, ?, ?, ?, ?, ?,?)");
173 152
                ps.setString(1, insertQuery);
......
187 166
//    
188 167
//    			conn.close();
189 168
                buildQuery = new String[2];
190
                buildQuery[0]="";
191
                buildQuery[1]="";
192
                insertQuery="";
169
                buildQuery[0] = "";
170
                buildQuery[1] = "";
171
                insertQuery = "";
193 172
//    			commitCounter+=counter;
194 173
                counter = 0;
195 174

  
......
200 179
//    				conn=ds.getConnection();
201 180
//    			}
202 181
//
203
                if (fileName.contains("entities")) {
204
                    //// TODO: keep counters for all entity types
205
                    context.getCounter(ENTITIES_COUNTER.TOTAL_ENTITIES).increment(entitiesPerQuery);
182
                if (!fileName.contains("relations")) {
183
                    //s// TODO: keep counters for all entity types
184
                    context.getCounter(REDUCES_COUNTER.TOTAL_ENTITIES).increment(entitiesPerQuery);
206 185

  
186
                } else {
187
                    context.getCounter(REDUCES_COUNTER.TOTAL_RELATIONS).increment(1);
207 188
                }
208
                else
209
                {  context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
210
                }
211 189

  
212 190
            } catch (Exception e) {
213 191
                log.error("Virtuoso write failed  at  query" + insertQuery + " \n and with error " + e.toString(), e);
214 192
                try {
215
                    if(conn!=null){
193
                    if (conn != null) {
216 194
                        conn.rollback();
217 195
                        conn.close();
218
                        conn=ds.getConnection();
196
                        conn = ds.getConnection();
219 197
                        conn.setAutoCommit(false);
220
                    }else{
198
                    } else {
221 199
                        conn.close();
222
                        conn=ds.getConnection();
200
                        conn = ds.getConnection();
223 201
                        conn.setAutoCommit(false);
224 202
                    }
225 203

  
226 204
                } catch (Exception e1) {
227
                    log.error("Getting another Connection fail:"+ e1.toString(), e1);
205
                    log.error("Getting another Connection fail:" + e1.toString(), e1);
228 206
                }
229 207

  
230 208
            }
......
254 232
            ps.close();
255 233

  
256 234
            ps = conn.prepareStatement("?");
257
            ps.setString(1,"commit work");
235
            ps.setString(1, "commit work");
258 236
            ps.close();
259 237

  
260 238
        } catch (Exception e) {
......
262 240
//            throw new InterruptedException("Virtuoso write failed  at  query" + insertQuery + " \n and with error " + e.toString());
263 241
        } finally {
264 242
            log.info("Cleaning up reducer...\nClosing Graph and Datasource");
265
            try{
243
            try {
266 244
                conn.commit();
267 245
                conn.close();
268 246
//            	graph.close();
269 247
//            	ds.close();
270
            }catch(Exception e){
248
            } catch (Exception e) {
271 249
                log.error("Could not Close connection \n" + e.toString(), e);
272 250
            }
273 251

  

Also available in: Unified diff