Revision 35227
Added by Marek Horst over 9 years ago
modules/icm-iis-import/trunk/src/main/java/eu/dnetlib/iis/importer/converter/DocumentRelationConverter.java | ||
---|---|---|
1 |
package eu.dnetlib.iis.importer.converter; |
|
2 |
|
|
3 |
|
|
4 |
import java.util.ArrayList; |
|
5 |
import java.util.NavigableMap; |
|
6 |
|
|
7 |
import org.apache.hadoop.hbase.client.Result; |
|
8 |
|
|
9 |
import com.google.protobuf.InvalidProtocolBufferException; |
|
10 |
|
|
11 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
12 |
import eu.dnetlib.data.proto.OafProtos.OafRel; |
|
13 |
import eu.dnetlib.iis.importer.OafHelper; |
|
14 |
import eu.dnetlib.iis.importer.input.approver.ResultApprover; |
|
15 |
import eu.dnetlib.iis.importer.schemas.DocumentRelation; |
|
16 |
|
|
17 |
/** |
|
18 |
* {@link DocumentRelation} converter. |
|
19 |
* @author mhorst |
|
20 |
* |
|
21 |
*/ |
|
22 |
public class DocumentRelationConverter extends AbstractAvroConverter<DocumentRelation> { |
|
23 |
|
|
24 |
/** |
|
25 |
* Result-result relation column family. |
|
26 |
*/ |
|
27 |
private final byte[] resultResultPublicationDatasetIsRelatedToColumnFamilyBytes; |
|
28 |
|
|
29 |
/** |
|
30 |
* Default constructor. |
|
31 |
* @param encoding |
|
32 |
* @param resultApprover |
|
33 |
* @param resultResultPublicationDatasetIsRelatedToColumnFamilyBytes |
|
34 |
*/ |
|
35 |
public DocumentRelationConverter(String encoding, |
|
36 |
ResultApprover resultApprover, |
|
37 |
byte[] resultResultPublicationDatasetIsRelatedToColumnFamilyBytes) { |
|
38 |
super(encoding, resultApprover); |
|
39 |
this.resultResultPublicationDatasetIsRelatedToColumnFamilyBytes = OafHelper.copyArrayWhenNotNull( |
|
40 |
resultResultPublicationDatasetIsRelatedToColumnFamilyBytes); |
|
41 |
} |
|
42 |
|
|
43 |
@Override |
|
44 |
public DocumentRelation buildObject(Result hbaseResult, |
|
45 |
Oaf resolvedOafObject) throws InvalidProtocolBufferException { |
|
46 |
NavigableMap<byte[],byte[]> resultResultRelations = hbaseResult.getFamilyMap( |
|
47 |
resultResultPublicationDatasetIsRelatedToColumnFamilyBytes); |
|
48 |
if (resultResultRelations!=null && resultResultRelations.size()>0) { |
|
49 |
DocumentRelation.Builder builder = DocumentRelation.newBuilder(); |
|
50 |
for (byte[] resultResultBytes : resultResultRelations.values()) { |
|
51 |
Oaf resResOAF = OafHelper.buildOaf(resultResultBytes); |
|
52 |
OafRel resResRel = resResOAF.getRel(); |
|
53 |
if (resultApprover!=null? |
|
54 |
resultApprover.approveBeforeBuilding(resResOAF): |
|
55 |
true) { |
|
56 |
if (builder.getReferencedIds()==null) { |
|
57 |
builder.setReferencedIds(new ArrayList<CharSequence>()); |
|
58 |
} |
|
59 |
builder.getReferencedIds().add(resResRel.getTarget()); |
|
60 |
} |
|
61 |
} |
|
62 |
if (builder.hasReferencedIds()) { |
|
63 |
builder.setId(resolvedOafObject.getEntity().getId()); |
|
64 |
return builder.build(); |
|
65 |
} |
|
66 |
} |
|
67 |
// fallback |
|
68 |
return null; |
|
69 |
} |
|
70 |
|
|
71 |
} |
|
72 | 0 |
modules/icm-iis-import/trunk/src/main/java/eu/dnetlib/iis/importer/converter/DocumentIdConverter.java | ||
---|---|---|
1 |
package eu.dnetlib.iis.importer.converter; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
|
|
5 |
import org.apache.hadoop.hbase.client.Result; |
|
6 |
import org.apache.log4j.Logger; |
|
7 |
|
|
8 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
9 |
import eu.dnetlib.data.proto.ResultProtos; |
|
10 |
import eu.dnetlib.iis.importer.input.approver.ResultApprover; |
|
11 |
import eu.dnetlib.iis.common.schemas.DocumentId; |
|
12 |
|
|
13 |
|
|
14 |
/** |
|
15 |
* HBase {@link Result} to avro {@link DocumentId} converter. |
|
16 |
* @author mhorst |
|
17 |
* |
|
18 |
*/public class DocumentIdConverter extends AbstractAvroConverter<DocumentId> { |
|
19 |
|
|
20 |
protected static final Logger log = Logger.getLogger(DocumentIdConverter.class); |
|
21 |
|
|
22 |
/** |
|
23 |
* Default constructor. |
|
24 |
* @param encoding |
|
25 |
* @param resultApprover |
|
26 |
*/ |
|
27 |
public DocumentIdConverter(String encoding, |
|
28 |
ResultApprover resultApprover) { |
|
29 |
super(encoding, resultApprover); |
|
30 |
} |
|
31 |
|
|
32 |
@Override |
|
33 |
public DocumentId buildObject(Result hbaseResult, |
|
34 |
Oaf resolvedOafObject) throws IOException { |
|
35 |
ResultProtos.Result sourceResult = resolvedOafObject.getEntity()!=null? |
|
36 |
resolvedOafObject.getEntity().getResult():null; |
|
37 |
if (sourceResult==null) { |
|
38 |
log.error("skipping: no result object " + |
|
39 |
"for a row " + new String(hbaseResult.getRow(), getEncoding())); |
|
40 |
return null; |
|
41 |
} |
|
42 |
if (resolvedOafObject.getEntity().getId()!=null) { |
|
43 |
DocumentId.Builder builder = DocumentId.newBuilder(); |
|
44 |
builder.setId(resolvedOafObject.getEntity().getId()); |
|
45 |
return builder.build(); |
|
46 |
} else { |
|
47 |
log.error("skipping: no id specified for " + |
|
48 |
"result of a row " + new String(hbaseResult.getRow(), getEncoding())); |
|
49 |
return null; |
|
50 |
} |
|
51 |
} |
|
52 |
|
|
53 |
} |
|
54 | 0 |
modules/icm-iis-import/trunk/src/main/java/eu/dnetlib/iis/importer/mapred/IISDataImporterMapper.java | ||
---|---|---|
33 | 33 |
import eu.dnetlib.data.proto.RelTypeProtos.RelType; |
34 | 34 |
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType; |
35 | 35 |
import eu.dnetlib.data.proto.ResultProjectProtos.ResultProject.Outcome; |
36 |
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult.PublicationDataset; |
|
37 | 36 |
import eu.dnetlib.data.proto.TypeProtos; |
38 | 37 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
39 | 38 |
import eu.dnetlib.iis.citationmatching.schemas.Citation; |
40 | 39 |
import eu.dnetlib.iis.common.ByteArrayUtils; |
41 | 40 |
import eu.dnetlib.iis.common.WorkflowRuntimeParameters; |
42 | 41 |
import eu.dnetlib.iis.common.hbase.HBaseConstants; |
43 |
import eu.dnetlib.iis.common.schemas.DocumentId; |
|
44 | 42 |
import eu.dnetlib.iis.common.schemas.IdentifierMapping; |
45 | 43 |
import eu.dnetlib.iis.core.javamapreduce.MultipleOutputs; |
46 | 44 |
import eu.dnetlib.iis.importer.converter.CitationConverter; |
47 | 45 |
import eu.dnetlib.iis.importer.converter.DeduplicationMappingConverter; |
48 |
import eu.dnetlib.iis.importer.converter.DocumentIdConverter; |
|
49 | 46 |
import eu.dnetlib.iis.importer.converter.DocumentMetadataConverter; |
50 |
import eu.dnetlib.iis.importer.converter.DocumentRelationConverter; |
|
51 | 47 |
import eu.dnetlib.iis.importer.converter.DocumentToProjectConverter; |
52 | 48 |
import eu.dnetlib.iis.importer.converter.PersonConverter; |
53 | 49 |
import eu.dnetlib.iis.importer.converter.ProjectConverter; |
... | ... | |
55 | 51 |
import eu.dnetlib.iis.importer.input.approver.DataInfoBasedApprover; |
56 | 52 |
import eu.dnetlib.iis.importer.input.approver.FieldApprover; |
57 | 53 |
import eu.dnetlib.iis.importer.input.approver.OriginDatasourceApprover; |
58 |
import eu.dnetlib.iis.importer.input.approver.PublicationTypeApprover; |
|
59 | 54 |
import eu.dnetlib.iis.importer.input.approver.ResultApprover; |
60 | 55 |
import eu.dnetlib.iis.importer.schemas.DocumentMetadata; |
61 |
import eu.dnetlib.iis.importer.schemas.DocumentRelation; |
|
62 | 56 |
import eu.dnetlib.iis.importer.schemas.DocumentToProject; |
63 | 57 |
import eu.dnetlib.iis.importer.schemas.Person; |
64 | 58 |
import eu.dnetlib.iis.importer.schemas.Project; |
... | ... | |
78 | 72 |
|
79 | 73 |
private static final String OUTPUT_NAME_CITATION = "output.name.citation"; |
80 | 74 |
|
81 |
private static final String OUTPUT_NAME_DOCUMENT_RELATION = "output.name.document_relation"; |
|
82 |
|
|
83 | 75 |
private static final String OUTPUT_NAME_DOCUMENT_PROJECT = "output.name.document_project"; |
84 | 76 |
|
85 | 77 |
private static final String OUTPUT_NAME_PROJECT = "output.name.project"; |
86 | 78 |
|
87 | 79 |
private static final String OUTPUT_NAME_PERSON = "output.name.person"; |
88 | 80 |
|
89 |
private static final String OUTPUT_NAME_DATASET_ID = "output.name.dataset_id"; |
|
90 |
|
|
91 | 81 |
private static final String OUTPUT_NAME_DEDUP_MAPPING = "output.name.dedup_mapping"; |
92 | 82 |
|
93 | 83 |
private String outputNameDocumentMeta; |
94 | 84 |
|
95 | 85 |
private String outputNameCitation; |
96 | 86 |
|
97 |
private String outputNameDocumentRelation; |
|
98 |
|
|
99 | 87 |
private String outputNameDocumentProject; |
100 | 88 |
|
101 | 89 |
private String outputNameProject; |
102 | 90 |
|
103 | 91 |
private String outputNamePerson; |
104 | 92 |
|
105 |
private String outputNameDatasetId; |
|
106 |
|
|
107 | 93 |
private String outputNameDedupMapping; |
108 | 94 |
|
109 | 95 |
private String encoding = HBaseConstants.STATIC_FIELDS_ENCODING_UTF8; |
... | ... | |
114 | 100 |
|
115 | 101 |
private FieldApprover fieldApprover; |
116 | 102 |
|
117 |
private ResultApprover datasetApprover; |
|
118 |
|
|
119 | 103 |
private DocumentMetadataConverter docMetaConverter; |
120 | 104 |
|
121 | 105 |
private CitationConverter citationConverter; |
122 | 106 |
|
123 |
private DocumentRelationConverter docRelationConverter; |
|
124 |
|
|
125 | 107 |
private DocumentToProjectConverter docProjectConverter; |
126 | 108 |
|
127 | 109 |
private DeduplicationMappingConverter deduplicationMappingConverter; |
128 | 110 |
|
129 |
private DocumentIdConverter datasetConverter; |
|
130 |
|
|
131 | 111 |
private PersonConverter personConverter; |
132 | 112 |
|
133 | 113 |
private ProjectConverter projectConverter; |
... | ... | |
184 | 164 |
// field approver |
185 | 165 |
this.fieldApprover = dataInfoBasedApprover; |
186 | 166 |
|
187 |
// dataset approver does not limit datasets by datasource but filters candidates by publication type |
|
188 |
this.datasetApprover = new ComplexApprover( |
|
189 |
new PublicationTypeApprover( |
|
190 |
HBaseConstants.SEMANTIC_CLASS_INSTANCE_TYPE_DATASET), |
|
191 |
dataInfoBasedApprover); |
|
192 |
|
|
193 | 167 |
// initializing converters |
194 |
datasetConverter = new DocumentIdConverter(encoding, datasetApprover); |
|
195 | 168 |
docMetaConverter = new DocumentMetadataConverter(encoding, |
196 | 169 |
this.resultApprover, this.fieldApprover, |
197 | 170 |
getCollumnFamily(RelType.personResult, SubRelType.authorship, |
198 | 171 |
Authorship.RelName.hasAuthor.toString())); |
199 | 172 |
citationConverter = new CitationConverter(encoding, this.resultApprover); |
200 |
docRelationConverter = new DocumentRelationConverter( |
|
201 |
encoding, resultApprover, |
|
202 |
getCollumnFamily(RelType.resultResult, SubRelType.publicationDataset, |
|
203 |
PublicationDataset.RelName.isRelatedTo.toString())); |
|
204 | 173 |
deduplicationMappingConverter = new DeduplicationMappingConverter( |
205 | 174 |
encoding, resultApprover, |
206 | 175 |
getCollumnFamily(RelType.resultResult, SubRelType.dedup, |
... | ... | |
222 | 191 |
if (outputNameCitation==null) { |
223 | 192 |
throw new RuntimeException("citation output name not provided!"); |
224 | 193 |
} |
225 |
outputNameDocumentRelation = context.getConfiguration().get(OUTPUT_NAME_DOCUMENT_RELATION); |
|
226 |
if (outputNameDocumentRelation==null) { |
|
227 |
throw new RuntimeException("document relation output name not provided!"); |
|
228 |
} |
|
229 | 194 |
outputNameDocumentProject = context.getConfiguration().get(OUTPUT_NAME_DOCUMENT_PROJECT); |
230 | 195 |
if (outputNameDocumentProject==null) { |
231 | 196 |
throw new RuntimeException("document project relation output name not provided!"); |
... | ... | |
238 | 203 |
if (outputNamePerson==null) { |
239 | 204 |
throw new RuntimeException("person output name not provided!"); |
240 | 205 |
} |
241 |
outputNameDatasetId = context.getConfiguration().get(OUTPUT_NAME_DATASET_ID); |
|
242 |
if (outputNameDatasetId==null) { |
|
243 |
throw new RuntimeException("dataset identifier output name not provided!"); |
|
244 |
} |
|
245 | 206 |
outputNameDedupMapping = context.getConfiguration().get(OUTPUT_NAME_DEDUP_MAPPING); |
246 | 207 |
if (outputNameDedupMapping==null) { |
247 | 208 |
throw new RuntimeException("deduplication mapping output name not provided!"); |
... | ... | |
292 | 253 |
mos.write(outputNameCitation, new AvroKey<Citation>(citation)); |
293 | 254 |
} |
294 | 255 |
} |
295 |
// handling resultResult relations, required for filtering out existing dataset relations from inferenced dataset relations |
|
296 |
DocumentRelation docRel = docRelationConverter.buildObject(value, oafObj); |
|
297 |
if (docRel!=null) { |
|
298 |
mos.write(outputNameDocumentRelation, new AvroKey<DocumentRelation>(docRel)); |
|
299 |
} |
|
300 | 256 |
// hadling project relations |
301 | 257 |
DocumentToProject[] docProjects = docProjectConverter.buildObject(value, oafObj); |
302 | 258 |
if (docProjects!=null && docProjects.length>0) { |
... | ... | |
312 | 268 |
} |
313 | 269 |
} |
314 | 270 |
} |
315 |
// producing datasetid datastore holding all existing datacite records |
|
316 |
if (datasetApprover.approveBeforeBuilding(oafObj)) { |
|
317 |
DocumentId datasetId = datasetConverter.buildObject(value, oafObj); |
|
318 |
if (datasetId!=null) { |
|
319 |
mos.write(outputNameDatasetId, new AvroKey<DocumentId>(datasetId)); |
|
320 |
} |
|
321 |
} |
|
322 | 271 |
} |
323 | 272 |
|
324 | 273 |
/** |
modules/icm-iis-import/trunk/src/main/resources/eu/dnetlib/iis/importer/mapred_import/oozie_app/workflow.xml | ||
---|---|---|
85 | 85 |
<description>citation output subdirectory name</description> |
86 | 86 |
</property> |
87 | 87 |
<property> |
88 |
<name>output_name_document_relation</name> |
|
89 |
<value>docrelation</value> |
|
90 |
<description>document to document relation output subdirectory name</description> |
|
91 |
</property> |
|
92 |
<property> |
|
93 | 88 |
<name>output_name_document_project</name> |
94 | 89 |
<value>docproject</value> |
95 | 90 |
<description>document to project relation output subdirectory name</description> |
... | ... | |
105 | 100 |
<description>person output subdirectory name</description> |
106 | 101 |
</property> |
107 | 102 |
<property> |
108 |
<name>output_name_dataset_id</name> |
|
109 |
<value>datasetid</value> |
|
110 |
<description>dataset identifier output subdirectory name</description> |
|
111 |
</property> |
|
112 |
<property> |
|
113 | 103 |
<name>output_name_dedup_mapping</name> |
114 | 104 |
<value>dedupmapping</value> |
115 | 105 |
<description>deduplication mapping output subdirectory name</description> |
... | ... | |
140 | 130 |
<!-- |
141 | 131 |
column family: |
142 | 132 |
--> |
143 |
<arg>-f person, project, result, resultProject_outcome_isProducedBy, personResult_authorship_hasAuthor, resultResult_publicationDataset_isRelatedTo, resultResult_dedup_merges</arg>
|
|
133 |
<arg>-f person, project, result, resultProject_outcome_isProducedBy, personResult_authorship_hasAuthor, resultResult_dedup_merges</arg> |
|
144 | 134 |
<capture-output /> |
145 | 135 |
</java> |
146 | 136 |
<ok to="mr_import" /> |
... | ... | |
287 | 277 |
<value>${output_name_citation}</value> |
288 | 278 |
</property> |
289 | 279 |
<property> |
290 |
<name>output.name.document_relation</name> |
|
291 |
<value>${output_name_document_relation}</value> |
|
292 |
</property> |
|
293 |
<property> |
|
294 | 280 |
<name>output.name.document_project</name> |
295 | 281 |
<value>${output_name_document_project}</value> |
296 | 282 |
</property> |
... | ... | |
303 | 289 |
<value>${output_name_person}</value> |
304 | 290 |
</property> |
305 | 291 |
<property> |
306 |
<name>output.name.dataset_id</name> |
|
307 |
<value>${output_name_dataset_id}</value> |
|
308 |
</property> |
|
309 |
<property> |
|
310 | 292 |
<name>output.name.dedup_mapping</name> |
311 | 293 |
<value>${output_name_dedup_mapping}</value> |
312 | 294 |
</property> |
313 |
<!-- disabling, currently content import is conducted by dedicated module --> |
|
314 |
<!-- |
|
315 | 295 |
<property> |
316 |
<name>import.content.object.store.location</name> |
|
317 |
<value>${object_store_location}</value> |
|
318 |
</property> |
|
319 |
<property> |
|
320 |
<name>import.content.lookup.service.location</name> |
|
321 |
<value>${lookup_service_location}</value> |
|
322 |
</property> |
|
323 |
--> |
|
324 |
<property> |
|
325 | 296 |
<name>import.approved.datasources.csv</name> |
326 | 297 |
<value>${approved_datasources_csv}</value> |
327 | 298 |
</property> |
... | ... | |
329 | 300 |
<!-- ## Names of all output ports --> |
330 | 301 |
<property> |
331 | 302 |
<name>avro.mapreduce.multipleoutputs</name> |
332 |
<value>${output_name_document_meta} ${output_name_citation} ${output_name_document_relation} ${output_name_document_project} ${output_name_dedup_mapping} ${output_name_dataset_id} ${output_name_person} ${output_name_project}</value>
|
|
303 |
<value>${output_name_document_meta} ${output_name_citation} ${output_name_document_project} ${output_name_dedup_mapping} ${output_name_person} ${output_name_project}</value>
|
|
333 | 304 |
</property> |
334 | 305 |
<!-- ## Output classes for all output ports --> |
335 | 306 |
<property> |
... | ... | |
343 | 314 |
<value>org.apache.avro.mapreduce.AvroKeyOutputFormat</value> |
344 | 315 |
</property> |
345 | 316 |
<property> |
346 |
<name>avro.mapreduce.multipleoutputs.namedOutput.${output_name_document_relation}.format |
|
347 |
</name> |
|
348 |
<value>org.apache.avro.mapreduce.AvroKeyOutputFormat</value> |
|
349 |
</property> |
|
350 |
<property> |
|
351 | 317 |
<name>avro.mapreduce.multipleoutputs.namedOutput.${output_name_document_project}.format |
352 | 318 |
</name> |
353 | 319 |
<value>org.apache.avro.mapreduce.AvroKeyOutputFormat</value> |
... | ... | |
358 | 324 |
<value>org.apache.avro.mapreduce.AvroKeyOutputFormat</value> |
359 | 325 |
</property> |
360 | 326 |
<property> |
361 |
<name>avro.mapreduce.multipleoutputs.namedOutput.${output_name_dataset_id}.format |
|
362 |
</name> |
|
363 |
<value>org.apache.avro.mapreduce.AvroKeyOutputFormat</value> |
|
364 |
</property> |
|
365 |
<property> |
|
366 | 327 |
<name>avro.mapreduce.multipleoutputs.namedOutput.${output_name_person}.format |
367 | 328 |
</name> |
368 | 329 |
<value>org.apache.avro.mapreduce.AvroKeyOutputFormat</value> |
... | ... | |
399 | 360 |
<value>eu.dnetlib.iis.citationmatching.schemas.Citation</value> |
400 | 361 |
</property> |
401 | 362 |
<property> |
402 |
<name>eu.dnetlib.iis.avro.multipleoutputs.class.${output_name_document_relation}</name> |
|
403 |
<value>eu.dnetlib.iis.importer.schemas.DocumentRelation</value> |
|
404 |
</property> |
|
405 |
<property> |
|
406 | 363 |
<name>eu.dnetlib.iis.avro.multipleoutputs.class.${output_name_document_project}</name> |
407 | 364 |
<value>eu.dnetlib.iis.importer.schemas.DocumentToProject</value> |
408 | 365 |
</property> |
... | ... | |
411 | 368 |
<value>eu.dnetlib.iis.common.schemas.IdentifierMapping</value> |
412 | 369 |
</property> |
413 | 370 |
<property> |
414 |
<name>eu.dnetlib.iis.avro.multipleoutputs.class.${output_name_dataset_id}</name> |
|
415 |
<value>eu.dnetlib.iis.common.schemas.DocumentId</value> |
|
416 |
</property> |
|
417 |
|
|
418 |
<property> |
|
419 | 371 |
<name>eu.dnetlib.iis.avro.multipleoutputs.class.${output_name_person}</name> |
420 | 372 |
<value>eu.dnetlib.iis.importer.schemas.Person</value> |
421 | 373 |
</property> |
Also available in: Unified diff
#1195 removing obsolete ports docreation and datasetid from hbase mapred import, removing references to those ports in workflow.xml files, updating transformer by removing filtering by datasetid due to decisions made in #1072