Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodImport.utils;
2

    
3
import java.net.URI;
4
import java.sql.Connection;
5
import java.sql.ResultSet;
6
import java.sql.Statement;
7
import java.util.List;
8

    
9
import org.apache.hadoop.conf.Configuration;
10
import org.apache.log4j.Logger;
11
import org.json.JSONArray;
12
import org.json.JSONObject;
13

    
14
import com.jolbox.bonecp.BoneCPDataSource;
15

    
16
public class RDFizer {
17
	
18
	private static Logger log = Logger.getLogger(RDFizer.class);
19
	
20
	public RDFizer() {
21
	}
22

    
23
//	public static String RDFizeEntityRow(List<String> row,VirtGraph graph, JSONObject mappings, Configuration conf, MapCountries mapCountries,MapLanguages mapLanguages){
24
	public static String[] RDFizeEntityRow(List<String> row,BoneCPDataSource ds, JSONObject mappings, Configuration conf, MapCountries mapCountries,MapLanguages mapLanguages, String defaultGraph){
25
		
26
		//		VirtModel model = new VirtModel(graph);
27
		
28
//		String graph = conf.get("lod.defaultGraph");
29
		
30
		String[] buildQuery = new String[2];
31
		buildQuery[0]="";
32
		buildQuery[1]="";
33

    
34
//		String insertQuery ="";
35
//		String deleteQuery="";
36
		
37
		String baseURI = conf.get("lod.baseURI");
38
		String id = row.get(1).toString();
39
		String type = row.get(0).toString();
40
		String resourceURI = baseURI+type+"/"+id;
41
	
42
//	####################################################################################	
43
//	THIS IS FOR INCREMENTAL UPDATE ON-HOLD FOR THE BATCH	
44
//		boolean ask = false;
45
//		try{
46
//			ask = resourceExists(ds.getConnection(), resourceURI, graph);
47
//		}catch(Exception e){
48
//			log.error("Could not ASK "+e.toString(),e);
49
//		}
50
//
51
		
52
		
53
		buildQuery[1]+= "<"+resourceURI+"> ?p ?o. ";
54
		
55
//		if(ask){
56
//			try{
57
//				deleteResource(ds.getConnection(), resourceURI, defaultGraph);
58
//			}catch(Exception e){
59
//				log.error("Could not DELETE  "+resourceURI+" "+e.toString(),e);
60
//		}
61
		//####################################################################################
62
		
63
	    JSONArray typeMappings = mappings.getJSONArray(type);
64
	    JSONObject propertyObject = typeMappings.getJSONObject(0);
65
	    
66
		for(int i=0; i<row.size(); i++){
67
            String index = new Integer(i).toString();
68
            String propertyString;
69
            
70
            
71
            if(type.equals("project") && i>24) continue;
72
            if(type.equals("organization") && i>11) continue;
73
            
74
            try{
75
            	propertyString = propertyObject.getString(index);
76
            }catch(Exception e){
77
//            	log.error("Could not get the property for type "+type+" and ID"+row.get(1).toString()+" :  "+e.toString(),e);
78
            	continue;
79
            }
80
            
81
			if(i==0){
82
				String resourceType = propertyObject.getString(propertyString);
83
//				insertQuery+="<"+resourceURI+"> <"+propertyString+"> <"+resourceType+">";
84
				buildQuery[0]+="<"+resourceURI+"> <"+propertyString+"> <"+resourceType+">";
85
				continue;
86
			}
87
			
88
			String value = row.get(i).trim();
89
			value = value.replace("\\", "");
90
			value = value.replace("\""," ");
91
			
92
			if(value.trim().equals("null") || value==null || value.trim().equals("")) continue;
93
			
94
			if(i==16 && type.equals("result")  && !value.equals("und") || i==11 && type.equals("person")  && !value.equals("und") || i==9 && type.equals("organization") && !value.equals("und")){
95
				try{
96
//					log.info("Country code is  "+value);
97
					String countryURI = mapCountries.getCountryURI(value);
98
					URI uri = new URI(countryURI);
99
					if(countryURI.equals("")){
100
//						insertQuery+="; <"+propertyString+"> \""+value+"\"";
101
						buildQuery[0]+="; <"+propertyString+"> \""+value+"\"";
102
						continue;
103
					}
104
//					insertQuery+="; <"+propertyString+"> <"+uri+">";
105
					buildQuery[0]+="; <"+propertyString+"> <"+uri+">";
106
//					log.info("COUNTRY "+countryURI+"  FROM  "+value);
107
				}catch(Exception e){
108
					buildQuery[0]+="; <"+propertyString+"> \""+value+"\"";
109
//					log.error("No country URI for: "+e.toString(),e);
110
					continue;
111
				}
112
				continue;	
113
			}
114
			
115
			if(i==9 && type.equals("result") && !value.equals("und")){
116
				try{
117
					String langURI = mapLanguages.getLangURI(value);
118
					URI uri = new URI(langURI);
119
					if(langURI.equals("")){
120
//						insertQuery+="; <"+propertyString+"> \""+value+"\"";
121
						buildQuery[0]+="; <"+propertyString+"> \""+value+"\"";
122
						continue;
123
					}
124
//					insertQuery+="; <"+propertyString+"> <"+uri+">";
125
					buildQuery[0]+="; <"+propertyString+"> <"+uri+">";
126
//					log.info("LANG "+langURI+"  FROM  "+value.trim());
127
				}catch(Exception e){
128
//					insertQuery+="; <"+propertyString+"> \""+value+"\"";
129
					buildQuery[0]+="; <"+propertyString+"> \""+value+"\"";
130
//					log.error("No Language URI for: "+e.toString(),e);
131
					continue;
132
				}
133
				continue;	
134
			}
135
			
136
			if(value.contains(conf.get("lod.seperator"))){
137
				String[] splittedValue = value.split(conf.get("lod.seperator"));
138
				for(String v:splittedValue){
139
					v= v.replace(conf.get("lod.seperator"), "").trim();
140
//					insertQuery+="; <"+propertyString+"> \""+v+"\"";
141
					buildQuery[0]+="; <"+propertyString+"> \""+v+"\"";
142
				}
143
			}else{
144
				if(value.startsWith("http://")){
145
					try{
146
						if(value.contains(" ")){
147
//							insertQuery+="; <"+propertyString+"> \""+value+"\"";
148
							buildQuery[0]+="; <"+propertyString+"> \""+value+"\"";
149
							
150
						}else{
151
							value = value.replaceAll("\\s","-");
152
							URI uri = new URI(value);
153
//							insertQuery+="; <"+propertyString+"> <"+uri+">";
154
							buildQuery[0]+="; <"+propertyString+"> <"+uri+">";
155
						}
156
					}catch(Exception e){
157
//						insertQuery+="; <"+propertyString+"> \""+value+"\"";
158
						buildQuery[0]+="; <"+propertyString+"> \""+value+"\"";
159
//						log.error("NOT URI "+e.toString(),e);
160
						continue;
161
					}
162
				}
163
				else buildQuery[0]+="; <"+propertyString+"> \""+value+"\"";
164
			}	
165
			
166
		}
167
		
168
//		insertQuery+=". ";
169
		buildQuery[0]+=". ";
170
		
171
		return buildQuery;
172
		
173
	}
174
	
175
//	public static String RDFizeRelationRow(List<String> row, VirtGraph graph, JSONObject mappings, Configuration conf){
176
	public static String[] RDFizeRelationRow(List<String> row, JSONObject mappings, Configuration conf){
177
		
178
//			String insertQuery = "";
179
			String[] buildQuery = new String[1];
180
			buildQuery[0]="";
181
			String baseURI = conf.get("lod.baseURI");
182
			JSONObject typeMappings = mappings.getJSONArray(row.get(0)).getJSONObject(0);
183
			String sourceType = typeMappings.getString("sourceType");
184
			String sourceId = typeMappings.getString("sourceId");
185
			String targetType = typeMappings.getString("targetType");
186
			String targetId = typeMappings.getString("targetId");
187
			String property = typeMappings.getString("property");
188
			String sourceURI = baseURI+row.get(Integer.parseInt(sourceType))+"/"+row.get(Integer.parseInt(sourceId));
189
			String targetURI = baseURI+row.get(Integer.parseInt(targetType))+"/"+row.get(Integer.parseInt(targetId));
190
//			insertQuery= "<"+sourceURI+"> <"+property+"> <"+targetURI+">. ";
191
			buildQuery[0]="<"+sourceURI+"> <"+property+"> <"+targetURI+">. ";
192
//			return insertQuery;
193
			return buildQuery;
194
		}
195
	
196
	public static boolean resourceExists(Connection conn, String resourceURI, String graph){
197
		boolean exists = false;
198
		String askQuery = "SPARQL SELECT ?o FROM <"+graph+"> WHERE {<"+resourceURI+"> ?p ?o}";
199
		Statement stmt;
200
		try {
201
			stmt = conn.createStatement();
202
			ResultSet rs=stmt.executeQuery(askQuery);
203
			if(rs.next())exists=true;
204
			rs.close();
205
//			if(rs.next()) exists=true;
206
			stmt.close();
207
//			conn.commit();
208
			conn.close();
209
		} catch (Exception e) {
210
			log.error("Virtuoso ask Query failed. Query was "+ askQuery +"\n" + e.toString(), e);
211
		}
212
		return exists;
213
	}
214
	
215
	public static void deleteResource(Connection conn, String resourceURI, String graph){
216
		String deleteQueryString ="SPARQL DELETE FROM <"+graph+"> {<"+resourceURI+"> ?p ?o} WHERE {<"+resourceURI+"> ?p ?o}" ;
217
		Statement stmt;
218
		try {
219
			stmt = conn.createStatement();
220
			stmt.execute(deleteQueryString);
221
			conn.commit();
222
			stmt.close();
223
			conn.close();
224
		} catch (Exception e) {
225
			log.error("Virtuoso ask Query failed.  "+ deleteQueryString +"\n" + e.toString(), e);
226
		}
227
	}
228
	
229
	public static void clearGraph(String graph, Connection conn){
230
		String clearQuery="SPARQL DEFINE sql:log-enable 0 CLEAR GRAPH  <"+graph+">";
231
		Statement stmt;
232
		try {
233
			stmt = conn.createStatement();
234
			stmt.execute(clearQuery);
235
			conn.commit();
236
			stmt.close();
237
//			conn.commit();
238
//			conn.close();
239
		} catch (Exception e) {
240
			log.error("Virtuoso FAILED TO CLEAR graph "+ clearQuery +"\n" + e.toString(), e);
241
		}
242
	}
243

    
244
	public static void setCheckpoint(Connection conn, int checkpointValue){
245
		Statement stmt;
246
		try {
247

    
248
			stmt = conn.createStatement();			
249
			stmt.execute("checkpoint_interval("+checkpointValue+")");			
250
			if(checkpointValue==120){
251
				log.info("CheckPoint Started");
252
				stmt.execute("checkpoint");
253
				stmt.execute("log_enable(3,1)");
254
				conn.commit();
255
				log.info("CheckPoint Finished");
256
			}
257
			
258
			stmt.close();
259
//			
260
//			conn.close();
261
		} catch (Exception e) {
262
			log.error("Virtuoso set checkpoint failed. Checkpoint was "+ checkpointValue +"\n" + e.toString(), e);
263
		}
264
	}
265
}
(4-4/4)