Project

General

Profile

« Previous | Next » 

Revision 41076

DB changes

View differences:

modules/dnet-openaire-lodimport/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/lodImport/utils/DB.java
12 12

  
13 13
public class DB  {
14 14
	
15
	private static Logger log = Logger.getLogger(DB.class);
15
	private Logger log = Logger.getLogger(DB.class);
16 16

  
17
	 private static BoneCPDataSource ds;
18
	 private static BoneCP connectionPool;
17
	 private BoneCPDataSource ds;
18
	 private BoneCP connectionPool;
19 19
	
20 20
	 public DB(){
21 21
		 
......
52 52
//
53 53
//	    }
54 54
	 
55
	 public static BoneCPDataSource getDatasource(String conLine, String username, String password, String minCpart, String maxCpart, String part) throws IOException, SQLException, PropertyVetoException {
55
	 public BoneCPDataSource getDatasource(String conLine, String username, String password, String minCpart, String maxCpart, String part) throws IOException, SQLException, PropertyVetoException {
56 56
		 try {
57 57
//	            // load the database driver (make sure this is in your classpath!)
58 58
	            Class.forName("virtuoso.jdbc4.Driver");
......
75 75
	    }
76 76
	 
77 77
	 
78
	 public static BoneCP getConnectionPool(String conLine, String username, String password, String minCpart, String maxCpart, String part) throws IOException, SQLException, PropertyVetoException {
78
	 public  BoneCP getConnectionPool(String conLine, String username, String password, String minCpart, String maxCpart, String part) throws IOException, SQLException, PropertyVetoException {
79 79
		 try {
80 80
//	            // load the database driver (make sure this is in your classpath!)
81 81
	            Class.forName("virtuoso.jdbc4.Driver");
......
96 96
	       
97 97
	    }
98 98
 
99
	 public static void closeConnectionPool() throws SQLException {
99
	 public void closeConnectionPool() throws SQLException {
100 100
	        connectionPool.shutdown();
101 101
	    }
102 102
	 
103
	 public static void closeDatasource() throws SQLException {
103
	 public void closeDatasource() throws SQLException {
104 104
	        ds.close();
105 105
	    }
106 106
	
modules/dnet-openaire-lodimport/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/lodImport/utils/RDFizer.java
1 1
package eu.dnetlib.data.mapreduce.hbase.lodImport.utils;
2 2

  
3 3
import java.net.URI;
4
import java.sql.Connection;
4 5
import java.sql.Statement;
5 6
import java.util.List;
6 7

  
7
import javax.sql.DataSource;
8

  
9 8
import org.apache.hadoop.conf.Configuration;
10 9
import org.apache.log4j.Logger;
11 10
import org.json.JSONArray;
12 11
import org.json.JSONObject;
13 12

  
14
import com.hp.hpl.jena.query.ResultSet;
13
import com.jolbox.bonecp.BoneCPDataSource;
15 14

  
16
import virtuoso.jena.driver.VirtGraph;
17
import virtuoso.jena.driver.VirtuosoQueryExecution;
18
import virtuoso.jena.driver.VirtuosoQueryExecutionFactory;
19
import virtuoso.jena.driver.VirtuosoUpdateFactory;
20
import virtuoso.jena.driver.VirtuosoUpdateRequest;
21

  
22 15
public class RDFizer {
23 16
	
24 17
	private static Logger log = Logger.getLogger(RDFizer.class);
......
26 19
	public RDFizer() {
27 20
	}
28 21

  
29
	public static String RDFizeEntityRow(List<String> row,VirtGraph graph, JSONObject mappings, Configuration conf, MapCountries mapCountries,MapLanguages mapLanguages){
30
//		VirtModel model = new VirtModel(graph);
22
//	public static String RDFizeEntityRow(List<String> row,VirtGraph graph, JSONObject mappings, Configuration conf, MapCountries mapCountries,MapLanguages mapLanguages){
23
	public static String RDFizeEntityRow(List<String> row,BoneCPDataSource ds, JSONObject mappings, Configuration conf, MapCountries mapCountries,MapLanguages mapLanguages){
31 24
		
25
		//		VirtModel model = new VirtModel(graph);
26
		
27
		String graph = conf.get("lod.defaultGraph");
28
		
32 29
		String insertQuery ="";
33 30
		
34 31
		String baseURI = conf.get("lod.baseURI");
35 32
		String id = row.get(1).toString();
36 33
		String type = row.get(0).toString();
37 34
		String resourceURI = baseURI+type+"/"+id;
38
		String askQuery = "SELECT ?o FROM <"+conf.get("lod.defaultGraph")+"> WHERE {<"+resourceURI+"> ?p ?o}";
39
		VirtuosoQueryExecution vqe = VirtuosoQueryExecutionFactory.create (askQuery, graph);
35
//		String askQuery = "SELECT ?o FROM <"+conf.get("lod.defaultGraph")+"> WHERE {<"+resourceURI+"> ?p ?o}";
36
//		VirtuosoQueryExecution vqe = VirtuosoQueryExecutionFactory.create (askQuery, graph);
40 37
		boolean ask = false;
41
		ResultSet rs = vqe.execSelect();
42
		if(rs.hasNext()) ask=true;
43
		vqe.close();
38
		try{
39
			ask = resourceExists(ds.getConnection(), resourceURI, graph);
40
		}catch(Exception e){
41
			log.error("Could not ASK "+e.toString(),e);
42
		}
43
//		ResultSet rs = vqe.execSelect();
44
//		if(rs.hasNext()) ask=true;
45
//		vqe.close();
44 46
		if(ask){
45
			String deleteQueryString ="DELETE FROM <"+conf.get("lod.defaultGraph")+"> {<"+resourceURI+"> ?p ?o} WHERE {<"+resourceURI+"> ?p ?o}" ;
46
			VirtuosoUpdateRequest vur = VirtuosoUpdateFactory.create(deleteQueryString, graph);
47
//			System.out.println("DELETED:\t"+deleteQueryString);
48
			vur.exec();
47
			try{
48
				deleteResource(ds.getConnection(), resourceURI, graph);
49
			}catch(Exception e){
50
				log.error("Could not DELETE  "+resourceURI+" "+e.toString(),e);
51
			}
52
//			String deleteQueryString ="DELETE FROM <"+conf.get("lod.defaultGraph")+"> {<"+resourceURI+"> ?p ?o} WHERE {<"+resourceURI+"> ?p ?o}" ;
53
//			VirtuosoUpdateRequest vur = VirtuosoUpdateFactory.create(deleteQueryString, graph);
54
////			System.out.println("DELETED:\t"+deleteQueryString);
55
//			vur.exec();
49 56
		}
50 57
		
51 58
	    JSONArray typeMappings = mappings.getJSONArray(type);
......
55 62
            String index = new Integer(i).toString();
56 63
            String propertyString;
57 64
            
65
            
66
            if(type.equals("project") && i>24) continue;
67
            if(type.equals("organization") && i>11) continue;
68
            
58 69
            try{
59 70
            	propertyString = propertyObject.getString(index);
60 71
            }catch(Exception e){
61
            	log.error("Could not get the property:  "+e.toString(),e);
72
            	log.error("Could not get the property for type "+type+" :  "+e.toString(),e);
62 73
            	continue;
63 74
            }
64 75
            
......
74 85
			if(i==16 && type.equals("result")  && !value.equals("und") || i==11 && type.equals("person")  && !value.equals("und") || i==9 && type.equals("organization") && !value.equals("und")){
75 86
				try{
76 87
					String countryURI = mapCountries.getCountryURI(value);
88
					URI uri = new URI(countryURI);
77 89
					if(countryURI.equals("")){
78 90
						insertQuery+="; <"+propertyString+"> \""+value+"\"";
79 91
						continue;
80 92
					}
81
					insertQuery+="; <"+propertyString+"> <"+countryURI+">";
93
					insertQuery+="; <"+propertyString+"> <"+uri+">";
82 94
//					log.info("COUNTRY "+countryURI+"  FROM  "+value);
83 95
				}catch(Exception e){
84 96
					insertQuery+="; <"+propertyString+"> \""+value+"\"";
......
91 103
			if(i==9 && type.equals("result") && !value.equals("und")){
92 104
				try{
93 105
					String langURI = mapLanguages.getLangURI(value);
106
					URI uri = new URI(langURI);
94 107
					if(langURI.equals("")){
95 108
						insertQuery+="; <"+propertyString+"> \""+value+"\"";
96 109
						continue;
97 110
					}
98
					insertQuery+="; <"+propertyString+"> <"+langURI+">";
111
					insertQuery+="; <"+propertyString+"> <"+uri+">";
99 112
//					log.info("LANG "+langURI+"  FROM  "+value.trim());
100 113
				}catch(Exception e){
101 114
					insertQuery+="; <"+propertyString+"> \""+value+"\"";
......
133 146
		
134 147
	}
135 148
	
136
	public static String RDFizeRelationRow(List<String> row, VirtGraph graph, JSONObject mappings, Configuration conf){
149
//	public static String RDFizeRelationRow(List<String> row, VirtGraph graph, JSONObject mappings, Configuration conf){
150
	public static String RDFizeRelationRow(List<String> row, JSONObject mappings, Configuration conf){
151
		
137 152
			String insertQuery = "";
138 153
			String baseURI = conf.get("lod.baseURI");
139 154
			JSONObject typeMappings = mappings.getJSONArray(row.get(0)).getJSONObject(0);
......
148 163
			return insertQuery;
149 164
		}
150 165
	
151
	public static void clearGraph(String graph, VirtGraph virtGraph){
152
		String clearQuery="DEFINE sql:log-enable 3 CLEAR GRAPH  <"+graph+">";
153
		VirtuosoUpdateRequest vur = VirtuosoUpdateFactory.create(clearQuery, virtGraph);
154
		vur.exec();
166
	public static boolean resourceExists(Connection conn, String resourceURI, String graph){
167
		boolean exists = false;
168
		String askQuery = "SPARQL SELECT ?o FROM <"+graph+"> WHERE {<"+resourceURI+"> ?p ?o}";
169
		Statement stmt;
170
		try {
171
			stmt = conn.createStatement();
172
//			ResultSet rs=	stmt.executeQuery(askQuery);
173
			exists = stmt.execute(askQuery);
174
//			if(rs.next()) exists=true;
175
			stmt.close();
176
			conn.commit();
177
			conn.close();
178
		} catch (Exception e) {
179
			log.error("Virtuoso ask Query failed. Checkpoint was "+ askQuery +"\n" + e.toString(), e);
180
		}
181
		return exists;
155 182
	}
156 183
	
157
	public static void setCheckpoint(DataSource ds, String checkpointValue){
184
	public static void deleteResource(Connection conn, String resourceURI, String graph){
185
		String deleteQueryString ="SPARQL DELETE FROM <"+graph+"> {<"+resourceURI+"> ?p ?o} WHERE {<"+resourceURI+"> ?p ?o}" ;
158 186
		Statement stmt;
159 187
		try {
160
			ds.getConnection().createStatement();
161
			stmt = ds.getConnection().createStatement();
188
			stmt = conn.createStatement();
189
			stmt.execute(deleteQueryString);
190
			stmt.close();
191
			conn.commit();
192
			conn.close();
193
		} catch (Exception e) {
194
			log.error("Virtuoso ask Query failed. Checkpoint was "+ deleteQueryString +"\n" + e.toString(), e);
195
		}
196
	}
197
	
198
	public static void clearGraph(String graph, Connection conn){
199
		String clearQuery="SPARQL DEFINE sql:log-enable 3 CLEAR GRAPH  <"+graph+">";
200
		Statement stmt;
201
		try {
202
			stmt = conn.createStatement();
203
			stmt.execute(clearQuery);
204
			stmt.close();
205
			conn.commit();
206
			conn.close();
207
		} catch (Exception e) {
208
			log.error("Virtuoso FAILED TO CLEAR graph "+ clearQuery +"\n" + e.toString(), e);
209
		}
210
	}
211

  
212
	public static void setCheckpoint(Connection conn, String checkpointValue){
213
		Statement stmt;
214
		try {
215
			stmt = conn.createStatement();
162 216
			stmt.execute("checkpoint_interval("+checkpointValue+");");
163 217
			stmt.close();
218
			conn.commit();
219
			conn.close();
164 220
		} catch (Exception e) {
165 221
			log.error("Virtuoso set checkpoint failed. Checkpoint was "+ checkpointValue +"\n" + e.toString(), e);
166 222
		}
167
		
168
		
169
		
170 223
	}
171 224
}
modules/dnet-openaire-lodimport/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/lodImport/LodImportReducer.java
1 1
package eu.dnetlib.data.mapreduce.hbase.lodImport;
2 2

  
3 3
import java.io.IOException;
4
import java.sql.Connection;
5
import java.sql.Statement;
4 6
import java.util.ArrayList;
5 7
import java.util.Arrays;
6 8
import java.util.Iterator;
......
20 22
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.MapLanguages;
21 23
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.RDFizer;
22 24
import virtuoso.jena.driver.VirtGraph;
23
import virtuoso.jena.driver.VirtuosoUpdateFactory;
24
import virtuoso.jena.driver.VirtuosoUpdateRequest;
25 25

  
26 26

  
27 27
public class LodImportReducer extends Reducer<Text, Text, NullWritable, NullWritable> {
......
32 32
    private Configuration hadoopConf;
33 33
    private VirtGraph graph;
34 34
    private int counter;
35
    private int commitCounter;
35 36
    private String buildQuery = "";
36 37
    String defaultGraph = "";
37 38
    private int entitiesPerQuery;
38 39
    private MapCountries mapCountries;
39 40
    private MapLanguages mapLanguages;
41
    Connection conn;
40 42

  
41 43
    @Override
42 44
    protected void setup(Context context) throws IOException, InterruptedException {
......
52 54
        else entitiesPerQuery = Integer.parseInt(hadoopConf.get("lod.relationsPerQuery"));
53 55

  
54 56
        try {
55
            ds = DB.getDatasource(hadoopConf.get("lod.conLine"), hadoopConf.get("lod.username"), hadoopConf.get("lod.password"), hadoopConf.get("lod.minCpart"), hadoopConf.get("lod.maxCpart"), hadoopConf.get("lod.part"));
57
        	DB db = new DB();
58
            ds = db.getDatasource(hadoopConf.get("lod.conLine"), hadoopConf.get("lod.username"), hadoopConf.get("lod.password"), hadoopConf.get("lod.minCpart"), hadoopConf.get("lod.maxCpart"), hadoopConf.get("lod.part"));
59
            conn = ds.getConnection();
56 60
        } catch (Exception e) {
57 61
            log.error(e.toString(), e);
58 62

  
59 63
        }
60

  
61
        this.graph = new VirtGraph(ds);
64
       
65
//        this.graph = new VirtGraph(ds);
66
        
62 67
    }
63 68

  
64 69
    @Override
......
76 81
            List<String> row = new ArrayList<String>(Arrays.asList(split));
77 82
//            log.info("\nRow is:" +row.toString());
78 83
            if (fileName.contains("entities")) {
79
                buildQuery += RDFizer.RDFizeEntityRow(row, graph, entitiesMappings, hadoopConf, mapCountries,mapLanguages);
84
//                buildQuery += RDFizer.RDFizeEntityRow(row, graph, entitiesMappings, hadoopConf, mapCountries,mapLanguages);
85
            	  buildQuery += RDFizer.RDFizeEntityRow(row, ds, entitiesMappings, hadoopConf, mapCountries,mapLanguages);
80 86
            } else {
81
                buildQuery += RDFizer.RDFizeRelationRow(row, graph, relationsMappings, hadoopConf);
87
//                buildQuery += RDFizer.RDFizeRelationRow(row, graph, relationsMappings, hadoopConf);
88
            	buildQuery += RDFizer.RDFizeRelationRow(row, relationsMappings, hadoopConf);
82 89
            }
83 90
        }
91
        
84 92
//        System.out.println("/nID-> "+key.toString()+"\n");
85 93
//        System.out.println("/nITCOUNTER-> "+itCounter+"\n");
86 94
//        System.err.println("\nCOUNTER "+counter);
87
         counter++;
88 95
//        System.out.println("so FAR: "+buildQuery);
96
        counter++;
89 97
        if (counter > entitiesPerQuery) {
90
            String insertQuery = "INSERT INTO <" + defaultGraph + "> { " + buildQuery + "}";
98
            String insertQuery = "SPARQL INSERT INTO <" + defaultGraph + "> { " + buildQuery + "}";
99
            Statement stmt;
100
//            Connection conn;
91 101
            try {
92
//                Query q = QueryFactory.create(insertQuery);
93
//                VirtuosoUpdateRequest q2ur = VirtuosoUpdateFactory.create(q.serialize(), graph);
94
                VirtuosoUpdateRequest q2ur = VirtuosoUpdateFactory.create(insertQuery, graph);
95
                q2ur.exec();
96
                counter = 0;
97
                buildQuery = "";
98
            } catch (Exception e) {
99
                log.error("Virtuoso write failed  at  query" + insertQuery + " \n and with error " + e.toString(), e);
100
//              throw new InterruptedException("Virtuoso write failed  at  query : " + insertQuery + " \n  and with error " + e.toString());
101
            }
102
//            	conn = ds.getConnection();
103
    			stmt = conn.createStatement();
104
    			stmt.execute(insertQuery);
105
    			stmt.close();
106
//    			conn.close();
107
    			buildQuery = "";
108
    			commitCounter+=counter;
109
    			counter = 0;
110
    			if(commitCounter>100){
111
    				commitCounter=0;
112
    				System.err.println("commit Started "+commitCounter);
113
    				conn.commit();
114
    				conn.close();
115
    				conn=ds.getConnection();
116
    			}
117
    			
118
    		} catch (Exception e) {
119
    			log.error("Virtuoso write failed  at  query" + insertQuery + " \n and with error " + e.toString(), e);
120
    		}
121
            
122
            
123
            
124
//            try {
125
////                Query q = QueryFactory.create(insertQuery);
126
////                VirtuosoUpdateRequest q2ur = VirtuosoUpdateFactory.create(q.serialize(), graph);
127
//                VirtuosoUpdateRequest q2ur = VirtuosoUpdateFactory.create(insertQuery, graph);
128
//                q2ur.exec();
129
//                counter = 0;
130
//                buildQuery = "";
131
//            } catch (Exception e) {
132
//                log.error("Virtuoso write failed  at  query" + insertQuery + " \n and with error " + e.toString(), e);
133
////              throw new InterruptedException("Virtuoso write failed  at  query : " + insertQuery + " \n  and with error " + e.toString());
134
//            }
102 135

  
103 136
        }
104 137

  
......
107 140

  
108 141
    @Override
109 142
    protected void cleanup(Context context) throws IOException, InterruptedException {
110
        String insertQuery = "INSERT INTO <" + defaultGraph + "> { " + buildQuery + "}";
143
        String insertQuery = "SPARQL INSERT INTO <" + defaultGraph + "> { " + buildQuery + "}";
111 144
        System.out.println("CLEAN-> NEW INSERT QUERY:  " + insertQuery);
145
        Statement stmt;
146
//        Connection conn;
112 147
        try {
113

  
114

  
148
//        	conn.close();
149
//        	conn = ds.getConnection();
150
			stmt = conn.createStatement();
151
			stmt.execute(insertQuery);
152
			stmt.close();
115 153
//            Query q = QueryFactory.create(insertQuery);
116 154
//            VirtuosoUpdateRequest q2ur = VirtuosoUpdateFactory.create(q.serialize(), graph);
117
            VirtuosoUpdateRequest q2ur = VirtuosoUpdateFactory.create(insertQuery, graph);
118

  
119
            q2ur.exec();
155
//            VirtuosoUpdateRequest q2ur = VirtuosoUpdateFactory.create(insertQuery, graph);
156
//            q2ur.exec();
120 157
        } catch (Exception e) {
121 158
            log.error("Virtuoso write failed  at  query" + insertQuery + " \n and with error " + e.toString(), e);
122 159
//            throw new InterruptedException("Virtuoso write failed  at  query" + insertQuery + " \n and with error " + e.toString());
123 160
        } finally {
124

  
125 161
            log.info("Cleaning up reducer...\nClosing Graph and Datasource");
126 162
            try{
127
            	graph.close();
163
    			conn.commit();
164
            	conn.close();
165
//            	graph.close();
128 166
//            	ds.close();
129 167
            }catch(Exception e){
130 168
            	log.error("Could not Close Graph or Datasource \n and with error " + e.toString(), e);

Also available in: Unified diff