Revision 51302
Added by Claudio Atzori over 6 years ago
modules/dnet-mapreduce-jobs/tags/dnet-mapreduce-jobs-1.1.1-PROD/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/enrich/EnrichmentMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.broker.enrich; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 |
import com.google.common.collect.Iterables; |
|
7 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
8 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
9 |
import eu.dnetlib.data.mapreduce.util.UpdateMerger; |
|
10 |
import eu.dnetlib.data.proto.DedupProtos.Dedup.RelName; |
|
11 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
12 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
13 |
import org.apache.commons.collections.MapUtils; |
|
14 |
import org.apache.hadoop.hbase.client.Result; |
|
15 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
16 |
import org.apache.hadoop.hbase.util.Bytes; |
|
17 |
|
|
18 |
/** |
|
19 |
* Created by claudio on 08/07/16. |
|
20 |
*/ |
|
21 |
public class EnrichmentMapper extends AbstractEnrichmentMapper { |
|
22 |
|
|
23 |
@Override |
|
24 |
protected String counterGroup() { |
|
25 |
return "Broker Enrichment"; |
|
26 |
} |
|
27 |
|
|
28 |
@Override |
|
29 |
protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException { |
|
30 |
|
|
31 |
final String type = OafRowKeyDecoder.decode(key.copyBytes()).getType().toString(); |
|
32 |
|
|
33 |
final Map<byte[], byte[]> result = value.getFamilyMap(Bytes.toBytes(Type.result.name())); |
|
34 |
if (MapUtils.isEmpty(result)) { |
|
35 |
context.getCounter(counterGroup(), type + ": empty family map").increment(1); |
|
36 |
return; |
|
37 |
} |
|
38 |
|
|
39 |
final Oaf body = UpdateMerger.mergeBodyUpdates(context, result); |
|
40 |
|
|
41 |
if (body == null) { |
|
42 |
context.getCounter(counterGroup(), type + ": body null").increment(1); |
|
43 |
return; |
|
44 |
} |
|
45 |
|
|
46 |
final String mergedInCF = DedupUtils.getDedupCF_mergedIn(Type.result); |
|
47 |
final Map<byte[], byte[]> mergedIn = value.getFamilyMap(Bytes.toBytes(mergedInCF)); |
|
48 |
|
|
49 |
final byte[] outKey = getEmitKey(context, key, mergedIn); |
|
50 |
|
|
51 |
emit(context, outKey, body.toByteArray(), type); |
|
52 |
} |
|
53 |
|
|
54 |
private byte[] getEmitKey(final Context context, final ImmutableBytesWritable key, final Map<byte[], byte[]> mergedIn) { |
|
55 |
if (MapUtils.isNotEmpty(mergedIn)) { |
|
56 |
context.getCounter(Type.result.name(), RelName.isMergedIn.name()).increment(1); |
|
57 |
return Iterables.getOnlyElement(mergedIn.keySet()); |
|
58 |
} else { |
|
59 |
return key.copyBytes(); |
|
60 |
} |
|
61 |
} |
|
62 |
|
|
63 |
} |
modules/dnet-mapreduce-jobs/tags/dnet-mapreduce-jobs-1.1.1-PROD/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/enrich/ProjectEnrichmentMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.broker.enrich; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Arrays; |
|
5 |
import java.util.Map; |
|
6 |
|
|
7 |
import com.google.common.base.Function; |
|
8 |
import com.google.common.base.Predicate; |
|
9 |
import com.google.common.collect.Iterables; |
|
10 |
import com.google.common.collect.Lists; |
|
11 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
12 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
|
13 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
14 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
15 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
16 |
import org.apache.commons.collections.MapUtils; |
|
17 |
import org.apache.hadoop.hbase.client.Result; |
|
18 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
19 |
import org.apache.hadoop.hbase.util.Bytes; |
|
20 |
|
|
21 |
/** |
|
22 |
* Created by claudio on 08/07/16. |
|
23 |
*/ |
|
24 |
public class ProjectEnrichmentMapper extends AbstractEnrichmentMapper { |
|
25 |
|
|
26 |
@Override |
|
27 |
protected String counterGroup() { |
|
28 |
return "Broker Enrichment projects"; |
|
29 |
} |
|
30 |
|
|
31 |
@Override |
|
32 |
protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException { |
|
33 |
|
|
34 |
final Type type = getEntityType(key); |
|
35 |
|
|
36 |
final byte[] body = value.getValue(Bytes.toBytes(type.toString()), Bytes.toBytes("body")); |
|
37 |
|
|
38 |
if (body == null) { |
|
39 |
context.getCounter(counterGroup(), "missing metadata").increment(1); |
|
40 |
return; |
|
41 |
} |
|
42 |
|
|
43 |
switch (type) { |
|
44 |
case project: |
|
45 |
for (final byte[] resultId : listRelatedIds(value, "resultProject_outcome_produces")) { |
|
46 |
emit(context, resultId, body, "project"); |
|
47 |
} |
|
48 |
break; |
|
49 |
case result: |
|
50 |
|
|
51 |
final Oaf.Builder oafBuilder = Oaf.newBuilder(OafDecoder.decode(body).getOaf()); |
|
52 |
for (final String relName : Arrays.asList("resultProject_outcome_isProducedBy")) { // Â TODO add dataset rels |
|
53 |
for (final Oaf rel : listRelations(value, relName)) { |
|
54 |
oafBuilder.getEntityBuilder().addCachedOafRel(rel); |
|
55 |
context.getCounter(counterGroup(), "rel: " + relName).increment(1); |
|
56 |
} |
|
57 |
} |
|
58 |
|
|
59 |
final Map<byte[], byte[]> mergedIn = value.getFamilyMap(DedupUtils.getDedupCF_mergedInBytes(Type.result)); |
|
60 |
|
|
61 |
if (MapUtils.isEmpty(mergedIn) & !DedupUtils.isRoot(key)) { |
|
62 |
emit(context, key.copyBytes(), oafBuilder.build().toByteArray(), "result not deduped"); |
|
63 |
} else if (DedupUtils.isRoot(key)) { |
|
64 |
emit(context, key.copyBytes(), oafBuilder.build().toByteArray(), "result merges"); |
|
65 |
} else { |
|
66 |
emit(context, getRootId(mergedIn), oafBuilder.build().toByteArray(), "result mergedIn"); |
|
67 |
} |
|
68 |
|
|
69 |
break; |
|
70 |
default: |
|
71 |
throw new IllegalArgumentException("invalid type: " + type); |
|
72 |
} |
|
73 |
} |
|
74 |
|
|
75 |
private Iterable<Oaf> listRelations(final Result value, final String relType) { |
|
76 |
|
|
77 |
//TODO consider only relationshipt not deletedbyinference |
|
78 |
|
|
79 |
final Map<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(relType)); |
|
80 |
if (MapUtils.isNotEmpty(map)) { |
|
81 |
return Iterables.filter(Iterables.transform(map.values(), new Function<byte[], Oaf>() { |
|
82 |
|
|
83 |
@Override |
|
84 |
public Oaf apply(final byte[] input) { |
|
85 |
return OafDecoder.decode(input).getOaf(); |
|
86 |
} |
|
87 |
}), new Predicate<Oaf>() { |
|
88 |
@Override |
|
89 |
public boolean apply(final Oaf rel) { |
|
90 |
return !rel.getRel().getTarget().contains("unidentified"); |
|
91 |
} |
|
92 |
}); |
|
93 |
} else { |
|
94 |
return Lists.newArrayList(); |
|
95 |
} |
|
96 |
} |
|
97 |
|
|
98 |
private Iterable<byte[]> listRelatedIds(final Result value, final String relType) { |
|
99 |
|
|
100 |
//TODO consider only relationshipt not deletedbyinference |
|
101 |
|
|
102 |
final Map<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(relType)); |
|
103 |
if (MapUtils.isNotEmpty(map)) { |
|
104 |
return map.keySet(); |
|
105 |
} else { |
|
106 |
return Lists.newArrayList(); |
|
107 |
} |
|
108 |
} |
|
109 |
|
|
110 |
private Type getEntityType(final ImmutableBytesWritable key) { |
|
111 |
return OafRowKeyDecoder.decode(key.copyBytes()).getType(); |
|
112 |
} |
|
113 |
|
|
114 |
private byte[] getRootId(final Map<byte[], byte[]> mergedIn) { |
|
115 |
return Iterables.getOnlyElement(mergedIn.keySet()); |
|
116 |
} |
|
117 |
|
|
118 |
} |
modules/dnet-mapreduce-jobs/tags/dnet-mapreduce-jobs-1.1.1-PROD/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/AbstractEventFactory.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.broker; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.List; |
|
5 |
|
|
6 |
import com.google.common.collect.Lists; |
|
7 |
import eu.dnetlib.broker.objects.OpenAireEventPayload; |
|
8 |
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory; |
|
9 |
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory; |
|
10 |
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventWrapper; |
|
11 |
import eu.dnetlib.data.proto.FieldTypeProtos.StringField; |
|
12 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
13 |
import org.apache.commons.lang.StringUtils; |
|
14 |
import org.dom4j.DocumentException; |
|
15 |
|
|
16 |
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent; |
|
17 |
|
|
18 |
/** |
|
19 |
* Created by claudio on 26/07/16. |
|
20 |
*/ |
|
21 |
public class AbstractEventFactory { |
|
22 |
|
|
23 |
public static List<EventWrapper> process(final Oaf current, final Oaf other, final float trust) |
|
24 |
throws IOException, InterruptedException, DocumentException { |
|
25 |
return new AbstractEventFactory().processAbstract(current, other, trust); |
|
26 |
} |
|
27 |
|
|
28 |
private List<EventWrapper> processAbstract(final Oaf current, final Oaf other, final float trust) |
|
29 |
throws IOException, InterruptedException, DocumentException { |
|
30 |
|
|
31 |
final List<EventWrapper> events = Lists.newArrayList(); |
|
32 |
|
|
33 |
if (hasAbstract(other)) { |
|
34 |
// doProcessAbstract(context, current, other, Topic.MORE_ABSTRACT); |
|
35 |
|
|
36 |
if (!hasAbstract(current)) { |
|
37 |
events.add(doProcessAbstract(current, other, Topic.ENRICH_MISSING_ABSTRACT, trust)); |
|
38 |
} |
|
39 |
} |
|
40 |
|
|
41 |
return events; |
|
42 |
} |
|
43 |
|
|
44 |
private EventWrapper doProcessAbstract(final Oaf current, final Oaf other, final Topic topic, final float trust) |
|
45 |
throws IOException, InterruptedException, DocumentException { |
|
46 |
final Oaf.Builder prototype = Oaf.newBuilder(current); |
|
47 |
final List<StringField> descriptionList = other.getEntity().getResult().getMetadata().getDescriptionList(); |
|
48 |
prototype.getEntityBuilder().getResultBuilder().getMetadataBuilder().addAllDescription(descriptionList); |
|
49 |
|
|
50 |
final Oaf oaf = prototype.build(); |
|
51 |
|
|
52 |
final OpenAireEventPayload payload = |
|
53 |
HighlightFactory.highlightEnrichAbstract(OpenAireEventPayloadFactory.fromOAF(oaf.getEntity(), other.getEntity(), trust), descriptionList); |
|
54 |
|
|
55 |
return EventWrapper.newInstance( |
|
56 |
asEvent(oaf.getEntity(), topic, payload, other.getEntity(), trust), |
|
57 |
topic.getValue()); |
|
58 |
} |
|
59 |
|
|
60 |
private boolean hasAbstract(final Oaf oaf) { |
|
61 |
return oaf.getEntity().getResult().getMetadata().getDescriptionList() |
|
62 |
.stream() |
|
63 |
.anyMatch(s -> StringUtils.isNotBlank(s.getValue())); |
|
64 |
} |
|
65 |
|
|
66 |
} |
modules/dnet-mapreduce-jobs/tags/dnet-mapreduce-jobs-1.1.1-PROD/pom.xml | ||
---|---|---|
1 |
<?xml version="1.0" ?> |
|
2 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> |
|
3 |
<parent> |
|
4 |
<groupId>eu.dnetlib</groupId> |
|
5 |
<artifactId>dnet45-parent</artifactId> |
|
6 |
<version>1.0.0</version> |
|
7 |
<relativePath /> |
|
8 |
</parent> |
|
9 |
<modelVersion>4.0.0</modelVersion> |
|
10 |
<groupId>eu.dnetlib</groupId> |
|
11 |
<artifactId>dnet-mapreduce-jobs</artifactId> |
|
12 |
<version>1.1.1-PROD</version> |
|
13 |
<packaging>jar</packaging> |
|
14 |
<scm> |
|
15 |
<developerConnection>scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-mapreduce-jobs/tags/dnet-mapreduce-jobs-1.1.1-PROD</developerConnection> |
|
16 |
</scm> |
|
17 |
<build> |
|
18 |
<plugins> |
|
19 |
<plugin> |
|
20 |
<artifactId>maven-assembly-plugin</artifactId> |
|
21 |
<configuration> |
|
22 |
<archive> |
|
23 |
<manifest> |
|
24 |
<mainClass>eu.dnetlib.data.mapreduce.hbase.dataimport.ImportRecordsJob</mainClass> |
|
25 |
</manifest> |
|
26 |
</archive> |
|
27 |
<descriptorRefs> |
|
28 |
<descriptorRef>jar-with-dependencies</descriptorRef> |
|
29 |
</descriptorRefs> |
|
30 |
</configuration> |
|
31 |
</plugin> |
|
32 |
</plugins> |
|
33 |
</build> |
|
34 |
<repositories> |
|
35 |
<!-- Cloudera Repositories --> |
|
36 |
<repository> |
|
37 |
<snapshots> |
|
38 |
<enabled>false</enabled> |
|
39 |
</snapshots> |
|
40 |
<id>cloudera-central</id> |
|
41 |
<name>cloundera-libs-release</name> |
|
42 |
<url>http://maven.research-infrastructures.eu/nexus/content/repositories/cloudera-central</url> |
|
43 |
</repository> |
|
44 |
<repository> |
|
45 |
<id>cloudera-snapshots</id> |
|
46 |
<name>cloudera-libs-snapshot</name> |
|
47 |
<url>http://maven.research-infrastructures.eu/nexus/content/repositories/cloudera-snapshots</url> |
|
48 |
</repository> |
|
49 |
</repositories> |
|
50 |
<dependencies> |
|
51 |
<dependency> |
|
52 |
<groupId>commons-logging</groupId> |
|
53 |
<artifactId>commons-logging</artifactId> |
|
54 |
<version>${commons.logging.version}</version> |
|
55 |
</dependency> |
|
56 |
<dependency> |
|
57 |
<groupId>junit</groupId> |
|
58 |
<artifactId>junit</artifactId> |
|
59 |
<version>${junit.version}</version> |
|
60 |
<scope>test</scope> |
|
61 |
</dependency> |
|
62 |
<dependency> |
|
63 |
<groupId>com.google.code.gson</groupId> |
|
64 |
<artifactId>gson</artifactId> |
|
65 |
<version>${google.gson.version}</version> |
|
66 |
</dependency> |
|
67 |
<dependency> |
|
68 |
<groupId>eu.dnetlib</groupId> |
|
69 |
<artifactId>dnet-index-solr-common</artifactId> |
|
70 |
<version>[1.0.0,2.0.0)</version> |
|
71 |
</dependency> |
|
72 |
<dependency> |
|
73 |
<groupId>eu.dnetlib</groupId> |
|
74 |
<artifactId>dnet-openaire-broker-common</artifactId> |
|
75 |
<version>[1.0.0,2.0.0)</version> |
|
76 |
</dependency> |
|
77 |
<dependency> |
|
78 |
<groupId>com.mycila</groupId> |
|
79 |
<artifactId>xmltool</artifactId> |
|
80 |
<version>3.3</version> |
|
81 |
</dependency> |
|
82 |
<dependency> |
|
83 |
<groupId>eu.dnetlib</groupId> |
|
84 |
<artifactId>cnr-misc-utils</artifactId> |
|
85 |
<version>[1.0.0,2.0.0)</version> |
|
86 |
<exclusions> |
|
87 |
<exclusion> |
|
88 |
<groupId>apache</groupId> |
|
89 |
<artifactId>commons-lang</artifactId> |
|
90 |
</exclusion> |
|
91 |
</exclusions> |
|
92 |
</dependency> |
|
93 |
<dependency> |
|
94 |
<groupId>eu.dnetlib</groupId> |
|
95 |
<artifactId>dnet-hadoop-commons</artifactId> |
|
96 |
<version>[2.0.0,3.0.0)</version> |
|
97 |
<exclusions> |
|
98 |
<exclusion> |
|
99 |
<artifactId>commons-httpclient</artifactId> |
|
100 |
<groupId>commons-httpclient</groupId> |
|
101 |
</exclusion> |
|
102 |
</exclusions> |
|
103 |
</dependency> |
|
104 |
<dependency> |
|
105 |
<groupId>org.apache.hbase</groupId> |
|
106 |
<artifactId>hbase</artifactId> |
|
107 |
<version>${apache.hbase.version}</version> |
|
108 |
<exclusions> |
|
109 |
<exclusion> |
|
110 |
<groupId>tomcat</groupId> |
|
111 |
<artifactId>jasper-runtime</artifactId> |
|
112 |
</exclusion> |
|
113 |
<exclusion> |
|
114 |
<groupId>tomcat</groupId> |
|
115 |
<artifactId>jasper-compiler</artifactId> |
|
116 |
</exclusion> |
|
117 |
<exclusion> |
|
118 |
<artifactId>slf4j-api</artifactId> |
|
119 |
<groupId>org.slf4j</groupId> |
|
120 |
</exclusion> |
|
121 |
<exclusion> |
|
122 |
<artifactId>slf4j-log4j12</artifactId> |
|
123 |
<groupId>org.slf4j</groupId> |
|
124 |
</exclusion> |
|
125 |
<exclusion> |
|
126 |
<artifactId>commons-lang</artifactId> |
|
127 |
<groupId>commons-lang</groupId> |
|
128 |
</exclusion> |
|
129 |
<exclusion> |
|
130 |
<artifactId>commons-httpclient</artifactId> |
|
131 |
<groupId>commons-httpclient</groupId> |
|
132 |
</exclusion> |
|
133 |
<exclusion> |
|
134 |
<artifactId>httpclient</artifactId> |
|
135 |
<groupId>org.apache.httpcomponents</groupId> |
|
136 |
</exclusion> |
|
137 |
<exclusion> |
|
138 |
<artifactId>httpcore</artifactId> |
|
139 |
<groupId>org.apache.httpcomponents</groupId> |
|
140 |
</exclusion> |
|
141 |
</exclusions> |
|
142 |
</dependency> |
|
143 |
<dependency> |
|
144 |
<groupId>com.googlecode.protobuf-java-format</groupId> |
|
145 |
<artifactId>protobuf-java-format</artifactId> |
|
146 |
<version>1.2</version> |
|
147 |
</dependency> |
|
148 |
<dependency> |
|
149 |
<groupId>eu.dnetlib</groupId> |
|
150 |
<artifactId>dnet-openaireplus-mapping-utils</artifactId> |
|
151 |
<version>[6.0.0,6.1.5]</version> |
|
152 |
</dependency> |
|
153 |
<dependency> |
|
154 |
<groupId>org.antlr</groupId> |
|
155 |
<artifactId>stringtemplate</artifactId> |
|
156 |
<version>3.2</version> |
|
157 |
</dependency> |
|
158 |
<dependency> |
|
159 |
<groupId>org.json</groupId> |
|
160 |
<artifactId>json</artifactId> |
|
161 |
<version>20140107</version> |
|
162 |
</dependency> |
|
163 |
<dependency> |
|
164 |
<groupId>com.typesafe</groupId> |
|
165 |
<artifactId>config</artifactId> |
|
166 |
<version>1.2.1</version> |
|
167 |
</dependency> |
|
168 |
<dependency> |
|
169 |
<groupId>eu.dnetlib</groupId> |
|
170 |
<artifactId>dnet-pace-core</artifactId> |
|
171 |
<version>[2.0.0,3.0.0)</version> |
|
172 |
</dependency> |
|
173 |
<dependency> |
|
174 |
<groupId>org.mongodb</groupId> |
|
175 |
<artifactId>mongo-java-driver</artifactId> |
|
176 |
<version>${mongodb.driver.version}</version> |
|
177 |
</dependency> |
|
178 |
<dependency> |
|
179 |
<groupId>eu.dnetlib</groupId> |
|
180 |
<artifactId>dnet-actionmanager-common</artifactId> |
|
181 |
<version>[6.0.0,7.0.0)</version> |
|
182 |
<exclusions> |
|
183 |
<exclusion> |
|
184 |
<groupId>commons-httpclient</groupId> |
|
185 |
<artifactId>commons-httpclient</artifactId> |
|
186 |
</exclusion> |
|
187 |
</exclusions> |
|
188 |
</dependency> |
|
189 |
|
|
190 |
<dependency> |
|
191 |
<groupId>org.elasticsearch</groupId> |
|
192 |
<artifactId>elasticsearch-hadoop-mr</artifactId> |
|
193 |
<version>5.2.0</version> |
|
194 |
<exclusions> |
|
195 |
<exclusion> |
|
196 |
<groupId>tomcat</groupId> |
|
197 |
<artifactId>jasper-compiler</artifactId> |
|
198 |
</exclusion> |
|
199 |
<exclusion> |
|
200 |
<groupId>org.antlr</groupId> |
|
201 |
<artifactId>antlr-runtime</artifactId> |
|
202 |
</exclusion> |
|
203 |
<exclusion> |
|
204 |
<groupId>org.eclipse.jetty.aggregate</groupId> |
|
205 |
<artifactId>jetty-all</artifactId> |
|
206 |
</exclusion> |
|
207 |
<exclusion> |
|
208 |
<groupId>org.slf4j</groupId> |
|
209 |
<artifactId>slf4j-log4j12</artifactId> |
|
210 |
</exclusion> |
|
211 |
<exclusion> |
|
212 |
<groupId>org.glassfish.jersey.core</groupId> |
|
213 |
<artifactId>jersey-client</artifactId> |
|
214 |
</exclusion> |
|
215 |
<exclusion> |
|
216 |
<groupId>org.glassfish.jersey.core</groupId> |
|
217 |
<artifactId>jersey-common</artifactId> |
|
218 |
</exclusion> |
|
219 |
<exclusion> |
|
220 |
<groupId>org.glassfish.jersey.core</groupId> |
|
221 |
<artifactId>jersey-server</artifactId> |
|
222 |
</exclusion> |
|
223 |
<exclusion> |
|
224 |
<groupId>org.glassfish.jersey.containers</groupId> |
|
225 |
<artifactId>jersey-container-servlet</artifactId> |
|
226 |
</exclusion> |
|
227 |
<exclusion> |
|
228 |
<groupId>org.glassfish.jersey.containers</groupId> |
|
229 |
<artifactId>jersey-container-servlet-core</artifactId> |
|
230 |
</exclusion> |
|
231 |
|
|
232 |
<exclusion> |
|
233 |
<groupId>org.codehaus.groovy</groupId> |
|
234 |
<artifactId>groovy-all</artifactId> |
|
235 |
</exclusion> |
|
236 |
|
|
237 |
<exclusion> |
|
238 |
<groupId>org.apache.hive</groupId> |
|
239 |
<artifactId>hive-service</artifactId> |
|
240 |
</exclusion> |
|
241 |
|
|
242 |
<exclusion> |
|
243 |
<groupId>org.apache.spark</groupId> |
|
244 |
<artifactId>spark-core_2.10</artifactId> |
|
245 |
</exclusion> |
|
246 |
<exclusion> |
|
247 |
<groupId>org.apache.spark</groupId> |
|
248 |
<artifactId>spark-sql_2.10</artifactId> |
|
249 |
</exclusion> |
|
250 |
<exclusion> |
|
251 |
<groupId>org.apache.spark</groupId> |
|
252 |
<artifactId>spark-streaming_2.10</artifactId> |
|
253 |
</exclusion> |
|
254 |
<exclusion> |
|
255 |
<groupId>cascading</groupId> |
|
256 |
<artifactId>cascading-hadoop</artifactId> |
|
257 |
</exclusion> |
|
258 |
<exclusion> |
|
259 |
<groupId>cascading</groupId> |
|
260 |
<artifactId>cascading-local</artifactId> |
|
261 |
</exclusion> |
|
262 |
<exclusion> |
|
263 |
<groupId>org.apache.storm</groupId> |
|
264 |
<artifactId>storm-core</artifactId> |
|
265 |
</exclusion> |
|
266 |
<exclusion> |
|
267 |
<groupId>org.apache.pig</groupId> |
|
268 |
<artifactId>pig</artifactId> |
|
269 |
</exclusion> |
|
270 |
</exclusions> |
|
271 |
|
|
272 |
</dependency> |
|
273 |
<dependency> |
|
274 |
<groupId>org.mockito</groupId> |
|
275 |
<artifactId>mockito-core</artifactId> |
|
276 |
<version>${mockito.version}</version> |
|
277 |
<scope>test</scope> |
|
278 |
</dependency> |
|
279 |
<dependency> |
|
280 |
<groupId>eu.dnetlib</groupId> |
|
281 |
<artifactId>dnet-openaireplus-profiles</artifactId> |
|
282 |
<version>[1.0.0,2.0.0)</version> |
|
283 |
<scope>test</scope> |
|
284 |
</dependency> |
|
285 |
<dependency> |
|
286 |
<groupId>com.google.guava</groupId> |
|
287 |
<artifactId>guava</artifactId> |
|
288 |
<version>${google.guava.version}</version> |
|
289 |
</dependency> |
|
290 |
|
|
291 |
</dependencies> |
|
292 |
</project> |
modules/dnet-mapreduce-jobs/tags/dnet-mapreduce-jobs-1.1.1-PROD/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/add/AdditionMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.broker.add; |
|
2 |
|
|
3 |
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent; |
|
4 |
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getPropertyValues; |
|
5 |
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.listKeys; |
|
6 |
|
|
7 |
import java.io.IOException; |
|
8 |
import java.util.HashSet; |
|
9 |
import java.util.Map; |
|
10 |
import java.util.Set; |
|
11 |
|
|
12 |
import org.apache.commons.collections.MapUtils; |
|
13 |
import org.apache.commons.lang.StringUtils; |
|
14 |
import org.apache.commons.lang.math.RandomUtils; |
|
15 |
import org.apache.hadoop.hbase.client.HTable; |
|
16 |
import org.apache.hadoop.hbase.client.Result; |
|
17 |
import org.apache.hadoop.hbase.client.ResultScanner; |
|
18 |
import org.apache.hadoop.hbase.client.Scan; |
|
19 |
import org.apache.hadoop.hbase.filter.FilterList; |
|
20 |
import org.apache.hadoop.hbase.filter.FilterList.Operator; |
|
21 |
import org.apache.hadoop.hbase.filter.PrefixFilter; |
|
22 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
23 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
24 |
import org.apache.hadoop.hbase.util.Bytes; |
|
25 |
import org.apache.hadoop.io.Text; |
|
26 |
|
|
27 |
import com.google.common.base.Function; |
|
28 |
import com.google.common.collect.Iterables; |
|
29 |
import com.google.common.collect.Maps; |
|
30 |
import com.google.common.collect.Sets; |
|
31 |
|
|
32 |
import eu.dnetlib.broker.objects.OpenAireEventPayload; |
|
33 |
import eu.dnetlib.data.mapreduce.hbase.broker.Topic; |
|
34 |
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory; |
|
35 |
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventMessage; |
|
36 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
|
37 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
38 |
import eu.dnetlib.data.proto.OafProtos.OafEntity; |
|
39 |
|
|
40 |
/** |
|
41 |
* Created by claudio on 08/07/16. |
|
42 |
*/ |
|
43 |
public class AdditionMapper extends TableMapper<Text, Text> { |
|
44 |
|
|
45 |
private Text outKey; |
|
46 |
private Text outValue; |
|
47 |
|
|
48 |
/** |
|
49 |
* Map ProjectID -> Set of Organization info (id, name) |
|
50 |
*/ |
|
51 |
private Map<String, Set<EntityInfo>> projectOrganization; |
|
52 |
|
|
53 |
/** |
|
54 |
* Map OrganizationID -> Set of Datasource info (id, name) |
|
55 |
*/ |
|
56 |
private Map<String, Set<EntityInfo>> organizationDatasource; |
|
57 |
|
|
58 |
private Set<String> organizationPrefixBlacklist = Sets.newHashSet(); |
|
59 |
|
|
60 |
// White list for datasource typologies. |
|
61 |
private Set<String> dsTypeWhitelist = Sets.newHashSet(); |
|
62 |
|
|
63 |
@Override |
|
64 |
protected void setup(final Context context) throws IOException { |
|
65 |
|
|
66 |
organizationPrefixBlacklist = Sets.newHashSet("nsf_________"); |
|
67 |
dsTypeWhitelist.addAll(getPropertyValues(context, "broker.datasource.type.whitelist")); |
|
68 |
|
|
69 |
projectOrganization = getRelMap(context, "20", "organization", "projectOrganization_participation_isParticipant", organizationPrefixBlacklist); |
|
70 |
organizationDatasource = getRelMap(context, "10", "datasource", "datasourceOrganization_provision_provides", dsTypeWhitelist); |
|
71 |
|
|
72 |
outKey = new Text(""); |
|
73 |
outValue = new Text(); |
|
74 |
} |
|
75 |
|
|
76 |
class EntityInfo { |
|
77 |
|
|
78 |
private String id; |
|
79 |
private String name; |
|
80 |
|
|
81 |
public EntityInfo(final String id, final String name) { |
|
82 |
this.id = id; |
|
83 |
this.name = name; |
|
84 |
} |
|
85 |
|
|
86 |
@Override |
|
87 |
public int hashCode() { |
|
88 |
return getId().hashCode(); |
|
89 |
} |
|
90 |
|
|
91 |
@Override |
|
92 |
public boolean equals(final Object obj) { |
|
93 |
return getId().equals(obj); |
|
94 |
} |
|
95 |
|
|
96 |
public String getId() { |
|
97 |
return id; |
|
98 |
} |
|
99 |
|
|
100 |
public void setId(final String id) { |
|
101 |
this.id = id; |
|
102 |
} |
|
103 |
|
|
104 |
public String getName() { |
|
105 |
return name; |
|
106 |
} |
|
107 |
|
|
108 |
public void setName(final String name) { |
|
109 |
this.name = name; |
|
110 |
} |
|
111 |
} |
|
112 |
|
|
113 |
@Override |
|
114 |
protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException { |
|
115 |
try { |
|
116 |
final Map<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes("result")); |
|
117 |
final byte[] bodyB = map.get(Bytes.toBytes("body")); |
|
118 |
|
|
119 |
if (MapUtils.isEmpty(map) || (bodyB == null)) { |
|
120 |
context.getCounter("result", "empty body").increment(1); |
|
121 |
return; |
|
122 |
} |
|
123 |
|
|
124 |
final Oaf oaf = Oaf.parseFrom(bodyB); |
|
125 |
|
|
126 |
if (oaf.getDataInfo().getDeletedbyinference()) { |
|
127 |
context.getCounter("result", "deletedbyinference = true").increment(1); |
|
128 |
return; |
|
129 |
} |
|
130 |
|
|
131 |
final Set<String> currentDatasourceIds = Sets.newHashSet(listKeys(oaf.getEntity().getCollectedfromList())); |
|
132 |
|
|
133 |
final Map<byte[], byte[]> resultProject = value.getFamilyMap(Bytes.toBytes("resultProject_outcome_isProducedBy")); |
|
134 |
if (!MapUtils.isEmpty(resultProject)) { |
|
135 |
|
|
136 |
for (final String projectId : asStringID(resultProject.keySet())) { |
|
137 |
|
|
138 |
final Set<EntityInfo> organizations = projectOrganization.get(projectId); |
|
139 |
|
|
140 |
if ((organizations != null) && !organizations.isEmpty()) { |
|
141 |
|
|
142 |
for (final EntityInfo organization : organizations) { |
|
143 |
|
|
144 |
final Set<EntityInfo> datasources = organizationDatasource.get(organization.getId()); |
|
145 |
|
|
146 |
if ((datasources != null) && !datasources.isEmpty()) { |
|
147 |
|
|
148 |
for (final EntityInfo datasource : datasources) { |
|
149 |
|
|
150 |
if (!currentDatasourceIds.contains(datasource.getId())) { |
|
151 |
|
|
152 |
// emit event for datasourceId |
|
153 |
final float trust = RandomUtils.nextFloat(); |
|
154 |
final OpenAireEventPayload payload = OpenAireEventPayloadFactory.fromOAF(oaf.getEntity(), oaf.getEntity(), trust); |
|
155 |
// event.setPayload(HighlightFactory.highlightEnrichPid(payload, |
|
156 |
// Lists.newArrayList(pids)).toJSON()); |
|
157 |
final EventMessage event = |
|
158 |
asEvent(oaf.getEntity(), Topic.ADD_BY_PROJECT, payload, datasource.getId(), datasource.getName(), trust); |
|
159 |
|
|
160 |
emit(event, context); |
|
161 |
|
|
162 |
context.getCounter("event", Topic.ADD_BY_PROJECT.getValue()).increment(1); |
|
163 |
} |
|
164 |
} |
|
165 |
} |
|
166 |
} |
|
167 |
} |
|
168 |
} |
|
169 |
} |
|
170 |
} catch (final Exception e) { |
|
171 |
throw new RuntimeException(e); |
|
172 |
} |
|
173 |
} |
|
174 |
|
|
175 |
private void emit(final EventMessage e, final Context context) throws IOException, InterruptedException { |
|
176 |
// tKey.set(e.getMap().get("id")); |
|
177 |
outValue.set(e.toString()); |
|
178 |
context.write(outKey, outValue); |
|
179 |
} |
|
180 |
|
|
181 |
private Iterable<String> asStringID(final Iterable<byte[]> in) { |
|
182 |
return Iterables.transform(in, new Function<byte[], String>() { |
|
183 |
|
|
184 |
@Override |
|
185 |
public String apply(final byte[] input) { |
|
186 |
return getID(new String(input)); |
|
187 |
} |
|
188 |
}); |
|
189 |
} |
|
190 |
|
|
191 |
private Map<String, Set<EntityInfo>> getRelMap(final Context context, |
|
192 |
final String prefixFilter, |
|
193 |
final String entity, |
|
194 |
final String columnFamily, |
|
195 |
final Set<String> filter) throws IOException { |
|
196 |
System.out.println(String.format("loading %s, %s", entity, columnFamily)); |
|
197 |
|
|
198 |
final Map<String, Set<EntityInfo>> out = Maps.newHashMap(); |
|
199 |
final String tableName = context.getConfiguration().get("hbase.mapred.inputtable"); |
|
200 |
|
|
201 |
System.out.println(String.format("table name: '%s'", tableName)); |
|
202 |
|
|
203 |
try (final HTable table = new HTable(context.getConfiguration(), tableName); |
|
204 |
final ResultScanner res = scanTable(table, prefixFilter, entity, columnFamily)) { |
|
205 |
|
|
206 |
for (final Result r : res) { |
|
207 |
|
|
208 |
final byte[] bodyB = r.getValue(Bytes.toBytes(entity), Bytes.toBytes("body")); |
|
209 |
|
|
210 |
if (bodyB == null) { |
|
211 |
context.getCounter("missing body", entity).increment(1); |
|
212 |
} else { |
|
213 |
|
|
214 |
final OafEntity oafEntity = OafDecoder.decode(bodyB).getEntity(); |
|
215 |
final EntityInfo kv = getEntityInfo(oafEntity, filter); |
|
216 |
if (kv != null) { |
|
217 |
|
|
218 |
final Map<byte[], byte[]> relMap = r.getFamilyMap(Bytes.toBytes(columnFamily)); |
|
219 |
|
|
220 |
if (MapUtils.isNotEmpty(relMap)) { |
|
221 |
for (final String id : asStringID(relMap.keySet())) { |
|
222 |
|
|
223 |
if (!out.containsKey(id)) { |
|
224 |
out.put(id, new HashSet<EntityInfo>()); |
|
225 |
} |
|
226 |
out.get(id).add(kv); |
|
227 |
} |
|
228 |
} else { |
|
229 |
context.getCounter("skipped", entity).increment(1); |
|
230 |
} |
|
231 |
} |
|
232 |
} |
|
233 |
} |
|
234 |
} |
|
235 |
|
|
236 |
System.out.println(String.format("loaded map for %s, %s, size: %s", entity, columnFamily, out.size())); |
|
237 |
return out; |
|
238 |
} |
|
239 |
|
|
240 |
private EntityInfo getEntityInfo(final OafEntity entity, final Set<String> filter) { |
|
241 |
|
|
242 |
final String id = getID(entity.getId()); |
|
243 |
switch (entity.getType()) { |
|
244 |
case datasource: |
|
245 |
final String dsType = entity.getDatasource().getMetadata().getDatasourcetype().getClassid(); |
|
246 |
if (!filter.contains(dsType)) { return null; } |
|
247 |
return new EntityInfo(id, entity.getDatasource().getMetadata().getOfficialname().getValue()); |
|
248 |
case organization: |
|
249 |
if (filter.contains(prefix(id))) { return null; } |
|
250 |
return new EntityInfo(id, entity.getOrganization().getMetadata().getLegalname().getValue()); |
|
251 |
default: |
|
252 |
throw new IllegalArgumentException("invalid entity: " + entity); |
|
253 |
} |
|
254 |
} |
|
255 |
|
|
256 |
private ResultScanner scanTable(final HTable table, final String prefixFilter, final String entity, final String columnFamily) throws IOException { |
|
257 |
final Scan scan = new Scan(); |
|
258 |
final FilterList fl = new FilterList(Operator.MUST_PASS_ALL); |
|
259 |
fl.addFilter(new PrefixFilter(Bytes.toBytes(prefixFilter))); |
|
260 |
scan.setFilter(fl); |
|
261 |
scan.addFamily(Bytes.toBytes(entity)); |
|
262 |
scan.addFamily(Bytes.toBytes(columnFamily)); |
|
263 |
|
|
264 |
return table.getScanner(scan); |
|
265 |
} |
|
266 |
|
|
267 |
private String getID(final String s) { |
|
268 |
return StringUtils.substringAfter(s, "|"); |
|
269 |
} |
|
270 |
|
|
271 |
private String prefix(final String s) { |
|
272 |
return StringUtils.substringBefore(s, "::"); |
|
273 |
} |
|
274 |
|
|
275 |
} |
modules/dnet-mapreduce-jobs/tags/dnet-mapreduce-jobs-1.1.1-PROD/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupBuildRootsReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.List; |
|
5 |
|
|
6 |
import com.google.common.collect.Iterables; |
|
7 |
import com.google.common.collect.Lists; |
|
8 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
9 |
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses; |
|
10 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
11 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
|
12 |
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils; |
|
13 |
import eu.dnetlib.data.mapreduce.util.OafRelDecoder; |
|
14 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
15 |
import eu.dnetlib.data.transform.OafEntityMerger; |
|
16 |
import eu.dnetlib.pace.config.DedupConfig; |
|
17 |
import org.apache.commons.lang.StringUtils; |
|
18 |
import org.apache.hadoop.hbase.client.Put; |
|
19 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
20 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
|
21 |
import org.apache.hadoop.hbase.util.Bytes; |
|
22 |
import org.apache.hadoop.io.Text; |
|
23 |
|
|
24 |
public class DedupBuildRootsReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> { |
|
25 |
|
|
26 |
private enum OafPatch { |
|
27 |
rootToEntity, entityToRoot |
|
28 |
} |
|
29 |
|
|
30 |
private DedupConfig dedupConf; |
|
31 |
|
|
32 |
private RelClasses relClasses; |
|
33 |
|
|
34 |
@Override |
|
35 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
36 |
super.setup(context); |
|
37 |
dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF)); |
|
38 |
System.out.println("dedup buildRoots reducer\n\nwf conf: " + dedupConf.toString()); |
|
39 |
|
|
40 |
final String relClassJson = context.getConfiguration().get("relClasses"); |
|
41 |
System.out.println("relClassesJson:\n" + relClassJson); |
|
42 |
relClasses = RelClasses.fromJSon(relClassJson); |
|
43 |
System.out.println("relClasses:\n" + relClasses); |
|
44 |
} |
|
45 |
|
|
46 |
@Override |
|
47 |
protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException { |
|
48 |
|
|
49 |
// ensures we're dealing with a root, otherwise returns |
|
50 |
if (!DedupUtils.isRoot(key.toString())) { |
|
51 |
System.err.println("aborting DedupBuildRootsReducer, found non-root key: " + key); |
|
52 |
context.getCounter("DedupBuildRootsReducer", "aborted").increment(1); |
|
53 |
return; |
|
54 |
} |
|
55 |
|
|
56 |
final byte[] rowkey = Bytes.toBytes(key.toString()); |
|
57 |
final List<Oaf> entities = Lists.newArrayList(); |
|
58 |
|
|
59 |
for (final Oaf oaf : toOaf(values)) { |
|
60 |
switch (oaf.getKind()) { |
|
61 |
case entity: |
|
62 |
entities.add(oaf); |
|
63 |
break; |
|
64 |
case relation: |
|
65 |
handleRels(context, rowkey, oaf); |
|
66 |
break; |
|
67 |
default: |
|
68 |
break; |
|
69 |
} |
|
70 |
} |
|
71 |
|
|
72 |
// build and emit the root body |
|
73 |
final Oaf.Builder builder = OafEntityMerger.merge(dedupConf, key.toString(), entities); |
|
74 |
if (entities.size() < JobParams.MAX_COUNTERS) { |
|
75 |
context.getCounter(dedupConf.getWf().getEntityType() + " root group size", lpad(entities.size())).increment(1); |
|
76 |
} else { |
|
77 |
context.getCounter(dedupConf.getWf().getEntityType() + " root group size", "> " + JobParams.MAX_COUNTERS).increment(1); |
|
78 |
} |
|
79 |
|
|
80 |
emit(context, rowkey, dedupConf.getWf().getEntityType(), DedupUtils.BODY_S, builder.build().toByteArray(), "root"); |
|
81 |
|
|
82 |
} |
|
83 |
|
|
84 |
private Iterable<Oaf> toOaf(final Iterable<ImmutableBytesWritable> values) { |
|
85 |
return Iterables.transform(values, OafHbaseUtils.oafDecoder()); |
|
86 |
} |
|
87 |
|
|
88 |
private void handleRels(final Context context, final byte[] rowkey, final Oaf oaf) throws IOException, InterruptedException { |
|
89 |
|
|
90 |
// emit relation from the root to the related entities |
|
91 |
OafDecoder decoder = rootToEntity(rowkey, oaf); |
|
92 |
emit(context, rowkey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "[root -> entity]"); |
|
93 |
|
|
94 |
// emit relation from the related entities to the root |
|
95 |
decoder = entityToRoot(rowkey, oaf); |
|
96 |
byte[] revKey = Bytes.toBytes(decoder.relSourceId()); |
|
97 |
emit(context, revKey, decoder.getCFQ(), new String(rowkey), decoder.toByteArray(), "[entity -> root]"); |
|
98 |
|
|
99 |
// mark relation from the related entities to the duplicate as deleted |
|
100 |
decoder = markDeleted(oaf, true); |
|
101 |
revKey = Bytes.toBytes(decoder.relSourceId()); |
|
102 |
emit(context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [dup -> entity]"); |
|
103 |
|
|
104 |
// mark relation from the related entities to the duplicate as deleted |
|
105 |
decoder = markDeleted(oaf, false); |
|
106 |
revKey = Bytes.toBytes(decoder.relSourceId()); |
|
107 |
emit(context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [entity -> dup]"); |
|
108 |
} |
|
109 |
|
|
110 |
private void emit(final Context context, final byte[] rowkey, final String family, final String qualifier, final byte[] value, final String label) |
|
111 |
throws IOException, InterruptedException { |
|
112 |
final Put put = new Put(rowkey).add(Bytes.toBytes(family), Bytes.toBytes(qualifier), value); |
|
113 |
put.setWriteToWAL(JobParams.WRITE_TO_WAL); |
|
114 |
context.write(new ImmutableBytesWritable(rowkey), put); |
|
115 |
context.getCounter(family, label).increment(1); |
|
116 |
} |
|
117 |
|
|
118 |
// ///////////////// |
|
119 |
|
|
120 |
private OafDecoder rootToEntity(final byte[] rootRowkey, final Oaf rel) { |
|
121 |
return patchRelations(rootRowkey, rel, OafPatch.rootToEntity); |
|
122 |
} |
|
123 |
|
|
124 |
private OafDecoder entityToRoot(final byte[] rootRowkey, final Oaf rel) { |
|
125 |
return patchRelations(rootRowkey, rel, OafPatch.entityToRoot); |
|
126 |
} |
|
127 |
|
|
128 |
private OafDecoder markDeleted(final Oaf rel, final boolean reverse) { |
|
129 |
return deleteRelations(rel, reverse); |
|
130 |
} |
|
131 |
|
|
132 |
// patches relation objects setting the source field with the root id |
|
133 |
private OafDecoder patchRelations(final byte[] rootRowkey, final Oaf rel, final OafPatch patchKind) { |
|
134 |
final String id = new String(rootRowkey); |
|
135 |
final OafRelDecoder decoder = OafRelDecoder.decode(rel.getRel()); |
|
136 |
final Oaf.Builder builder = Oaf.newBuilder(rel); |
|
137 |
builder.getDataInfoBuilder().setInferred(true).setDeletedbyinference(false); |
|
138 |
switch (patchKind) { |
|
139 |
case rootToEntity: |
|
140 |
// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:rootToEntity)"); |
|
141 |
builder.getRelBuilder().setSource(id); |
|
142 |
break; |
|
143 |
|
|
144 |
case entityToRoot: |
|
145 |
final String relClass = rel.getRel().getRelClass(); |
|
146 |
/* |
|
147 |
if(StringUtils.isBlank(relClass)) { |
|
148 |
throw new IllegalStateException(String.format("missing relation term for %s in row %s", rel.getRel().getRelType().name(), id)); |
|
149 |
} |
|
150 |
*/ |
|
151 |
final String inverse = relClasses.getInverse(relClass); |
|
152 |
if(StringUtils.isBlank(inverse)) { |
|
153 |
throw new IllegalStateException(String.format("missing inverse relation for %s in row %s", relClass, id)); |
|
154 |
} |
|
155 |
builder.setRel(decoder.setClassId(inverse)); |
|
156 |
// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:entityToRoot)"); |
|
157 |
builder.getRelBuilder().setSource(builder.getRel().getTarget()); |
|
158 |
builder.getRelBuilder().setTarget(id); |
|
159 |
break; |
|
160 |
|
|
161 |
default: |
|
162 |
break; |
|
163 |
} |
|
164 |
|
|
165 |
return OafDecoder.decode(builder.build()); |
|
166 |
} |
|
167 |
|
|
168 |
private OafDecoder deleteRelations(final Oaf rel, final boolean reverse) { |
|
169 |
final Oaf.Builder builder = Oaf.newBuilder(rel); |
|
170 |
// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots d: " + reverse + ")"); |
|
171 |
builder.getDataInfoBuilder().setDeletedbyinference(true); |
|
172 |
|
|
173 |
if (reverse) { |
|
174 |
final OafRelDecoder decoder = OafRelDecoder.decode(rel.getRel()); |
|
175 |
builder.setRel(decoder.setClassId(relClasses.getInverse(rel.getRel().getRelClass()))); |
|
176 |
// swap source and target |
|
177 |
final String tmp = builder.getRel().getSource(); |
|
178 |
builder.getRelBuilder().setSource(builder.getRel().getTarget()); |
|
179 |
builder.getRelBuilder().setTarget(tmp); |
|
180 |
} |
|
181 |
|
|
182 |
return OafDecoder.decode(builder.build()); |
|
183 |
} |
|
184 |
|
|
185 |
private String lpad(final int s) { |
|
186 |
return StringUtils.leftPad(String.valueOf(s), 5); |
|
187 |
} |
|
188 |
|
|
189 |
} |
modules/dnet-mapreduce-jobs/tags/dnet-mapreduce-jobs-1.1.1-PROD/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/cc/MindistSearchReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.cc; |
|
2 |
|
|
3 |
/** |
|
4 |
* Created by claudio on 14/10/15. |
|
5 |
*/ |
|
6 |
|
|
7 |
import java.io.IOException; |
|
8 |
|
|
9 |
import org.apache.commons.logging.Log; |
|
10 |
import org.apache.commons.logging.LogFactory; |
|
11 |
import org.apache.hadoop.io.Text; |
|
12 |
import org.apache.hadoop.mapreduce.Reducer; |
|
13 |
|
|
14 |
public class MindistSearchReducer extends Reducer<Text, VertexWritable, Text, VertexWritable> { |
|
15 |
|
|
16 |
private static final Log log = LogFactory.getLog(MindistSearchReducer.class); |
|
17 |
|
|
18 |
public static final String UPDATE_COUNTER = "UpdateCounter"; |
|
19 |
public static final String SKIPPED = "SKIPPED"; |
|
20 |
public static final String UPDATED = "UPDATED"; |
|
21 |
|
|
22 |
private boolean depthOne; |
|
23 |
|
|
24 |
private boolean debug = false; |
|
25 |
|
|
26 |
@Override |
|
27 |
protected void setup(Context context) throws IOException, InterruptedException { |
|
28 |
super.setup(context); |
|
29 |
final String recursionDepth = context.getConfiguration().get("mindist_recursion_depth"); |
|
30 |
log.info("got recursion depth: " + recursionDepth); |
|
31 |
if (Integer.parseInt(recursionDepth) == 0) { |
|
32 |
depthOne = true; |
|
33 |
} |
|
34 |
|
|
35 |
debug = context.getConfiguration().getBoolean("mindist_DEBUG", false); |
|
36 |
log.info("debug mode: " + debug); |
|
37 |
} |
|
38 |
|
|
39 |
@Override |
|
40 |
protected void reduce(Text key, Iterable<VertexWritable> values, Context context) throws IOException, InterruptedException { |
|
41 |
|
|
42 |
VertexWritable realVertex = null; |
|
43 |
Text currentMinimalKey = null; |
|
44 |
//boolean foundEdges = false; |
|
45 |
|
|
46 |
if (depthOne) { |
|
47 |
for (VertexWritable vertex : values) { |
|
48 |
if (!vertex.isMessage()) { |
|
49 |
//log.info(String.format("found vertex with edges: %s", key.toString())); |
|
50 |
realVertex = vertex.clone(); |
|
51 |
} |
|
52 |
} |
|
53 |
|
|
54 |
if (realVertex == null) { |
|
55 |
context.getCounter(UPDATE_COUNTER, SKIPPED).increment(1); |
|
56 |
return; |
|
57 |
} |
|
58 |
|
|
59 |
realVertex.setActivated(true); |
|
60 |
realVertex.setVertexId(realVertex.getEdges().first()); |
|
61 |
|
|
62 |
if (key.compareTo(realVertex.getVertexId()) < 0) { |
|
63 |
realVertex.setVertexId(key); |
|
64 |
} |
|
65 |
|
|
66 |
context.getCounter(UPDATE_COUNTER, UPDATED).increment(1); |
|
67 |
} else { |
|
68 |
for (VertexWritable vertex : values) { |
|
69 |
if (!vertex.isMessage()) { |
|
70 |
if (realVertex == null) { |
|
71 |
realVertex = vertex.clone(); |
|
72 |
} |
|
73 |
} else { |
|
74 |
if (currentMinimalKey == null) { |
|
75 |
currentMinimalKey = new Text(vertex.getVertexId()); |
|
76 |
} else { |
|
77 |
|
|
78 |
if (currentMinimalKey.compareTo(vertex.getVertexId()) > 0) { |
|
79 |
currentMinimalKey = new Text(vertex.getVertexId()); |
|
80 |
} |
|
81 |
} |
|
82 |
} |
|
83 |
} |
|
84 |
|
|
85 |
if (realVertex == null) { |
|
86 |
context.getCounter(UPDATE_COUNTER, SKIPPED).increment(1); |
|
87 |
return; |
|
88 |
} |
|
89 |
|
|
90 |
if (currentMinimalKey != null && currentMinimalKey.compareTo(realVertex.getVertexId()) < 0) { |
|
91 |
realVertex.setVertexId(currentMinimalKey); |
|
92 |
realVertex.setActivated(true); |
|
93 |
context.getCounter(UPDATE_COUNTER, UPDATED).increment(1); |
|
94 |
} else { |
|
95 |
realVertex.setActivated(false); |
|
96 |
} |
|
97 |
} |
|
98 |
|
|
99 |
context.write(key, realVertex); |
|
100 |
if (debug) { |
|
101 |
log.info(realVertex.toJSON()); |
|
102 |
} |
|
103 |
} |
|
104 |
|
|
105 |
} |
modules/dnet-mapreduce-jobs/tags/dnet-mapreduce-jobs-1.1.1-PROD/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/cc/HBaseToSimilarityGraphMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.cc; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
|
|
5 |
import org.apache.hadoop.hbase.KeyValue; |
|
6 |
import org.apache.hadoop.hbase.client.Result; |
|
7 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
8 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
9 |
import org.apache.hadoop.io.Text; |
|
10 |
|
|
11 |
/** |
|
12 |
* Created by claudio on 14/10/15. |
|
13 |
*/ |
|
14 |
public class HBaseToSimilarityGraphMapper extends TableMapper<Text, VertexWritable> { |
|
15 |
|
|
16 |
@Override |
|
17 |
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { |
|
18 |
|
|
19 |
final VertexWritable vertex = new VertexWritable(); |
|
20 |
final Text realKey = new Text(keyIn.copyBytes()); |
|
21 |
|
|
22 |
vertex.checkAndSetMinimalVertex(realKey); |
|
23 |
vertex.addVertex(realKey); |
|
24 |
|
|
25 |
for (KeyValue kv : value.list()) { |
|
26 |
|
|
27 |
Text tmp = new Text(kv.getQualifier()); |
|
28 |
vertex.checkAndSetMinimalVertex(tmp); |
|
29 |
vertex.addVertex(tmp); |
|
30 |
} |
|
31 |
|
|
32 |
context.write(realKey, vertex); |
|
33 |
|
|
34 |
for (Text edge : vertex.getEdges()) { |
|
35 |
context.write(edge, vertex.makeMessage()); |
|
36 |
} |
|
37 |
} |
|
38 |
|
|
39 |
} |
modules/dnet-mapreduce-jobs/tags/dnet-mapreduce-jobs-1.1.1-PROD/src/main/java/eu/dnetlib/data/mapreduce/util/XmlRecordFactory.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.util; |
|
2 |
|
|
3 |
import java.io.StringReader; |
|
4 |
import java.io.StringWriter; |
|
5 |
import java.util.List; |
|
6 |
import java.util.Map; |
|
7 |
import java.util.Map.Entry; |
|
8 |
import java.util.Set; |
|
9 |
import javax.xml.transform.*; |
|
10 |
import javax.xml.transform.dom.DOMSource; |
|
11 |
import javax.xml.transform.stream.StreamResult; |
|
12 |
|
|
13 |
import com.google.common.base.Joiner; |
|
14 |
import com.google.common.base.Predicate; |
|
15 |
import com.google.common.base.Splitter; |
|
16 |
import com.google.common.collect.Iterables; |
|
17 |
import com.google.common.collect.Lists; |
|
18 |
import com.google.common.collect.Maps; |
|
19 |
import com.google.common.collect.Sets; |
|
20 |
import com.google.protobuf.Descriptors.EnumValueDescriptor; |
|
21 |
import com.google.protobuf.Descriptors.FieldDescriptor; |
|
22 |
import com.google.protobuf.GeneratedMessage; |
|
23 |
import com.mycila.xmltool.XMLDoc; |
|
24 |
import com.mycila.xmltool.XMLTag; |
|
25 |
import eu.dnetlib.data.mapreduce.hbase.index.config.*; |
|
26 |
import eu.dnetlib.data.proto.FieldTypeProtos.*; |
|
27 |
import eu.dnetlib.data.proto.OafProtos.OafEntity; |
|
28 |
import eu.dnetlib.data.proto.OafProtos.OafRel; |
|
29 |
import eu.dnetlib.data.proto.ProjectProtos.Project; |
|
30 |
import eu.dnetlib.data.proto.RelMetadataProtos.RelMetadata; |
|
31 |
import eu.dnetlib.data.proto.ResultProtos.Result; |
|
32 |
import eu.dnetlib.data.proto.ResultProtos.Result.Context; |
|
33 |
import eu.dnetlib.data.proto.ResultProtos.Result.ExternalReference; |
|
34 |
import eu.dnetlib.data.proto.ResultProtos.Result.Instance; |
|
35 |
import eu.dnetlib.data.proto.ResultProtos.Result.Journal; |
|
36 |
import eu.dnetlib.data.proto.TypeProtos; |
|
37 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
38 |
import org.apache.commons.lang.StringUtils; |
|
39 |
import org.dom4j.Document; |
|
40 |
import org.dom4j.DocumentException; |
|
41 |
import org.dom4j.Element; |
|
42 |
import org.dom4j.Node; |
|
43 |
import org.dom4j.io.SAXReader; |
|
44 |
|
|
45 |
import static eu.dnetlib.miscutils.collections.MappedCollection.listMap; |
|
46 |
|
|
47 |
public class XmlRecordFactory { |
|
48 |
|
|
49 |
// private static final Log log = LogFactory.getLog(XmlRecordFactory.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
50 |
|
|
51 |
private final Map<String, Integer> relCounters = Maps.newHashMap(); |
|
52 |
protected Set<String> specialDatasourceTypes; |
|
53 |
protected TemplateFactory templateFactory = new TemplateFactory(); |
|
54 |
protected OafDecoder mainEntity = null; |
|
55 |
protected String key = null; |
|
56 |
protected List<OafDecoder> relations = Lists.newLinkedList(); |
|
57 |
protected List<OafDecoder> children = Lists.newLinkedList(); |
|
58 |
protected EntityConfigTable entityConfigTable; |
|
59 |
protected ContextMapper contextMapper; |
|
60 |
protected RelClasses relClasses; |
|
61 |
protected String schemaLocation; |
|
62 |
protected boolean entityDefaults; |
|
63 |
protected boolean relDefaults; |
|
64 |
protected boolean childDefaults; |
|
65 |
protected Set<String> contextes = Sets.newHashSet(); |
|
66 |
protected List<String> extraInfo = Lists.newArrayList(); |
|
67 |
protected Map<String, Integer> counters = Maps.newHashMap(); |
|
68 |
protected Transformer transformer; |
|
69 |
|
|
70 |
protected static Predicate<String> instanceFilter = new Predicate<String>() { |
|
71 |
final Set<String> instanceFieldFilter = Sets.newHashSet("instancetype", "hostedby", "license", "accessright", "collectedfrom", "dateofacceptance"); |
|
72 |
@Override |
|
73 |
public boolean apply(final String s) { |
|
74 |
return instanceFieldFilter.contains(s); |
|
75 |
} |
|
76 |
}; |
|
77 |
|
|
78 |
public XmlRecordFactory(final EntityConfigTable entityConfigTable, final ContextMapper contextMapper, final RelClasses relClasses, |
|
79 |
final String schemaLocation, final boolean entityDefaults, final boolean relDefaults, final boolean childDefeaults, final Set<String> otherDatasourceTypesUForUI) |
|
80 |
throws TransformerConfigurationException, TransformerFactoryConfigurationError { |
|
81 |
this.entityConfigTable = entityConfigTable; |
|
82 |
this.contextMapper = contextMapper; |
|
83 |
this.relClasses = relClasses; |
|
84 |
this.schemaLocation = schemaLocation; |
|
85 |
this.entityDefaults = entityDefaults; |
|
86 |
this.relDefaults = relDefaults; |
|
87 |
this.childDefaults = childDefeaults; |
|
88 |
this.specialDatasourceTypes = otherDatasourceTypesUForUI; |
|
89 |
|
|
90 |
transformer = TransformerFactory.newInstance().newTransformer(); |
|
91 |
transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes"); |
|
92 |
} |
|
93 |
|
|
94 |
public static String removePrefix(final String s) { |
|
95 |
if (s.contains("|")) return StringUtils.substringAfter(s, "|"); |
|
96 |
return s; |
|
97 |
} |
|
98 |
|
|
99 |
public static String escapeXml(final String value) { |
|
100 |
return value.replaceAll("&", "&").replaceAll("<", "<").replaceAll(">", ">").replaceAll("\"", """).replaceAll("'", "'"); |
|
101 |
} |
|
102 |
|
|
103 |
public Map<String, Integer> getRelCounters() { |
|
104 |
return relCounters; |
|
105 |
} |
|
106 |
|
|
107 |
public RelClasses getRelClasses() { |
|
108 |
return relClasses; |
|
109 |
} |
|
110 |
|
|
111 |
public String getId() { |
|
112 |
return key; |
|
113 |
} |
|
114 |
|
|
115 |
public boolean isValid() { |
|
116 |
return mainEntity != null; |
|
117 |
} |
|
118 |
|
|
119 |
public void setMainEntity(final OafDecoder mainEntity) { |
|
120 |
this.mainEntity = mainEntity; |
|
121 |
this.key = mainEntity.decodeEntity().getId(); |
|
122 |
} |
|
123 |
|
|
124 |
public void addRelation(final Type type, final OafDecoder rel) { |
|
125 |
addRelOrChild(type, relations, rel); |
|
126 |
} |
|
127 |
|
|
128 |
public void addChild(final Type type, final OafDecoder child) { |
|
129 |
addRelOrChild(type, children, child); |
|
130 |
} |
|
131 |
|
|
132 |
private void addRelOrChild(final Type type, final List<OafDecoder> list, final OafDecoder decoder) { |
|
133 |
|
|
134 |
final OafRel oafRel = decoder.getOafRel(); |
|
135 |
final String rd = oafRel.getRelType().toString() + "_" + oafRel.getSubRelType() + "_" + relClasses.getInverse(oafRel.getRelClass()); |
|
136 |
final LinkDescriptor ld = entityConfigTable.getDescriptor(type, new RelDescriptor(rd)); |
|
137 |
|
|
138 |
if (getRelCounters().get(rd) == null) { |
|
139 |
getRelCounters().put(rd, 0); |
|
140 |
} |
|
141 |
|
|
142 |
if (ld == null) { |
|
143 |
list.add(decoder); |
|
144 |
return; |
|
145 |
} |
|
146 |
|
|
147 |
if (ld.getMax() < 0) { |
|
148 |
list.add(decoder); |
|
149 |
return; |
|
150 |
} |
|
151 |
|
|
152 |
if (getRelCounters().get(rd) < ld.getMax()) { |
|
153 |
getRelCounters().put(rd, getRelCounters().get(rd) + 1); |
|
154 |
list.add(decoder); |
|
155 |
} |
|
156 |
} |
|
157 |
|
|
158 |
public String build() { |
|
159 |
try { |
|
160 |
final OafEntityDecoder entity = mainEntity.decodeEntity(); |
|
161 |
// log.info("building"); |
|
162 |
// log.info("main: " + mainEntity); |
|
163 |
// log.info("rel: " + relations); |
|
164 |
// log.info("chi: " + children); |
|
165 |
// log.info("============="); |
|
166 |
|
|
167 |
final Predicate<String> filter = entityConfigTable.getFilter(entity.getType()); |
|
168 |
final List<String> metadata = decodeType(entity, filter, entityDefaults, false); |
|
169 |
|
|
170 |
// rels has to be processed before the contexts because they enrich the contextMap with the funding info. |
|
171 |
final List<String> rels = listRelations(); |
|
172 |
metadata.addAll(buildContexts(entity.getType())); |
|
173 |
metadata.add(parseDataInfo(mainEntity)); |
|
174 |
|
|
175 |
final String body = templateFactory.buildBody(entity.getType(), metadata, rels, listChildren(), extraInfo); |
|
176 |
|
|
177 |
return templateFactory |
|
178 |
.buildRecord(key, entity.getDateOfCollection(), entity.getDateOfTransformation(), schemaLocation, body, countersAsXml()); |
|
179 |
} catch (final Throwable e) { |
|
180 |
throw new RuntimeException(String.format("error building record '%s'", this.key), e); |
|
181 |
} |
|
182 |
} |
|
183 |
|
|
184 |
private String parseDataInfo(final OafDecoder decoder) { |
|
185 |
final DataInfo dataInfo = decoder.getOaf().getDataInfo(); |
|
186 |
|
|
187 |
final StringBuilder sb = new StringBuilder(); |
|
188 |
sb.append("<datainfo>"); |
|
189 |
sb.append(asXmlElement("inferred", dataInfo.getInferred() + "", null, null)); |
|
190 |
sb.append(asXmlElement("deletedbyinference", dataInfo.getDeletedbyinference() + "", null, null)); |
|
191 |
sb.append(asXmlElement("trust", dataInfo.getTrust() + "", null, null)); |
|
192 |
sb.append(asXmlElement("inferenceprovenance", dataInfo.getInferenceprovenance() + "", null, null)); |
|
193 |
sb.append(asXmlElement("provenanceaction", null, dataInfo.getProvenanceaction(), null)); |
|
194 |
sb.append("</datainfo>"); |
|
195 |
|
|
196 |
return sb.toString(); |
|
197 |
} |
|
198 |
|
|
199 |
private List<String> decodeType(final OafEntityDecoder decoder, final Predicate<String> filter, final boolean defaults, final boolean expandingRel) { |
|
200 |
|
|
201 |
final List<String> metadata = Lists.newArrayList(); |
|
202 |
metadata.addAll(listFields(decoder.getMetadata(), filter, defaults, expandingRel)); |
|
203 |
metadata.addAll(listFields(decoder.getOafEntity(), filter, defaults, expandingRel)); |
|
204 |
|
|
205 |
if ((decoder.getEntity() instanceof Result) && !expandingRel) { |
|
206 |
metadata.add(asXmlElement("bestaccessright", "", getBestAccessright(), null)); |
|
207 |
|
|
208 |
metadata.addAll(listFields(decoder.getEntity(), filter, defaults, expandingRel)); |
|
209 |
} |
|
210 |
if ((decoder.getEntity() instanceof Project) && !expandingRel) { |
|
211 |
metadata.addAll(listFields(decoder.getEntity(), filter, defaults, expandingRel)); |
|
212 |
} |
|
213 |
|
|
214 |
return metadata; |
|
215 |
} |
|
216 |
|
|
217 |
private Qualifier getBestAccessright() { |
|
218 |
Qualifier bestAccessRight = getQualifier("UNKNOWN", "not available", "dnet:access_modes"); |
|
219 |
final LicenseComparator lc = new LicenseComparator(); |
|
220 |
for (final Instance instance : ((Result) mainEntity.decodeEntity().getEntity()).getInstanceList()) { |
|
221 |
if (lc.compare(bestAccessRight, instance.getAccessright()) > 0) { |
|
222 |
bestAccessRight = instance.getAccessright(); |
|
223 |
} |
|
224 |
} |
|
225 |
return bestAccessRight; |
|
226 |
} |
|
227 |
|
|
228 |
public Qualifier getQualifier(final String classid, final String classname, final String schemename) { |
|
229 |
return Qualifier.newBuilder().setClassid(classid).setClassname(classname).setSchemeid(schemename).setSchemename(schemename).build(); |
|
230 |
} |
|
231 |
|
|
232 |
private List<String> listRelations() { |
|
233 |
|
|
234 |
final List<String> rels = Lists.newArrayList(); |
|
235 |
|
|
236 |
for (final OafDecoder decoder : this.relations) { |
|
237 |
|
|
238 |
final OafRel rel = decoder.getOafRel(); |
|
239 |
final OafEntity cachedTarget = rel.getCachedTarget(); |
|
240 |
final OafRelDecoder relDecoder = OafRelDecoder.decode(rel); |
|
241 |
|
|
242 |
// if (!relDecoder.getRelType().equals(RelType.personResult) || relDecoder.getRelTargetId().equals(key)) { |
|
243 |
if (relDecoder.getRelSourceId().equals(key) || relDecoder.getRelTargetId().equals(key)) { |
|
244 |
|
|
245 |
final List<String> metadata = Lists.newArrayList(); |
|
246 |
final TypeProtos.Type targetType = relDecoder.getTargetType(mainEntity.getEntity().getType()); |
|
247 |
//final Set<String> relFilter = entityConfigTable.getFilter(targetType, relDecoder.getRelDescriptor()); |
|
248 |
metadata.addAll(listFields(relDecoder.getSubRel(), entityConfigTable.getIncludeFilter(targetType, relDecoder.getRelDescriptor()), false, true)); |
|
249 |
|
|
250 |
String semanticclass = ""; |
|
251 |
String semanticscheme = ""; |
|
252 |
|
|
253 |
final RelDescriptor relDescriptor = relDecoder.getRelDescriptor(); |
|
254 |
|
|
255 |
if ((cachedTarget != null) && cachedTarget.isInitialized()) { |
|
256 |
|
|
257 |
//final Set<String> filter = entityConfigTable.getFilter(targetType, relDescriptor); |
|
258 |
final OafEntityDecoder d = OafEntityDecoder.decode(cachedTarget); |
|
259 |
metadata.addAll(decodeType(d, entityConfigTable.getIncludeFilter(targetType, relDescriptor), relDefaults, true)); |
|
260 |
if (d.getType().equals(Type.result)) { |
|
261 |
for(Instance i : cachedTarget.getResult().getInstanceList()) { |
|
262 |
final List<String> fields = listFields(i, entityConfigTable.getIncludeFilter(targetType, relDecoder.getRelDescriptor()), false, true); |
|
263 |
metadata.addAll(fields); |
|
264 |
} |
|
265 |
} |
|
266 |
} |
|
267 |
|
|
268 |
final RelMetadata relMetadata = relDecoder.getRelMetadata(); |
|
269 |
// debug |
|
270 |
if (relMetadata == null) { |
|
271 |
// System.err.println(this); |
|
272 |
semanticclass = semanticscheme = "UNKNOWN"; |
|
273 |
} else { |
|
274 |
semanticclass = relClasses.getInverse(relMetadata.getSemantics().getClassname()); |
|
275 |
semanticscheme = relMetadata.getSemantics().getSchemename(); |
|
276 |
} |
|
277 |
|
|
278 |
final String rd = relDescriptor.getSubRelType().toString(); |
|
279 |
incrementCounter(rd); |
|
280 |
|
|
281 |
final DataInfo info = decoder.getOaf().getDataInfo(); |
|
282 |
if (info.getInferred()) { |
|
283 |
incrementCounter(rd + "_inferred"); |
|
284 |
} else if(StringUtils.startsWith(info.getProvenanceaction().getClassid(), "sysimport:crosswalk")) { |
|
285 |
incrementCounter(rd + "_collected"); |
|
286 |
} else if(StringUtils.startsWith(info.getProvenanceaction().getClassid(), "user:")) { |
|
287 |
incrementCounter(rd + "_claimed"); |
|
288 |
} |
|
289 |
|
|
290 |
final LinkDescriptor ld = entityConfigTable.getDescriptor(relDecoder.getTargetType(mainEntity.getEntity().getType()), relDescriptor); |
|
291 |
|
|
292 |
final String relId = (ld != null) && !ld.isSymmetric() ? relDecoder.getRelTargetId() : relDecoder.getRelSourceId(); |
|
293 |
|
|
294 |
rels.add(templateFactory.getRel(targetType, relId, Sets.newHashSet(metadata), semanticclass, semanticscheme, info.getInferred(), info.getTrust(), |
|
295 |
info.getInferenceprovenance(), info.getProvenanceaction().getClassid())); |
|
296 |
} |
|
297 |
} |
|
298 |
return rels; |
|
299 |
} |
|
300 |
|
|
301 |
// ////////////////////////////////// |
|
302 |
|
|
303 |
private List<String> listChildren() { |
|
304 |
|
|
305 |
final List<String> children = Lists.newArrayList(); |
|
306 |
for (final OafDecoder decoder : this.children) { |
|
307 |
final OafEntity cachedTarget = decoder.getOafRel().getCachedTarget(); |
|
308 |
addChildren(children, cachedTarget, decoder.getRelDescriptor()); |
|
309 |
} |
Also available in: Unified diff
[maven-release-plugin] copy for tag dnet-mapreduce-jobs-1.1.1-PROD