Project

General

Profile

1
package eu.dnetlib.data.mdstore.modular;
2

    
3
import com.google.common.collect.Maps;
4
import com.googlecode.sarasvati.GraphProcess;
5
import eu.dnetlib.data.mdstore.MDStoreServiceException;
6
import eu.dnetlib.data.mdstore.modular.action.DoneCallback;
7
import eu.dnetlib.data.mdstore.modular.action.FailedCallback;
8
import eu.dnetlib.data.mdstore.modular.connector.MDStore;
9
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
10
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
11
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
12
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
13
import eu.dnetlib.miscutils.datetime.DateUtils;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.springframework.beans.factory.annotation.Required;
17

    
18
import java.util.List;
19
import java.util.Map;
20

    
21
public class MDStoreFeeder {
22

    
23
	private static final Log log = LogFactory.getLog(MDStoreFeeder.class);
24

    
25
	private MDStoreDao dao;
26

    
27
	private ResultSetClientFactory resultSetClientFactory;
28

    
29
	private UniqueServiceLocator serviceLocator;
30

    
31
	private boolean syncFeed = true;
32

    
33
	public void feed(final String mdId,
34
                     final String rsEpr,
35
                     final String storingType,
36
                     final boolean sync,
37
                     final List<MDFormatDescription> mdformats,
38
                     final GraphProcess graphProcess,
39
                     final DoneCallback doneCallback,
40
                     final FailedCallback failCallback) throws MDStoreServiceException {
41

    
42
		log.debug("Start feeding mdstore " + mdId + " with epr " + rsEpr);
43

    
44
		String transactionId = null;
45

    
46
		try {
47
			final boolean refresh = "REFRESH".equals(storingType);
48

    
49
			final MDStore mdstore = dao.startTransaction(mdId, refresh);
50
			transactionId = mdstore.getId();
51

    
52
			final Iterable<String> records = resultSetClientFactory.getClient(rsEpr);
53

    
54
			if (refresh) {
55
				mdstore.truncate();
56
			}
57
            int writeOps;
58

    
59
            if (mdformats == null) {
60
                writeOps = mdstore.feed(records, refresh);
61
            } else {
62
                writeOps = mdstore.feed(records, refresh, mdformats);
63
            }
64

    
65
            if(graphProcess.isCanceled()){
66
            	//means the process was cancelled, so we must not commit
67
				log.warn(String.format("The feeding process on mdstore %s has been cancelled. Records will not be committed. Transaction %s set as invalid.", mdId, transactionId));
68
				dao.invalidTransaction(transactionId, mdId);
69
				failCallback.call(new MDStoreServiceException(String.format("Feeding cancelled for mdstore %s on transaction %s", mdId, transactionId)));
70
            } else {
71
				dao.commit(mdstore.getId(), mdId);
72
				int size = dao.refreshSize(mdId);
73
				touch(mdId, size);
74
				log.info("Finished feeding mdstore " + mdId + " - new size: " + size);
75
				doneCallback.call(buildParams(size, writeOps));
76
			}
77
		} catch (Throwable e) {
78
			if (transactionId != null) {
79
				dao.invalidTransaction(transactionId, mdId);
80
			}
81
			log.error("Error feeding mdstore: " + mdId);
82
			failCallback.call(e);
83
		}
84
	}
85

    
86
	private Map<String, String> buildParams(final int size, final int storeCount) {
87
		Map<String, String> params = Maps.newHashMap();
88
		params.put("mdstoreSize", String.valueOf(size));
89
		params.put("writeOps", String.valueOf(storeCount));
90
		return params;
91
	}
92

    
93
	/**
94
	 * Sets the last modified date in the profile.
95
	 *
96
	 * @param mdId
97
	 */
98
	public void touch(final String mdId, final int size) {
99
		try {
100
			final String now = DateUtils.now_ISO8601();
101

    
102
			final String mdstoreXUpdate = "for $x in //RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '" + mdId + "']"
103
					+ "return update value $x//LAST_STORAGE_DATE with '" + now + "'";
104

    
105
			serviceLocator.getService(ISRegistryService.class).executeXUpdate(mdstoreXUpdate);
106

    
107
			touchSize(mdId, size);
108
		} catch (final Exception e) {
109
			throw new IllegalStateException(e);
110
		}
111
	}
112

    
113
	public void touchSize(final String mdId, final int size) {
114
		try {
115
			final String mdstoreNumberXUpdate = "for $x in //RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '" + mdId + "']"
116
					+ "return update value $x//NUMBER_OF_RECORDS with '" + size + "'";
117

    
118
			serviceLocator.getService(ISRegistryService.class).executeXUpdate(mdstoreNumberXUpdate);
119
		} catch (final Exception e) {
120
			throw new IllegalStateException(e);
121
		}
122
	}
123

    
124
	public MDStoreDao getDao() {
125
		return dao;
126
	}
127

    
128
	@Required
129
	public void setDao(final MDStoreDao dao) {
130
		this.dao = dao;
131
	}
132

    
133
	public ResultSetClientFactory getResultSetClientFactory() {
134
		return resultSetClientFactory;
135
	}
136

    
137
	@Required
138
	public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) {
139
		this.resultSetClientFactory = resultSetClientFactory;
140
	}
141

    
142
	public boolean isSyncFeed() {
143
		return syncFeed;
144
	}
145

    
146
	public void setSyncFeed(final boolean syncFeed) {
147
		this.syncFeed = syncFeed;
148
	}
149

    
150
	public UniqueServiceLocator getServiceLocator() {
151
		return serviceLocator;
152
	}
153

    
154
	@Required
155
	public void setServiceLocator(final UniqueServiceLocator serviceLocator) {
156
		this.serviceLocator = serviceLocator;
157
	}
158

    
159
}
(5-5/13)