Revision 49096
Added by Claudio Atzori over 6 years ago
modules/dnet-mapreduce-jobs/trunk/src/test/java/eu/dnetlib/data/mapreduce/hbase/broker/EventFactoryTest.java | ||
---|---|---|
42 | 42 |
.setResulttype(q("publication", "dnet:result_typologies"))) |
43 | 43 |
.addInstance(Instance.newBuilder() |
44 | 44 |
.setHostedby(kv("456", "PubMed")) |
45 |
.setLicence(q("OPEN", "dnet:licenses"))
|
|
45 |
.setAccessright(q("OPEN", "dnet:licenses"))
|
|
46 | 46 |
.addUrl("http://456"))) |
47 | 47 |
.build(); |
48 | 48 |
|
... | ... | |
54 | 54 |
.addInstance( |
55 | 55 |
Instance.newBuilder() |
56 | 56 |
.setHostedby(kv("123", "Puma")) |
57 |
.setLicence(q("CLOSED", "dnet:licenses"))
|
|
57 |
.setAccessright(q("CLOSED", "dnet:licenses"))
|
|
58 | 58 |
.addUrl("http://123"))) |
59 | 59 |
.build(); |
60 | 60 |
|
modules/dnet-mapreduce-jobs/trunk/src/test/resources/eu/dnetlib/data/transform/recordDatasetPUB.xml | ||
---|---|---|
78 | 78 |
<oaf:identifier identifierType="DOI">10.4119/unibi/2693180</oaf:identifier> |
79 | 79 |
<oaf:dateAccepted>2014-01-01</oaf:dateAccepted> |
80 | 80 |
<oaf:accessrights>OPEN</oaf:accessrights> |
81 |
<oaf:license>https://creativecommons.org/licenses/by/4.0/</oaf:license> |
|
81 | 82 |
<oaf:language>eng</oaf:language> |
82 | 83 |
<oaf:hostedBy id="re3data_____::r3d100010750" name="PUB Data Publications"/> |
83 | 84 |
<oaf:collectedFrom id="re3data_____::r3d100010750" name="PUB Data Publications"/> |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupReducer.java | ||
---|---|---|
70 | 70 |
|
71 | 71 |
final Set<String> seen = new HashSet<String>(); |
72 | 72 |
final int queueMaxSize = dedupConf.getWf().getQueueMaxSize(); |
73 |
int count = 0; |
|
73 |
|
|
74 | 74 |
boolean logged = false; |
75 | 75 |
|
76 | 76 |
for (final ImmutableBytesWritable i : values) { |
77 |
count++; |
|
78 |
|
|
79 | 77 |
if (queue.size() <= queueMaxSize) { |
80 | 78 |
final MapDocument doc = MapDocumentSerializer.decode(i.copyBytes()); |
81 | 79 |
final String id = doc.getIdentifier(); |
... | ... | |
95 | 93 |
} |
96 | 94 |
} |
97 | 95 |
|
98 |
log.info(String.format("cluster key '%s' size '%s'", key, count)); |
|
99 |
|
|
100 | 96 |
return queue; |
101 | 97 |
} |
102 | 98 |
|
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/OAVersionEventFactory.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.broker; |
2 | 2 |
|
3 |
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent; |
|
4 |
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getKey; |
|
5 |
|
|
6 | 3 |
import java.io.IOException; |
7 | 4 |
import java.util.List; |
8 | 5 |
import java.util.Set; |
6 |
import java.util.stream.Collectors; |
|
9 | 7 |
|
10 |
import org.apache.commons.lang.StringUtils; |
|
11 |
import org.apache.hadoop.io.Text; |
|
12 |
import org.apache.hadoop.mapreduce.Reducer.Context; |
|
13 |
import org.dom4j.DocumentException; |
|
14 |
|
|
15 | 8 |
import com.google.common.base.Predicate; |
16 | 9 |
import com.google.common.collect.Iterables; |
17 |
import com.google.common.collect.Lists; |
|
18 |
|
|
19 | 10 |
import eu.dnetlib.broker.objects.OpenAireEventPayload; |
20 | 11 |
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory; |
21 | 12 |
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory; |
22 | 13 |
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventMessage; |
23 | 14 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
24 | 15 |
import eu.dnetlib.data.proto.ResultProtos.Result.Instance; |
16 |
import org.apache.commons.lang.StringUtils; |
|
17 |
import org.apache.hadoop.io.Text; |
|
18 |
import org.apache.hadoop.mapreduce.Reducer.Context; |
|
19 |
import org.dom4j.DocumentException; |
|
25 | 20 |
|
21 |
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent; |
|
22 |
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getKey; |
|
23 |
|
|
26 | 24 |
/** |
27 | 25 |
* Created by claudio on 26/07/16. |
28 | 26 |
*/ |
... | ... | |
59 | 57 |
private void doProcessOAVersion(final Context context, final Oaf current, final Oaf other, final Topic topic, final float trust) |
60 | 58 |
throws IOException, InterruptedException, DocumentException { |
61 | 59 |
final Oaf.Builder prototype = Oaf.newBuilder(current); |
62 |
final Iterable<Instance> i = Iterables.filter(other.getEntity().getResult().getInstanceList(), new Predicate<Instance>() { |
|
63 | 60 |
|
64 |
@Override |
|
65 |
public boolean apply(final Instance i) { |
|
66 |
return "OPEN".equalsIgnoreCase(i.getLicence().getClassid()); |
|
67 |
} |
|
68 |
}); |
|
69 |
prototype.getEntityBuilder().getResultBuilder().addAllInstance(i); |
|
61 |
final List<Instance> instances = |
|
62 |
other.getEntity().getResult().getInstanceList().stream() |
|
63 |
.filter(i ->"OPEN".equalsIgnoreCase(i.getAccessright().getClassid())) |
|
64 |
.collect(Collectors.toList()); |
|
65 |
prototype.getEntityBuilder().getResultBuilder().addAllInstance(instances); |
|
70 | 66 |
|
71 | 67 |
final Oaf oaf = prototype.build(); |
72 | 68 |
|
73 | 69 |
final OpenAireEventPayload payload = |
74 |
HighlightFactory.highlightEnrichOa(OpenAireEventPayloadFactory.fromOAF(oaf.getEntity(), other.getEntity(), trust), Lists.newArrayList(i));
|
|
70 |
HighlightFactory.highlightEnrichOa(OpenAireEventPayloadFactory.fromOAF(oaf.getEntity(), other.getEntity(), trust), instances);
|
|
75 | 71 |
final EventMessage event = asEvent(oaf.getEntity(), topic, payload, other.getEntity(), trust); |
76 | 72 |
|
77 | 73 |
context.write(tKey, new Text(event.toString())); |
... | ... | |
79 | 75 |
} |
80 | 76 |
|
81 | 77 |
private boolean hasAccess(final Oaf oaf, final String access, final boolean strict) { |
82 |
final Predicate<Instance> p = new Predicate<Instance>() { |
|
83 |
|
|
84 |
@Override |
|
85 |
public boolean apply(final Instance i) { |
|
86 |
return access.equalsIgnoreCase(i.getLicence().getClassid()); |
|
87 |
} |
|
88 |
}; |
|
78 |
final Predicate<Instance> p = i -> access.equalsIgnoreCase(i.getAccessright().getClassid()); |
|
89 | 79 |
final List<Instance> i = oaf.getEntity().getResult().getInstanceList(); |
90 | 80 |
return strict ? Iterables.all(i, p) : Iterables.any(i, p); |
91 | 81 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/mapping/ProtoMapping.java | ||
---|---|---|
3 | 3 |
import java.io.StringReader; |
4 | 4 |
import java.util.List; |
5 | 5 |
import java.util.Map; |
6 |
import java.util.Set;
|
|
6 |
import java.util.stream.Collectors;
|
|
7 | 7 |
|
8 |
import com.google.common.base.Function; |
|
9 | 8 |
import com.google.common.collect.Iterables; |
10 |
import com.google.common.collect.Lists; |
|
11 | 9 |
import com.google.common.collect.Maps; |
12 |
import com.google.common.collect.Sets; |
|
13 | 10 |
import eu.dnetlib.broker.objects.*; |
14 | 11 |
import eu.dnetlib.data.proto.FieldTypeProtos.StructuredProperty; |
15 | 12 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
... | ... | |
31 | 28 |
public abstract class ProtoMapping { |
32 | 29 |
|
33 | 30 |
protected static List<Instance> mapInstances(final List<Result.Instance> in) { |
34 |
final Set<Instance> instances = Sets.newHashSet(Iterables.transform(in, new Function<Result.Instance, Instance>() { |
|
35 |
|
|
36 |
@Override |
|
37 |
public Instance apply(final Result.Instance i) { |
|
38 |
return new Instance() |
|
31 |
return in.stream() |
|
32 |
.map(i -> new Instance() |
|
39 | 33 |
.setHostedby(getValue(i.getHostedby())) |
40 | 34 |
.setInstancetype(getValue(i.getInstancetype())) |
41 |
.setLicense(getKey(i.getLicence())) |
|
42 |
.setUrl(Iterables.getFirst(i.getUrlList(), "")); |
|
43 |
} |
|
44 |
})); |
|
45 |
return Lists.newArrayList(instances); |
|
35 |
.setLicense(getKey(i.getAccessright())) |
|
36 |
.setUrl(Iterables.getFirst(i.getUrlList(), ""))) |
|
37 |
.collect(Collectors.toList()); |
|
46 | 38 |
} |
47 | 39 |
|
48 | 40 |
protected static List<Pid> mapPids(final List<StructuredProperty> sp) { |
49 |
return Lists.newArrayList(Iterables.transform(sp, new Function<StructuredProperty, Pid>() { |
|
50 |
|
|
51 |
@Override |
|
52 |
public Pid apply(final StructuredProperty sp) { |
|
53 |
return new Pid().setType(sp.getQualifier().getClassid()).setValue(sp.getValue()); |
|
54 |
} |
|
55 |
})); |
|
41 |
return sp.stream() |
|
42 |
.map(s -> new Pid().setType(s.getQualifier().getClassid()).setValue(s.getValue())) |
|
43 |
.collect(Collectors.toList()); |
|
56 | 44 |
} |
57 | 45 |
|
58 | 46 |
protected static Journal mapJournal(final ResultProtos.Result.Journal j) { |
... | ... | |
64 | 52 |
} |
65 | 53 |
|
66 | 54 |
protected static List<ExternalReference> mapExternalRefs(final List<Result.ExternalReference> ext) { |
67 |
return Lists.newArrayList(Iterables.transform(ext, new Function<Result.ExternalReference, ExternalReference>() { |
|
68 |
|
|
69 |
@Override |
|
70 |
public ExternalReference apply(final Result.ExternalReference e) { |
|
71 |
return new ExternalReference() |
|
55 |
return ext.stream() |
|
56 |
.map(e -> new ExternalReference() |
|
72 | 57 |
.setUrl(e.getUrl()) |
73 | 58 |
.setType(getKey(e.getQualifier())) |
74 | 59 |
.setRefidentifier(e.getRefidentifier()) |
75 |
.setSitename(e.getSitename()); |
|
76 |
} |
|
77 |
})); |
|
60 |
.setSitename(e.getSitename())) |
|
61 |
.collect(Collectors.toList()); |
|
78 | 62 |
} |
79 | 63 |
|
80 | 64 |
protected static final List<Project> mapRelatedProjects(final OafEntity entity) { |
... | ... | |
85 | 69 |
projectMap.put(p.getId(), Oaf.newBuilder(rel).build()); |
86 | 70 |
} |
87 | 71 |
|
88 |
return Lists.transform(Lists.newArrayList(projectMap.values()), getProjectMappingFunction()); |
|
72 |
return projectMap.values().stream() |
|
73 |
.map(o -> mapRelatedProject(o.getRel().getCachedOafTarget().getEntity().getProject())) |
|
74 |
.collect(Collectors.toList()); |
|
89 | 75 |
} |
90 | 76 |
|
91 | 77 |
protected static final Project mapRelatedProject(final ProjectProtos.Project project) { |
... | ... | |
111 | 97 |
return p; |
112 | 98 |
} |
113 | 99 |
|
114 |
private static Function<Oaf, Project> getProjectMappingFunction() { |
|
115 |
return new Function<Oaf, Project>() { |
|
116 |
|
|
117 |
@Override |
|
118 |
public Project apply(final Oaf oafRel) { |
|
119 |
return mapRelatedProject(oafRel.getRel().getCachedOafTarget().getEntity().getProject()); |
|
120 |
} |
|
121 |
}; |
|
122 |
} |
|
123 |
|
|
124 | 100 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/index/IndexFeedMapper.java | ||
---|---|---|
7 | 7 |
import java.util.Map.Entry; |
8 | 8 |
import java.util.zip.GZIPOutputStream; |
9 | 9 |
|
10 |
import com.google.common.collect.Lists; |
|
11 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
12 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
13 |
import eu.dnetlib.functionality.index.solr.feed.InputDocumentFactory; |
|
10 | 14 |
import eu.dnetlib.functionality.index.solr.feed.ResultTransformer; |
11 | 15 |
import eu.dnetlib.functionality.index.solr.feed.ResultTransformer.Mode; |
16 |
import eu.dnetlib.functionality.index.solr.feed.StreamingInputDocumentFactory; |
|
17 |
import eu.dnetlib.miscutils.datetime.HumanTime; |
|
18 |
import eu.dnetlib.miscutils.functional.xml.ApplyXslt; |
|
12 | 19 |
import org.apache.commons.codec.binary.Base64; |
13 | 20 |
import org.apache.commons.lang.exception.ExceptionUtils; |
14 | 21 |
import org.apache.commons.logging.Log; |
... | ... | |
22 | 29 |
import org.apache.solr.client.solrj.response.UpdateResponse; |
23 | 30 |
import org.apache.solr.common.SolrInputDocument; |
24 | 31 |
|
25 |
import com.google.common.collect.Lists; |
|
26 |
|
|
27 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
28 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
29 |
import eu.dnetlib.functionality.index.solr.feed.InputDocumentFactory; |
|
30 |
import eu.dnetlib.functionality.index.solr.feed.StreamingInputDocumentFactory; |
|
31 |
import eu.dnetlib.miscutils.datetime.HumanTime; |
|
32 |
import eu.dnetlib.miscutils.functional.xml.ApplyXslt; |
|
33 |
|
|
34 | 32 |
public class IndexFeedMapper extends Mapper<Text, Text, Text, Text> { |
35 | 33 |
|
36 | 34 |
private static final Log log = LogFactory.getLog(IndexFeedMapper.class); // NOPMD by marko on 11/24/08 5:02 PM |
35 |
public static final String DNET_RESULT = "dnetResult"; |
|
37 | 36 |
|
38 | 37 |
private InputDocumentFactory documentFactory; |
39 | 38 |
|
... | ... | |
59 | 58 |
|
60 | 59 |
private final static int MAX_FEED_RETRIES = 10; |
61 | 60 |
|
61 |
private boolean compress = false; |
|
62 |
|
|
62 | 63 |
@Override |
63 | 64 |
protected void setup(final Context context) throws IOException, InterruptedException { |
64 | 65 |
|
... | ... | |
72 | 73 |
buffer = Lists.newArrayList(); |
73 | 74 |
simulation = Boolean.parseBoolean(context.getConfiguration().get(JobParams.INDEX_FEED_SIMULATION_MODE)); |
74 | 75 |
|
76 |
compress = context.getConfiguration().getBoolean(JobParams.INDEX_FEED_COMPRESS_RESULT, false); |
|
77 |
|
|
75 | 78 |
final String xslt = new String(Base64.decodeBase64(context.getConfiguration().get(JobParams.INDEX_XSLT))); |
76 | 79 |
|
77 | 80 |
log.info("got xslt: \n" + xslt); |
... | ... | |
130 | 133 |
|
131 | 134 |
try { |
132 | 135 |
indexRecord = dmfToRecord.evaluate(value.toString()); |
133 |
doc = documentFactory.parseDocument(version, indexRecord, dsId, "dnetResult", new ResultTransformer(Mode.base64) { |
|
134 |
@Override |
|
135 |
public String apply(final String s) { |
|
136 | 136 |
|
137 |
return org.apache.solr.common.util.Base64.byteArrayToBase64(zip(s)); |
|
138 |
} |
|
139 |
}); |
|
137 |
if (compress) { |
|
138 |
doc = documentFactory.parseDocument(version, indexRecord, dsId, DNET_RESULT, new ResultTransformer(Mode.base64) { |
|
139 |
@Override |
|
140 |
public String apply(final String s) { |
|
141 |
|
|
142 |
return org.apache.solr.common.util.Base64.byteArrayToBase64(zip(s)); |
|
143 |
} |
|
144 |
}); |
|
145 |
} else { |
|
146 |
doc = documentFactory.parseDocument(version, indexRecord, dsId, DNET_RESULT); |
|
147 |
} |
|
148 |
|
|
140 | 149 |
if ((doc == null) || doc.isEmpty()) throw new EmptySolrDocumentException(); |
141 | 150 |
|
142 | 151 |
} catch (final Throwable e) { |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/util/XmlRecordFactory.java | ||
---|---|---|
26 | 26 |
import eu.dnetlib.data.proto.FieldTypeProtos.*; |
27 | 27 |
import eu.dnetlib.data.proto.OafProtos.OafEntity; |
28 | 28 |
import eu.dnetlib.data.proto.OafProtos.OafRel; |
29 |
import eu.dnetlib.data.proto.PersonProtos.Person; |
|
30 | 29 |
import eu.dnetlib.data.proto.ProjectProtos.Project; |
31 | 30 |
import eu.dnetlib.data.proto.RelMetadataProtos.RelMetadata; |
32 | 31 |
import eu.dnetlib.data.proto.ResultProtos.Result; |
... | ... | |
69 | 68 |
protected Transformer transformer; |
70 | 69 |
|
71 | 70 |
protected static Predicate<String> instanceFilter = new Predicate<String>() { |
72 |
final Set<String> instanceFieldFilter = Sets.newHashSet("instancetype", "hostedby", "licence", "collectedfrom", "dateofacceptance");
|
|
71 |
final Set<String> instanceFieldFilter = Sets.newHashSet("instancetype", "hostedby", "license", "accessright", "collectedfrom", "dateofacceptance");
|
|
73 | 72 |
@Override |
74 | 73 |
public boolean apply(final String s) { |
75 | 74 |
return instanceFieldFilter.contains(s); |
... | ... | |
204 | 203 |
metadata.addAll(listFields(decoder.getOafEntity(), filter, defaults, expandingRel)); |
205 | 204 |
|
206 | 205 |
if ((decoder.getEntity() instanceof Result) && !expandingRel) { |
207 |
metadata.add(asXmlElement("bestlicense", "", getBestLicense(), null));
|
|
206 |
metadata.add(asXmlElement("bestaccessright", "", getBestAccessright(), null));
|
|
208 | 207 |
|
209 | 208 |
metadata.addAll(listFields(decoder.getEntity(), filter, defaults, expandingRel)); |
210 | 209 |
} |
211 |
if ((decoder.getEntity() instanceof Person) && !expandingRel) { |
|
212 |
metadata.addAll(listFields(decoder.getEntity(), filter, defaults, expandingRel)); |
|
213 |
} |
|
214 | 210 |
if ((decoder.getEntity() instanceof Project) && !expandingRel) { |
215 | 211 |
metadata.addAll(listFields(decoder.getEntity(), filter, defaults, expandingRel)); |
216 | 212 |
} |
... | ... | |
218 | 214 |
return metadata; |
219 | 215 |
} |
220 | 216 |
|
221 |
private Qualifier getBestLicense() {
|
|
222 |
Qualifier bestLicense = getQualifier("UNKNOWN", "not available", "dnet:access_modes");
|
|
217 |
private Qualifier getBestAccessright() {
|
|
218 |
Qualifier bestAccessRight = getQualifier("UNKNOWN", "not available", "dnet:access_modes");
|
|
223 | 219 |
final LicenseComparator lc = new LicenseComparator(); |
224 | 220 |
for (final Instance instance : ((Result) mainEntity.decodeEntity().getEntity()).getInstanceList()) { |
225 |
if (lc.compare(bestLicense, instance.getLicence()) > 0) {
|
|
226 |
bestLicense = instance.getLicence();
|
|
221 |
if (lc.compare(bestAccessRight, instance.getAccessright()) > 0) {
|
|
222 |
bestAccessRight = instance.getAccessright();
|
|
227 | 223 |
} |
228 | 224 |
} |
229 |
return bestLicense;
|
|
225 |
return bestAccessRight;
|
|
230 | 226 |
} |
231 | 227 |
|
232 | 228 |
public Qualifier getQualifier(final String classid, final String classname, final String schemename) { |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/util/OafHbaseUtils.java | ||
---|---|---|
8 | 8 |
import com.google.common.collect.Lists; |
9 | 9 |
import com.google.common.collect.Sets; |
10 | 10 |
import eu.dnetlib.data.proto.FieldTypeProtos.*; |
11 |
import eu.dnetlib.data.proto.PersonProtos.Person; |
|
12 | 11 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
13 | 12 |
|
14 | 13 |
import com.google.common.base.Function; |
... | ... | |
89 | 88 |
if (t instanceof StructuredProperty) return ((StructuredProperty) t).getValue(); |
90 | 89 |
if (t instanceof KeyValue) return ((KeyValue) t).getValue(); |
91 | 90 |
if (t instanceof String) return (String) t; |
92 |
if (t instanceof Person) return ((Person) t).getMetadata().getFullname().getValue(); |
|
93 | 91 |
if (t instanceof StringField) return ((StringField) t).getValue(); |
94 | 92 |
if (t instanceof Qualifier) return ((Qualifier) t).getClassname(); |
95 | 93 |
if (t instanceof Author) return ((Author) t).getFullname(); |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/JobParams.java | ||
---|---|---|
45 | 45 |
public static final String INDEX_ROTTEN_FILE = "index.solr.rotten.records"; |
46 | 46 |
public static final String INDEX_LOCAL_FEEDING = "index.solr.local.feeding"; |
47 | 47 |
public static final String INDEX_FEED_SIMULATION_MODE = "index.solr.sim.mode"; |
48 |
public static final String INDEX_FEED_COMPRESS_RESULT = "index.solr.compress.result"; |
|
48 | 49 |
|
49 | 50 |
public static final String INDEX_ADD_THRESHOLD = "index.add.threshold"; |
50 | 51 |
public static final String INDEX_XSLT = "index.xslt"; |
Also available in: Unified diff
fixing mapping for license vs accessright #3128, cleanup