Project

General

Profile

1
package eu.dnetlib.data.mdstore.modular;
2

    
3
import com.google.common.collect.Maps;
4
import eu.dnetlib.data.mdstore.MDStoreServiceException;
5
import eu.dnetlib.data.mdstore.modular.action.DoneCallback;
6
import eu.dnetlib.data.mdstore.modular.action.FailedCallback;
7
import eu.dnetlib.data.mdstore.modular.connector.MDStore;
8
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
9
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
10
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
11
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
12
import eu.dnetlib.miscutils.datetime.DateUtils;
13
import org.apache.commons.logging.Log;
14
import org.apache.commons.logging.LogFactory;
15
import org.springframework.beans.factory.annotation.Required;
16

    
17
import java.util.List;
18
import java.util.Map;
19

    
20
public class MDStoreFeeder {
21

    
22
	private static final Log log = LogFactory.getLog(MDStoreFeeder.class);
23

    
24
	private MDStoreDao dao;
25

    
26
	private ResultSetClientFactory resultSetClientFactory;
27

    
28
	private UniqueServiceLocator serviceLocator;
29

    
30
	private boolean syncFeed = true;
31

    
32
	public void feed(final String mdId,
33
                     final String rsEpr,
34
                     final String storingType,
35
                     final boolean sync,
36
                     final List<MDFormatDescription> mdformats,
37
                     final DoneCallback doneCallback,
38
                     final FailedCallback failCallback) throws MDStoreServiceException {
39
		log.info("Start feeding mdstore " + mdId);
40
		log.debug("Start feeding mdstore " + mdId + " with epr " + rsEpr);
41

    
42
		String transactionId = null;
43

    
44
		try {
45
			final boolean refresh = "REFRESH".equals(storingType);
46

    
47
			final MDStore mdstore = dao.startTransaction(mdId, refresh);
48
			transactionId = mdstore.getId();
49

    
50
			final Iterable<String> records = resultSetClientFactory.getClient(rsEpr);
51

    
52
			if (refresh) {
53
				mdstore.truncate();
54
			}
55
            int writeOps;
56

    
57
            if (mdformats == null) {
58
                writeOps = mdstore.feed(records, refresh);
59
            } else {
60
                writeOps = mdstore.feed(records, refresh, mdformats);
61
            }
62

    
63
			dao.commit(mdstore.getId(), mdId);
64

    
65
			int size = dao.refreshSize(mdId);
66

    
67
			touch(mdId, size);
68

    
69
			log.info("Finished feeding mdstore " + mdId + " - new size: " + size);
70

    
71
			doneCallback.call(buildParams(size, writeOps));
72
		} catch (Throwable e) {
73
			if (transactionId != null) {
74
				dao.invalidTransaction(transactionId, mdId);
75
			}
76
			log.error("Error feeding mdstore: " + mdId);
77
			failCallback.call(e);
78
		}
79
	}
80

    
81
	private Map<String, String> buildParams(final int size, final int storeCount) {
82
		Map<String, String> params = Maps.newHashMap();
83
		params.put("mdstoreSize", String.valueOf(size));
84
		params.put("writeOps", String.valueOf(storeCount));
85
		return params;
86
	}
87

    
88
	/**
89
	 * Sets the last modified date in the profile.
90
	 *
91
	 * @param mdId
92
	 */
93
	public void touch(final String mdId, final int size) {
94
		try {
95
			final String now = DateUtils.now_ISO8601();
96

    
97
			final String mdstoreXUpdate = "for $x in //RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '" + mdId + "']"
98
					+ "return update value $x//LAST_STORAGE_DATE with '" + now + "'";
99

    
100
			serviceLocator.getService(ISRegistryService.class).executeXUpdate(mdstoreXUpdate);
101

    
102
			touchSize(mdId, size);
103
		} catch (final Exception e) {
104
			throw new IllegalStateException(e);
105
		}
106
	}
107

    
108
	public void touchSize(final String mdId, final int size) {
109
		try {
110
			final String mdstoreNumberXUpdate = "for $x in //RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '" + mdId + "']"
111
					+ "return update value $x//NUMBER_OF_RECORDS with '" + size + "'";
112

    
113
			serviceLocator.getService(ISRegistryService.class).executeXUpdate(mdstoreNumberXUpdate);
114
		} catch (final Exception e) {
115
			throw new IllegalStateException(e);
116
		}
117
	}
118

    
119
	public MDStoreDao getDao() {
120
		return dao;
121
	}
122

    
123
	@Required
124
	public void setDao(final MDStoreDao dao) {
125
		this.dao = dao;
126
	}
127

    
128
	public ResultSetClientFactory getResultSetClientFactory() {
129
		return resultSetClientFactory;
130
	}
131

    
132
	@Required
133
	public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) {
134
		this.resultSetClientFactory = resultSetClientFactory;
135
	}
136

    
137
	public boolean isSyncFeed() {
138
		return syncFeed;
139
	}
140

    
141
	public void setSyncFeed(final boolean syncFeed) {
142
		this.syncFeed = syncFeed;
143
	}
144

    
145
	public UniqueServiceLocator getServiceLocator() {
146
		return serviceLocator;
147
	}
148

    
149
	@Required
150
	public void setServiceLocator(final UniqueServiceLocator serviceLocator) {
151
		this.serviceLocator = serviceLocator;
152
	}
153

    
154
}
(5-5/13)