Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodImport;
2

    
3
import java.io.IOException;
4
import java.sql.Connection;
5
import java.sql.PreparedStatement;
6
import java.sql.SQLException;
7
import java.sql.Statement;
8
import java.util.ArrayList;
9
import java.util.Arrays;
10
import java.util.HashSet;
11
import java.util.Iterator;
12
import java.util.List;
13

    
14
import org.apache.hadoop.conf.Configuration;
15
import org.apache.hadoop.io.NullWritable;
16
import org.apache.hadoop.io.Text;
17
import org.apache.hadoop.mapreduce.Reducer;
18
import org.apache.log4j.Logger;
19
import org.json.JSONObject;
20

    
21
import com.hp.hpl.jena.rdf.model.StmtIterator;
22
import com.jolbox.bonecp.BoneCPDataSource;
23

    
24
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.DB;
25
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.MapCountries;
26
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.MapLanguages;
27
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.RDFizer;
28
import virtuoso.jena.driver.VirtGraph;
29

    
30

    
31
public class LodImportReducer extends Reducer<Text, Text, NullWritable, NullWritable> {
32

    
33
    private Logger log = Logger.getLogger(LodImportReducer.class);
34
    private BoneCPDataSource ds;
35
    private String fileName = "";
36
    private Configuration hadoopConf;
37
    //    private VirtGraph graph;
38
    private int counter;
39
    private int commitCounter;
40
    private String[] buildQuery = new String[2];
41
    private String insertQuery = "";
42
    private String deleteQuery = "";
43
    private String defaultGraph = "";
44
    private int entitiesPerQuery;
45
    private MapCountries mapCountries;
46
    private MapLanguages mapLanguages;
47
    private Connection conn;
48
    private Connection dConn;
49
    private JSONObject entitiesMappings;
50
    private JSONObject relationsMappings;
51

    
52

    
53
    public static enum ENTITIES_COUNTER {
54
        RESULT,
55
        PROJECT,
56
        DATASOURCE,
57
        PERSON,
58
        ORGANIZATION,
59
        TOTAL_ENTITIES,
60
        TOTAL_RELATIONS
61
    }
62

    
63

    
64
    @Override
65
    protected void setup(Context context) throws IOException, InterruptedException {
66
        this.hadoopConf = context.getConfiguration();
67
        this.fileName = hadoopConf.get("lod.inputFile");
68
        if (fileName.contains("entities")) defaultGraph = hadoopConf.get("lod.defaultGraph");
69
        else defaultGraph = hadoopConf.get("lod.relationsGraph");
70
        this.counter = 0;
71
        mapCountries = new MapCountries();
72
        mapLanguages = new MapLanguages();
73
        
74
        buildQuery[0]="";
75
        buildQuery[1]="";
76

    
77
        entitiesMappings = new JSONObject(hadoopConf.get("lod.jsonEntities"));
78
        relationsMappings = new JSONObject(hadoopConf.get("lod.jsonRels"));
79

    
80
        if (fileName.contains("entities")) entitiesPerQuery = Integer.parseInt(hadoopConf.get("lod.entitiesPerQuery"));
81
        else entitiesPerQuery = Integer.parseInt(hadoopConf.get("lod.relationsPerQuery"));
82

    
83
        try {
84
            DB db = new DB();
85
            ds = db.getDatasource(hadoopConf.get("lod.conLine"), hadoopConf.get("lod.username"), hadoopConf.get("lod.password"), hadoopConf.get("lod.minCpart"), hadoopConf.get("lod.maxCpart"), hadoopConf.get("lod.part"));
86
            
87
            conn = ds.getConnection();
88
            conn.setAutoCommit(false);
89
            
90
//            dConn = ds.getConnection();
91
//            dConn.setAutoCommit(false);
92
        } catch (Exception e) {
93
            log.error(e.toString(), e);
94

    
95
        }
96

    
97
//        this.graph = new VirtGraph(ds);
98

    
99
    }
100

    
101
    @Override
102
    protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
103

    
104
        if (entitiesMappings == null || relationsMappings == null) {
105
            throw new InterruptedException("Could not read json mappings!");
106
        }
107
        Iterator<Text> it = values.iterator();
108

    
109
        while (it.hasNext()) {
110
            String data=it.next().toString();
111
            String[] split = data.split(hadoopConf.get("lod.delim"));
112
            List<String> row = new ArrayList<String>(Arrays.asList(split));
113

    
114
            if (fileName.contains("entities")) {
115
                buildQuery = RDFizer.RDFizeEntityRow(row, ds, entitiesMappings, hadoopConf, mapCountries,mapLanguages,defaultGraph);
116
                insertQuery+=buildQuery[0];
117
                deleteQuery+=buildQuery[1];
118
            } else {
119
                buildQuery = RDFizer.RDFizeRelationRow(row, relationsMappings, hadoopConf);
120
                insertQuery+=buildQuery[0];
121
            }
122

    
123
        }
124

    
125
//        System.out.println("/nID-> "+key.toString()+"\n");
126
//        System.out.println("/nITCOUNTER-> "+itCounter+"\n");
127
//        System.err.println("\nCOUNTER "+counter);
128
//        System.out.println("so FAR: "+buildQuery);
129
        counter++;
130
        if (counter > entitiesPerQuery) {
131
//            String insertQuery = "SPARQL INSERT INTO <" + defaultGraph + "> { " + buildQuery + "}";
132
//            Statement stmt;
133
////            Connection conn;
134
        	if (fileName.contains("entities")) { 	
135
	        	try{
136
//	        		Statement stmt1;
137
//	        		stmt1 = dConn.createStatement();
138
//	        		stmt1.execute("__dbf_set ('enable_qp', 1)");
139
//	        		stmt1.close();
140
	        		
141
//	        		String deleteQueryString ="SPARQL DEFINE sql:log-enable 3 WITH <"+defaultGraph+"> DELETE {"+deleteQuery+"} WHERE {"+deleteQuery+"}" ;
142
//	        		Statement stmt;
143
//	        		stmt = dConn.createStatement();
144
//	    			stmt.execute(deleteQueryString);
145
//	    			dConn.commit();
146
//	    			stmt.close();
147
//	    			dConn.close();
148
	    			
149
	    			deleteQuery="";
150
	    			
151
	        	}catch (Exception e) {
152
//	                log.error("Virtuoso delete failed  at  query" + deleteQuery + " \n and with error " + e.toString(), e);
153
//	                try {
154
//	                    if(dConn!=null){
155
//	                    	dConn.rollback();
156
//	                    	dConn.close();
157
//	                    	dConn=ds.getConnection();
158
//	                    	dConn.setAutoCommit(false);
159
//	                    }else{
160
//	                    	dConn.close();
161
//	                    	dConn=ds.getConnection();
162
//	                    	dConn.setAutoCommit(false);
163
//	                    }
164
//	
165
//	                } catch (Exception e1) {
166
//	                    log.error("Getting another Connection failed:"+ e1.toString(), e1);
167
//	                }
168
	
169
	            }
170
        	}
171
            try {
172
                PreparedStatement ps = conn.prepareStatement("DB.DBA.TTLP_MT (?, ?, ?, ?, ?, ?,?)");
173
                ps.setString(1, insertQuery);
174
                ps.setString(2, "");
175
                ps.setString(3, defaultGraph);
176
                final int NQUAD_LEVEL = 64;
177
                ps.setInt(4, NQUAD_LEVEL);
178
                ps.setInt(5, 0);
179
                ps.setInt(6, 15);
180
                ps.setInt(7, 0);
181
                ps.execute();
182
//                ps = conn.prepareStatement("commit work");
183
//                ps.execute();
184
                conn.commit();
185

    
186
                ps.close();
187
//    
188
//    			conn.close();
189
                buildQuery = new String[2];
190
                buildQuery[0]="";
191
                buildQuery[1]="";
192
                insertQuery="";
193
//    			commitCounter+=counter;
194
                counter = 0;
195

    
196
//    			if(commitCounter>100){
197
//    				commitCounter=0;
198
////    				conn.commit();
199
//    				conn.close();
200
//    				conn=ds.getConnection();
201
//    			}
202
//
203
                if (fileName.contains("entities")) {
204
                    //// TODO: keep counters for all entity types
205
                    context.getCounter(ENTITIES_COUNTER.TOTAL_ENTITIES).increment(entitiesPerQuery);
206

    
207
                }
208
                else
209
                {  context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
210
                }
211

    
212
            } catch (Exception e) {
213
                log.error("Virtuoso write failed  at  query" + insertQuery + " \n and with error " + e.toString(), e);
214
                try {
215
                    if(conn!=null){
216
                        conn.rollback();
217
                        conn.close();
218
                        conn=ds.getConnection();
219
                        conn.setAutoCommit(false);
220
                    }else{
221
                        conn.close();
222
                        conn=ds.getConnection();
223
                        conn.setAutoCommit(false);
224
                    }
225

    
226
                } catch (Exception e1) {
227
                    log.error("Getting another Connection fail:"+ e1.toString(), e1);
228
                }
229

    
230
            }
231

    
232
        }
233

    
234
    }
235

    
236

    
237
    @Override
238
    protected void cleanup(Context context) throws IOException, InterruptedException {
239
//        String insertQuery = "SPARQL INSERT INTO <" + defaultGraph + "> { " + buildQuery + "}";
240
//        System.out.println("CLEAN-> NEW INSERT QUERY:  " + insertQuery);
241
//        Statement stmt;
242
        try {
243
            PreparedStatement ps = conn.prepareStatement("DB.DBA.TTLP_MT (?, ?, ?, ?, ?, ?,?)");
244
            ps.setString(1, insertQuery);
245
            ps.setString(2, "");
246
            ps.setString(3, defaultGraph);
247
            final int NQUAD_LEVEL = 64;
248
            ps.setInt(4, NQUAD_LEVEL);
249
            ps.setInt(5, 0);
250
            ps.setInt(6, 15);
251
            ps.setInt(7, 0);
252
            ps.execute();
253
            conn.commit();
254
            ps.close();
255

    
256
            ps = conn.prepareStatement("?");
257
            ps.setString(1,"commit work");
258
            ps.close();
259

    
260
        } catch (Exception e) {
261
            log.error("Virtuoso write failed  at  query" + buildQuery + " \n and with error " + e.toString(), e);
262
//            throw new InterruptedException("Virtuoso write failed  at  query" + insertQuery + " \n and with error " + e.toString());
263
        } finally {
264
            log.info("Cleaning up reducer...\nClosing Graph and Datasource");
265
            try{
266
                conn.commit();
267
                conn.close();
268
//            	graph.close();
269
//            	ds.close();
270
            }catch(Exception e){
271
                log.error("Could not Close connection \n" + e.toString(), e);
272
            }
273

    
274
            log.info("Graph and Datasource Closed...");
275
        }
276
    }
277

    
278
}
(2-2/2)