Project

General

Profile

1
package eu.dnetlib.openaire.exporter;
2

    
3
import java.io.IOException;
4
import java.io.OutputStream;
5
import java.nio.charset.Charset;
6
import java.sql.*;
7
import java.time.Duration;
8
import java.time.LocalDateTime;
9
import java.util.Arrays;
10
import java.util.List;
11
import java.util.zip.GZIPOutputStream;
12
import java.util.zip.ZipOutputStream;
13

    
14
import com.google.common.base.Function;
15
import com.google.common.base.Joiner;
16
import com.google.common.base.Splitter;
17
import com.google.common.collect.Iterables;
18
import com.google.common.collect.Lists;
19
import eu.dnetlib.openaire.exporter.tmpModel.Project;
20
import eu.dnetlib.openaire.exporter.tmpModel.ProjectDetail;
21
import org.antlr.stringtemplate.StringTemplate;
22
import org.apache.commons.dbcp2.BasicDataSource;
23
import org.apache.commons.lang3.StringUtils;
24
import org.apache.commons.logging.Log;
25
import org.apache.commons.logging.LogFactory;
26
import org.springframework.beans.factory.annotation.Autowired;
27
import org.springframework.beans.factory.annotation.Value;
28
import org.springframework.stereotype.Component;
29

    
30
/**
31
 * Created by claudio on 20/09/16.
32
 */
33
@Component
34
public class JdbcApiDao {
35

    
36
	public static final Charset UTF8 = Charset.forName("UTF-8");
37
	private static final Log log = LogFactory.getLog(JdbcApiDao.class);
38
	@Value("${openaire.exporter.jdbc.maxrows}")
39
	private int maxRows;
40

    
41
	@Value("${openaire.exporter.projectdetails.flushsize}")
42
	private int gzipFlushSize;
43

    
44
	@Value("${openaire.exporter.projects2tsv.fields}")
45
	private String tsvFields;
46

    
47
	@Autowired
48
	private BasicDataSource apiDataSource;
49

    
50
	public void streamProjects(final String sql, final OutputStream out,
51
			final String head, final StringTemplate projectTemplate, final String tail,
52
			final ValueCleaner cleaner) throws IOException, SQLException {
53

    
54
		if (log.isDebugEnabled()) {
55
			log.debug("Thread " + Thread.currentThread().getId() + " begin");
56
		}
57
		final LocalDateTime start = LocalDateTime.now();
58

    
59
		if (StringUtils.isNotBlank(head)) {
60
			out.write(head.getBytes(UTF8));
61
		}
62

    
63
		try(final Connection con = getConn(); final PreparedStatement stm = getStm(sql, con); final ResultSet rs = stm.executeQuery()) {
64
			while(rs.next()){
65
				final Project p = new Project()
66
						.setFunder(cleaner.clean(rs.getString("funder")))
67
						.setJurisdiction(cleaner.clean(rs.getString("jurisdiction")))
68
						.setFundingpathid(cleaner.clean(rs.getString("fundingpathid")))
69
						.setAcronym(cleaner.clean(rs.getString("acronym")))
70
						.setTitle(cleaner.clean(rs.getString("title")))
71
						.setCode(cleaner.clean(rs.getString("code")))
72
						.setStartdate(cleaner.clean(rs.getString("startdate")))
73
						.setEnddate(cleaner.clean(rs.getString("enddate")));
74

    
75
				projectTemplate.reset();
76
				projectTemplate.setAttribute("p", p);
77
				out.write(projectTemplate.toString().getBytes(UTF8));
78
			}
79
			if (StringUtils.isNotBlank(tail)) {
80
				out.write(tail.getBytes(UTF8));
81
			}
82
			final LocalDateTime end = LocalDateTime.now();
83
			if (log.isDebugEnabled()) {
84
				log.debug("Thread " + Thread.currentThread().getId() + " ends, took: " + Duration.between(start, end));
85
			}
86
		} finally {
87
			out.close();
88
		}
89
	}
90

    
91
	public void streamProjectsTSV(final String sql, final ZipOutputStream out) throws IOException, SQLException {
92

    
93
		if (log.isDebugEnabled()) {
94
			log.debug("Thread " + Thread.currentThread().getId() + " begin");
95
		}
96
		final LocalDateTime start = LocalDateTime.now();
97
		final List<String> fields = Lists.newArrayList(Splitter.on(",").omitEmptyStrings().trimResults().split(tsvFields));
98
		writeCSVLine(out, fields);
99
		try(final Connection con = getConn(); final PreparedStatement stm = getStm(sql, con); final ResultSet rs = stm.executeQuery()) {
100
			while(rs.next()) {
101
				final Project p = new Project()
102
						.setCode(rs.getString("code"))
103
						.setAcronym(rs.getString("acronym"))
104
						.setTitle(rs.getString("title"))
105
						.setCallIdentifier(rs.getString("call_identifier"))
106
						.setStartdate(rs.getString("startdate"))
107
						.setEnddate(rs.getString("enddate"))
108
						.setOaMandateForPublications(rs.getBoolean("oa_mandate_for_publications"))
109
						.setOaMandateForDatasets(rs.getBoolean("oa_mandate_for_datasets"))
110
						.setDescription(rs.getString("description"))
111
						.setOrgLegalname(rs.getString("legalname"))
112
						.setOrgCountry(rs.getString("country"))
113
						.setOrgRole(rs.getString("role"))
114
						.setFirstname(rs.getString("firstname"))
115
						.setSecondnames(rs.getString("secondnames"))
116
						.setEmail(rs.getString("email"));
117

    
118
				writeCSVLine(out, p.asList());
119
			}
120
			out.closeEntry();
121
			final LocalDateTime end = LocalDateTime.now();
122
			if (log.isDebugEnabled()) {
123
				log.debug("Thread " + Thread.currentThread().getId() + " ends, took: " + Duration.between(start, end));
124
			}
125
		} finally {
126
			out.close();
127
		}
128
	}
129

    
130
	private void writeCSVLine(final ZipOutputStream out, final List<String> list) throws IOException {
131
		out.write(Joiner.on('\t').useForNull("").join(list).getBytes(UTF8));
132
		out.write('\n');
133
	}
134

    
135

    
136
	public void streamProjectDetails(final String sql, final OutputStream out, final String format) throws SQLException, IOException {
137
		if (log.isDebugEnabled()) {
138
			log.debug("Thread " + Thread.currentThread().getId() + " begin");
139
		}
140
		final LocalDateTime start = LocalDateTime.now();
141
		int i = 0;
142
		try(final Connection con = getConn(); final PreparedStatement stm = getStm(sql, con); final ResultSet rs = stm.executeQuery()) {
143
			while (rs.next()) {
144
				final ProjectDetail p = getProjectDetail(rs);
145

    
146
				switch (format) {
147
				case "csv":
148
					out.write(p.asCSV().getBytes(UTF8));
149
					break;
150
				case "json":
151
					out.write(p.asJson().getBytes(UTF8));
152
					break;
153
				}
154
				if (++i % gzipFlushSize == 0) {
155
					log.debug("flushing output stream");
156
					out.flush();
157
				}
158
			}
159
			final LocalDateTime end = LocalDateTime.now();
160
			if (log.isDebugEnabled()) {
161
				log.debug("Thread " + Thread.currentThread().getId() + " ends, took: " + Duration.between(start, end));
162
			}
163
		} finally {
164
			if (out instanceof GZIPOutputStream) {
165
				((GZIPOutputStream) out).finish();
166
			}
167
			out.close();
168
		}
169
	}
170

    
171
	private Connection getConn() throws SQLException {
172
		final Connection conn = apiDataSource.getConnection();
173
		conn.setAutoCommit(false);
174
		return conn;
175
	}
176

    
177
	private PreparedStatement getStm(final String sql, final Connection con) throws SQLException {
178
		final PreparedStatement stm = con.prepareStatement(sql);
179
		stm.setFetchSize(maxRows);
180
		return stm;
181
	}
182

    
183
	private ProjectDetail getProjectDetail(final ResultSet rs) throws SQLException {
184
		return new ProjectDetail()
185
				.setProjectId(rs.getString("projectid"))
186
				.setAcronym(rs.getString("acronym"))
187
				.setCode(rs.getString("code"))
188
				.setJsonextrainfo(rs.getString("jsonextrainfo"))
189
				.setFundingPath(asList(rs.getArray("fundingpath")));
190
	}
191

    
192
	private List<String> asList(final Array value) throws SQLException {
193
		if (value != null) {
194
			final List<Object> list = Arrays.asList((Object[]) value.getArray());
195
			return Lists.newArrayList(Iterables.transform(list, new Function<Object, String>() {
196
				@Override
197
				public String apply(final Object o) {
198
					return o != null ? o.toString() : null;
199

    
200
				}
201
			}));
202
		}
203
		return Lists.newArrayList();
204
	}
205

    
206
}
(1-1/5)