Project

General

Profile

1
package eu.dnetlib.validator.service.impls.providers;
2

    
3
import java.io.StringReader;
4
import java.util.ArrayList;
5
import java.util.Calendar;
6
import java.util.List;
7

    
8
import javax.xml.parsers.DocumentBuilder;
9
import javax.xml.parsers.DocumentBuilderFactory;
10
import javax.xml.parsers.ParserConfigurationException;
11
import javax.xml.xpath.XPath;
12
import javax.xml.xpath.XPathConstants;
13
import javax.xml.xpath.XPathExpression;
14
import javax.xml.xpath.XPathExpressionException;
15
import javax.xml.xpath.XPathFactory;
16

    
17
import org.w3c.dom.Document;
18
import org.w3c.dom.Element;
19
import org.w3c.dom.Node;
20
import org.w3c.dom.ls.DOMImplementationLS;
21
import org.w3c.dom.ls.LSSerializer;
22
import org.xml.sax.InputSource;
23

    
24
import eu.dnetlib.api.data.MDStoreService;
25
import eu.dnetlib.domain.EPR;
26
import eu.dnetlib.validator.engine.data.DataException;
27
import eu.dnetlib.validator.engine.data.Provider;
28
import eu.dnetlib.validator.engine.data.ResultSet;
29
import eu.dnetlib.validator.engine.execution.ValidationObject;
30
import eu.dnetlib.validator.service.impls.valobjs.XMLTextValidationObject;
31
import gr.uoa.di.driver.enabling.resultset.ResultSetFactory;
32
import gr.uoa.di.driver.util.ServiceLocator;
33

    
34
public class DnetProvider extends Provider{
35

    
36
	private static final long serialVersionUID = -4280319954962194170L;
37

    
38
	private static ServiceLocator<MDStoreService> mdStoreServiceLocator;
39

    
40
	private static ResultSetFactory rsFactory;
41
	
42
	public static final String DATASOURCE = "DATASOURCE";
43
	
44
	public static final String BATCH_SIZE = "BATCH_SIZE";
45

    
46
	public static final String RECORDS = "RECORDS";
47
	
48
	public static final String MDSTORE_ID = "MDSTORE_ID";
49
	
50
	public static final String FROM = "FROM";
51
	
52
	public static final String BEGIN_RECORD = "BEGIN_RECORD";
53
	
54
	public static final String UNTIL = "UNTIL";
55
	
56
	public static final String RECORD_FILTER = "RECORD_FILTER";
57

    
58
	public static final String WORKER_ID = "WORKER_ID";
59

    
60
	public static final String WORKERS = "WORKERS";
61
	
62
	
63

    
64
	public DnetProvider() {
65
		super(3);
66
		
67
	}
68

    
69
	@Override
70
	public ResultSet<ValidationObject> getValidationObjects() throws ProviderException {
71
		return new DnetRecordResultSet();
72
	}
73

    
74
	@Override
75
	public ResultSet<String> getValidationObjectIds() throws ProviderException,
76
			UnsupportedOperationException {
77
		// TODO Auto-generated method stub
78
		return null;
79
	}
80

    
81
	@Override
82
	public ValidationObject getValidationObject(String valObjId)
83
			throws ProviderException, UnsupportedOperationException {
84
		// TODO Auto-generated method stub
85
		return null;
86
	}
87
	
88
	public static void printXmlDocument(Document document) {
89
	    DOMImplementationLS domImplementationLS = 
90
	        (DOMImplementationLS) document.getImplementation();
91
	    LSSerializer lsSerializer = 
92
	        domImplementationLS.createLSSerializer();
93
	    String string = lsSerializer.writeToString(document);
94
	    System.out.println(string);
95
	}
96

    
97
	private class DnetResultSet {
98
				
99
		gr.uoa.di.driver.enabling.resultset.ResultSet<String> rs = null;
100

    
101
		protected DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
102
		protected DocumentBuilder builder;
103
		protected XPathFactory xfactory = XPathFactory.newInstance();
104
		protected List<Node> records = null;
105
		protected int records_sum = 0;
106
		protected int index = -1;
107
		protected int pointer = 0;
108
		protected int recordsNum = 0;
109
		protected int workers = 0;
110
		protected int workerId = 0;
111
		protected int beginRecord = 0;
112
		protected int endRecord = 0;
113
		protected String error = null;
114
		private long elapsed = 0;
115

    
116
		public DnetResultSet() {
117

    
118
			try {
119
				builder = factory.newDocumentBuilder();
120
				log.debug("Retrieving the datasource..");
121

    
122
				log.debug("RECORDS " + pros.getProperty(RECORDS));
123
				log.debug("MDSTORE_ID " + pros.getProperty(MDSTORE_ID));
124
				log.debug("DATASOURCE: " + pros.getProperty(DATASOURCE));
125
				log.debug("WORKER_ID: " + pros.getProperty(WORKER_ID));
126
				workerId = Integer.parseInt(pros.getProperty(WORKER_ID));
127
				workers = Integer.parseInt(pros.getProperty(WORKERS));
128
//				log.debug("BEGIN RECORD: " + pros.getProperty(BEGIN_RECORD));
129
//				EPR epr = mdStoreServiceServiceLocator.getService().deliverMDRecords(pros.getProperty(MD_ID), pros.getProperty(FROM), pros.getProperty(UNTIL), pros.getProperty(RECORD_FILTER));
130
				log.debug("Issuing request on mdstore: " + pros.getProperty(MDSTORE_ID));
131
				EPR epr = mdStoreServiceLocator.getService().deliverMDRecords(pros.getProperty(MDSTORE_ID), null, null, null);
132

    
133
				rs = rsFactory.createResultSet(epr);
134

    
135
				log.debug("rs created");
136
				records_sum = rs.size();
137
				log.debug("Number of records in ResultSet: " + records_sum);
138
				if (pros.getProperty(RECORDS).equalsIgnoreCase("-1") ) {
139
					pros.setProperty(RECORDS,Integer.toString(records_sum));
140
				} else if (Integer.parseInt(pros.getProperty(RECORDS)) > records_sum) {
141
					pros.setProperty(RECORDS,Integer.toString(records_sum));
142
				}
143
				recordsNum = Integer.parseInt(pros.getProperty(RECORDS));
144
				log.info("W"+ workerId + "# RECORDS TO TEST: " + recordsNum);
145
				log.info("W"+ workerId + "# WORKERS: " + workers);
146
//				pros.setProperty(DnetProvider.BEGIN_RECORD, Integer.toString(Integer.parseInt(pros.getProperty(WORKER_ID)) * (Integer.parseInt(pros.getProperty(RECORDS))/Integer.parseInt(pros.getProperty(WORKERS)))));
147
				beginRecord = workerId * (recordsNum/workers) + 1;
148
				endRecord = (recordsNum/workers) + beginRecord -1;
149
//				endRecord = Integer.parseInt(pros.getProperty(RECORDS))/Integer.parseInt(pros.getProperty(WORKERS)) + Integer.parseInt(pros.getProperty(BEGIN_RECORD));
150
				if (workerId == workers-1)
151
					endRecord += recordsNum % workers;
152
				log.info("W"+ workerId + "# BEGIN RECORD: " + beginRecord);
153
				log.info("W"+ workerId + "# END RECORD: " + endRecord);
154
				pointer = beginRecord;
155
			} catch (ParserConfigurationException e) {
156
				log.error("", e);
157
			} catch (Exception e) {
158
				log.error("", e);
159
			}
160
		}
161

    
162
		protected List<Node> getRecords() throws DataException {
163
			List<Node> records = new ArrayList<Node>();
164
			
165
			try {
166
				
167
				int to = pointer + Integer.parseInt(pros.getProperty(BATCH_SIZE)) -1;
168
				if ((pointer + Integer.parseInt(pros.getProperty(BATCH_SIZE)) -1) < endRecord) {
169
					to = pointer + Integer.parseInt(pros.getProperty(BATCH_SIZE)) -1;
170
				} else {
171
					if (pointer <= endRecord)
172
						to = endRecord ;
173
				}
174
				log.error("to : " + to + " and limit: " + endRecord); 
175
				if (to <= endRecord) {
176
					log.error("W"+ workerId + "# Issuing request for records. From : " + pointer + " to : " + to); 
177
					long time1 = Calendar.getInstance().getTimeInMillis();
178
					List<String> tempRecords = rs.get(pointer, to);
179
					long time2 = Calendar.getInstance().getTimeInMillis();
180
					log.debug("W"+ workerId + "#Rule fetching took " + ((time2 - time1))
181
							+ " milliseconds");
182
					elapsed += time2 - time1;
183
					log.error("W"+ workerId + "#records fetched : " + tempRecords.size());
184
					log.debug("W"+ workerId + "#Elapsed time till now is for rules fetching" + elapsed/1000
185
							+ " seconds");
186

    
187
					pointer += Integer.parseInt(pros.getProperty(BATCH_SIZE));
188
					for (String record : tempRecords) {
189
	//					log.debug("record from resultSet is : " + record);
190
						InputSource is = new InputSource(new StringReader(record));
191
						Document doc = builder.parse(is);
192
						XPath xpath = xfactory.newXPath();
193
						XPathExpression expr = xpath.compile("//*[local-name()='record']");
194
						records.add((Node) expr.evaluate(doc, XPathConstants.NODE));
195
					}
196
					log.error("W"+ workerId + "records to return : " + records.size());
197
				}
198
				if (records.size() == 0) {
199
					log.debug("There are no records. ");
200
					error = "There are no records";
201
					log.debug("Error: "+ error);
202
				}
203
				
204
			} catch (Exception e) {
205
				log.error("", e);
206
				throw new DataException();
207
			}
208
			return records;
209
		}		
210
	}
211

    
212
	private class DnetRecordResultSet extends DnetResultSet implements ResultSet<ValidationObject> {
213

    
214
		@Override
215
		public String getError() {
216
			if (error != null)
217
				log.debug("An error occured "+ this.error);
218
			else
219
				log.debug("No errors on request");
220
			return this.error;
221
		}
222
		
223
		@Override
224
		public boolean next() throws DataException {
225
			index++;
226
			log.debug("Moving cursor to result "+index);
227
			if (records == null || index >= records.size()) {
228
				if (records != null && (records.size() == 0))
229
					return false;
230
				index = -1;
231
				records = getRecords();
232
				return next();
233
			}
234
			return true;
235
		}
236

    
237
		@Override
238
		public ValidationObject get() throws DataException {
239
			XMLTextValidationObject ret = null;
240
			
241
			Document newXmlDocument;
242
			try {
243
				newXmlDocument = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument();
244

    
245
		        Element root = newXmlDocument.createElement("root");
246
		        newXmlDocument.appendChild(root);
247
	            Node node = records.get(index);
248
	            Node copyNode = newXmlDocument.importNode(node, true);
249
	            root.appendChild(copyNode);
250
	            printXmlDocument(newXmlDocument);
251
	            ret = new XMLTextValidationObject(newXmlDocument);
252
	            XPathFactory factory = XPathFactory.newInstance();
253
	            XPath xPath = factory.newXPath();
254
	            String id = xPath.evaluate("//*[local-name()='header']/*[name()='dri:objIdentifier']/text()", records.get(index));
255
	            if (id.isEmpty())
256
	            	id = xPath.evaluate("//*[local-name()='header']/*[name()='dri:recordIdentifier']/text()", records.get(index));
257
	            if (id.isEmpty())
258
	            	id = xPath.evaluate("//*[local-name()='header']/*[name()='identifier']/text()", records.get(index));
259
	            ret.setId(id);
260
	            ret.setStatus(xPath.evaluate("//*[local-name()='header']/@status", records.get(index)));
261
	            
262
	            log.debug("record id: " + ret.getId());
263
	            log.debug("record status: " + ret.getStatus());
264
	            
265
			} catch (ParserConfigurationException e) {
266
				log.error("error getting object"+ e);
267
			} catch (XPathExpressionException e) {
268
				log.error("error getting object"+ e);
269
			}			
270
			return ret;
271
		}
272

    
273

    
274
	}
275

    
276
	
277
	public static ResultSetFactory getRsFactory() {
278
		return rsFactory;
279
	}
280

    
281
	public static void setRsFactory(ResultSetFactory rsFactory) {
282
		DnetProvider.rsFactory = rsFactory;
283
	}
284

    
285
	public static ServiceLocator<MDStoreService> getMdStoreServiceLocator() {
286
		return mdStoreServiceLocator;
287
	}
288

    
289
	public static void setMdStoreServiceLocator(
290
			ServiceLocator<MDStoreService> mdStoreServiceLocator) {
291
		DnetProvider.mdStoreServiceLocator = mdStoreServiceLocator;
292
	}
293

    
294
	@Override
295
	public ResultSet<ValidationObject> getValidationObjects(String entity)
296
			throws ProviderException {
297
		// TODO Auto-generated method stub
298
		return null;
299
	}
300

    
301

    
302
}
(2-2/11)