Project

General

Profile

1
package eu.dnetlib.openaire.exporter.project;
2

    
3
import java.io.BufferedOutputStream;
4
import java.io.IOException;
5
import java.io.OutputStream;
6
import java.nio.charset.Charset;
7
import java.sql.Connection;
8
import java.sql.PreparedStatement;
9
import java.sql.ResultSet;
10
import java.sql.SQLException;
11
import java.time.Duration;
12
import java.time.LocalDateTime;
13
import java.util.List;
14
import java.util.concurrent.ScheduledThreadPoolExecutor;
15
import java.util.concurrent.atomic.AtomicInteger;
16
import java.util.zip.GZIPOutputStream;
17
import java.util.zip.ZipEntry;
18
import java.util.zip.ZipOutputStream;
19

    
20
import javax.annotation.PostConstruct;
21

    
22
import com.google.common.base.Joiner;
23
import com.google.common.base.Splitter;
24
import com.google.common.util.concurrent.ListeningExecutorService;
25
import com.google.common.util.concurrent.MoreExecutors;
26
import com.google.common.util.concurrent.ThreadFactoryBuilder;
27
import eu.dnetlib.OpenaireExporterConfig;
28
import eu.dnetlib.openaire.exporter.datasource.ApiException;
29
import eu.dnetlib.openaire.exporter.model.project.Project;
30
import eu.dnetlib.openaire.exporter.model.project.ProjectTsv;
31
import eu.dnetlib.openaire.exporter.project.repository.ProjectDetailsRepository;
32
import eu.dnetlib.openaire.exporter.project.repository.ProjectTsvRepository;
33
import org.antlr.stringtemplate.StringTemplate;
34
import org.apache.commons.dbcp2.BasicDataSource;
35
import org.apache.commons.lang3.StringUtils;
36
import org.apache.commons.logging.Log;
37
import org.apache.commons.logging.LogFactory;
38
import org.springframework.beans.factory.annotation.Autowired;
39
import org.springframework.stereotype.Component;
40

    
41
/**
42
 * Created by claudio on 20/09/16.
43
 */
44
@Component
45
public class JdbcApiDao {
46

    
47
	public static final Charset UTF8 = Charset.forName("UTF-8");
48

    
49
	private static final Log log = LogFactory.getLog(JdbcApiDao.class);
50

    
51
	@Autowired
52
	private OpenaireExporterConfig config;
53

    
54
	@Autowired
55
	private BasicDataSource projectApiDataSource;
56

    
57
	@Autowired
58
	private ProjectDetailsRepository projectDetailsRepository;
59

    
60
	@Autowired
61
	private ProjectTsvRepository projectTsvRepository;
62

    
63
	private ListeningExecutorService service;
64

    
65
	@PostConstruct
66
	public void init() {
67
		service = MoreExecutors.listeningDecorator(
68
				new ScheduledThreadPoolExecutor(config.getRequestWorkers(),
69
						new ThreadFactoryBuilder().setNameFormat("project-exporter-dao-%d").build()));
70
	}
71

    
72
	public void processProjectDetails(final OutputStream outputStream, String format, Boolean compress) throws IOException {
73
		service.execute(() -> {
74
			try {
75
				final OutputStream out = getOutputStream(outputStream, compress);
76
				try {
77
					final AtomicInteger i = new AtomicInteger(0);
78
					projectDetailsRepository.findAll().forEach(p -> {
79
							try {
80
								switch (format) {
81
								case "csv":
82
									out.write(p.asCSV().getBytes(UTF8));
83
									break;
84
								case "json":
85
									out.write(p.asJson().getBytes(UTF8));
86
									break;
87
								}
88
								i.incrementAndGet();
89
								if (i.intValue() % config.getProject().getGzipFlushSize() == 0) {
90
									log.debug("flushing output stream");
91
									out.flush();
92
								}
93
							} catch (IOException e) {
94
								throw new RuntimeException(e);
95
							}
96
						}
97
					);
98
				} finally {
99
					if (out instanceof GZIPOutputStream) {
100
						((GZIPOutputStream) out).finish();
101
					}
102
					out.close();
103
				}
104
			} catch (IOException e) {
105
				throw new RuntimeException(e);
106
			}
107
		});
108
	}
109

    
110
	private OutputStream getOutputStream(final OutputStream outputStream, final Boolean compress) throws IOException {
111
		if (compress != null && compress) {
112
			return new GZIPOutputStream(outputStream);
113
		}
114
		return outputStream;
115
	}
116

    
117
	public void processTsvRequest(final OutputStream outputStream, final Boolean article293, final String fundingPrefix, final String filename) throws ApiException {
118
		service.execute(() -> {
119
			try(final ZipOutputStream out = new ZipOutputStream(new BufferedOutputStream(outputStream))) {
120
				out.putNextEntry(new ZipEntry(filename));
121
				writeTsvLine(out, Splitter.on(",").trimResults().splitToList(config.getProject().getTsvFields()));
122
				queryForTsv(fundingPrefix, article293).forEach(p -> {
123
					try {
124
						writeTsvLine(out, p.asList());
125
					} catch (IOException e) {
126
						throw new RuntimeException(e);
127
					}
128
				});
129
			} catch (Throwable e) {
130
				throw new RuntimeException("Error processing the request", e);
131
			}
132
		});
133
	}
134

    
135
	private void writeTsvLine(final ZipOutputStream out, final List<String> s) throws IOException {
136
		out.write(Joiner.on('\t').useForNull("").join(s).getBytes(UTF8));
137
		out.write('\n');
138
	}
139

    
140
	private Iterable<ProjectTsv> queryForTsv(final String fundingPrefix, final Boolean article293) {
141
		log.debug(String.format("fundingPrefix:'%s' and article293:'%s'", fundingPrefix, article293));
142
		if (article293 != null) {
143
			return projectTsvRepository.findByFundingpathidStartingWithAndOaMandateForDatasetsOrderByAcronym(fundingPrefix, article293);
144
		} else {
145
			return projectTsvRepository.findByFundingpathidStartingWithOrderByAcronym(fundingPrefix);
146
		}
147
	}
148

    
149
	public void streamProjects(final String sql, final OutputStream out,
150
			final String head, final StringTemplate projectTemplate, final String tail,
151
			final ValueCleaner cleaner) throws IOException, SQLException {
152
		service.execute(() -> {
153
			try {
154

    
155
				if (log.isDebugEnabled()) {
156
					log.debug("Thread " + Thread.currentThread().getId() + " begin");
157
				}
158
				final LocalDateTime start = LocalDateTime.now();
159

    
160
				if (StringUtils.isNotBlank(head)) {
161
					out.write(head.getBytes(UTF8));
162
				}
163

    
164
				try (final Connection con = getConn(); final PreparedStatement stm = getStm(sql, con); final ResultSet rs = stm.executeQuery()) {
165
					while (rs.next()) {
166
						final Project p = new Project()
167
								.setFunder(cleaner.clean(rs.getString("funder")))
168
								.setJurisdiction(cleaner.clean(rs.getString("jurisdiction")))
169
								.setFundingpathid(cleaner.clean(rs.getString("fundingpathid")))
170
								.setAcronym(cleaner.clean(rs.getString("acronym")))
171
								.setTitle(cleaner.clean(rs.getString("title")))
172
								.setCode(cleaner.clean(rs.getString("code")))
173
								.setStartdate(cleaner.clean(rs.getString("startdate")))
174
								.setEnddate(cleaner.clean(rs.getString("enddate")));
175

    
176
						projectTemplate.reset();
177
						projectTemplate.setAttribute("p", p);
178
						out.write(projectTemplate.toString().getBytes(UTF8));
179
					}
180
					if (StringUtils.isNotBlank(tail)) {
181
						out.write(tail.getBytes(UTF8));
182
					}
183
					final LocalDateTime end = LocalDateTime.now();
184
					if (log.isDebugEnabled()) {
185
						log.debug("Thread " + Thread.currentThread().getId() + " ends, took: " + Duration.between(start, end));
186
					}
187
				} finally {
188
					out.close();
189
				}
190
			} catch (IOException | SQLException e) {
191
				log.error(e);
192
				throw new RuntimeException(e);
193
			}
194
		});
195
	}
196

    
197
	private Connection getConn() throws SQLException {
198
		final Connection conn = projectApiDataSource.getConnection();
199
		conn.setAutoCommit(false);
200
		return conn;
201
	}
202

    
203
	private PreparedStatement getStm(final String sql, final Connection con) throws SQLException {
204
		final PreparedStatement stm = con.prepareStatement(sql);
205
		stm.setFetchSize(config.getJdbc().getMaxRows());
206
		return stm;
207
	}
208

    
209
}
(1-1/5)