Revision 48459
Added by Claudio Atzori almost 7 years ago
JdbcApiDao.java | ||
---|---|---|
11 | 11 |
import java.time.Duration; |
12 | 12 |
import java.time.LocalDateTime; |
13 | 13 |
import java.util.List; |
14 |
import java.util.Map; |
|
15 |
import java.util.Set; |
|
14 | 16 |
import java.util.concurrent.atomic.AtomicInteger; |
15 | 17 |
import java.util.zip.GZIPOutputStream; |
16 | 18 |
import java.util.zip.ZipEntry; |
... | ... | |
18 | 20 |
|
19 | 21 |
import com.google.common.base.Joiner; |
20 | 22 |
import com.google.common.base.Splitter; |
23 |
import com.google.common.collect.Maps; |
|
24 |
import com.google.common.collect.Sets; |
|
21 | 25 |
import eu.dnetlib.OpenaireExporterConfig; |
22 | 26 |
import eu.dnetlib.openaire.exporter.datasource.ApiException; |
23 | 27 |
import eu.dnetlib.openaire.exporter.model.project.Project; |
... | ... | |
54 | 58 |
@Autowired |
55 | 59 |
private ProjectTsvRepository projectTsvRepository; |
56 | 60 |
|
61 |
|
|
62 |
public Map<String, String> readFundingpathIds() { |
|
63 |
|
|
64 |
log.debug("loading funding ids"); |
|
65 |
final String sql = "SELECT id FROM fundingpaths"; |
|
66 |
final Set<String> ids = Sets.newHashSet(); |
|
67 |
try (final Connection con = getConn(); final PreparedStatement stm = getStm(sql, con); final ResultSet rs = stm.executeQuery()) { |
|
68 |
while (rs.next()) { |
|
69 |
ids.add(rs.getString("id")); |
|
70 |
} |
|
71 |
} catch (SQLException e) { |
|
72 |
throw new RuntimeException(e); |
|
73 |
} |
|
74 |
log.debug(String.format("loaded %s funding ids", ids.size())); |
|
75 |
|
|
76 |
final Map<String, String> res = Maps.newHashMap(); |
|
77 |
final Splitter sp = Splitter.on("::").trimResults(); |
|
78 |
ids.stream() |
|
79 |
.filter(s -> sp.splitToList(s).size() < 3) |
|
80 |
.forEach(s -> res.put(StringUtils.substringAfterLast(s, "::").toUpperCase(), s)); |
|
81 |
|
|
82 |
res.put("FP7", "ec__________::EC::FP7"); |
|
83 |
res.put("H2020", "ec__________::EC::H2020"); |
|
84 |
log.debug(String.format("processed %s funding ids", res.size())); |
|
85 |
res.forEach((k,v) -> log.debug(String.format("%s : '%s'", k, v))); |
|
86 |
return res; |
|
87 |
} |
|
88 |
|
|
57 | 89 |
public void processProjectDetails(final OutputStream outputStream, String format, Boolean compress) throws IOException { |
90 |
final OutputStream out = getOutputStream(outputStream, compress); |
|
58 | 91 |
try { |
59 |
final OutputStream out = getOutputStream(outputStream, compress); |
|
60 |
try { |
|
61 |
final AtomicInteger i = new AtomicInteger(0); |
|
62 |
projectDetailsRepository.findAll().forEach(p -> { |
|
63 |
try { |
|
64 |
switch (format) { |
|
65 |
case "csv": |
|
66 |
out.write(p.asCSV().getBytes(UTF8)); |
|
67 |
break; |
|
68 |
case "json": |
|
69 |
out.write(p.asJson().getBytes(UTF8)); |
|
70 |
break; |
|
71 |
} |
|
72 |
i.incrementAndGet(); |
|
73 |
if (i.intValue() % config.getProject().getGzipFlushSize() == 0) { |
|
74 |
log.debug("flushing output stream"); |
|
75 |
out.flush(); |
|
76 |
} |
|
77 |
} catch (IOException e) { |
|
78 |
throw new RuntimeException(e); |
|
92 |
final AtomicInteger i = new AtomicInteger(0); |
|
93 |
projectDetailsRepository.findAll().forEach(p -> { |
|
94 |
try { |
|
95 |
switch (format) { |
|
96 |
case "csv": |
|
97 |
out.write(p.asCSV().getBytes(UTF8)); |
|
98 |
break; |
|
99 |
case "json": |
|
100 |
out.write(p.asJson().getBytes(UTF8)); |
|
101 |
break; |
|
79 | 102 |
} |
103 |
i.incrementAndGet(); |
|
104 |
if (i.intValue() % config.getProject().getGzipFlushSize() == 0) { |
|
105 |
log.debug("flushing output stream"); |
|
106 |
out.flush(); |
|
107 |
} |
|
108 |
} catch (IOException e) { |
|
109 |
throw new RuntimeException(e); |
|
80 | 110 |
} |
81 |
); |
|
82 |
} finally { |
|
83 |
if (out instanceof GZIPOutputStream) { |
|
84 |
((GZIPOutputStream) out).finish(); |
|
85 | 111 |
} |
86 |
out.close(); |
|
112 |
); |
|
113 |
} finally { |
|
114 |
if (out instanceof GZIPOutputStream) { |
|
115 |
((GZIPOutputStream) out).finish(); |
|
87 | 116 |
} |
88 |
} catch (IOException e) { |
|
89 |
throw new RuntimeException(e); |
|
117 |
out.close(); |
|
90 | 118 |
} |
91 | 119 |
} |
92 | 120 |
|
... | ... | |
131 | 159 |
final String head, final StringTemplate projectTemplate, final String tail, |
132 | 160 |
final ValueCleaner cleaner) throws IOException, SQLException { |
133 | 161 |
|
134 |
try { |
|
135 |
if (log.isDebugEnabled()) { |
|
136 |
log.debug("Thread " + Thread.currentThread().getId() + " begin"); |
|
137 |
} |
|
138 |
final LocalDateTime start = LocalDateTime.now(); |
|
162 |
if (log.isDebugEnabled()) { |
|
163 |
log.debug("Thread " + Thread.currentThread().getId() + " begin"); |
|
164 |
} |
|
165 |
final LocalDateTime start = LocalDateTime.now(); |
|
139 | 166 |
|
140 |
if (StringUtils.isNotBlank(head)) {
|
|
141 |
out.write(head.getBytes(UTF8));
|
|
142 |
}
|
|
167 |
if (StringUtils.isNotBlank(head)) { |
|
168 |
out.write(head.getBytes(UTF8)); |
|
169 |
} |
|
143 | 170 |
|
144 |
try (final Connection con = getConn(); final PreparedStatement stm = getStm(sql, con); final ResultSet rs = stm.executeQuery()) {
|
|
145 |
while (rs.next()) {
|
|
146 |
final Project p = new Project()
|
|
147 |
.setFunder(cleaner.clean(rs.getString("funder")))
|
|
148 |
.setJurisdiction(cleaner.clean(rs.getString("jurisdiction")))
|
|
149 |
.setFundingpathid(cleaner.clean(rs.getString("fundingpathid")))
|
|
150 |
.setAcronym(cleaner.clean(rs.getString("acronym")))
|
|
151 |
.setTitle(cleaner.clean(rs.getString("title")))
|
|
152 |
.setCode(cleaner.clean(rs.getString("code")))
|
|
153 |
.setStartdate(cleaner.clean(rs.getString("startdate")))
|
|
154 |
.setEnddate(cleaner.clean(rs.getString("enddate")));
|
|
171 |
try (final Connection con = getConn(); final PreparedStatement stm = getStm(sql, con); final ResultSet rs = stm.executeQuery()) { |
|
172 |
while (rs.next()) { |
|
173 |
final Project p = new Project() |
|
174 |
.setFunder(cleaner.clean(rs.getString("funder"))) |
|
175 |
.setJurisdiction(cleaner.clean(rs.getString("jurisdiction"))) |
|
176 |
.setFundingpathid(cleaner.clean(rs.getString("fundingpathid"))) |
|
177 |
.setAcronym(cleaner.clean(rs.getString("acronym"))) |
|
178 |
.setTitle(cleaner.clean(rs.getString("title"))) |
|
179 |
.setCode(cleaner.clean(rs.getString("code"))) |
|
180 |
.setStartdate(cleaner.clean(rs.getString("startdate"))) |
|
181 |
.setEnddate(cleaner.clean(rs.getString("enddate"))); |
|
155 | 182 |
|
156 |
projectTemplate.reset(); |
|
157 |
projectTemplate.setAttribute("p", p); |
|
158 |
out.write(projectTemplate.toString().getBytes(UTF8)); |
|
159 |
} |
|
160 |
if (StringUtils.isNotBlank(tail)) { |
|
161 |
out.write(tail.getBytes(UTF8)); |
|
162 |
} |
|
163 |
final LocalDateTime end = LocalDateTime.now(); |
|
164 |
if (log.isDebugEnabled()) { |
|
165 |
log.debug("Thread " + Thread.currentThread().getId() + " ends, took: " + Duration.between(start, end).toMillis() + " ms"); |
|
166 |
} |
|
167 |
} finally { |
|
168 |
out.close(); |
|
169 |
if (log.isDebugEnabled()) { |
|
170 |
log.debug("Thread " + Thread.currentThread().getId() + " closed output stream"); |
|
171 |
} |
|
183 |
projectTemplate.reset(); |
|
184 |
projectTemplate.setAttribute("p", p); |
|
185 |
out.write(projectTemplate.toString().getBytes(UTF8)); |
|
172 | 186 |
} |
173 |
} catch (Throwable e) { |
|
174 |
log.error(e); |
|
175 |
throw new RuntimeException(e); |
|
187 |
if (StringUtils.isNotBlank(tail)) { |
|
188 |
out.write(tail.getBytes(UTF8)); |
|
189 |
} |
|
190 |
final LocalDateTime end = LocalDateTime.now(); |
|
191 |
if (log.isDebugEnabled()) { |
|
192 |
log.debug("Thread " + Thread.currentThread().getId() + " ends, took: " + Duration.between(start, end).toMillis() + " ms"); |
|
193 |
} |
|
194 |
} finally { |
|
195 |
out.close(); |
|
196 |
if (log.isDebugEnabled()) { |
|
197 |
log.debug("Thread " + Thread.currentThread().getId() + " closed output stream"); |
|
198 |
} |
|
176 | 199 |
} |
177 | 200 |
} |
178 | 201 |
|
Also available in: Unified diff
removed deprecated stuff