Project

General

Profile

1
package eu.dnetlib.validator.service.impls.listeners;
2

    
3
import java.util.ArrayList;
4
import java.util.List;
5
import java.util.Map;
6

    
7
import org.apache.log4j.Logger;
8

    
9
import eu.dnetlib.api.enabling.ResultSetService;
10
import eu.dnetlib.api.enabling.ResultSetServiceException;
11
import eu.dnetlib.domain.EPR;
12
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
13
import eu.dnetlib.enabling.tools.blackboard.BlackboardNotificationHandler;
14
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
15
import eu.dnetlib.utils.EPRUtils;
16
import eu.dnetlib.validator.engine.execution.CompletedTask;
17
import eu.dnetlib.validator.engine.execution.JobListener;
18
import gr.uoa.di.driver.util.ServiceLocator;
19

    
20
public class DnetListener implements JobListener {
21

    
22
	private static Logger logger = Logger.getLogger(DnetListener.class);
23
	
24
	private RecordXMLBuilder xmlBuilder;
25
	private BlackboardJob job;
26
	private BlackboardNotificationHandler<BlackboardServerHandler> blackboardHandler;
27
	
28
	private ServiceLocator<ResultSetService> resultSetServiceLocator = null;
29
	private ResultSetService resultSetService;
30
	private String resultSetPassID;
31
	private String resultSetFailID;
32
	private List<String> passBuffer;
33
	private List<String> failBuffer;
34
	private EPR eprPass; 
35
	private EPR eprFail; 
36
	
37

    
38
	public void init() { 
39
		try {
40
		// Get reference to service
41
		resultSetService = resultSetServiceLocator.getService();
42
		// Create a new result set
43
		eprPass = resultSetService.createPushRS(86400, 0);
44
		eprFail = resultSetService.createPushRS(86400, 0);
45
		// get result set ids
46
		resultSetPassID = eprPass.getParameter("ResourceIdentifier");
47
		resultSetFailID = eprFail.getParameter("ResourceIdentifier");
48
		// initializing buffers
49
		passBuffer = new ArrayList<String>();
50
		failBuffer = new ArrayList<String>();
51
		
52
		} catch (Exception e) {
53
			logger.error("Error initializing ResultSetService.", e);
54
		}
55
	}
56
    
57
    public void sendToPassQueue(String xmlString) {
58
    	logger.debug("received passed XMLresult");
59
    	// add records
60
    	passBuffer.add(xmlString);
61
    	if (passBuffer.size() > 30) {
62
    		try {
63
				resultSetService.populateRS(resultSetPassID, passBuffer);
64
				passBuffer.clear();
65
			} catch (ResultSetServiceException e) {
66
				logger.error("Error populating ResultSetService.", e);
67
			}    		
68
    	}
69
    	logger.debug("XML: " + xmlString);
70
    }
71

    
72
    public void sendToFailQueue(String xmlString) {
73
    	logger.debug("received failed XMLresult");
74
    	// add records
75
    	failBuffer.add(xmlString);
76
    	if (failBuffer.size() > 30) {
77
    		try {
78
				resultSetService.populateRS(resultSetFailID, failBuffer);
79
				failBuffer.clear();
80
			} catch (ResultSetServiceException e) {
81
				logger.error("Error populating ResultSetService.", e);
82
			}    		
83
    	}
84
    	logger.debug("XML: " + xmlString);
85
    }
86

    
87
	@Override
88
	public synchronized void currentResults(List<CompletedTask> tasks, int jobId,
89
			Object record, Map<String, Object> recordContext, Throwable t) {
90
		
91
		@SuppressWarnings("unchecked")
92
		String xmlString = xmlBuilder.buildXml((List<Map<String, String>>) recordContext.get("veloList"),record, (Map<String,String>) recordContext.get("recordValidationResult"));
93
		if ( (Integer) recordContext.get("score") > 0)
94
			this.sendToPassQueue(xmlString);
95
		else
96
			this.sendToFailQueue(xmlString);
97
		
98
		logger.debug("XML: " + xmlString);
99
		blackboardHandler.getBlackboardHandler().ongoing(job);
100
		
101
	}
102

    
103
	@Override
104
	public synchronized void currentResults(List<CompletedTask> tasks, int jobId,
105
			Object record, Map<String, Object> recordContext) {
106
		
107
		@SuppressWarnings("unchecked")
108
		String xmlString = xmlBuilder.buildXml((List<Map<String, String>>) recordContext.get("veloList"),record, (Map<String,String>) recordContext.get("recordValidationResult"));
109
		if ( (Integer) recordContext.get("score") > 0)
110
			this.sendToPassQueue(xmlString);
111
		else
112
			this.sendToFailQueue(xmlString);
113
		
114
		logger.debug("XML: " + xmlString);
115
		blackboardHandler.getBlackboardHandler().ongoing(job);
116
	}
117

    
118
	@Override
119
	public synchronized void finished(int jobId, Map<String, Object> jobContext) {
120
		//send remaining xmls from buffers
121
    	if (passBuffer.size() > 0) {
122
    		try {
123
				resultSetService.populateRS(resultSetPassID, passBuffer);
124
				passBuffer.clear();
125
			} catch (ResultSetServiceException e) {
126
				logger.error("Error populating ResultSetService.", e);
127
			}    		
128
    	}
129
		if (failBuffer.size() > 0) {
130
    		try {
131
				resultSetService.populateRS(resultSetFailID, failBuffer);
132
				failBuffer.clear();
133
			} catch (ResultSetServiceException e) {
134
				logger.error("Error populating ResultSetService.", e);
135
			}    		
136
    	}
137
		// finally, close rs.
138
		resultSetService.closeRS(resultSetPassID);
139
		resultSetService.closeRS(resultSetFailID);
140
		
141
		// update job status
142
		job.getParameters().put("passEpr", EPRUtils.eprToXml(eprPass));
143
		job.getParameters().put("failEpr", EPRUtils.eprToXml(eprFail));
144
		blackboardHandler.getBlackboardHandler().done(job);
145
	}
146

    
147
	@Override
148
	public synchronized void failed(int jobId, Map<String, Object> jobContext, Throwable t) {
149
		resultSetService.closeRS(resultSetPassID);
150
		resultSetService.closeRS(resultSetFailID);
151

    
152
		// update job status
153
		blackboardHandler.getBlackboardHandler().failed(job, t);
154
	}
155

    
156
	public RecordXMLBuilder getXmlBuilder() {
157
		return xmlBuilder;
158
	}
159

    
160
	public void setXmlBuilder(RecordXMLBuilder xmlBuilder) {
161
		this.xmlBuilder = xmlBuilder;
162
	}
163
	
164
	public BlackboardJob getJob() {
165
		return job;
166
	}
167

    
168
	public void setJob(BlackboardJob job) {
169
		this.job = job;
170
	}
171

    
172
	public ServiceLocator<ResultSetService> getResultSetServiceLocator() {
173
		return resultSetServiceLocator;
174
	}
175

    
176
	public void setResultSetServiceLocator(
177
			ServiceLocator<ResultSetService> resultSetServiceLocator) {
178
		this.resultSetServiceLocator = resultSetServiceLocator;
179
	}
180

    
181
	public BlackboardNotificationHandler<BlackboardServerHandler> getBlackboardHandler() {
182
		return blackboardHandler;
183
	}
184

    
185
	public void setBlackboardHandler(BlackboardNotificationHandler<BlackboardServerHandler> blackboardHandler) {
186
		this.blackboardHandler = blackboardHandler;
187
	}
188
}
(3-3/7)