Revision 57524
Added by Antonis Lempesis over 4 years ago
modules/dnet-openaire-stats-export-wf/branches/monetdb/trunk/dnet-openaire-stats/src/main/java/eu/dnetlib/data/mapreduce/hbase/statsExport/utils/ContextTransformer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.statsExport.utils; |
|
2 |
|
|
3 |
import org.apache.commons.io.IOUtils; |
|
4 |
import javax.xml.transform.Transformer; |
|
5 |
import javax.xml.transform.TransformerFactory; |
|
6 |
import javax.xml.transform.stream.StreamResult; |
|
7 |
import javax.xml.transform.stream.StreamSource; |
|
8 |
import java.io.ByteArrayInputStream; |
|
9 |
import java.io.ByteArrayOutputStream; |
|
10 |
import java.io.InputStream; |
|
11 |
import java.nio.charset.StandardCharsets; |
|
12 |
|
|
13 |
public class ContextTransformer { |
|
14 |
|
|
15 |
public String transformXSL(String xml) throws Exception { |
|
16 |
Transformer transformer; |
|
17 |
TransformerFactory tFactory = TransformerFactory.newInstance(); |
|
18 |
|
|
19 |
if (xml == null) { |
|
20 |
throw new IllegalArgumentException("Input xml should not be null!"); |
|
21 |
} |
|
22 |
|
|
23 |
InputStream inputStream = null; |
|
24 |
ByteArrayInputStream readerStream = null; |
|
25 |
ByteArrayOutputStream writerStream = null; |
|
26 |
|
|
27 |
try { |
|
28 |
inputStream = ClassLoader.getSystemResourceAsStream("eu/dnetlib/data/mapreduce/hbase/statsExport/" + "context.xsl"); |
|
29 |
transformer = tFactory.newTransformer(new StreamSource(inputStream)); |
|
30 |
|
|
31 |
readerStream = new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); |
|
32 |
|
|
33 |
writerStream = new ByteArrayOutputStream(); |
|
34 |
transformer.transform(new StreamSource(readerStream), new StreamResult(writerStream)); |
|
35 |
|
|
36 |
return writerStream.toString("UTF8"); |
|
37 |
} finally { |
|
38 |
IOUtils.closeQuietly(inputStream); |
|
39 |
IOUtils.closeQuietly(readerStream); |
|
40 |
IOUtils.closeQuietly(writerStream); |
|
41 |
} |
|
42 |
} |
|
43 |
} |
modules/dnet-openaire-stats-export-wf/branches/monetdb/trunk/dnet-openaire-stats/src/main/java/eu/dnetlib/data/mapreduce/hbase/statsExport/utils/FundingParser.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.statsExport.utils; |
|
2 |
|
|
3 |
import org.apache.log4j.Logger; |
|
4 |
|
|
5 |
/** |
|
6 |
* Created by envy17 j110ea on 4/5/2015. |
|
7 |
*/ |
|
8 |
|
|
9 |
//Extract funder info for projects |
|
10 |
|
|
11 |
public class FundingParser { |
|
12 |
|
|
13 |
private String DELIM; |
|
14 |
private String ENCLOSING; |
|
15 |
|
|
16 |
public FundingParser(String DELIM, String ENCLOSING) { |
|
17 |
this.DELIM = DELIM; |
|
18 |
this.ENCLOSING = ENCLOSING; |
|
19 |
} |
|
20 |
|
|
21 |
public String getFundingLevel(String funding_level, int level) { |
|
22 |
|
|
23 |
if (funding_level.isEmpty()) { |
|
24 |
return ENCLOSING + " " + ENCLOSING + DELIM; |
|
25 |
} |
|
26 |
|
|
27 |
if (!funding_level.contains("<funding_level_" + level + ">")) { |
|
28 |
return ENCLOSING + " " + ENCLOSING + DELIM; |
|
29 |
} |
|
30 |
|
|
31 |
String[] split = funding_level.split("<funding_level_" + level + ">"); |
|
32 |
|
|
33 |
funding_level = split[1]; |
|
34 |
|
|
35 |
split = funding_level.split("<name>"); |
|
36 |
funding_level = split[1]; |
|
37 |
|
|
38 |
funding_level = funding_level.substring(0, funding_level.indexOf("</name>")); |
|
39 |
funding_level = funding_level.replaceAll("\"", ""); |
|
40 |
funding_level = funding_level.replaceAll("/>", ""); |
|
41 |
funding_level = funding_level.replaceAll("<", ""); |
|
42 |
funding_level = funding_level.replaceAll("&", ""); |
|
43 |
|
|
44 |
if (level == 1) { |
|
45 |
if (funding_level.equalsIgnoreCase("SP1")) { |
|
46 |
funding_level = "SP1-Cooperation"; |
|
47 |
} else if (funding_level.equalsIgnoreCase("SP2")) { |
|
48 |
funding_level = "SP2-Ideas"; |
|
49 |
} |
|
50 |
|
|
51 |
if (funding_level.equalsIgnoreCase("SP3")) { |
|
52 |
funding_level = "SP3-People"; |
|
53 |
} else if (funding_level.equalsIgnoreCase("SP4")) { |
|
54 |
funding_level = "SP4-Capacities"; |
|
55 |
} else if (funding_level.equalsIgnoreCase("SP5")) { |
|
56 |
funding_level = "SP5-Euratom"; |
|
57 |
} |
|
58 |
} |
|
59 |
|
|
60 |
funding_level = funding_level.replaceAll(">", ""); |
|
61 |
funding_level = funding_level.replaceAll("</", ""); |
|
62 |
funding_level = funding_level.replace(DELIM, " "); |
|
63 |
funding_level = funding_level.replace(ENCLOSING, " "); |
|
64 |
|
|
65 |
return ENCLOSING + funding_level + ENCLOSING + DELIM; |
|
66 |
} |
|
67 |
|
|
68 |
public String getFundingInfo(String buff) { |
|
69 |
return getFunder(buff) + getFundingLevel(buff, 0) + (getFundingLevel(buff, 1) + getFundingLevel(buff, 2) |
|
70 |
+ getFundingLevel(buff, 3)); |
|
71 |
} |
|
72 |
|
|
73 |
public String getFunder(String buff) { |
|
74 |
|
|
75 |
if (buff.isEmpty()) { |
|
76 |
return ENCLOSING + " " + ENCLOSING + DELIM; |
|
77 |
|
|
78 |
} |
|
79 |
if (!buff.contains("<funder>")) { |
|
80 |
return ENCLOSING + " " + ENCLOSING + DELIM; |
|
81 |
} |
|
82 |
|
|
83 |
String[] split = buff.split("<funder>"); |
|
84 |
String funder = split[1]; |
|
85 |
split = funder.split("<name>"); |
|
86 |
|
|
87 |
funder = split[1]; |
|
88 |
funder = funder.substring(0, funder.indexOf("</name>")); |
|
89 |
funder = funder.replaceAll(">", ""); |
|
90 |
funder = funder.replaceAll("</", ""); |
|
91 |
funder = funder.replaceAll("\"", ""); |
|
92 |
funder = funder.replaceAll("&", ""); |
|
93 |
funder = funder.replace(ENCLOSING, " "); |
|
94 |
|
|
95 |
return ENCLOSING + funder + ENCLOSING + DELIM; |
|
96 |
} |
|
97 |
} |
|
98 |
|
modules/dnet-openaire-stats-export-wf/branches/monetdb/trunk/dnet-openaire-stats/src/main/java/eu/dnetlib/data/mapreduce/hbase/statsExport/utils/Serializer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.statsExport.utils; |
|
2 |
|
|
3 |
import com.google.common.collect.Multimap; |
|
4 |
|
|
5 |
import eu.dnetlib.data.mapreduce.util.LicenseComparator; |
|
6 |
import eu.dnetlib.data.proto.DatasourceProtos.Datasource; |
|
7 |
import eu.dnetlib.data.proto.DatasourceProtos.Datasource.Metadata; |
|
8 |
import eu.dnetlib.data.proto.FieldTypeProtos; |
|
9 |
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier; |
|
10 |
import eu.dnetlib.data.proto.FieldTypeProtos.StringField; |
|
11 |
import eu.dnetlib.data.proto.FieldTypeProtos.StructuredProperty; |
|
12 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
13 |
import eu.dnetlib.data.proto.OafProtos.OafEntity; |
|
14 |
import eu.dnetlib.data.proto.OafProtos.OafRel; |
|
15 |
import eu.dnetlib.data.proto.OrganizationProtos.Organization; |
|
16 |
import eu.dnetlib.data.proto.ProjectProtos.Project; |
|
17 |
import eu.dnetlib.data.proto.RelTypeProtos.RelType; |
|
18 |
import eu.dnetlib.data.proto.ResultProtos.Result; |
|
19 |
import eu.dnetlib.data.proto.ResultProtos.Result.Instance; |
|
20 |
import org.apache.log4j.Logger; |
|
21 |
|
|
22 |
import java.text.DateFormat; |
|
23 |
import java.text.ParseException; |
|
24 |
import java.text.SimpleDateFormat; |
|
25 |
import java.util.Date; |
|
26 |
import java.util.List; |
|
27 |
|
|
28 |
import org.w3c.dom.Element; |
|
29 |
import org.w3c.dom.NodeList; |
|
30 |
import org.xml.sax.InputSource; |
|
31 |
import com.sun.org.apache.xerces.internal.parsers.DOMParser; |
|
32 |
import org.w3c.dom.Document; |
|
33 |
|
|
34 |
/** |
|
35 |
* @author eri Simple serializer that parses input Oaf Protos and prepares them |
|
36 |
* for sqoop |
|
37 |
*/ |
|
38 |
public class Serializer { |
|
39 |
private static Logger logger = Logger.getLogger(Serializer.class); |
|
40 |
|
|
41 |
private String DELIM; |
|
42 |
private String ENCLOSING; |
|
43 |
|
|
44 |
public Serializer(String DELIM, String ENCLOSING) { |
|
45 |
this.DELIM = DELIM; |
|
46 |
this.ENCLOSING = ENCLOSING; |
|
47 |
} |
|
48 |
|
|
49 |
public String serialize(Oaf oaf) { |
|
50 |
|
|
51 |
switch (oaf.getKind()) { |
|
52 |
case entity: |
|
53 |
OafEntity valueEntity = oaf.getEntity(); |
|
54 |
|
|
55 |
switch (valueEntity.getType()) { |
|
56 |
case datasource: |
|
57 |
|
|
58 |
return buildDatasource(oaf); |
|
59 |
|
|
60 |
case organization: |
|
61 |
|
|
62 |
return buildOrganization(oaf); |
|
63 |
|
|
64 |
case project: |
|
65 |
|
|
66 |
return buildProject(oaf); |
|
67 |
case result: |
|
68 |
|
|
69 |
return buildResult(oaf); |
|
70 |
default: |
|
71 |
break; |
|
72 |
} |
|
73 |
break; |
|
74 |
case relation: |
|
75 |
return buildRel(oaf.getRel()); |
|
76 |
} |
|
77 |
|
|
78 |
return null; |
|
79 |
} |
|
80 |
|
|
81 |
public String serialize(OafRel oaf) { |
|
82 |
|
|
83 |
switch (oaf.getRelType()) { |
|
84 |
case resultProject: |
|
85 |
return getResultProject(oaf); |
|
86 |
default: |
|
87 |
return buildRel(oaf); |
|
88 |
} |
|
89 |
} |
|
90 |
|
|
91 |
private String buildRel(OafRel Rel) { |
|
92 |
return cleanId(Rel.getTarget()) + DELIM; |
|
93 |
} |
|
94 |
|
|
95 |
public void extractRelations(Oaf oaf, Multimap<String, String> relations) { |
|
96 |
OafEntity valueEntity = oaf.getEntity(); |
|
97 |
getOriginalId(valueEntity, relations); |
|
98 |
|
|
99 |
switch (valueEntity.getType()) { |
|
100 |
case datasource: |
|
101 |
getDatasourceLanguages(valueEntity, relations); |
|
102 |
break; |
|
103 |
case result: |
|
104 |
getResultTopics(valueEntity, relations); |
|
105 |
getResultLanguages(valueEntity, relations); |
|
106 |
getResultClassifications(valueEntity, relations); |
|
107 |
getResultDatasources(valueEntity, relations); |
|
108 |
getResultConcepts(valueEntity, relations); |
|
109 |
getResultDois(valueEntity, relations); |
|
110 |
getResultCitations(valueEntity, relations); |
|
111 |
break; |
|
112 |
|
|
113 |
case project: |
|
114 |
getProjectKeywords(valueEntity, relations); |
|
115 |
getProjectSubjects(valueEntity, relations); |
|
116 |
break; |
|
117 |
} |
|
118 |
|
|
119 |
} |
|
120 |
|
|
121 |
private void getOriginalId(OafEntity oafEntity, Multimap<String, String> relations) { |
|
122 |
|
|
123 |
String relName = oafEntity.getType().toString().toLowerCase() + "Oid"; |
|
124 |
for (String oid : oafEntity.getOriginalIdList()) { |
|
125 |
relations.put(relName, cleanId(oid)); |
|
126 |
} |
|
127 |
|
|
128 |
} |
|
129 |
|
|
130 |
private void getProjectKeywords(OafEntity oafEntity, Multimap<String, String> relations) { |
|
131 |
relations.put("projectKeyword", getStringField(oafEntity.getProject().getMetadata().getKeywords().getValue())); |
|
132 |
|
|
133 |
} |
|
134 |
|
|
135 |
private void getProjectSubjects(OafEntity oafEntity, Multimap<String, String> relations) { |
|
136 |
for (StructuredProperty subj : oafEntity.getProject().getMetadata().getSubjectsList()) { |
|
137 |
relations.put("projectSubject", getStringField(subj.getValue())); |
|
138 |
} |
|
139 |
} |
|
140 |
|
|
141 |
private String getResultProject(OafRel oaf) { |
|
142 |
StringBuilder buff = new StringBuilder(); |
|
143 |
buff.append(cleanId(oaf.getTarget())).append(DELIM); |
|
144 |
// is declared as int!!! |
|
145 |
long diff = DATEDIFF(oaf.getResultProject().getOutcome().getRelMetadata().getEnddate(), oaf.getResultProject().getOutcome().getRelMetadata().getStartdate()); |
|
146 |
|
|
147 |
if (diff < 0) { |
|
148 |
diff = 0; |
|
149 |
} |
|
150 |
|
|
151 |
buff.append(getNumericField(String.valueOf(diff))); |
|
152 |
return buff.toString(); |
|
153 |
} |
|
154 |
|
|
155 |
|
|
156 |
private void getDatasourceLanguages(OafEntity valueEntity, Multimap<String, String> rels) { |
|
157 |
Datasource d = valueEntity.getDatasource(); |
|
158 |
Metadata metadata = d.getMetadata(); |
|
159 |
|
|
160 |
for (StringField lang : metadata.getOdlanguagesList()) { |
|
161 |
rels.put("datasourceLanguage", getStringField(lang.getValue())); |
|
162 |
} |
|
163 |
} |
|
164 |
|
|
165 |
private void getResultLanguages(OafEntity valueEntity, Multimap<String, String> rels) { |
|
166 |
|
|
167 |
Result d = valueEntity.getResult(); |
|
168 |
Result.Metadata metadata = d.getMetadata(); |
|
169 |
if (metadata.getLanguage().getClassname() != null && !metadata.getLanguage().getClassname().isEmpty()) { |
|
170 |
rels.put("resultLanguage", getStringField(metadata.getLanguage().getClassname())); |
|
171 |
} |
|
172 |
|
|
173 |
} |
|
174 |
|
|
175 |
private void getResultDois(OafEntity valueEntity, Multimap<String, String> rels) { |
|
176 |
|
|
177 |
for (StructuredProperty pid : valueEntity.getPidList()) { |
|
178 |
rels.put("resultPid", getStringField(pid.getQualifier().getClassname()) + getStringField(pid.getValue())); |
|
179 |
} |
|
180 |
} |
|
181 |
|
|
182 |
private void getResultClassifications(OafEntity valueEntity, Multimap<String, String> rels) { |
|
183 |
|
|
184 |
Result result = valueEntity.getResult(); |
|
185 |
|
|
186 |
for (Instance instance : (result.getInstanceList())) { |
|
187 |
String classification = instance.getInstancetype().getClassname(); |
|
188 |
|
|
189 |
if (classification != null && !classification.isEmpty()) { |
|
190 |
rels.put("resultClassification", getStringField(instance.getInstancetype().getClassname())); |
|
191 |
} |
|
192 |
} |
|
193 |
} |
|
194 |
|
|
195 |
private void getResultConcepts(OafEntity valueEntity, Multimap<String, String> rels) { |
|
196 |
Result result = valueEntity.getResult(); |
|
197 |
|
|
198 |
for (Result.Context context : result.getMetadata().getContextList()) { |
|
199 |
rels.put("resultConcept", cleanId(context.getId())); |
|
200 |
} |
|
201 |
} |
|
202 |
|
|
203 |
private void getResultDatasources(OafEntity valueEntity, Multimap<String, String> rels) { |
|
204 |
Result result = valueEntity.getResult(); |
|
205 |
|
|
206 |
// hosted by |
|
207 |
for (Instance instance : (result.getInstanceList())) { |
|
208 |
String hostedBy = instance.getHostedby().getKey(); |
|
209 |
|
|
210 |
if (hostedBy != null && !hostedBy.isEmpty()) { |
|
211 |
rels.put("resultDatasource", cleanId(hostedBy) + DELIM); |
|
212 |
} |
|
213 |
} |
|
214 |
|
|
215 |
// collected from |
|
216 |
for (FieldTypeProtos.KeyValue collectedFromValue : (valueEntity.getCollectedfromList())) { |
|
217 |
String collectedFrom = collectedFromValue.getKey(); |
|
218 |
|
|
219 |
if (collectedFrom != null && !collectedFrom.isEmpty()) { |
|
220 |
rels.put("resultDatasource", cleanId(collectedFrom) + DELIM); |
|
221 |
} |
|
222 |
} |
|
223 |
} |
|
224 |
|
|
225 |
private void getResultTopics(OafEntity valueEntity, Multimap<String, String> rels) { |
|
226 |
Result d = valueEntity.getResult(); |
|
227 |
Result.Metadata metadata = d.getMetadata(); |
|
228 |
List<StructuredProperty> Topics = metadata.getSubjectList(); |
|
229 |
|
|
230 |
for (StructuredProperty topic : Topics) { |
|
231 |
rels.put("resultTopic", getStringField(topic.getValue())); |
|
232 |
} |
|
233 |
} |
|
234 |
|
|
235 |
|
|
236 |
private void getResultCitations(OafEntity oafEntity, Multimap<String, String> rels) { |
|
237 |
for (FieldTypeProtos.ExtraInfo extraInfo : oafEntity.getExtraInfoList()) { |
|
238 |
if (extraInfo.getName().equals("result citations")) { |
|
239 |
DOMParser parser = new DOMParser(); |
|
240 |
try { |
|
241 |
parser.parse(new InputSource(new java.io.StringReader(extraInfo.getValue()))); |
|
242 |
Document doc = parser.getDocument(); |
|
243 |
doc.getDocumentElement().normalize(); |
|
244 |
|
|
245 |
NodeList citations = doc.getElementsByTagName("citation"); |
|
246 |
for (int temp = 0; temp < citations.getLength(); temp++) { |
|
247 |
Element citation = (Element) citations.item(temp); |
|
248 |
NodeList ids = citation.getElementsByTagName("id"); |
|
249 |
for(int temp1 = 0; temp1 < ids.getLength(); temp1++){ |
|
250 |
Element id = (Element) ids.item(temp1); |
|
251 |
if(id.getAttribute("type").equals("openaire")){ |
|
252 |
//System.out.println(id.getAttribute("value")); |
|
253 |
rels.put("resultCitation", id.getAttribute("value")); |
|
254 |
} |
|
255 |
} |
|
256 |
} |
|
257 |
} catch (Exception e) { |
|
258 |
logger.error("Error getting result citations", e); |
|
259 |
} |
|
260 |
} |
|
261 |
} |
|
262 |
} |
|
263 |
|
|
264 |
private String buildDatasource(Oaf oaf) { |
|
265 |
Metadata metadata = oaf.getEntity().getDatasource().getMetadata(); |
|
266 |
StringBuilder buff = new StringBuilder(); |
|
267 |
|
|
268 |
// name |
|
269 |
if (metadata.getOfficialname().getValue().equalsIgnoreCase("unknown")) { |
|
270 |
buff.append(getStringField("Unknown Repository")); |
|
271 |
} else { |
|
272 |
buff.append(getStringField(metadata.getOfficialname().getValue())); |
|
273 |
} |
|
274 |
|
|
275 |
// type |
|
276 |
if (metadata.hasDatasourcetype()) { |
|
277 |
buff.append(getStringField(metadata.getDatasourcetype().getClassname().replaceFirst(".*::", ""))); |
|
278 |
} |
|
279 |
|
|
280 |
// compatibility, |
|
281 |
buff.append(getStringField(metadata.getOpenairecompatibility().getClassname())); |
|
282 |
|
|
283 |
// latitude |
|
284 |
buff.append(getLatLongField(metadata.getLatitude().getValue())); |
|
285 |
|
|
286 |
// longtitude |
|
287 |
buff.append(getLatLongField(metadata.getLongitude().getValue())); |
|
288 |
|
|
289 |
// dateofvalidation, |
|
290 |
buff.append(getStringDateField(metadata.getDateofvalidation().getValue())); |
|
291 |
|
|
292 |
// yearofvalidation, |
|
293 |
buff.append(getYearInt(metadata.getDateofvalidation().getValue())); |
|
294 |
|
|
295 |
//harvested |
|
296 |
buff.append(getStringField("false")); |
|
297 |
|
|
298 |
//piwik_id |
|
299 |
String piwik_id = ""; |
|
300 |
for (String oid : oaf.getEntity().getOriginalIdList()) { |
|
301 |
if (oid.contains("piwik")) { |
|
302 |
piwik_id = oid.split(":")[1]; |
|
303 |
break; |
|
304 |
} |
|
305 |
} |
|
306 |
buff.append(getStringField(cleanNumber(piwik_id))); |
|
307 |
|
|
308 |
buff.append(getStringField(metadata.getWebsiteurl().getValue())); |
|
309 |
|
|
310 |
return buff.toString(); |
|
311 |
|
|
312 |
} |
|
313 |
|
|
314 |
private String buildOrganization(Oaf oaf) { |
|
315 |
|
|
316 |
StringBuilder buff = new StringBuilder(); |
|
317 |
Organization.Metadata metadata = oaf.getEntity().getOrganization().getMetadata(); |
|
318 |
|
|
319 |
// `name`, |
|
320 |
buff.append(getStringField(metadata.getLegalname().getValue())); |
|
321 |
|
|
322 |
// `country`, |
|
323 |
buff.append(getStringField(metadata.getCountry().getClassid())); |
|
324 |
|
|
325 |
return buff.toString(); |
|
326 |
} |
|
327 |
|
|
328 |
private String buildResult(Oaf oaf) { |
|
329 |
StringBuilder buff = new StringBuilder(); |
|
330 |
|
|
331 |
Result.Metadata metadata = oaf.getEntity().getResult().getMetadata(); |
|
332 |
|
|
333 |
// originalId |
|
334 |
buff.append(getId(oaf)).append(DELIM); |
|
335 |
|
|
336 |
String titleString = ""; |
|
337 |
|
|
338 |
if (metadata.getTitleList().size() > 0) { |
|
339 |
StructuredProperty title = metadata.getTitleList().get(0); |
|
340 |
|
|
341 |
titleString = title.getValue().replaceAll("\\s+", " "); |
|
342 |
titleString = titleString.replaceAll("\n", " "); |
|
343 |
} |
|
344 |
|
|
345 |
// pubtitle |
|
346 |
buff.append(getStringField(titleString)); |
|
347 |
|
|
348 |
// publisher |
|
349 |
buff.append(getStringField(metadata.getPublisher().getValue())); |
|
350 |
|
|
351 |
// journal |
|
352 |
buff.append(getStringField(metadata.getJournal().getName())); //#null#! |
|
353 |
|
|
354 |
// year |
|
355 |
buff.append(getYearInt(metadata.getDateofacceptance().getValue())); |
|
356 |
|
|
357 |
// date |
|
358 |
buff.append(getStringDateField(metadata.getDateofacceptance().getValue())); |
|
359 |
|
|
360 |
// bestlicense |
|
361 |
buff.append(getStringField(getBestLicense(oaf.getEntity().getResult()))); |
|
362 |
|
|
363 |
// type |
|
364 |
buff.append(getStringField(metadata.getResulttype().getClassname())); |
|
365 |
|
|
366 |
// embargo_end_date |
|
367 |
buff.append(getStringDateField(metadata.getEmbargoenddate().getValue())); |
|
368 |
|
|
369 |
// `authors`, |
|
370 |
int authors = metadata.getAuthorCount(); |
|
371 |
String delayed = "no"; |
|
372 |
|
|
373 |
for (OafRel rel : oaf.getEntity().getCachedRelList()) { |
|
374 |
if (rel.getRelType().equals(RelType.resultProject)) |
|
375 |
// remember : in result Project, first id is project, second is result. |
|
376 |
{ |
|
377 |
String daysfromend = getYearDifferenceInteger(rel.getResultProject().getOutcome().getRelMetadata().getEnddate(), |
|
378 |
rel.getResultProject().getOutcome().getRelMetadata().getStartdate()); |
|
379 |
if (Integer.parseInt(daysfromend) > 0) { |
|
380 |
delayed = "yes"; |
|
381 |
} |
|
382 |
} |
|
383 |
} |
|
384 |
|
|
385 |
// `delayed`, |
|
386 |
buff.append(getStringField(delayed)); |
|
387 |
//authors |
|
388 |
buff.append(getNumericField(String.valueOf(authors))); |
|
389 |
|
|
390 |
String authorNames = ""; |
|
391 |
for (FieldTypeProtos.Author author:metadata.getAuthorList()) { |
|
392 |
authorNames += author.getFullname() + ";"; |
|
393 |
} |
|
394 |
|
|
395 |
buff.append(getStringField(authorNames)); |
|
396 |
|
|
397 |
String sources = ""; |
|
398 |
|
|
399 |
for (Instance instance : (oaf.getEntity().getResult().getInstanceList())) { |
|
400 |
List<String> urls = instance.getUrlList(); |
|
401 |
for (String url : urls) { |
|
402 |
sources += cleanUrl(url) + " ;"; |
|
403 |
} |
|
404 |
} |
|
405 |
|
|
406 |
//sources |
|
407 |
sources = ENCLOSING + sources + ENCLOSING + DELIM; |
|
408 |
|
|
409 |
buff.append(sources); |
|
410 |
|
|
411 |
boolean hasAbstract = false; |
|
412 |
for (StringField desc:metadata.getDescriptionList()) { |
|
413 |
if (desc != null && desc.getValue() != null && !desc.getValue().trim().isEmpty()) |
|
414 |
hasAbstract = true; |
|
415 |
} |
|
416 |
|
|
417 |
buff.append(getStringField(Boolean.toString(hasAbstract))); |
|
418 |
|
|
419 |
return buff.toString(); |
|
420 |
|
|
421 |
} |
|
422 |
|
|
423 |
private String getBestLicense(Result result) { |
|
424 |
Qualifier bestLicense = null; |
|
425 |
LicenseComparator lc = new LicenseComparator(); |
|
426 |
for (Instance instance : (result.getInstanceList())) { |
|
427 |
if (lc.compare(bestLicense, instance.getAccessright()) > 0) { |
|
428 |
bestLicense = instance.getAccessright(); |
|
429 |
} |
|
430 |
} |
|
431 |
if (bestLicense != null) { |
|
432 |
return bestLicense.getClassname(); |
|
433 |
} else { |
|
434 |
return ""; |
|
435 |
} |
|
436 |
} |
|
437 |
|
|
438 |
private String buildProject(Oaf oaf) { |
|
439 |
|
|
440 |
FundingParser fundingParser = new FundingParser(DELIM, ENCLOSING); |
|
441 |
StringBuilder buff = new StringBuilder(); |
|
442 |
Project.Metadata metadata = oaf.getEntity().getProject().getMetadata(); |
|
443 |
|
|
444 |
// `acronym`, |
|
445 |
String acronym = metadata.getAcronym().getValue(); |
|
446 |
if (acronym.equalsIgnoreCase("UNKNOWN")) { |
|
447 |
acronym = metadata.getTitle().getValue(); |
|
448 |
} |
|
449 |
buff.append(getStringField(acronym)); |
|
450 |
|
|
451 |
//title |
|
452 |
buff.append(getStringField(metadata.getTitle().getValue())); |
|
453 |
|
|
454 |
//funding_lvl |
|
455 |
List<StringField> fundList = metadata.getFundingtreeList(); |
|
456 |
if (!fundList.isEmpty()) // `funding_lvl0`, |
|
457 |
{ |
|
458 |
//funder + 3 funding levels |
|
459 |
buff.append(fundingParser.getFundingInfo(fundList.get(0).getValue())); |
|
460 |
} else { |
|
461 |
buff.append(fundingParser.getFundingInfo("")); |
|
462 |
} |
|
463 |
|
|
464 |
//sc39 |
|
465 |
String sc39 = metadata.getEcsc39().getValue(); |
|
466 |
if (sc39.equalsIgnoreCase("true") || sc39.equalsIgnoreCase("t") || sc39.contains("yes")) { |
|
467 |
sc39 = "yes"; |
|
468 |
} else if (sc39.equalsIgnoreCase("false") || sc39.equalsIgnoreCase("f") || sc39.contains("no")) { |
|
469 |
sc39 = "no"; |
|
470 |
} |
|
471 |
buff.append(getStringField(sc39)); |
|
472 |
|
|
473 |
//project_type |
|
474 |
buff.append(getStringField(metadata.getContracttype().getClassid())); |
|
475 |
|
|
476 |
// start_year |
|
477 |
buff.append(getYearInt(metadata.getStartdate().getValue())); |
|
478 |
|
|
479 |
// end_year |
|
480 |
buff.append(getYearInt(metadata.getEnddate().getValue())); |
|
481 |
|
|
482 |
// duration enddate-startdate |
|
483 |
buff.append(getYearDifferenceInteger(metadata.getEnddate().getValue(), metadata.getStartdate().getValue())); |
|
484 |
|
|
485 |
// haspubs |
|
486 |
buff.append(getStringField("no")); |
|
487 |
|
|
488 |
// numpubs |
|
489 |
buff.append(getNumericField("0")); |
|
490 |
|
|
491 |
// enddate |
|
492 |
buff.append(getStringDateField(metadata.getEnddate().getValue())); |
|
493 |
|
|
494 |
// startdate |
|
495 |
buff.append(getStringDateField(metadata.getStartdate().getValue())); |
|
496 |
|
|
497 |
// `daysforlastpub`, |
|
498 |
buff.append(getNumericField("")); |
|
499 |
|
|
500 |
// `delayedpubs`, |
|
501 |
buff.append(getNumericField("")); |
|
502 |
|
|
503 |
//call identifier |
|
504 |
buff.append(getStringField(metadata.getCallidentifier().getValue())); |
|
505 |
|
|
506 |
//code |
|
507 |
buff.append(getStringField(metadata.getCode().getValue())); |
|
508 |
|
|
509 |
return buff.toString(); |
|
510 |
} |
|
511 |
|
|
512 |
|
|
513 |
private String getYearDifferenceInteger(String enddate, String startdate) { |
|
514 |
|
|
515 |
if (enddate != null && !enddate.isEmpty() && startdate != null && !startdate.isEmpty()) { |
|
516 |
|
|
517 |
String[] split = startdate.split("-"); |
|
518 |
|
|
519 |
if (split.length == 0) { |
|
520 |
return ENCLOSING + "0" + ENCLOSING + DELIM; |
|
521 |
} |
|
522 |
|
|
523 |
int Startdate = Integer.parseInt(split[0]); |
|
524 |
|
|
525 |
split = enddate.split("-"); |
|
526 |
|
|
527 |
if (split.length == 0) { |
|
528 |
return ENCLOSING + "0" + ENCLOSING + DELIM; |
|
529 |
} |
|
530 |
|
|
531 |
int Enddate = Integer.parseInt(split[0]); |
|
532 |
|
|
533 |
int diff = Enddate - Startdate; |
|
534 |
|
|
535 |
return ENCLOSING + diff + ENCLOSING + DELIM; |
|
536 |
|
|
537 |
} |
|
538 |
|
|
539 |
return ENCLOSING + "0" + ENCLOSING + DELIM; |
|
540 |
} |
|
541 |
|
|
542 |
private String getYearInt(String data) { |
|
543 |
if (data == null || data.isEmpty() || data.equals("-1")) { |
|
544 |
return ENCLOSING + "0" + ENCLOSING + DELIM; |
|
545 |
} |
|
546 |
|
|
547 |
String[] split = data.split("-"); |
|
548 |
|
|
549 |
if (split.length == 0) { |
|
550 |
return ENCLOSING + "0" + ENCLOSING + DELIM; |
|
551 |
} |
|
552 |
|
|
553 |
String year = split[0]; |
|
554 |
|
|
555 |
year = cleanNumber(year); |
|
556 |
|
|
557 |
if (year == null || year.isEmpty()) year = "0"; |
|
558 |
|
|
559 |
return ENCLOSING + year + ENCLOSING + DELIM; |
|
560 |
|
|
561 |
} |
|
562 |
|
|
563 |
private String cleanNumber(String number) { |
|
564 |
number = number.replaceAll("[^A-Za-z0-9:,_]", ""); |
|
565 |
return number; |
|
566 |
} |
|
567 |
|
|
568 |
private String getLatLongField(String data) { |
|
569 |
|
|
570 |
if (data == null || data.isEmpty()) |
|
571 |
return ENCLOSING + "null" + ENCLOSING + DELIM; |
|
572 |
|
|
573 |
return ENCLOSING + data.replaceAll("[^-0-9.]+", "") + ENCLOSING + DELIM; |
|
574 |
|
|
575 |
} |
|
576 |
|
|
577 |
private String getStringField(String data) { |
|
578 |
|
|
579 |
if (data == null || data.isEmpty()) |
|
580 |
return ENCLOSING + "null" + ENCLOSING + DELIM; |
|
581 |
|
|
582 |
return ENCLOSING + clean(data) + ENCLOSING + DELIM; |
|
583 |
|
|
584 |
} |
|
585 |
|
|
586 |
private String getStringDateField(String data) { |
|
587 |
if (data == null || data.isEmpty() || data.equals("-1")) { |
|
588 |
return ENCLOSING + "0" + ENCLOSING + DELIM; |
|
589 |
} else { |
|
590 |
data = data.replace(DELIM, " "); |
|
591 |
data = data.replace(ENCLOSING, " "); |
|
592 |
data = data.replaceAll("\\r\\n|\\r|\\n", ""); |
|
593 |
try { |
|
594 |
DateFormat format = new SimpleDateFormat("yyyy-MM-dd"); |
|
595 |
data = format.format(format.parse(data)); |
|
596 |
return ENCLOSING + data + ENCLOSING + DELIM; |
|
597 |
} catch (ParseException e) { |
|
598 |
return ENCLOSING + "0" + ENCLOSING + DELIM; |
|
599 |
} |
|
600 |
} |
|
601 |
} |
|
602 |
|
|
603 |
private String getNumericField(String data) { |
|
604 |
if (data == null || data.isEmpty()) { |
|
605 |
return ENCLOSING + "0" + ENCLOSING + DELIM; |
|
606 |
} else { |
|
607 |
return ENCLOSING + data + ENCLOSING + DELIM; |
|
608 |
} |
|
609 |
} |
|
610 |
|
|
611 |
public String getId(Oaf oaf) { |
|
612 |
switch (oaf.getKind()) { |
|
613 |
case entity: |
|
614 |
return cleanId(oaf.getEntity().getId()); |
|
615 |
case relation: |
|
616 |
return cleanId(oaf.getRel().getSource()); |
|
617 |
|
|
618 |
} |
|
619 |
return null; |
|
620 |
|
|
621 |
} |
|
622 |
|
|
623 |
public String getId(OafRel relOaf) { |
|
624 |
return cleanId(relOaf.getSource()); |
|
625 |
} |
|
626 |
|
|
627 |
private String clean(String value) { |
|
628 |
if (value != null) { |
|
629 |
|
|
630 |
value = value.replaceAll("[\"\\r\\\\;]", ""); |
|
631 |
value = value.replace(DELIM, " "); |
|
632 |
value = value.replace(ENCLOSING, " "); |
|
633 |
value = value.replaceAll("\\r\\n|\\r|\\n", " "); |
|
634 |
|
|
635 |
return value; |
|
636 |
} else { |
|
637 |
return ""; |
|
638 |
} |
|
639 |
|
|
640 |
} |
|
641 |
|
|
642 |
private String cleanId(String value) { |
|
643 |
if (value != null) { |
|
644 |
// DO NOT CHANGE THIS: IT REMOVES ID PREFIX ( "5|datacite____::" to "datacite____::") |
|
645 |
// AND REPLACES OCCURRENCES OF DELIM CHARS IN DATA |
|
646 |
value = value.replaceFirst(".*\\|", ""); |
|
647 |
value = value.replace("\n", ""); |
|
648 |
value = value.replace(ENCLOSING, ""); |
|
649 |
value = value.replace(DELIM, ""); |
|
650 |
value = value.replace("\"", ""); |
|
651 |
value = value.replace("«", " "); |
|
652 |
value = value.replace("»", " "); |
|
653 |
} |
|
654 |
|
|
655 |
return ENCLOSING + value + ENCLOSING; |
|
656 |
} |
|
657 |
|
|
658 |
private String cleanUrl(String value) { |
|
659 |
value = value.replace(DELIM, " "); |
|
660 |
value = value.replace(ENCLOSING, " "); |
|
661 |
value = value.replace(" ", ""); |
|
662 |
value = value.replace("\n", ""); |
|
663 |
value = value.replace("\r", ""); |
|
664 |
value = value.replace("\\n", ""); |
|
665 |
value = value.replace("\\r", ""); |
|
666 |
return value; |
|
667 |
} |
|
668 |
|
|
669 |
private long DATEDIFF(String startDate, String endDate) { |
|
670 |
long MILLISECS_PER_DAY = 24 * 60 * 60 * 1000L; |
|
671 |
long days; |
|
672 |
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); // "dd/MM/yyyy HH:mm:ss"); |
|
673 |
// <startdate>2011-09-01</startdate> |
|
674 |
// <enddate>2015-08-31</enddate> |
|
675 |
Date dateIni; |
|
676 |
Date dateFin; |
|
677 |
|
|
678 |
if (startDate == null || startDate.isEmpty() || endDate == null || endDate.isEmpty()) { |
|
679 |
return 0; |
|
680 |
} |
|
681 |
try { |
|
682 |
dateIni = format.parse(startDate); |
|
683 |
dateFin = format.parse(endDate); |
|
684 |
days = (dateFin.getTime() - dateIni.getTime()) / MILLISECS_PER_DAY; |
|
685 |
} catch (Exception e) { |
|
686 |
|
|
687 |
return 0; |
|
688 |
} |
|
689 |
|
|
690 |
return days; |
|
691 |
} |
|
692 |
} |
|
693 | 0 |
modules/dnet-openaire-stats-export-wf/branches/monetdb/trunk/dnet-openaire-stats/src/main/java/eu/dnetlib/data/mapreduce/hbase/statsExport/mapreduce/StatsMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.statsExport.mapreduce; |
|
2 |
|
|
3 |
import com.google.common.collect.ArrayListMultimap; |
|
4 |
import com.google.common.collect.Multimap; |
|
5 |
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable; |
|
6 |
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig; |
|
7 |
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor; |
|
8 |
import eu.dnetlib.data.mapreduce.hbase.statsExport.utils.Serializer; |
|
9 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
10 |
import eu.dnetlib.data.mapreduce.util.RelDescriptor; |
|
11 |
import eu.dnetlib.data.mapreduce.util.UpdateMerger; |
|
12 |
import eu.dnetlib.data.proto.KindProtos; |
|
13 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
14 |
import eu.dnetlib.data.proto.OafProtos.OafRel; |
|
15 |
import eu.dnetlib.data.proto.OafProtos.OafRelOrBuilder; |
|
16 |
import eu.dnetlib.data.proto.TypeProtos; |
|
17 |
import org.apache.hadoop.hbase.client.Result; |
|
18 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
19 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
20 |
import org.apache.hadoop.hbase.util.Bytes; |
|
21 |
import org.apache.hadoop.io.Text; |
|
22 |
import org.apache.log4j.Logger; |
|
23 |
|
|
24 |
import java.io.IOException; |
|
25 |
import java.util.ArrayList; |
|
26 |
import java.util.List; |
|
27 |
import java.util.Map; |
|
28 |
import java.util.Map.Entry; |
|
29 |
|
|
30 |
/** |
|
31 |
* Mapper Class that reads HBASE contents and prepares them for the StatsDB |
|
32 |
* export |
|
33 |
*/ |
|
34 |
public class StatsMapper extends TableMapper<Text, ImmutableBytesWritable> { |
|
35 |
private Logger log = Logger.getLogger(this.getClass()); |
|
36 |
|
|
37 |
//Index configuratios for OAFs |
|
38 |
private EntityConfigTable entityConfigTable; |
|
39 |
|
|
40 |
//Counters for produced entities. Visible in the JobTracker monitoring page. |
|
41 |
private enum STATS_COUNTERS { |
|
42 |
datasource, |
|
43 |
organization, |
|
44 |
result, |
|
45 |
person, |
|
46 |
project |
|
47 |
} |
|
48 |
|
|
49 |
private enum REL_COUNTERS { |
|
50 |
resultProject, |
|
51 |
datasourceOrganization, |
|
52 |
organizationOrganization, |
|
53 |
resultResult, |
|
54 |
personPerson, |
|
55 |
personResult, |
|
56 |
projectOrganization, |
|
57 |
projectPerson, |
|
58 |
resultOrganization |
|
59 |
} |
|
60 |
|
|
61 |
//Counter for corrupted records. |
|
62 |
private enum ERROR_COUNTERS { |
|
63 |
rottenRecords, |
|
64 |
rottenRelations |
|
65 |
} |
|
66 |
|
|
67 |
private Serializer serializer; |
|
68 |
private int id; |
|
69 |
private int increment; |
|
70 |
|
|
71 |
//Init class: Load Index config and mapping for DB tables. |
|
72 |
@Override |
|
73 |
protected void setup(Context context) { |
|
74 |
loadEntityConfig(context); |
|
75 |
|
|
76 |
this.serializer = new Serializer(context.getConfiguration().get("stats.delim"), context.getConfiguration().get("stats.enclChar")); |
|
77 |
|
|
78 |
id = context.getTaskAttemptID().getTaskID().getId(); |
|
79 |
increment = context.getConfiguration().getInt("mapred.map.tasks", 0); |
|
80 |
if (increment == 0) { |
|
81 |
throw new IllegalArgumentException("mapred.map.tasks is zero"); |
|
82 |
} |
|
83 |
} |
|
84 |
|
|
85 |
|
|
86 |
//Read HBASE table and decode Protos to OAF entities. |
|
87 |
@Override |
|
88 |
protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) { |
|
89 |
|
|
90 |
// generating unique integer if for the entity. |
|
91 |
id += increment; |
|
92 |
|
|
93 |
try { |
|
94 |
OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes()); |
|
95 |
Oaf oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(keyDecoder.getType().toString()))); |
|
96 |
|
|
97 |
if (isValid(oaf)) |
|
98 |
emitProtos(context, result, oaf); |
|
99 |
|
|
100 |
} catch (Exception e) { |
|
101 |
log.error("Unable to parse proto in row: " + new String(keyIn.copyBytes()), e); |
|
102 |
|
|
103 |
context.getCounter(ERROR_COUNTERS.rottenRecords).increment(1); |
|
104 |
} |
|
105 |
} |
|
106 |
|
|
107 |
|
|
108 |
private boolean isValid(Oaf oaf) { |
|
109 |
return oaf != null && oaf.isInitialized() && !oaf.getDataInfo().getDeletedbyinference() && !oaf.getDataInfo().getInvisible(); |
|
110 |
} |
|
111 |
|
|
112 |
|
|
113 |
private void emitProtos(Context context, Result result, Oaf oaf) { |
|
114 |
|
|
115 |
Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()). |
|
116 |
setDataInfo(oaf.getDataInfo()).setLastupdatetimestamp(oaf.getLastupdatetimestamp()); |
|
117 |
|
|
118 |
oafBuilder.setEntity(oaf.getEntity()); |
|
119 |
|
|
120 |
// emit relation first so we can cache them to entity protos |
|
121 |
emitRelation(context, result, oaf); |
|
122 |
emitEntity(context, oafBuilder.build()); |
|
123 |
|
|
124 |
} |
|
125 |
|
|
126 |
private void emitEntity(Context context, Oaf oaf) { |
|
127 |
|
|
128 |
String serialized = serializer.serialize(oaf); |
|
129 |
if (serialized != null) { |
|
130 |
try { |
|
131 |
Text TextKeyOut; |
|
132 |
|
|
133 |
switch (oaf.getEntity().getType()) { |
|
134 |
case project: |
|
135 |
case datasource: |
|
136 |
case organization: |
|
137 |
TextKeyOut = new Text(oaf.getEntity().getType().toString() + "," + serializer.getId(oaf)); |
|
138 |
break; |
|
139 |
case result: |
|
140 |
default: |
|
141 |
TextKeyOut = new Text(oaf.getEntity().getType().toString() + "," + id); |
|
142 |
break; |
|
143 |
} |
|
144 |
|
|
145 |
context.write(TextKeyOut, new ImmutableBytesWritable(serialized.getBytes())); |
|
146 |
context.getCounter(STATS_COUNTERS.valueOf(oaf.getEntity().getType().toString())).increment(1); |
|
147 |
|
|
148 |
} catch (Exception e) { |
|
149 |
log.error("Error writing entity to M/R output", e); |
|
150 |
} |
|
151 |
} |
|
152 |
|
|
153 |
} |
|
154 |
|
|
155 |
// may have multiple relations per each field |
|
156 |
private void emitRelation(Context context, Result result, Oaf oaf) { |
|
157 |
|
|
158 |
try { |
|
159 |
|
|
160 |
// writing out : personResult,ID,data |
|
161 |
// derived relations; ResultLanguages, resultConcepts etc are |
|
162 |
// created here |
|
163 |
|
|
164 |
Multimap<String, String> relMap = ArrayListMultimap.create(); |
|
165 |
|
|
166 |
serializer.extractRelations(oaf, relMap); |
|
167 |
|
|
168 |
if (!relMap.isEmpty()) { |
|
169 |
for (Entry<String, String> rel : relMap.entries()) { |
|
170 |
Text TextKeyOut; |
|
171 |
|
|
172 |
switch (oaf.getEntity().getType()) { |
|
173 |
case project: |
|
174 |
case datasource: |
|
175 |
case organization: |
|
176 |
TextKeyOut = new Text(rel.getKey() + "," + serializer.getId(oaf)); |
|
177 |
break; |
|
178 |
case result: |
|
179 |
default: |
|
180 |
TextKeyOut = new Text(rel.getKey() + "," + id); |
|
181 |
break; |
|
182 |
} |
|
183 |
|
|
184 |
context.write((TextKeyOut), new ImmutableBytesWritable(rel.getValue().getBytes())); |
|
185 |
} |
|
186 |
} |
|
187 |
} catch (Throwable e) { |
|
188 |
log.error("Error writing relation to M/R output", e); |
|
189 |
} |
|
190 |
|
|
191 |
// Existing Hbase relations are generated here |
|
192 |
if (entityConfigTable.getDescriptors(oaf.getEntity().getType()) != null) { |
|
193 |
|
|
194 |
for (LinkDescriptor ld : entityConfigTable.getDescriptors(oaf.getEntity().getType())) { |
|
195 |
try { |
|
196 |
ArrayList<OafRel> oafRels = new ArrayList<OafRel>(); |
|
197 |
decodeRelation(oaf, context, result, ld, oafRels); |
|
198 |
|
|
199 |
for (OafRel rel : oafRels) { |
|
200 |
{ |
|
201 |
Text TextKeyOut = new Text(ld.getRelDescriptor().getRelType().toString() + "," + serializer.getId(rel)); |
|
202 |
String buff = serializer.serialize(rel); |
|
203 |
|
|
204 |
context.write((TextKeyOut), new ImmutableBytesWritable(buff.getBytes())); |
|
205 |
context.getCounter(REL_COUNTERS.valueOf(rel.getRelType().toString())).increment(1); |
|
206 |
} |
|
207 |
} |
|
208 |
} catch (Throwable e) { |
|
209 |
log.error("Error for record ", e); |
|
210 |
context.getCounter(ERROR_COUNTERS.rottenRelations).increment(1); |
|
211 |
} |
|
212 |
} |
|
213 |
} |
|
214 |
} |
|
215 |
|
|
216 |
private void decodeRelation(final Oaf body, final Context context, Result result, final LinkDescriptor ld, List<OafRel> rels) { |
|
217 |
|
|
218 |
try { |
|
219 |
final Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(KindProtos.Kind.relation); |
|
220 |
final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt())); |
|
221 |
|
|
222 |
if (hasData(columnMap)) { |
|
223 |
|
|
224 |
for (Entry<byte[], byte[]> e : columnMap.entrySet()) { |
|
225 |
|
|
226 |
Oaf decodedOaf = decodeProto(e.getValue(), context); |
|
227 |
OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel()); |
|
228 |
|
|
229 |
if (ld.isSymmetric()) { |
|
230 |
RelDescriptor rd = ld.getRelDescriptor(); |
|
231 |
relBuilder.setCachedTarget(body.getEntity()).setRelType(rd.getRelType()).setSubRelType(rd.getSubRelType()); |
|
232 |
} |
|
233 |
|
|
234 |
OafRel oafRel = relBuilder.setChild(ld.isChild()).build(); |
|
235 |
rels.add(oafBuilder.setDataInfo(decodedOaf.getDataInfo()).setRel(oafRel).build().getRel()); |
|
236 |
} |
|
237 |
} |
|
238 |
|
|
239 |
} catch (Throwable throwable) { |
|
240 |
log.error("Error Decoding relation for: " + body.getRel().getRelType() + " " + body.getEntity().getId() + " ", throwable); |
|
241 |
context.getCounter(ERROR_COUNTERS.rottenRelations).increment(1); |
|
242 |
} |
|
243 |
} |
|
244 |
|
|
245 |
private Oaf decodeProto(final byte[] body, Context context) { |
|
246 |
try { |
|
247 |
return Oaf.parseFrom(body); |
|
248 |
} catch (Exception e) { |
|
249 |
log.error(e); |
|
250 |
context.getCounter(ERROR_COUNTERS.rottenRecords).increment(1); |
|
251 |
} |
|
252 |
return null; |
|
253 |
} |
|
254 |
|
|
255 |
private void loadEntityConfig(Context context) { |
|
256 |
String indexConf = context.getConfiguration().get("stats.indexConf"); |
|
257 |
|
|
258 |
if (indexConf == null || indexConf.isEmpty()) { |
|
259 |
log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER : "); |
|
260 |
|
|
261 |
} |
|
262 |
|
|
263 |
entityConfigTable = IndexConfig.load(indexConf).getConfigMap(); |
|
264 |
|
|
265 |
} |
|
266 |
|
|
267 |
private boolean isDedupSelf(final OafRelOrBuilder rel) { |
|
268 |
return rel.getSource().contains(rel.getTarget()); |
|
269 |
} |
|
270 |
|
|
271 |
private boolean hasData(final Map<byte[], byte[]> columnMap) { |
|
272 |
return columnMap != null && !columnMap.isEmpty(); |
|
273 |
} |
|
274 |
|
|
275 |
public EntityConfigTable getEntityConfigTable() { |
|
276 |
return entityConfigTable; |
|
277 |
} |
|
278 |
|
|
279 |
public void setEntityConfigTable(EntityConfigTable entityConfigTable) { |
|
280 |
this.entityConfigTable = entityConfigTable; |
|
281 |
} |
|
282 |
} |
modules/dnet-openaire-stats-export-wf/branches/monetdb/trunk/dnet-openaire-stats/src/main/java/eu/dnetlib/data/mapreduce/hbase/statsExport/mapreduce/StatsReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.statsExport.mapreduce; |
|
2 |
|
|
3 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
4 |
import org.apache.hadoop.io.Text; |
|
5 |
import org.apache.hadoop.mapreduce.Reducer; |
|
6 |
import org.apache.hadoop.mapreduce.TaskInputOutputContext; |
|
7 |
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; |
|
8 |
import org.apache.log4j.Logger; |
|
9 |
|
|
10 |
import java.io.IOException; |
|
11 |
import java.nio.charset.Charset; |
|
12 |
import java.util.Iterator; |
|
13 |
|
|
14 |
public class StatsReducer extends Reducer<Text, ImmutableBytesWritable, Text, Text> { |
|
15 |
|
|
16 |
private Logger log = Logger.getLogger(StatsReducer.class); |
|
17 |
private MultipleOutputs MultipleOutputWriter; |
|
18 |
|
|
19 |
/** |
|
20 |
* Reducer that splits input according to their Type ( datasource, results |
|
21 |
* etc..) and writes each kind to a seperate output |
|
22 |
*/ |
|
23 |
@Override |
|
24 |
protected void setup(Context context) throws IOException, InterruptedException { |
|
25 |
|
|
26 |
MultipleOutputWriter = new MultipleOutputs((TaskInputOutputContext) context); |
|
27 |
}; |
|
28 |
|
|
29 |
@Override |
|
30 |
protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException { |
|
31 |
|
|
32 |
Iterator<ImmutableBytesWritable> it = values.iterator(); |
|
33 |
while (it.hasNext()) { |
|
34 |
String[] split = key.toString().split(","); |
|
35 |
String type = split[0]; |
|
36 |
String id = split[1]; |
|
37 |
String value = new String(it.next().copyBytes(),Charset.forName("UTF-8")); |
|
38 |
value = value.trim(); |
|
39 |
MultipleOutputWriter.write(type, new Text(id.getBytes(Charset.forName("UTF-8"))), new Text(value.getBytes(Charset.forName("UTF-8"))), type); |
|
40 |
} |
|
41 |
} |
|
42 |
|
|
43 |
@Override |
|
44 |
protected void cleanup(Context context) throws IOException, InterruptedException { |
|
45 |
log.info("Cleaning up reducer..."); |
|
46 |
MultipleOutputWriter.close(); |
|
47 |
} |
|
48 |
} |
modules/dnet-openaire-stats-export-wf/branches/monetdb/trunk/dnet-openaire-stats/src/main/java/eu/dnetlib/data/mapreduce/hbase/statsExport/daos/StatsDAO.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.statsExport.daos; |
|
2 |
|
|
3 |
import org.apache.hadoop.conf.Configuration; |
|
4 |
import org.apache.hadoop.fs.FSDataInputStream; |
|
5 |
import org.apache.hadoop.fs.FileSystem; |
|
6 |
import org.apache.hadoop.fs.Path; |
|
7 |
import org.apache.log4j.Logger; |
|
8 |
import org.junit.Test; |
|
9 |
import org.postgresql.copy.CopyManager; |
|
10 |
import org.postgresql.core.BaseConnection; |
|
11 |
import org.springframework.jdbc.core.JdbcTemplate; |
|
12 |
import org.springframework.jdbc.core.simple.SimpleJdbcCall; |
|
13 |
import org.springframework.jdbc.datasource.DriverManagerDataSource; |
|
14 |
|
|
15 |
import java.io.FileInputStream; |
|
16 |
import java.io.InputStream; |
|
17 |
import java.sql.CallableStatement; |
|
18 |
import java.sql.Connection; |
|
19 |
import java.sql.ResultSet; |
|
20 |
import java.sql.ResultSetMetaData; |
|
21 |
|
|
22 |
/** |
|
23 |
* @author eri |
|
24 |
* <p/> |
|
25 |
* DB DAO with methods for creating the temporary shadow schema of the |
|
26 |
* statsDB, executing extra inserts, creating views and indexes . |
|
27 |
*/ |
|
28 |
public class StatsDAO { |
|
29 |
private org.springframework.jdbc.datasource.DriverManagerDataSource statsDatasource; |
|
30 |
|
|
31 |
private Logger log = Logger.getLogger(this.getClass()); |
|
32 |
private long startTime; |
|
33 |
private long endtime; |
|
34 |
|
|
35 |
public StatsDAO(String dbUrl, String dbUser, String dbPassword, String dbDriver) { |
|
36 |
statsDatasource = new DriverManagerDataSource(dbUrl, dbUser, dbPassword); |
|
37 |
statsDatasource.setDriverClassName(dbDriver); |
|
38 |
} |
|
39 |
|
|
40 |
/** |
|
41 |
* Reads create schema script from a file and executes it. Used to create a |
|
42 |
* temporary Shadow schema for data import |
|
43 |
* |
|
44 |
* @throws Exception |
|
45 |
*/ |
|
46 |
|
|
47 |
public void createSchema() throws Exception { |
|
48 |
|
|
49 |
InputStream in = ClassLoader.getSystemResourceAsStream("eu/dnetlib/data/mapreduce/hbase/statsExport/stats_db_schema.sql"); |
|
50 |
byte[] b = new byte[in.available()]; |
|
51 |
in.read(b); |
|
52 |
String q = new String(b); |
|
53 |
|
|
54 |
in.close(); |
|
55 |
Connection con = null; |
|
56 |
log.info(" Creating Shadow Schema Database ...."); |
|
57 |
|
|
58 |
try { |
|
59 |
|
|
60 |
con = statsDatasource.getConnection(); |
|
61 |
CallableStatement st = con.prepareCall(q); |
|
62 |
st.execute(); |
|
63 |
st.close(); |
|
64 |
|
|
65 |
} catch (Exception e) { |
|
66 |
log.error("Could not Create Tables in Temporary database :" + e.getMessage()); |
|
67 |
throw new Exception("Could not Create Tables in Temporary database :", e); |
|
68 |
} finally { |
|
69 |
con.close(); |
|
70 |
} |
|
71 |
|
|
72 |
log.info("Shadow Schema created."); |
|
73 |
} |
|
74 |
|
|
75 |
|
|
76 |
/** |
|
77 |
* Sets the default schema (search path ) for the sqoop user. |
|
78 |
* |
|
79 |
* @param user |
|
80 |
* @throws Exception |
|
81 |
*/ |
|
82 |
public void setSearchPathDB(String user) throws Exception { |
|
83 |
|
|
84 |
|
|
85 |
String q = "ALTER ROLE " + user + " SET search_path TO shadow,public; "; |
|
86 |
Connection con = null; |
|
87 |
try { |
|
88 |
con = statsDatasource.getConnection(); |
|
89 |
|
|
90 |
CallableStatement st = con.prepareCall(q); |
|
91 |
st.execute(); |
|
92 |
st.close(); |
|
93 |
} catch (Exception e) { |
|
94 |
log.error("Could not change default schema for user :" + e.getMessage()); |
|
95 |
throw new Exception("Could not change default schema for user ", e); |
|
96 |
} finally { |
|
97 |
con.close(); |
|
98 |
} |
|
99 |
|
|
100 |
log.info(" Default user schema set to shadow"); |
|
101 |
} |
|
102 |
|
|
103 |
|
|
104 |
/** |
|
105 |
* Keeps a backup of the current public schema and then replaces it with the new Shadow one. |
|
106 |
* |
|
107 |
* @throws Exception |
|
108 |
*/ |
|
109 |
public void renameSchema() throws Exception { |
|
110 |
Connection con = statsDatasource.getConnection(); |
|
111 |
String q = "DROP SCHEMA IF EXISTS backup CASCADE;" + |
|
112 |
"DROP SCHEMA IF EXISTS public CASCADE;" + |
|
113 |
" ALTER SCHEMA shadow RENAME TO public; "; |
|
114 |
|
|
115 |
try { |
|
116 |
CallableStatement st = con.prepareCall(q); |
|
117 |
log.info("Renaming Shadow schema to Public,,,"); |
|
118 |
st.execute(); |
|
119 |
st.close(); |
|
120 |
con.close(); |
|
121 |
|
|
122 |
log.info("Renaming done"); |
|
123 |
|
|
124 |
} catch (Exception e) { |
|
125 |
log.error("Could not rename schema " + e.getMessage()); |
|
126 |
throw new Exception("Could not rename schema ", e); |
|
127 |
} finally { |
|
128 |
con.close(); |
|
129 |
} |
|
130 |
log.info("Schema renamed"); |
|
131 |
} |
|
132 |
|
|
133 |
public void fix_integer_ids() throws Exception { |
|
134 |
log.info("fixing integer ids..."); |
|
135 |
Connection con = statsDatasource.getConnection(); |
|
136 |
|
|
137 |
startTime = System.currentTimeMillis(); |
|
138 |
String q = "{call shadow.fix_result_ids()}"; |
|
139 |
|
|
140 |
CallableStatement st = con.prepareCall(q); |
|
141 |
st.execute(); |
|
142 |
|
|
143 |
st.close(); |
|
144 |
con.close(); |
|
145 |
|
|
146 |
endtime = System.currentTimeMillis(); |
|
147 |
log.info("Time to fix integer ids: " + ((endtime - startTime) / 60000) + " minutes "); |
|
148 |
} |
|
149 |
|
|
150 |
public void addArrayColumns() throws Exception { |
|
151 |
log.info("Adding new columns ..."); |
|
152 |
Connection con = statsDatasource.getConnection(); |
|
153 |
|
|
154 |
startTime = System.currentTimeMillis(); |
|
155 |
String q = "{call shadow.create_arrays()}"; |
|
156 |
|
|
157 |
CallableStatement st = con.prepareCall(q); |
|
158 |
st.execute(); |
|
159 |
|
|
160 |
st.close(); |
|
161 |
con.close(); |
|
162 |
|
|
163 |
endtime = System.currentTimeMillis(); |
|
164 |
log.info("Time to add columns: " + ((endtime - startTime) / 60000) + " minutes "); |
|
165 |
} |
|
166 |
|
|
167 |
public void cleanTables() throws Exception { |
|
168 |
log.info(" Cleaning Tables..."); |
|
169 |
Connection con = statsDatasource.getConnection(); |
|
170 |
startTime = System.currentTimeMillis(); |
|
171 |
String q = "CREATE TABLE \"shadow\".rd_distinct AS SELECT DISTINCT * FROM \"shadow\".result_datasources;"; |
|
172 |
CallableStatement st = con.prepareCall(q); |
|
173 |
st.execute(); |
|
174 |
st.close(); |
|
175 |
|
|
176 |
q = "TRUNCATE \"shadow\".result_datasources;"; |
|
177 |
st = con.prepareCall(q); |
|
178 |
st.execute(); |
|
179 |
st.close(); |
|
180 |
|
|
181 |
q = "INSERT INTO \"shadow\".result_datasources SELECT * FROM \"shadow\".rd_distinct;"; |
|
182 |
st = con.prepareCall(q); |
|
183 |
st.execute(); |
|
184 |
st.close(); |
|
185 |
|
|
186 |
q = "DROP TABLE \"shadow\".rd_distinct;"; |
|
187 |
st = con.prepareCall(q); |
|
188 |
st.execute(); |
|
189 |
st.close(); |
|
190 |
|
|
191 |
con.close(); |
|
192 |
endtime = System.currentTimeMillis(); |
|
193 |
log.info("Time to clean tables : " + ((endtime - startTime) / 60000) + " minutes "); |
|
194 |
} |
|
195 |
|
|
196 |
/** |
|
197 |
* Create charts. |
|
198 |
* |
|
199 |
* @throws Exception |
|
200 |
*/ |
|
201 |
public void createCharts() throws Exception { |
|
202 |
log.info(" Creating Chart Tables..."); |
|
203 |
Connection con = statsDatasource.getConnection(); |
|
204 |
|
|
205 |
startTime = System.currentTimeMillis(); |
|
206 |
String q = "{call shadow.create_charts()}"; |
|
207 |
|
|
208 |
CallableStatement st = con.prepareCall(q); |
|
209 |
st.execute(); |
|
210 |
|
|
211 |
st.close(); |
|
212 |
con.close(); |
|
213 |
|
|
214 |
endtime = System.currentTimeMillis(); |
|
215 |
log.info("Time to create chart tables: " + ((endtime - startTime) / 60000) + " minutes "); |
|
216 |
} |
|
217 |
|
|
218 |
/** |
|
219 |
* Create chart indexes. |
|
220 |
* |
|
221 |
* @throws Exception |
|
222 |
*/ |
|
223 |
public void createChartIndexes() throws Exception { |
|
224 |
log.info(" Create Chart Indexes..."); |
|
225 |
Connection con = statsDatasource.getConnection(); |
|
226 |
|
|
227 |
startTime = System.currentTimeMillis(); |
|
228 |
String q = "{call shadow.create_chart_indexes()}"; |
|
229 |
|
|
230 |
CallableStatement st = con.prepareCall(q); |
|
231 |
st.execute(); |
|
232 |
|
|
233 |
st.close(); |
|
234 |
con.close(); |
|
235 |
|
|
236 |
endtime = System.currentTimeMillis(); |
|
237 |
log.info("Time to create chart indexes : " + ((endtime - startTime) / 60000) + " minutes "); |
|
238 |
} |
|
239 |
|
|
240 |
/** |
|
241 |
* Builds indexes. |
|
242 |
* |
|
243 |
* @throws Exception |
|
244 |
*/ |
|
245 |
public void buildIndexes() throws Exception { |
|
246 |
log.info(" Building Database Indexes..."); |
|
247 |
Connection con = statsDatasource.getConnection(); |
|
248 |
startTime = System.currentTimeMillis(); |
|
249 |
String q = "{call shadow.create_indexes()}"; |
|
250 |
CallableStatement st = con.prepareCall(q); |
|
251 |
st.execute(); |
|
252 |
st.close(); |
|
253 |
con.close(); |
|
254 |
endtime = System.currentTimeMillis(); |
|
255 |
log.info("Time to build indexes : " + ((endtime - startTime) / 60000) + " minutes "); |
|
256 |
} |
|
257 |
|
|
258 |
/** |
|
259 |
* Builds views. |
|
260 |
* |
|
261 |
* @throws Exception |
|
262 |
*/ |
|
263 |
public void buildViews() throws Exception { |
|
264 |
log.info(" Building Database Views..."); |
|
265 |
Connection con = statsDatasource.getConnection(); |
|
266 |
|
|
267 |
startTime = System.currentTimeMillis(); |
|
268 |
String q = "{call shadow.create_views()}"; |
|
269 |
|
|
270 |
CallableStatement st = con.prepareCall(q); |
|
271 |
st.execute(); |
|
272 |
|
|
273 |
st.close(); |
|
274 |
con.close(); |
|
275 |
|
|
276 |
endtime = System.currentTimeMillis(); |
|
277 |
log.info("Time to build Views : " + ((endtime - startTime) / 60000) + " minutes "); |
|
278 |
} |
|
279 |
|
|
280 |
/** |
|
281 |
* Executes extra inserts ( table updates ) |
|
282 |
* |
|
283 |
* @throws Exception |
|
284 |
*/ |
|
285 |
public void executeExtraInserts() throws Exception { |
|
286 |
log.info(" Executing Extra Inserts..."); |
|
287 |
|
|
288 |
startTime = System.currentTimeMillis(); |
|
289 |
|
|
290 |
populateDefaults(); |
|
291 |
//update_project_results(); |
|
292 |
update_project_has_pubs(); |
|
293 |
update_project_pubs_count(); |
|
294 |
update_project_delated_pubs(); |
|
295 |
update_project_daysforlastpub(); |
|
296 |
update_project_delayed_pubs(); |
|
297 |
cleanUp(); |
|
298 |
|
|
299 |
endtime = System.currentTimeMillis(); |
|
300 |
log.info("Total time for Extra Inserts : " + ((endtime - startTime) / 60000) + " minutes "); |
|
301 |
} |
Also available in: Unified diff
too late at night...