Project

General

Profile

1 40545 eri.katsar
package eu.dnetlib.data.mapreduce.hbase.lodImport;
2
3 40979 giorgos.al
import java.io.IOException;
4 41076 giorgos.al
import java.sql.Connection;
5 41140 giorgos.al
import java.sql.PreparedStatement;
6
import java.sql.SQLException;
7 41076 giorgos.al
import java.sql.Statement;
8 40979 giorgos.al
import java.util.ArrayList;
9
import java.util.Arrays;
10 41286 giorgos.al
import java.util.HashSet;
11 40979 giorgos.al
import java.util.Iterator;
12
import java.util.List;
13
14 40728 giorgos.al
import org.apache.hadoop.conf.Configuration;
15 40729 eri.katsar
import org.apache.hadoop.io.NullWritable;
16 40545 eri.katsar
import org.apache.hadoop.io.Text;
17
import org.apache.hadoop.mapreduce.Reducer;
18
import org.apache.log4j.Logger;
19 40719 giorgos.al
import org.json.JSONObject;
20 40979 giorgos.al
21 42726 giorgos.al
import com.hp.hpl.jena.rdf.model.StmtIterator;
22 41000 giorgos.al
import com.jolbox.bonecp.BoneCPDataSource;
23 40979 giorgos.al
24
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.DB;
25 41007 giorgos.al
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.MapCountries;
26
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.MapLanguages;
27 40979 giorgos.al
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.RDFizer;
28 40843 giorgos.al
import virtuoso.jena.driver.VirtGraph;
29 40778 giorgos.al
30
31 40774 eri.katsar
public class LodImportReducer extends Reducer<Text, Text, NullWritable, NullWritable> {
32 40545 eri.katsar
33 40726 eri.katsar
    private Logger log = Logger.getLogger(LodImportReducer.class);
34 41000 giorgos.al
    private BoneCPDataSource ds;
35 40726 eri.katsar
    private String fileName = "";
36 40811 eri.katsar
    private Configuration hadoopConf;
37 42282 eri.katsar
    //    private VirtGraph graph;
38 40843 giorgos.al
    private int counter;
39 41076 giorgos.al
    private int commitCounter;
40 42719 giorgos.al
    private String[] buildQuery = new String[2];
41
    private String insertQuery = "";
42
    private String deleteQuery = "";
43 41286 giorgos.al
    private String defaultGraph = "";
44 41002 giorgos.al
    private int entitiesPerQuery;
45 41007 giorgos.al
    private MapCountries mapCountries;
46
    private MapLanguages mapLanguages;
47 41286 giorgos.al
    private Connection conn;
48 42722 giorgos.al
    private Connection dConn;
49 41335 giorgos.al
    private JSONObject entitiesMappings;
50
    private JSONObject relationsMappings;
51 40935 eri.katsar
52 41335 giorgos.al
53 41331 eri.katsar
    public static enum ENTITIES_COUNTER {
54
        RESULT,
55
        PROJECT,
56
        DATASOURCE,
57
        PERSON,
58
        ORGANIZATION,
59
        TOTAL_ENTITIES,
60
        TOTAL_RELATIONS
61
    }
62
63
64 40726 eri.katsar
    @Override
65
    protected void setup(Context context) throws IOException, InterruptedException {
66 40796 eri.katsar
        this.hadoopConf = context.getConfiguration();
67 40778 giorgos.al
        this.fileName = hadoopConf.get("lod.inputFile");
68 40935 eri.katsar
        if (fileName.contains("entities")) defaultGraph = hadoopConf.get("lod.defaultGraph");
69
        else defaultGraph = hadoopConf.get("lod.relationsGraph");
70
        this.counter = 0;
71 41007 giorgos.al
        mapCountries = new MapCountries();
72
        mapLanguages = new MapLanguages();
73 42719 giorgos.al
74
        buildQuery[0]="";
75
        buildQuery[1]="";
76 42282 eri.katsar
77 41335 giorgos.al
        entitiesMappings = new JSONObject(hadoopConf.get("lod.jsonEntities"));
78
        relationsMappings = new JSONObject(hadoopConf.get("lod.jsonRels"));
79 41057 giorgos.al
80 41002 giorgos.al
        if (fileName.contains("entities")) entitiesPerQuery = Integer.parseInt(hadoopConf.get("lod.entitiesPerQuery"));
81
        else entitiesPerQuery = Integer.parseInt(hadoopConf.get("lod.relationsPerQuery"));
82 40796 eri.katsar
83 40726 eri.katsar
        try {
84 42282 eri.katsar
            DB db = new DB();
85 41076 giorgos.al
            ds = db.getDatasource(hadoopConf.get("lod.conLine"), hadoopConf.get("lod.username"), hadoopConf.get("lod.password"), hadoopConf.get("lod.minCpart"), hadoopConf.get("lod.maxCpart"), hadoopConf.get("lod.part"));
86 42722 giorgos.al
87 41076 giorgos.al
            conn = ds.getConnection();
88 41140 giorgos.al
            conn.setAutoCommit(false);
89 42722 giorgos.al
90 42822 giorgos.al
//            dConn = ds.getConnection();
91
//            dConn.setAutoCommit(false);
92 40726 eri.katsar
        } catch (Exception e) {
93 40813 eri.katsar
            log.error(e.toString(), e);
94 40811 eri.katsar
95 40726 eri.katsar
        }
96 42282 eri.katsar
97 41076 giorgos.al
//        this.graph = new VirtGraph(ds);
98 42282 eri.katsar
99 40726 eri.katsar
    }
100 40729 eri.katsar
101 40726 eri.katsar
    @Override
102 40811 eri.katsar
    protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
103 42282 eri.katsar
104 40935 eri.katsar
        if (entitiesMappings == null || relationsMappings == null) {
105
            throw new InterruptedException("Could not read json mappings!");
106
        }
107 40774 eri.katsar
        Iterator<Text> it = values.iterator();
108 42282 eri.katsar
109 40726 eri.katsar
        while (it.hasNext()) {
110 41331 eri.katsar
            String data=it.next().toString();
111
            String[] split = data.split(hadoopConf.get("lod.delim"));
112 40843 giorgos.al
            List<String> row = new ArrayList<String>(Arrays.asList(split));
113 41331 eri.katsar
114 40935 eri.katsar
            if (fileName.contains("entities")) {
115 42719 giorgos.al
                buildQuery = RDFizer.RDFizeEntityRow(row, ds, entitiesMappings, hadoopConf, mapCountries,mapLanguages,defaultGraph);
116
                insertQuery+=buildQuery[0];
117
                deleteQuery+=buildQuery[1];
118 40935 eri.katsar
            } else {
119 42719 giorgos.al
                buildQuery = RDFizer.RDFizeRelationRow(row, relationsMappings, hadoopConf);
120
                insertQuery+=buildQuery[0];
121 40843 giorgos.al
            }
122 41331 eri.katsar
123
        }
124 42282 eri.katsar
125 40934 giorgos.al
//        System.out.println("/nID-> "+key.toString()+"\n");
126
//        System.out.println("/nITCOUNTER-> "+itCounter+"\n");
127 41019 giorgos.al
//        System.err.println("\nCOUNTER "+counter);
128 40934 giorgos.al
//        System.out.println("so FAR: "+buildQuery);
129 41076 giorgos.al
        counter++;
130 41011 giorgos.al
        if (counter > entitiesPerQuery) {
131 41140 giorgos.al
//            String insertQuery = "SPARQL INSERT INTO <" + defaultGraph + "> { " + buildQuery + "}";
132
//            Statement stmt;
133
////            Connection conn;
134 42719 giorgos.al
        	if (fileName.contains("entities")) {
135
	        	try{
136 42808 giorgos.al
//	        		Statement stmt1;
137
//	        		stmt1 = dConn.createStatement();
138
//	        		stmt1.execute("__dbf_set ('enable_qp', 1)");
139
//	        		stmt1.close();
140 42727 giorgos.al
141 42822 giorgos.al
//	        		String deleteQueryString ="SPARQL DEFINE sql:log-enable 3 WITH <"+defaultGraph+"> DELETE {"+deleteQuery+"} WHERE {"+deleteQuery+"}" ;
142
//	        		Statement stmt;
143
//	        		stmt = dConn.createStatement();
144
//	    			stmt.execute(deleteQueryString);
145
//	    			dConn.commit();
146
//	    			stmt.close();
147 42723 giorgos.al
//	    			dConn.close();
148 42719 giorgos.al
149
	    			deleteQuery="";
150
151
	        	}catch (Exception e) {
152 42822 giorgos.al
//	                log.error("Virtuoso delete failed  at  query" + deleteQuery + " \n and with error " + e.toString(), e);
153
//	                try {
154
//	                    if(dConn!=null){
155
//	                    	dConn.rollback();
156
//	                    	dConn.close();
157
//	                    	dConn=ds.getConnection();
158
//	                    	dConn.setAutoCommit(false);
159
//	                    }else{
160
//	                    	dConn.close();
161
//	                    	dConn=ds.getConnection();
162
//	                    	dConn.setAutoCommit(false);
163
//	                    }
164
//
165
//	                } catch (Exception e1) {
166
//	                    log.error("Getting another Connection failed:"+ e1.toString(), e1);
167
//	                }
168 42719 giorgos.al
169
	            }
170
        	}
171 40935 eri.katsar
            try {
172 42282 eri.katsar
                PreparedStatement ps = conn.prepareStatement("DB.DBA.TTLP_MT (?, ?, ?, ?, ?, ?,?)");
173 42719 giorgos.al
                ps.setString(1, insertQuery);
174 41140 giorgos.al
                ps.setString(2, "");
175
                ps.setString(3, defaultGraph);
176 41331 eri.katsar
                final int NQUAD_LEVEL = 64;
177 41140 giorgos.al
                ps.setInt(4, NQUAD_LEVEL);
178 42240 giorgos.al
                ps.setInt(5, 0);
179
                ps.setInt(6, 15);
180 41140 giorgos.al
                ps.setInt(7, 0);
181
                ps.execute();
182 41313 giorgos.al
//                ps = conn.prepareStatement("commit work");
183
//                ps.execute();
184
                conn.commit();
185 42282 eri.katsar
186 41140 giorgos.al
                ps.close();
187
//
188 41076 giorgos.al
//    			conn.close();
189 42719 giorgos.al
                buildQuery = new String[2];
190
                buildQuery[0]="";
191
                buildQuery[1]="";
192
                insertQuery="";
193 41313 giorgos.al
//    			commitCounter+=counter;
194 42282 eri.katsar
                counter = 0;
195
196 41140 giorgos.al
//    			if(commitCounter>100){
197
//    				commitCounter=0;
198
////    				conn.commit();
199
//    				conn.close();
200
//    				conn=ds.getConnection();
201
//    			}
202 41331 eri.katsar
//
203
                if (fileName.contains("entities")) {
204
                    //// TODO: keep counters for all entity types
205 42732 giorgos.al
                    context.getCounter(ENTITIES_COUNTER.TOTAL_ENTITIES).increment(entitiesPerQuery);
206 41331 eri.katsar
207
                }
208
                else
209
                {  context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
210
                }
211
212 42282 eri.katsar
            } catch (Exception e) {
213 42722 giorgos.al
                log.error("Virtuoso write failed  at  query" + insertQuery + " \n and with error " + e.toString(), e);
214 42282 eri.katsar
                try {
215
                    if(conn!=null){
216
                        conn.rollback();
217
                        conn.close();
218
                        conn=ds.getConnection();
219
                        conn.setAutoCommit(false);
220
                    }else{
221
                        conn.close();
222
                        conn=ds.getConnection();
223
                        conn.setAutoCommit(false);
224
                    }
225 40935 eri.katsar
226 42282 eri.katsar
                } catch (Exception e1) {
227 42808 giorgos.al
                    log.error("Getting another Connection fail:"+ e1.toString(), e1);
228 42282 eri.katsar
                }
229
230
            }
231
232 40843 giorgos.al
        }
233 40545 eri.katsar
234 40726 eri.katsar
    }
235 40545 eri.katsar
236 40942 eri.katsar
237 40726 eri.katsar
    @Override
238
    protected void cleanup(Context context) throws IOException, InterruptedException {
239 41140 giorgos.al
//        String insertQuery = "SPARQL INSERT INTO <" + defaultGraph + "> { " + buildQuery + "}";
240
//        System.out.println("CLEAN-> NEW INSERT QUERY:  " + insertQuery);
241
//        Statement stmt;
242 40935 eri.katsar
        try {
243 42282 eri.katsar
            PreparedStatement ps = conn.prepareStatement("DB.DBA.TTLP_MT (?, ?, ?, ?, ?, ?,?)");
244 42719 giorgos.al
            ps.setString(1, insertQuery);
245 41140 giorgos.al
            ps.setString(2, "");
246
            ps.setString(3, defaultGraph);
247 41286 giorgos.al
            final int NQUAD_LEVEL = 64;
248 41140 giorgos.al
            ps.setInt(4, NQUAD_LEVEL);
249 42240 giorgos.al
            ps.setInt(5, 0);
250
            ps.setInt(6, 15);
251 41140 giorgos.al
            ps.setInt(7, 0);
252
            ps.execute();
253 41313 giorgos.al
            conn.commit();
254 41140 giorgos.al
            ps.close();
255 42282 eri.katsar
256 42240 giorgos.al
            ps = conn.prepareStatement("?");
257
            ps.setString(1,"commit work");
258
            ps.close();
259 42282 eri.katsar
260 40935 eri.katsar
        } catch (Exception e) {
261 42282 eri.katsar
            log.error("Virtuoso write failed  at  query" + buildQuery + " \n and with error " + e.toString(), e);
262 41002 giorgos.al
//            throw new InterruptedException("Virtuoso write failed  at  query" + insertQuery + " \n and with error " + e.toString());
263 40935 eri.katsar
        } finally {
264 40966 giorgos.al
            log.info("Cleaning up reducer...\nClosing Graph and Datasource");
265
            try{
266 42282 eri.katsar
                conn.commit();
267
                conn.close();
268 41076 giorgos.al
//            	graph.close();
269 41002 giorgos.al
//            	ds.close();
270 40966 giorgos.al
            }catch(Exception e){
271 42282 eri.katsar
                log.error("Could not Close connection \n" + e.toString(), e);
272 40966 giorgos.al
            }
273 42282 eri.katsar
274 40966 giorgos.al
            log.info("Graph and Datasource Closed...");
275 40935 eri.katsar
        }
276 40726 eri.katsar
    }
277 40935 eri.katsar
278 40545 eri.katsar
}