Project

General

Profile

1
package eu.dnetlib.enabling.datasources;
2

    
3
import java.io.StringReader;
4
import java.text.ParseException;
5
import java.util.Date;
6
import java.util.HashMap;
7
import java.util.List;
8
import java.util.Map;
9
import java.util.stream.Collectors;
10

    
11
import org.apache.commons.io.IOUtils;
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14
import org.dom4j.Document;
15
import org.dom4j.io.SAXReader;
16
import org.quartz.CronExpression;
17
import org.springframework.beans.factory.annotation.Autowired;
18
import org.springframework.beans.factory.annotation.Required;
19
import org.springframework.core.io.ClassPathResource;
20
import org.springframework.core.io.Resource;
21
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
22

    
23
import com.google.common.collect.ImmutableMap;
24

    
25
import eu.dnetlib.enabling.datasources.common.Api;
26
import eu.dnetlib.enabling.datasources.common.ApiParam;
27
import eu.dnetlib.enabling.datasources.common.Datasource;
28
import eu.dnetlib.enabling.datasources.common.DsmException;
29
import eu.dnetlib.enabling.datasources.common.Identity;
30
import eu.dnetlib.enabling.datasources.common.Organization;
31
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
32
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
33
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
34
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
35
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
36
import org.springframework.transaction.annotation.Transactional;
37

    
38
public class DatasourceManagerClients {
39

    
40
	private static final String REPOSITORY_SERVICE_RESOURCE_TYPE = "RepositoryServiceResourceType";
41
	private static final Log log = LogFactory.getLog(DatasourceManagerClients.class);
42

    
43
	private static final Resource dsQuery = new ClassPathResource(LocalOpenaireDatasourceManagerImpl.QUERY_BASEDIR + "getDatasource.sql");
44
	private static final Resource dsIdentitiesQuery = new ClassPathResource(LocalOpenaireDatasourceManagerImpl.QUERY_BASEDIR + "dsIdentitiesQuery.sql");
45
	private static final Resource dsOrganizationsQuery = new ClassPathResource(LocalOpenaireDatasourceManagerImpl.QUERY_BASEDIR + "dsOrganizationsQuery.sql");
46
	private static final Resource listApisByDsId = new ClassPathResource(LocalOpenaireDatasourceManagerImpl.QUERY_BASEDIR + "listApisByDsId.sql");
47
	private static final Resource isDefinedParamQuery = new ClassPathResource(LocalOpenaireDatasourceManagerImpl.QUERY_BASEDIR + "isDefinedParam.sql");
48

    
49
	private NamedParameterJdbcTemplate jdbcTemplate;
50

    
51
	public enum AfterSqlUpdate {
52
		DELETE_DS_PROFILE, UPDATE_DS_PROFILE, NONE
53
	}
54

    
55
	@Autowired
56
	private UniqueServiceLocator serviceLocator;
57

    
58
	public String findDatasourceId(final String profileId) throws DsmException {
59
		try {
60
			return serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(
61
					"/*[.//RESOURCE_IDENTIFIER/@value='" + profileId + "']//EXTRA_FIELDS/FIELD[./key='OpenAireDataSourceId']/value/text()");
62
		} catch (final Exception e) {
63
			log.error("Error finding dsId of profile " + profileId, e);
64
			throw new DsmException(-1, "Error finding dsId of profile " + profileId, e);
65
		}
66
	}
67

    
68
	public String getDatasourceProfile(final String dsId) throws DsmException {
69
		try {
70
			return serviceLocator.getService(ISLookUpService.class)
71
					.getResourceProfileByQuery(
72
							"collection('/db/DRIVER/RepositoryServiceResources/RepositoryServiceResourceType')/*[.//EXTRA_FIELDS/FIELD[./key='OpenAireDataSourceId']/value/text() = '"
73
									+ dsId + "']");
74
		} catch (final Exception e) {
75
			return null;
76
		}
77
	}
78

    
79
	public boolean deleteProfile(final String dsId) throws DsmException {
80
		try {
81
			final SAXReader reader = new SAXReader();
82

    
83
			final String profile = getDatasourceProfile(dsId);
84

    
85
			if (profile != null) {
86
				final Document docOld = reader.read(new StringReader(profile));
87
				final String profId = docOld.valueOf("//RESOURCE_IDENTIFIER/@value");
88
				serviceLocator.getService(ISRegistryService.class).deleteProfile(profId);
89
			}
90
			return true;
91
		} catch (final Exception e) {
92
			log.error("Error deleting profile", e);
93
			throw new DsmException(-1, "Error deleting profile", e);
94
		}
95
	}
96

    
97
	public boolean regenerateProfile(final String dsId) throws DsmException {
98
		return regenerateProfile(dsId, serviceLocator.getService(ISRegistryService.class));
99
	}
100

    
101
	public boolean regenerateProfile(final String dsId, final ISRegistryService registry) throws DsmException {
102

    
103
		final Datasource<Organization<?>, Identity> ds = getDatasourceById(dsId);
104
		final List<Api<ApiParam>> apis = getApis(dsId);
105

    
106
		try {
107

    
108
			final String oldProfile = getDatasourceProfile(dsId);
109

    
110
			if (oldProfile != null) {
111
				final Document docOld = new SAXReader().read(new StringReader(oldProfile));
112
				final String profId = docOld.valueOf("//RESOURCE_IDENTIFIER/@value");
113
				final String profile = DatasourceFunctions.dsToProfile(ds, apis, profId);
114
				registry.updateProfile(profId, profile, REPOSITORY_SERVICE_RESOURCE_TYPE);
115
				log.info("Profile " + profId + " UPDATED for ds " + dsId);
116
			} else {
117
				final String profile = DatasourceFunctions.dsToProfile(ds, apis, "");
118
				final String profId = registry.registerProfile(profile);
119
				log.info("Valid Profile " + profId + " REGISTERED for ds " + dsId);
120
			}
121
			return true;
122
		} catch (final Exception e) {
123
			log.error("Error saving profile, id: " + dsId, e);
124
			throw new DsmException(-1, "Error regenerating profile", e);
125
		}
126
	}
127

    
128

    
129
	@Transactional(readOnly = true)
130
	public List<Map<String, Object>> searchSQL(final String sql, final Map<String, Object> sqlParams) throws DsmException {
131
		try {
132
			log.debug("Executing SQL: " + sql);
133
			return jdbcTemplate.queryForList(sql, sqlParams);
134
		} catch (final Exception e) {
135
			log.error("Error executing sql", e);
136

    
137
			throw new DsmException(-1, "Error obtaining datasources from db", e);
138
		}
139
	}
140

    
141

    
142
	@Transactional(readOnly = true)
143
	public List<Map<String, Object>> searchSQL(final Resource sqlResource, final Map<String, Object> sqlParams) throws DsmException {
144
		try {
145
			return searchSQL(IOUtils.toString(sqlResource.getInputStream()), sqlParams);
146
		} catch (final Exception e) {
147
			log.error("Error executing sql", e);
148
			throw new DsmException(-1, "Error obtaining datasources from db", e);
149
		}
150
	}
151

    
152

    
153
	@Transactional
154
	public void updateSQL(final String dsId, final String sql, final AfterSqlUpdate op, final Map<String, Object> sqlparams)
155
			throws DsmException {
156
		log.debug("Executing query SQL: " + sql);
157

    
158
		jdbcTemplate.update(sql, sqlparams);
159

    
160
		switch (op) {
161
		case DELETE_DS_PROFILE:
162
			deleteProfile(dsId);
163
			break;
164
		case UPDATE_DS_PROFILE:
165

    
166
			regenerateProfile(dsId, serviceLocator.getService(ISRegistryService.class));
167
			break;
168
		default:
169
			break;
170
		}
171

    
172
	}
173
	@Transactional
174
	public void updateSQL(final String dsId, final Resource sqlResource, final AfterSqlUpdate op, final Map<String, Object> sqlparams)
175
			throws DsmException {
176
		try {
177
			updateSQL(dsId, IOUtils.toString(sqlResource.getInputStream()), op, sqlparams);
178
		} catch (final Exception e) {
179
			log.error("Error in updateSQL", e);
180
			throw new DsmException(-1, "Error in updateSQL", e);
181
		}
182
	}
183

    
184
	@Transactional(readOnly = true)
185
	public Datasource<Organization<?>, Identity> getDatasourceById(final String id) throws DsmException {
186
		final List<Map<String, Object>> list = searchSQL(dsQuery, ImmutableMap.of("dsId", id));
187

    
188
		if (list.size() != 1) { throw new DsmException("Invalid number of ds with id: " + id); }
189

    
190
		final Datasource<Organization<?>, Identity> ds = DatasourceFunctions.mapToDatasource(list.get(0));
191
		ds.setIdentities(searchSQL(dsIdentitiesQuery, ImmutableMap.of("dsId", id))
192
				.stream()
193
				.map(DatasourceFunctions::mapToDsIdentity)
194
				.collect(Collectors.toSet()));
195
		ds.setOrganizations(searchSQL(dsOrganizationsQuery, ImmutableMap.of("dsId", id))
196
				.stream()
197
				.map(DatasourceFunctions::mapToDsOrganization)
198
				.collect(Collectors.toSet()));
199

    
200
		return ds;
201
	}
202

    
203

    
204
	@Transactional(readOnly = true)
205
	public List<Api<ApiParam>> getApis(final String dsId) throws DsmException {
206

    
207
		return searchSQL(listApisByDsId, ImmutableMap.of("dsId", dsId))
208
				.stream()
209
				.map(DatasourceFunctions::mapToApi)
210
				.collect(Collectors.toList());
211
	}
212

    
213
	@Transactional(readOnly = true)
214
	public boolean isDefinedParam(final String apiId, final String param) throws DsmException {
215
		return !searchSQL(isDefinedParamQuery, ImmutableMap.of("apiId", apiId, "param", param)).isEmpty();
216
	}
217

    
218
	public Date findNextScheduledExecution(final String dsId, final String ifaceId) throws DsmException {
219
		final String xquery = "/*[.//DATAPROVIDER/@interface='" + ifaceId + "' and .//SCHEDULING/@enabled='true']//CRON/text()";
220
		try {
221
			final String cronExpression = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(xquery);
222
			final CronExpression cron = new CronExpression(cronExpression);
223
			return cron.getNextValidTimeAfter(new Date());
224
		} catch (final ISLookUpDocumentNotFoundException e) {
225
			// When the value is not found a null value must be returned
226
			return null;
227
		} catch (final ISLookUpException e) {
228
			log.error("Error in xquery: " + xquery, e);
229
			throw new DsmException(-1, "Error in xquery: " + xquery, e);
230
		} catch (final ParseException e) {
231
			log.error("Error parsing cron expression", e);
232
			throw new DsmException(-1, "Error parsing cron expression", e);
233
		}
234
	}
235

    
236
	@Transactional(readOnly = true)
237
	public void regenerateProfiles() throws DsmException {
238
		final ISRegistryService registry = serviceLocator.getService(ISRegistryService.class);
239
		searchSQL("SELECT id FROM dsm_datasources", new HashMap<>()).stream()
240
				.map(m -> m.get("id").toString())
241
				.forEach(id -> {
242
					try {
243
						regenerateProfile(id, registry);
244
					} catch (final DsmException e) {
245
						log.error("Error regeneating profile: " + id, e);
246
					}
247
				});
248
	}
249

    
250
	public NamedParameterJdbcTemplate getJdbcTemplate() {
251
		return jdbcTemplate;
252
	}
253

    
254
	@Required
255
	public void setJdbcTemplate(final NamedParameterJdbcTemplate jdbcTemplate) {
256
		this.jdbcTemplate = jdbcTemplate;
257
	}
258

    
259
}
(2-2/5)