Project

General

Profile

1
package eu.dnetlib.openaire.exporter.project;
2

    
3
import java.io.BufferedOutputStream;
4
import java.io.IOException;
5
import java.io.OutputStream;
6
import java.nio.charset.Charset;
7
import java.sql.*;
8
import java.time.Duration;
9
import java.time.LocalDateTime;
10
import java.util.Arrays;
11
import java.util.List;
12
import java.util.Map;
13
import java.util.Set;
14
import java.util.zip.GZIPOutputStream;
15
import java.util.zip.ZipEntry;
16
import java.util.zip.ZipOutputStream;
17

    
18
import com.google.common.base.Joiner;
19
import com.google.common.base.Splitter;
20
import com.google.common.collect.Maps;
21
import com.google.common.collect.Sets;
22
import eu.dnetlib.OpenaireExporterConfig;
23
import eu.dnetlib.openaire.exporter.model.project.Project;
24
import eu.dnetlib.openaire.exporter.model.project.ProjectDetails;
25
import eu.dnetlib.openaire.exporter.model.project.ProjectTsv;
26
import eu.dnetlib.openaire.exporter.project.repository.ProjectTsvRepository;
27
import org.antlr.stringtemplate.StringTemplate;
28
import org.apache.commons.dbcp2.BasicDataSource;
29
import org.apache.commons.lang3.StringUtils;
30
import org.apache.commons.logging.Log;
31
import org.apache.commons.logging.LogFactory;
32
import org.springframework.beans.factory.annotation.Autowired;
33
import org.springframework.cache.annotation.CacheEvict;
34
import org.springframework.cache.annotation.Cacheable;
35
import org.springframework.stereotype.Component;
36

    
37
/**
38
 * Created by claudio on 20/09/16.
39
 */
40
@Component
41
public class JdbcApiDao {
42

    
43
	public static final Charset UTF8 = Charset.forName("UTF-8");
44

    
45
	private static final Log log = LogFactory.getLog(JdbcApiDao.class);
46

    
47
	@Autowired
48
	private OpenaireExporterConfig config;
49

    
50
	@Autowired
51
	private BasicDataSource projectApiDataSource;
52

    
53
	@Autowired
54
	private ProjectTsvRepository projectTsvRepository;
55

    
56
	@Cacheable("fundingpath-ids")
57
	public Map<String, String> readFundingpathIds() {
58

    
59
		log.debug("loading funding ids");
60
		final String sql = "SELECT id FROM fundingpaths";
61
		final Set<String> ids = Sets.newHashSet();
62
		try (final Connection con = getConn(); final PreparedStatement stm = getStm(sql, con); final ResultSet rs = stm.executeQuery()) {
63
			while (rs.next()) {
64
				ids.add(rs.getString("id"));
65
			}
66
		} catch (SQLException e) {
67
			throw new RuntimeException(e);
68
		}
69
		log.debug(String.format("loaded %s funding ids", ids.size()));
70

    
71
		final Map<String, String> res = Maps.newHashMap();
72
		final Splitter sp = Splitter.on("::").trimResults();
73
		ids.stream()
74
				.filter(s -> sp.splitToList(s).size() < 3)
75
				.forEach(s -> res.put(StringUtils.substringAfterLast(s, "::").toUpperCase(), s));
76

    
77
		res.put("FP7", "ec__________::EC::FP7");
78
		res.put("H2020", "ec__________::EC::H2020");
79
		log.debug(String.format("processed %s funding ids", res.size()));
80
		res.forEach((k,v) -> log.debug(String.format("%s : '%s'", k, v)));
81
		return res;
82
	}
83

    
84
	@CacheEvict(cacheNames = "fundingpath-ids", allEntries = true)
85
	public void dropCache() {
86
		log.info("dropped project exporter fundingpath ids cache");
87
	}
88

    
89
	public void processProjectDetails(final OutputStream outputStream, String format, Boolean compress) throws IOException {
90
		final OutputStream out = getOutputStream(new BufferedOutputStream(outputStream), compress);
91
		try {
92
			final String sql = "SELECT * FROM project_details";
93
			try (final Connection con = getConn(); final PreparedStatement stm = getStm(sql, con); final ResultSet rs = stm.executeQuery()) {
94
				while (rs.next()) {
95
					try {
96
						switch (format) {
97
						case "csv":
98
							out.write(getProjectDetails(rs).asCSV().getBytes(UTF8));
99
							break;
100
						case "json":
101
							out.write(getProjectDetails(rs).asJson().getBytes(UTF8));
102
							break;
103
						}
104
					} catch (IOException e) {
105
						throw new RuntimeException(e);
106
					}
107
				}
108
			} catch (SQLException e) {
109
				throw new RuntimeException(e);
110
			}
111
		} finally {
112
			if (out instanceof GZIPOutputStream) {
113
				((GZIPOutputStream) out).finish();
114
			}
115
			out.close();
116
		}
117
	}
118

    
119
	private OutputStream getOutputStream(final OutputStream outputStream, final Boolean compress) throws IOException {
120
		if (compress != null && compress) {
121
			return new GZIPOutputStream(outputStream);
122
		}
123
		return outputStream;
124
	}
125

    
126
	private ProjectDetails getProjectDetails(final ResultSet rs) throws SQLException {
127
		return new ProjectDetails()
128
				.setProjectId(rs.getString("projectid"))
129
				.setAcronym(rs.getString("acronym"))
130
				.setCode(rs.getString("code"))
131
				.setOptional1(rs.getString("optional1"))
132
				.setOptional2(rs.getString("optional2"))
133
				.setJsonextrainfo(rs.getString("jsonextrainfo"))
134
				.setFundingPath(asList(rs.getArray("fundingpath")));
135
	}
136

    
137
	private String[] asList(final Array value) throws SQLException {
138
		if (value != null) {
139
			final List<Object> list = Arrays.asList((Object[]) value.getArray());
140
			return list.stream()
141
					.map(o -> o != null ? o.toString() : null)
142
					.toArray(String[]::new);
143
		}
144
		return new String[0];
145
	}
146

    
147
	public void processTsvRequest(final ZipOutputStream out, final Boolean article293, final String fundingPrefix, final String filename) throws IOException {
148
		out.putNextEntry(new ZipEntry(filename));
149
		writeTsvLine(out, Splitter.on(",").trimResults().splitToList(config.getProject().getTsvFields()));
150
		queryForTsv(fundingPrefix, article293).forEach(p -> {
151
			try {
152
				writeTsvLine(out, p.asList());
153
			} catch (IOException e) {
154
				throw new RuntimeException(e);
155
			}
156
		});
157
	}
158

    
159
	private void writeTsvLine(final ZipOutputStream out, final List<String> s) throws IOException {
160
		out.write(Joiner.on('\t').useForNull("").join(s).getBytes(UTF8));
161
		out.write('\n');
162
	}
163

    
164
	private Iterable<ProjectTsv> queryForTsv(final String fundingPrefix, final Boolean article293) {
165
		log.debug(String.format("fundingPrefix:'%s' and oa_mandate_for_datasets:'%s'", fundingPrefix, article293));
166
		if (article293 != null) {
167
			return projectTsvRepository.findByFundingpathidStartingWithAndOaMandateForDatasetsOrderByAcronym(fundingPrefix, article293);
168
		} else {
169
			return projectTsvRepository.findByFundingpathidStartingWithOrderByAcronym(fundingPrefix);
170
		}
171
	}
172

    
173
	public void streamProjects(final String sql, final OutputStream out,
174
			final String head, final StringTemplate projectTemplate, final String tail,
175
			final ValueCleaner cleaner) throws IOException, SQLException {
176

    
177
		if (log.isDebugEnabled()) {
178
			log.debug("Thread " + Thread.currentThread().getId() + " begin");
179
		}
180
		final LocalDateTime start = LocalDateTime.now();
181

    
182
		if (StringUtils.isNotBlank(head)) {
183
			out.write(head.getBytes(UTF8));
184
		}
185

    
186
		try (final Connection con = getConn(); final PreparedStatement stm = getStm(sql, con); final ResultSet rs = stm.executeQuery()) {
187
			while (rs.next()) {
188
				final Project p = new Project()
189
						.setFunder(cleaner.clean(rs.getString("funder")))
190
						.setJurisdiction(cleaner.clean(rs.getString("jurisdiction")))
191
						.setFundingpathid(cleaner.clean(rs.getString("fundingpathid")))
192
						.setAcronym(cleaner.clean(rs.getString("acronym")))
193
						.setTitle(cleaner.clean(rs.getString("title")))
194
						.setCode(cleaner.clean(rs.getString("code")))
195
						.setStartdate(cleaner.clean(rs.getString("startdate")))
196
						.setEnddate(cleaner.clean(rs.getString("enddate")));
197

    
198
				projectTemplate.reset();
199
				projectTemplate.setAttribute("p", p);
200
				out.write(projectTemplate.toString().getBytes(UTF8));
201
			}
202
			if (StringUtils.isNotBlank(tail)) {
203
				out.write(tail.getBytes(UTF8));
204
			}
205
			final LocalDateTime end = LocalDateTime.now();
206
			if (log.isDebugEnabled()) {
207
				log.debug("Thread " + Thread.currentThread().getId() + " ends, took: " + Duration.between(start, end).toMillis() + " ms");
208
			}
209
		}
210
	}
211

    
212
	private Connection getConn() throws SQLException {
213
		final Connection conn = projectApiDataSource.getConnection();
214
		conn.setAutoCommit(false);
215
		return conn;
216
	}
217

    
218
	private PreparedStatement getStm(final String sql, final Connection con) throws SQLException {
219
		final PreparedStatement stm = con.prepareStatement(sql);
220
		stm.setFetchSize(config.getJdbc().getMaxRows());
221
		return stm;
222
	}
223

    
224
}
(1-1/5)