Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodImport;
2

    
3
import java.io.IOException;
4
import java.sql.Connection;
5
import java.sql.PreparedStatement;
6
import java.sql.SQLException;
7
import java.sql.Statement;
8
import java.util.ArrayList;
9
import java.util.Arrays;
10
import java.util.HashSet;
11
import java.util.Iterator;
12
import java.util.List;
13

    
14
import org.apache.hadoop.conf.Configuration;
15
import org.apache.hadoop.io.NullWritable;
16
import org.apache.hadoop.io.Text;
17
import org.apache.hadoop.mapreduce.Reducer;
18
import org.apache.log4j.Logger;
19
import org.json.JSONObject;
20

    
21
import com.jolbox.bonecp.BoneCPDataSource;
22

    
23
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.DB;
24
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.MapCountries;
25
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.MapLanguages;
26
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.RDFizer;
27
import virtuoso.jena.driver.VirtGraph;
28

    
29

    
30
public class LodImportReducer extends Reducer<Text, Text, NullWritable, NullWritable> {
31

    
32
    private Logger log = Logger.getLogger(LodImportReducer.class);
33
    private BoneCPDataSource ds;
34
    private String fileName = "";
35
    private Configuration hadoopConf;
36
    //    private VirtGraph graph;
37
    private int counter;
38
    private int commitCounter;
39
    private String[] buildQuery = new String[2];
40
    private String insertQuery = "";
41
    private String deleteQuery = "";
42
    private String defaultGraph = "";
43
    private int entitiesPerQuery;
44
    private MapCountries mapCountries;
45
    private MapLanguages mapLanguages;
46
    private Connection conn;
47
    private JSONObject entitiesMappings;
48
    private JSONObject relationsMappings;
49

    
50

    
51
    public static enum ENTITIES_COUNTER {
52
        RESULT,
53
        PROJECT,
54
        DATASOURCE,
55
        PERSON,
56
        ORGANIZATION,
57
        TOTAL_ENTITIES,
58
        TOTAL_RELATIONS
59
    }
60

    
61

    
62
    @Override
63
    protected void setup(Context context) throws IOException, InterruptedException {
64
        this.hadoopConf = context.getConfiguration();
65
        this.fileName = hadoopConf.get("lod.inputFile");
66
        if (fileName.contains("entities")) defaultGraph = hadoopConf.get("lod.defaultGraph");
67
        else defaultGraph = hadoopConf.get("lod.relationsGraph");
68
        this.counter = 0;
69
        mapCountries = new MapCountries();
70
        mapLanguages = new MapLanguages();
71
        
72
        buildQuery[0]="";
73
        buildQuery[1]="";
74

    
75
        entitiesMappings = new JSONObject(hadoopConf.get("lod.jsonEntities"));
76
        relationsMappings = new JSONObject(hadoopConf.get("lod.jsonRels"));
77

    
78
        if (fileName.contains("entities")) entitiesPerQuery = Integer.parseInt(hadoopConf.get("lod.entitiesPerQuery"));
79
        else entitiesPerQuery = Integer.parseInt(hadoopConf.get("lod.relationsPerQuery"));
80

    
81
        try {
82
            DB db = new DB();
83
            ds = db.getDatasource(hadoopConf.get("lod.conLine"), hadoopConf.get("lod.username"), hadoopConf.get("lod.password"), hadoopConf.get("lod.minCpart"), hadoopConf.get("lod.maxCpart"), hadoopConf.get("lod.part"));
84
            conn = ds.getConnection();
85
            conn.setAutoCommit(false);
86
        } catch (Exception e) {
87
            log.error(e.toString(), e);
88

    
89
        }
90

    
91
//        this.graph = new VirtGraph(ds);
92

    
93
    }
94

    
95
    @Override
96
    protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
97

    
98
        if (entitiesMappings == null || relationsMappings == null) {
99
            throw new InterruptedException("Could not read json mappings!");
100
        }
101
        Iterator<Text> it = values.iterator();
102

    
103
        while (it.hasNext()) {
104
            String data=it.next().toString();
105
            String[] split = data.split(hadoopConf.get("lod.delim"));
106
            List<String> row = new ArrayList<String>(Arrays.asList(split));
107

    
108
            if (fileName.contains("entities")) {
109
                buildQuery = RDFizer.RDFizeEntityRow(row, ds, entitiesMappings, hadoopConf, mapCountries,mapLanguages,defaultGraph);
110
                insertQuery+=buildQuery[0];
111
                deleteQuery+=buildQuery[1];
112
            } else {
113
                buildQuery = RDFizer.RDFizeRelationRow(row, relationsMappings, hadoopConf);
114
                insertQuery+=buildQuery[0];
115
            }
116

    
117
        }
118

    
119
//        System.out.println("/nID-> "+key.toString()+"\n");
120
//        System.out.println("/nITCOUNTER-> "+itCounter+"\n");
121
//        System.err.println("\nCOUNTER "+counter);
122
//        System.out.println("so FAR: "+buildQuery);
123
        counter++;
124
        if (counter > entitiesPerQuery) {
125
//            String insertQuery = "SPARQL INSERT INTO <" + defaultGraph + "> { " + buildQuery + "}";
126
//            Statement stmt;
127
////            Connection conn;
128
        	if (fileName.contains("entities")) { 	
129
	        	try{
130
	        		
131
	        		String deleteQueryString ="SPARQL DELETE DATA { GRAPH <"+defaultGraph+"> {"+deleteQuery+"}}" ;
132
	        		Statement stmt;
133
	        		stmt = conn.createStatement();
134
	    			stmt.execute(deleteQueryString);
135
	    			conn.commit();
136
	    			stmt.close();
137
	    			conn.close();
138
	    			
139
	    			deleteQuery="";
140
	    			
141
	        	}catch (Exception e) {
142
	                log.error("Virtuoso write failed  at  query" + buildQuery[1] + " \n and with error " + e.toString(), e);
143
	                try {
144
	                    if(conn!=null){
145
	                        conn.rollback();
146
	                        conn.close();
147
	                        conn=ds.getConnection();
148
	                        conn.setAutoCommit(false);
149
	                    }else{
150
	                        conn.close();
151
	                        conn=ds.getConnection();
152
	                        conn.setAutoCommit(false);
153
	                    }
154
	
155
	                } catch (Exception e1) {
156
	                    log.error("Getting another Connection failed:"+ e1.toString(), e1);
157
	                }
158
	
159
	            }
160
        	}
161
            try {
162
                PreparedStatement ps = conn.prepareStatement("DB.DBA.TTLP_MT (?, ?, ?, ?, ?, ?,?)");
163
                ps.setString(1, insertQuery);
164
                ps.setString(2, "");
165
                ps.setString(3, defaultGraph);
166
                final int NQUAD_LEVEL = 64;
167
                ps.setInt(4, NQUAD_LEVEL);
168
                ps.setInt(5, 0);
169
                ps.setInt(6, 15);
170
                ps.setInt(7, 0);
171
                ps.execute();
172
//                ps = conn.prepareStatement("commit work");
173
//                ps.execute();
174
                conn.commit();
175

    
176
                ps.close();
177
//    
178
//    			conn.close();
179
                buildQuery = new String[2];
180
                buildQuery[0]="";
181
                buildQuery[1]="";
182
                insertQuery="";
183
//    			commitCounter+=counter;
184
                counter = 0;
185

    
186
//    			if(commitCounter>100){
187
//    				commitCounter=0;
188
////    				conn.commit();
189
//    				conn.close();
190
//    				conn=ds.getConnection();
191
//    			}
192
//
193
                if (fileName.contains("entities")) {
194
                    //// TODO: keep counters for all entity types
195
                    context.getCounter(ENTITIES_COUNTER.TOTAL_ENTITIES).increment(1);
196

    
197
                }
198
                else
199
                {  context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
200
                }
201

    
202
            } catch (Exception e) {
203
                log.error("Virtuoso write failed  at  query" + buildQuery[0] + " \n and with error " + e.toString(), e);
204
                try {
205
                    if(conn!=null){
206
                        conn.rollback();
207
                        conn.close();
208
                        conn=ds.getConnection();
209
                        conn.setAutoCommit(false);
210
                    }else{
211
                        conn.close();
212
                        conn=ds.getConnection();
213
                        conn.setAutoCommit(false);
214
                    }
215

    
216
                } catch (Exception e1) {
217
                    log.error("Getting another Connection failed:"+ e1.toString(), e1);
218
                }
219

    
220
            }
221

    
222
        }
223

    
224
    }
225

    
226

    
227
    @Override
228
    protected void cleanup(Context context) throws IOException, InterruptedException {
229
//        String insertQuery = "SPARQL INSERT INTO <" + defaultGraph + "> { " + buildQuery + "}";
230
//        System.out.println("CLEAN-> NEW INSERT QUERY:  " + insertQuery);
231
//        Statement stmt;
232
        try {
233
            PreparedStatement ps = conn.prepareStatement("DB.DBA.TTLP_MT (?, ?, ?, ?, ?, ?,?)");
234
            ps.setString(1, insertQuery);
235
            ps.setString(2, "");
236
            ps.setString(3, defaultGraph);
237
            final int NQUAD_LEVEL = 64;
238
            ps.setInt(4, NQUAD_LEVEL);
239
            ps.setInt(5, 0);
240
            ps.setInt(6, 15);
241
            ps.setInt(7, 0);
242
            ps.execute();
243
            conn.commit();
244
            ps.close();
245

    
246
            ps = conn.prepareStatement("?");
247
            ps.setString(1,"commit work");
248
            ps.close();
249

    
250
        } catch (Exception e) {
251
            log.error("Virtuoso write failed  at  query" + buildQuery + " \n and with error " + e.toString(), e);
252
//            throw new InterruptedException("Virtuoso write failed  at  query" + insertQuery + " \n and with error " + e.toString());
253
        } finally {
254
            log.info("Cleaning up reducer...\nClosing Graph and Datasource");
255
            try{
256
                conn.commit();
257
                conn.close();
258
//            	graph.close();
259
//            	ds.close();
260
            }catch(Exception e){
261
                log.error("Could not Close connection \n" + e.toString(), e);
262
            }
263

    
264
            log.info("Graph and Datasource Closed...");
265
        }
266
    }
267

    
268
}
(2-2/2)