Project

General

Profile

1
package eu.dnetlib.enabling.database;
2

    
3
import java.util.Map;
4
import java.util.concurrent.ExecutorService;
5
import java.util.concurrent.Executors;
6
import javax.xml.ws.wsaddressing.W3CEndpointReference;
7

    
8
import com.google.gson.Gson;
9
import eu.dnetlib.enabling.database.rmi.DatabaseException;
10
import eu.dnetlib.enabling.resultset.XSLTMappedResultSetFactory;
11
import eu.dnetlib.enabling.resultset.client.utils.EPRUtils;
12
import eu.dnetlib.enabling.tools.blackboard.AbstractBlackboardNotificationHandler;
13
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
14
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
15
import eu.dnetlib.enabling.tools.blackboard.NotificationHandler;
16
import org.apache.commons.codec.binary.Base64;
17
import org.apache.commons.logging.Log;
18
import org.apache.commons.logging.LogFactory;
19
import org.springframework.beans.factory.annotation.Required;
20

    
21
public class DatabaseBlackBoardNotificationHandler extends AbstractBlackboardNotificationHandler<BlackboardServerHandler> implements NotificationHandler {
22

    
23
	private static final Log log = LogFactory.getLog(DatabaseBlackBoardNotificationHandler.class); // NOPMD by marko on 11/24/08 5:02 PM
24

    
25
	private DatabaseServiceCore core;
26

    
27
	private ExecutorService threadPool = Executors.newCachedThreadPool();
28

    
29
	private XSLTMappedResultSetFactory xsltResultsetFactory;
30

    
31
	@Override
32
	protected void processJob(final BlackboardJob job) {
33
		threadPool.execute(new Runnable() {
34

    
35
			@Override
36
			public void run() {
37
				try {
38
					processJobInternal(job);
39
				} catch (final Throwable e) {
40
					log.error(e.getMessage(), e);
41
					getBlackboardHandler().failed(job, e);
42
				}
43
			}
44
		});
45
	}
46

    
47
	@SuppressWarnings("unchecked")
48
	private void processJobInternal(final BlackboardJob job) throws DatabaseException {
49
		String action = job.getAction();
50

    
51
		log.info("processing database job: " + action);
52

    
53
		if (action.equals("IMPORT")) {
54
			String db = job.getParameters().get("db");
55
			String epr = decodeBase64(job.getParameters().get("epr"));
56
			String xslt = decodeBase64(job.getParameters().get("xslt"));
57
			String xsltParamsString = decodeBase64(job.getParameters().get("xsltParams"));
58

    
59
			Map<String, String> xsltParams = null;
60
			if ((xsltParamsString != null) && !xsltParamsString.isEmpty()) {
61
				xsltParams = (new Gson()).fromJson(xsltParamsString, Map.class);
62
			}
63

    
64
			if ((db == null) || db.isEmpty() || (epr == null) || epr.isEmpty())
65
				throw new IllegalArgumentException("Some needed params are null or empty.");
66

    
67
			W3CEndpointReference eprObject = (new EPRUtils()).getEpr(epr);
68

    
69
			W3CEndpointReference mappedEpr = null;
70
			if ((xslt == null) || xslt.isEmpty()) {
71
				mappedEpr = eprObject;
72
			} else if ((xsltParams == null) || xsltParams.isEmpty()) {
73
				mappedEpr = xsltResultsetFactory.createMappedResultSet(eprObject, xslt);
74
			} else {
75
				mappedEpr = xsltResultsetFactory.createMappedResultSet(eprObject, xslt, xsltParams);
76
			}
77

    
78
			core.importFromResultset(db, mappedEpr);
79
			getBlackboardHandler().done(job);
80
			log.info("IMPORT job set to DONE");
81
		} else if (action.equals("EXEC")) {
82
			String db = job.getParameters().get("db");
83
			String sql = job.getParameters().get("sql");
84

    
85
			if ((db == null) || db.isEmpty() || (sql == null) || sql.isEmpty())
86
				throw new IllegalArgumentException("Some needed params are null or empty.");
87

    
88
			log.info("EXECUTING SCRIPT: " + sql + " on " + db);
89
			core.getDbUtils().executeSql(db, sql);
90
			getBlackboardHandler().done(job);
91
			log.info("SCRIPT COMPLETED");
92
		} else {
93
			throw new IllegalArgumentException("unsupported message action: " + action);
94
		}
95
	}
96

    
97
	private String decodeBase64(final String s) {
98
		if ((s != null) && Base64.isBase64(s.getBytes())) return new String(Base64.decodeBase64(s.getBytes()));
99
		return s;
100
	}
101

    
102
	public DatabaseServiceCore getCore() {
103
		return core;
104
	}
105

    
106
	@Required
107
	public void setCore(final DatabaseServiceCore core) {
108
		this.core = core;
109
	}
110

    
111
	public ExecutorService getThreadPool() {
112
		return threadPool;
113
	}
114

    
115
	public void setThreadPool(final ExecutorService threadPool) {
116
		this.threadPool = threadPool;
117
	}
118

    
119
	public XSLTMappedResultSetFactory getXsltResultsetFactory() {
120
		return xsltResultsetFactory;
121
	}
122

    
123
	@Required
124
	public void setXsltResultsetFactory(final XSLTMappedResultSetFactory xsltResultsetFactory) {
125
		this.xsltResultsetFactory = xsltResultsetFactory;
126
	}
127
}
(3-3/7)