Revision 53799
Added by Claudio Atzori almost 6 years ago
modules/dnet-mapreduce-jobs/trunk/deploy.info | ||
---|---|---|
1 |
{"type_source": "SVN", "goal": "package -U -T 4C source:jar", "url": "http://svn-public.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-mapreduce-jobs/trunk/", "deploy_repository": "dnet45-snapshots", "version": "4", "mail": "sandro.labruzzo@isti.cnr.it,michele.artini@isti.cnr.it, claudio.atzori@isti.cnr.it, alessia.bardi@isti.cnr.it", "deploy_repository_url": "http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots", "name": "dnet-mapreduce-jobs"} |
modules/dnet-mapreduce-jobs/trunk/src/main/resources/META-INF/services/javax.xml.transform.TransformerFactory | ||
---|---|---|
1 |
net.sf.saxon.TransformerFactoryImpl |
modules/dnet-mapreduce-jobs/trunk/src/main/resources/eu/dnetlib/data/mapreduce/hbase/dedup/blacklist/title_blacklist.txt | ||
---|---|---|
1 |
^(Corpus Oral Dialectal \(COD\)\.).*$ |
|
2 |
^(Kiri Karl Morgensternile).*$ |
|
3 |
^(\[Eksliibris Aleksandr).*\]$ |
|
4 |
^(Kiri A\. de Vignolles).*$ |
|
5 |
^(2 kirja Karl Morgensternile).*$ |
|
6 |
^(Pirita kloostri idaosa arheoloogilised).*$ |
|
7 |
^(Kiri tundmatule).*$ |
|
8 |
^(Kiri Jenaer Allgemeine Literaturzeitung toimetusele).*$ |
|
9 |
^(Eksliibris Nikolai Birukovile).*$ |
|
10 |
^(Eksliibris Nikolai Issakovile).*$ |
|
11 |
^(\[Eksliibris Aleksandr).*$ |
|
12 |
^(WHP Cruise Summary Information of section).*$ |
|
13 |
^(Measurement of the top quark\-pair production cross section with ATLAS in pp collisions at).*$ |
|
14 |
^(Measurement of the spin\-dependent structure function).* |
modules/dnet-mapreduce-jobs/trunk/src/main/resources/eu/dnetlib/data/mapreduce/util/entity.st | ||
---|---|---|
1 |
<oaf:$name$> |
|
2 |
$metadata:{$it$}$ |
|
3 |
<rels> |
|
4 |
$rels:{$it$}$ |
|
5 |
</rels> |
|
6 |
<children> |
|
7 |
$children:{$it$}$ |
|
8 |
</children> |
|
9 |
</oaf:$name$> |
|
10 |
$inference:{$it$}$ |
modules/dnet-mapreduce-jobs/trunk/src/main/resources/eu/dnetlib/data/mapreduce/util/record.st | ||
---|---|---|
1 |
<?xml version="1.0"?> |
|
2 |
<record> |
|
3 |
<result xmlns:dri="http://www.driver-repository.eu/namespace/dri" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> |
|
4 |
<header> |
|
5 |
<dri:objIdentifier>$id$</dri:objIdentifier> |
|
6 |
<dri:dateOfCollection>$dateofcollection$</dri:dateOfCollection> |
|
7 |
<dri:dateOfTransformation>$dateoftransformation$</dri:dateOfTransformation> |
|
8 |
<counters> |
|
9 |
$counters:{$it$}$ |
|
10 |
</counters> |
|
11 |
</header> |
|
12 |
<metadata> |
|
13 |
<oaf:entity xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
|
14 |
xmlns:oaf="http://namespace.openaire.eu/oaf" |
|
15 |
xsi:schemaLocation="http://namespace.openaire.eu/oaf $schemaLocation$"> |
|
16 |
$it$ |
|
17 |
</oaf:entity> |
|
18 |
</metadata> |
|
19 |
</result> |
|
20 |
</record> |
modules/dnet-mapreduce-jobs/trunk/src/main/resources/eu/dnetlib/data/mapreduce/util/childresult.st | ||
---|---|---|
1 |
<result> |
|
2 |
<dri:objIdentifier>$objIdentifier$</dri:objIdentifier> |
|
3 |
$metadata:{$it$}$ |
|
4 |
</result> |
modules/dnet-mapreduce-jobs/trunk/src/main/resources/eu/dnetlib/data/mapreduce/util/rel.st | ||
---|---|---|
1 |
<rel inferred="$inferred$" trust="$trust$" inferenceprovenance="$inferenceprovenance$" provenanceaction="$provenanceaction$"> |
|
2 |
<to class="$class$" scheme="$scheme$" type="$type$">$objIdentifier$</to> |
|
3 |
$metadata:{$it$}$ |
|
4 |
</rel> |
modules/dnet-mapreduce-jobs/trunk/src/main/resources/eu/dnetlib/data/mapreduce/util/child.st | ||
---|---|---|
1 |
<$name$$if(hasId)$ objidentifier="$id$"$else$$endif$> |
|
2 |
$metadata:{$it$}$ |
|
3 |
</$name$> |
modules/dnet-mapreduce-jobs/trunk/src/main/resources/eu/dnetlib/data/mapreduce/util/instance.st | ||
---|---|---|
1 |
<instance id="$instanceId$"> |
|
2 |
$metadata:{$it$}$ |
|
3 |
$webresources:{$it$}$ |
|
4 |
</instance> |
modules/dnet-mapreduce-jobs/trunk/src/main/resources/eu/dnetlib/data/mapreduce/util/webresource.st | ||
---|---|---|
1 |
<webresource> |
|
2 |
<url>$identifier$</url> |
|
3 |
</webresource> |
modules/dnet-mapreduce-jobs/trunk/src/main/resources/log4j.properties | ||
---|---|---|
1 |
### Root Level ### |
|
2 |
log4j.rootLogger=WARN, CONSOLE |
|
3 |
|
|
4 |
### Configuration for the CONSOLE appender ### |
|
5 |
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender |
|
6 |
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout |
|
7 |
log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c - %m%n |
|
8 |
|
|
9 |
### Application Level ### |
|
10 |
log4j.logger.eu.dnetlib=INFO |
|
11 |
log4j.logger.eu.dnetlib.enabling.is.sn=INFO |
|
12 |
log4j.logger.org.apache.cxf.interceptor=FATAL |
|
13 |
log4j.logger.org.apache.cxf.ws.addressing.ContextUtils=FATAL |
|
14 |
log4j.logger.eu.dnetlib.enabling.tools.AbstractBaseService=INFO |
|
15 |
log4j.logger.eu.dnetlib.enabling.inspector=DEBUG |
|
16 |
log4j.logger.eu.dnetlib.xml.database.LoggingTrigger=WARN |
|
17 |
log4j.logger.eu.dnetlib.enabling.tools.registration.ServiceRegistrator=INFO |
|
18 |
log4j.logger.eu.dnetlib.enabling.inspector=FATAL |
|
19 |
log4j.logger.eu.dnetlib.enabling.inspector.SubscriptionController=DEBUG |
|
20 |
log4j.logger.eu.dnetlib.springutils.stringtemplate.StringTemplateViewResolver=FATAL |
|
21 |
log4j.logger.eu.dnetlib.enabling.is.sn.SynchronousNotificationSenderImpl=WARN |
|
22 |
log4j.logger.eu.dnetlib.enabling.tools.LocalServiceResolverImpl=WARN |
|
23 |
log4j.logger.eu.dnetlib.enabling.is.sn.NotificationInvokerImpl=WARN |
|
24 |
log4j.logger.eu.dnetlib.data.collective=INFO |
|
25 |
log4j.logger.eu.dnetlib.data.hadoop.utils.ScanFactory=DEBUG |
|
26 |
log4j.logger.org.apache.xerces.parsers.SAXParser=OFF |
|
27 |
log4j.logger.eu.dnetlib.conf.PropertyFetcher=WARN |
|
28 |
#log4j.logger.eu.dnetlib.data.transform.XsltRowTransformerFactory=DEBUG |
|
29 |
|
|
30 |
log4j.logger.eu.dnetlib.enabling.is.sn.ISSNServiceImpl=OFF |
|
31 |
log4j.logger.eu.dnetlib.enabling.datasources.DatasourceManagerClients=FATAL |
|
32 |
log4j.logger.eu.dnetlib.data.mdstore.modular.mongodb.utils.MetadataCheckJob=DEBUG |
|
33 |
log4j.logger.eu.dnetlib.enabling.is.sn.ISSNServiceCore=WARN |
|
34 |
log4j.logger.eu.dnetlib.xml.database.exist.ExistDatabase=WARN |
|
35 |
log4j.logger.eu.dnetlib.enabling.is.store.AbstractContentInitializer=FATAL |
|
36 |
|
|
37 |
log4j.logger.org.apache.hadoop.hbase.mapreduce.TableInputFormatBase=FATAL |
|
38 |
log4j.logger.eu.dnetlib.data.mdstore.modular.plugin.CreatorExtractor=DEBUG |
|
39 |
|
|
40 |
### Spring ### |
|
41 |
log4j.logger.org.springframework=ERROR |
|
42 |
log4j.logger.org.apache=DEBUG |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/bulktag/BulkTaggingMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.bulktag; |
|
2 |
|
|
3 |
import com.google.common.base.Splitter; |
|
4 |
import com.google.common.collect.Lists; |
|
5 |
import eu.dnetlib.data.bulktag.CommunityConfiguration; |
|
6 |
import eu.dnetlib.data.bulktag.CommunityConfigurationFactory; |
|
7 |
import eu.dnetlib.data.proto.FieldTypeProtos; |
|
8 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
9 |
import eu.dnetlib.data.proto.ResultProtos; |
|
10 |
import org.apache.commons.lang.StringUtils; |
|
11 |
import org.apache.hadoop.hbase.client.Put; |
|
12 |
import org.apache.hadoop.hbase.client.Result; |
|
13 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
14 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
15 |
import org.apache.hadoop.hbase.util.Bytes; |
|
16 |
import org.apache.hadoop.io.Writable; |
|
17 |
|
|
18 |
import java.io.IOException; |
|
19 |
import java.util.List; |
|
20 |
import java.util.Map; |
|
21 |
|
|
22 |
public class BulkTaggingMapper extends TableMapper<ImmutableBytesWritable, Writable> { |
|
23 |
|
|
24 |
private CommunityConfiguration cc; |
|
25 |
|
|
26 |
private ResultTagger tagger; |
|
27 |
private boolean enabled; |
|
28 |
|
|
29 |
@Override |
|
30 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
31 |
super.setup(context); |
|
32 |
|
|
33 |
final String conf = context.getConfiguration().get("tagging.conf"); |
|
34 |
enabled = context.getConfiguration().getBoolean("tagging.enabled",false); |
|
35 |
if (StringUtils.isBlank(conf)) { |
|
36 |
throw new IllegalArgumentException("missing bulk tagging configuration"); |
|
37 |
} |
|
38 |
System.out.println("conf = " + conf); |
|
39 |
cc = CommunityConfigurationFactory.fromJson(conf); |
|
40 |
tagger = new ResultTagger(); |
|
41 |
tagger.setTrust(context.getConfiguration().get("bulktagging.trust", "0.85")); |
|
42 |
} |
|
43 |
|
|
44 |
@Override |
|
45 |
protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException { |
|
46 |
|
|
47 |
final Map<byte[], byte[]> resultMap = value.getFamilyMap(Bytes.toBytes("result")); |
|
48 |
|
|
49 |
final byte[] body = resultMap.get(Bytes.toBytes("body")); |
|
50 |
|
|
51 |
if (body != null) { |
|
52 |
context.getCounter("Bulk Tagging", "not null body ").increment(1); |
|
53 |
|
|
54 |
final Oaf oaf = tagger.enrichContext(Oaf.parseFrom(body), cc, context); |
|
55 |
if (oaf == null) { |
|
56 |
//context.getCounter("In mapper", " null oaf ").increment(1); |
|
57 |
return; |
|
58 |
} |
|
59 |
|
|
60 |
long tagged = oaf.getEntity().getResult().getMetadata().getContextList().stream() |
|
61 |
.flatMap(c -> c.getDataInfoList().stream()) |
|
62 |
.map(FieldTypeProtos.DataInfo::getInferenceprovenance) |
|
63 |
.filter(infProv -> "bulktagging".equals(infProv)) |
|
64 |
.count(); |
|
65 |
context.getCounter("Bulk Tagging", " bulktagged ").increment(tagged); |
|
66 |
|
|
67 |
|
|
68 |
final Put put = new Put(key.copyBytes()).add(Bytes.toBytes("result"), Bytes.toBytes("body"), oaf.toByteArray()); |
|
69 |
|
|
70 |
if(tagged > 0){ |
|
71 |
if (enabled) |
|
72 |
context.write(key, put); |
|
73 |
context.getCounter("Bulk Tagging", " write op ").increment(1); |
|
74 |
} |
|
75 |
|
|
76 |
} |
|
77 |
else{ |
|
78 |
context.getCounter("Bulk Tagging", " null body ").increment(1); |
|
79 |
} |
|
80 |
|
|
81 |
} |
|
82 |
|
|
83 |
|
|
84 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/bulktag/ResultTagger.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.bulktag; |
|
2 |
|
|
3 |
import com.google.common.base.Functions; |
|
4 |
import com.google.common.collect.Maps; |
|
5 |
import com.google.common.collect.Sets; |
|
6 |
import eu.dnetlib.data.bulktag.CommunityConfiguration; |
|
7 |
import eu.dnetlib.data.bulktag.Pair; |
|
8 |
import eu.dnetlib.data.proto.FieldTypeProtos; |
|
9 |
import eu.dnetlib.data.proto.OafProtos; |
|
10 |
import eu.dnetlib.data.proto.ResultProtos; |
|
11 |
import org.apache.commons.lang3.StringUtils; |
|
12 |
import org.apache.hadoop.mapreduce.Mapper; |
|
13 |
|
|
14 |
import java.util.*; |
|
15 |
import java.util.stream.Collectors; |
|
16 |
import java.util.stream.Stream; |
|
17 |
|
|
18 |
/** |
|
19 |
* Created by miriam on 02/08/2018. |
|
20 |
*/ |
|
21 |
public class ResultTagger { |
|
22 |
private final static String DATA_INFO_TYPE = "bulktagging"; |
|
23 |
private final static String SCHEMA_NAME = "dnet:provenanceActions"; |
|
24 |
private final static String CLASS_ID = "bulktagging::community"; |
|
25 |
private final static String SCHEMA_ID = "dnet:provenanceActions"; |
|
26 |
private final static String COUNTER_GROUP = "Bulk Tagging"; |
|
27 |
|
|
28 |
private String trust = "0.8"; |
|
29 |
|
|
30 |
|
|
31 |
public OafProtos.Oaf enrichContext(final OafProtos.Oaf oaf, final CommunityConfiguration conf, final Mapper.Context context) { |
|
32 |
|
|
33 |
//context.getCounter(COUNTER_GROUP, "to enrich").increment(1); |
|
34 |
final OafProtos.Oaf.Builder builder = OafProtos.Oaf.newBuilder(oaf); |
|
35 |
|
|
36 |
|
|
37 |
if(oaf.getDataInfo().getDeletedbyinference()){ |
|
38 |
context.getCounter(COUNTER_GROUP, "deleted by inference").increment(1); |
|
39 |
return null; |
|
40 |
} |
|
41 |
//context.getCounter(COUNTER_GROUP, "not deleted by inference").increment(1); |
|
42 |
|
|
43 |
final List<ResultProtos.Result.Context> contextList = oaf.getEntity().getResult().getMetadata().getContextList(); |
|
44 |
|
|
45 |
if(contextList.size()>0){ |
|
46 |
context.getCounter(COUNTER_GROUP, "exist context list").increment(1); |
|
47 |
}else{ |
|
48 |
context.getCounter(COUNTER_GROUP, "not exist context list").increment(1); |
|
49 |
} |
|
50 |
//communities contains all the communities to be added as context for the result |
|
51 |
final Set<String> communities = new HashSet<>(); |
|
52 |
|
|
53 |
oaf.getEntity().getResult().getMetadata().getSubjectList().stream() |
|
54 |
.map(subject -> subject.getValue()) |
|
55 |
.filter(StringUtils::isNotBlank) |
|
56 |
.map(String::toLowerCase) |
|
57 |
.map(String::trim) |
|
58 |
.collect(Collectors.toCollection(HashSet::new)) |
|
59 |
.forEach(s -> communities.addAll(conf.getCommunityForSubjectValue(s))); |
|
60 |
|
|
61 |
oaf.getEntity().getResult().getInstanceList() |
|
62 |
.stream() |
|
63 |
.map(i -> new Pair<>(i.getCollectedfrom().getKey(), i.getHostedby().getKey())) |
|
64 |
.flatMap(p -> Stream.of(p.getFst(), p.getSnd())) |
|
65 |
.map(s -> StringUtils.substringAfter(s, "|")) |
|
66 |
.collect(Collectors.toCollection(HashSet::new)) |
|
67 |
.forEach(dsId -> communities.addAll(conf.getCommunityForDatasourceValue(dsId))); |
|
68 |
|
|
69 |
//TODO: add code for Zenodo Communities |
|
70 |
|
|
71 |
if(communities.isEmpty()){ |
|
72 |
context.getCounter(COUNTER_GROUP, "list of communities empty").increment(1); |
|
73 |
}else{ |
|
74 |
context.getCounter(COUNTER_GROUP, "list of communities has values!").increment(1); |
|
75 |
} |
|
76 |
|
|
77 |
final ResultProtos.Result.Metadata.Builder mBuilder = builder.getEntityBuilder().getResultBuilder().getMetadataBuilder(); |
|
78 |
|
|
79 |
final Map<String, ResultProtos.Result.Context.Builder> cBuilders = Maps.newHashMap(); |
|
80 |
mBuilder.getContextBuilderList().forEach(cBuilder -> { |
|
81 |
cBuilders.put(cBuilder.getId(), cBuilder); |
|
82 |
}); |
|
83 |
|
|
84 |
for(String contextId:communities){ |
|
85 |
|
|
86 |
final ResultProtos.Result.Context.Builder cBuilder = cBuilders.get(contextId); |
|
87 |
if (cBuilder != null) { |
|
88 |
|
|
89 |
if (!cBuilder.getDataInfoBuilderList().stream() |
|
90 |
.map(di -> di.getInferenceprovenance()) |
|
91 |
.anyMatch(s -> DATA_INFO_TYPE.equals(s))) { |
|
92 |
|
|
93 |
cBuilder.addDataInfo(buildDataInfo()); |
|
94 |
context.getCounter(COUNTER_GROUP, "add provenance").increment(1); |
|
95 |
} else { |
|
96 |
context.getCounter(COUNTER_GROUP, "provenance already bulk tagged").increment(1); |
|
97 |
} |
|
98 |
} else { |
|
99 |
context.getCounter(COUNTER_GROUP, "add context").increment(1); |
|
100 |
mBuilder.addContext(buildContext(contextId)); |
|
101 |
} |
|
102 |
|
|
103 |
} |
|
104 |
|
|
105 |
return builder.build(); |
|
106 |
} |
|
107 |
|
|
108 |
private ResultProtos.Result.Context buildContext(final String contextId) { |
|
109 |
return ResultProtos.Result.Context.newBuilder() |
|
110 |
.setId(contextId) |
|
111 |
.addDataInfo(buildDataInfo()) |
|
112 |
.build(); |
|
113 |
} |
|
114 |
|
|
115 |
private FieldTypeProtos.DataInfo buildDataInfo() { |
|
116 |
FieldTypeProtos.DataInfo.Builder builder = FieldTypeProtos.DataInfo.newBuilder() |
|
117 |
.setInferred(true) |
|
118 |
.setProvenanceaction( |
|
119 |
FieldTypeProtos.Qualifier.newBuilder() |
|
120 |
.setClassid(CLASS_ID) |
|
121 |
.setClassname("Bulk Tagging for Communities") |
|
122 |
.setSchemeid(SCHEMA_ID) |
|
123 |
.setSchemename(SCHEMA_NAME)) |
|
124 |
.setInferenceprovenance(DATA_INFO_TYPE) |
|
125 |
.setTrust(trust); |
|
126 |
return builder |
|
127 |
.build(); |
|
128 |
} |
|
129 |
|
|
130 |
|
|
131 |
public void setTrust(String s) { |
|
132 |
trust = s; |
|
133 |
} |
|
134 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ExportSimplifiedRecordsReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dataexport; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
|
|
5 |
import org.apache.hadoop.io.Text; |
|
6 |
import org.apache.hadoop.mapreduce.Reducer; |
|
7 |
|
|
8 |
public class ExportSimplifiedRecordsReducer extends Reducer<Text, Text, Text, Text> { |
|
9 |
|
|
10 |
private Text keyOut; |
|
11 |
|
|
12 |
@Override |
|
13 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
14 |
keyOut = new Text(""); |
|
15 |
} |
|
16 |
|
|
17 |
@Override |
|
18 |
protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException { |
|
19 |
for(final Text v : values) { |
|
20 |
//keyOut.set(key.toString() + "@@@"); |
|
21 |
context.write(keyOut, v); |
|
22 |
} |
|
23 |
} |
|
24 |
|
|
25 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ExportInformationSpaceMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dataexport; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Map; |
|
5 |
import java.util.Map.Entry; |
|
6 |
import java.util.NavigableMap; |
|
7 |
|
|
8 |
import org.apache.commons.logging.Log; |
|
9 |
import org.apache.commons.logging.LogFactory; |
|
10 |
import org.apache.hadoop.hbase.client.Result; |
|
11 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
12 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
13 |
import org.apache.hadoop.io.Text; |
|
14 |
|
|
15 |
import com.google.common.base.Joiner; |
|
16 |
import com.googlecode.protobuf.format.JsonFormat; |
|
17 |
|
|
18 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
|
19 |
|
|
20 |
/** |
|
21 |
* Exports Oaf objects as their json serialization. |
|
22 |
* |
|
23 |
* @author claudio |
|
24 |
* |
|
25 |
*/ |
|
26 |
public class ExportInformationSpaceMapper extends TableMapper<Text, Text> { |
|
27 |
|
|
28 |
/** |
|
29 |
* logger. |
|
30 |
*/ |
|
31 |
private static final Log log = LogFactory.getLog(ExportInformationSpaceMapper.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
32 |
|
|
33 |
private static final String SEPARATOR = "@"; |
|
34 |
|
|
35 |
private Text keyOut; |
|
36 |
|
|
37 |
private Text valueOut; |
|
38 |
|
|
39 |
@Override |
|
40 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
41 |
super.setup(context); |
|
42 |
|
|
43 |
keyOut = new Text(); |
|
44 |
valueOut = new Text(); |
|
45 |
} |
|
46 |
|
|
47 |
@Override |
|
48 |
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { |
|
49 |
try { |
|
50 |
byte[] rowKey = keyIn.copyBytes(); |
|
51 |
Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap(); |
|
52 |
|
|
53 |
for (byte[] cf : row.keySet()) { |
|
54 |
|
|
55 |
for (Entry<byte[], byte[]> q : row.get(cf).entrySet()) { |
|
56 |
|
|
57 |
emit(rowKey, cf, q.getKey(), q.getValue(), context); |
|
58 |
} |
|
59 |
} |
|
60 |
} catch (final Throwable e) { |
|
61 |
log.error("error exporting the following record from HBase: " + value.toString(), e); |
|
62 |
context.getCounter("error", e.getClass().getName()).increment(1); |
|
63 |
throw new RuntimeException(e); |
|
64 |
} |
|
65 |
} |
|
66 |
|
|
67 |
private void emit(final byte[] rowKey, final byte[] cf, final byte[] q, final byte[] value, final Context context) throws IOException, InterruptedException { |
|
68 |
|
|
69 |
keyOut.set(Joiner.on(SEPARATOR).join(new String(rowKey), new String(cf), new String(q))); |
|
70 |
|
|
71 |
if ((value == null) || (value.length == 0)) { |
|
72 |
valueOut.set(""); |
|
73 |
} else { |
|
74 |
valueOut.set(new JsonFormat().printToString(OafDecoder.decode(value).getOaf())); |
|
75 |
} |
|
76 |
context.write(keyOut, valueOut); |
|
77 |
} |
|
78 |
|
|
79 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ExportResultIdentifiersMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dataexport; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.ArrayList; |
|
5 |
import java.util.Collections; |
|
6 |
import java.util.List; |
|
7 |
|
|
8 |
import com.google.common.base.Function; |
|
9 |
import com.google.common.collect.Iterables; |
|
10 |
import com.google.common.collect.Lists; |
|
11 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
12 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
|
13 |
import eu.dnetlib.data.proto.OafProtos.OafEntity; |
|
14 |
import org.apache.commons.logging.Log; |
|
15 |
import org.apache.commons.logging.LogFactory; |
|
16 |
import org.apache.hadoop.hbase.client.Result; |
|
17 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
18 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
19 |
import org.apache.hadoop.hbase.util.Bytes; |
|
20 |
import org.apache.hadoop.io.Text; |
|
21 |
|
|
22 |
/** |
|
23 |
* Exports the result identifiers as json. |
|
24 |
* |
|
25 |
* @author claudio |
|
26 |
*/ |
|
27 |
public class ExportResultIdentifiersMapper extends TableMapper<Text, Text> { |
|
28 |
|
|
29 |
/** |
|
30 |
* logger. |
|
31 |
*/ |
|
32 |
private static final Log log = LogFactory.getLog(ExportResultIdentifiersMapper.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
33 |
|
|
34 |
private static final String CF = "result"; |
|
35 |
|
|
36 |
private Text keyOut; |
|
37 |
|
|
38 |
private Text valueOut; |
|
39 |
|
|
40 |
@Override |
|
41 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
42 |
super.setup(context); |
|
43 |
|
|
44 |
keyOut = new Text(""); |
|
45 |
valueOut = new Text(); |
|
46 |
} |
|
47 |
|
|
48 |
@Override |
|
49 |
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { |
|
50 |
try { |
|
51 |
final byte[] body = value.getValue(Bytes.toBytes(CF), DedupUtils.BODY_B); |
|
52 |
|
|
53 |
if (body == null) { |
|
54 |
context.getCounter(CF, "missing body").increment(1); |
|
55 |
return; |
|
56 |
} |
|
57 |
|
|
58 |
final OpenaireEntityId id = new OpenaireEntityId(); |
|
59 |
final OafDecoder d = OafDecoder.decode(body); |
|
60 |
|
|
61 |
id.setDeleted(d.getOaf().getDataInfo().getDeletedbyinference()); |
|
62 |
id.setId(d.getEntityId()); |
|
63 |
|
|
64 |
final List<OafEntity> childrenList = d.getEntity().getChildrenList(); |
|
65 |
if (childrenList != null && !childrenList.isEmpty()) { |
|
66 |
final ArrayList<String> mergedIds = Lists.newArrayList(Iterables.transform(childrenList, new Function<OafEntity, String>() { |
|
67 |
@Override |
|
68 |
public String apply(final OafEntity oafEntity) { |
|
69 |
return oafEntity.getId(); |
|
70 |
} |
|
71 |
})); |
|
72 |
Collections.sort(mergedIds); |
|
73 |
id.setMergedIds(mergedIds); |
|
74 |
} |
|
75 |
|
|
76 |
valueOut.set(id.toString()); |
|
77 |
context.write(keyOut, valueOut); |
|
78 |
|
|
79 |
} catch (final Throwable e) { |
|
80 |
log.error("error exporting the following record from HBase: " + value.toString(), e); |
|
81 |
context.getCounter("error", e.getClass().getName()).increment(1); |
|
82 |
throw new RuntimeException(e); |
|
83 |
} |
|
84 |
} |
|
85 |
|
|
86 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/OpenaireEntityId.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dataexport; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
|
|
5 |
import com.google.gson.Gson; |
|
6 |
|
|
7 |
/** |
|
8 |
* Created by claudio on 27/01/16. |
|
9 |
*/ |
|
10 |
public class OpenaireEntityId { |
|
11 |
|
|
12 |
private String id; |
|
13 |
|
|
14 |
private List<String> mergedIds; |
|
15 |
|
|
16 |
private boolean deleted; |
|
17 |
|
|
18 |
public OpenaireEntityId() { |
|
19 |
} |
|
20 |
|
|
21 |
public OpenaireEntityId(final String id, final List<String> mergedIds, final boolean deleted) { |
|
22 |
this.id = id; |
|
23 |
this.mergedIds = mergedIds; |
|
24 |
this.deleted = deleted; |
|
25 |
} |
|
26 |
|
|
27 |
public String getId() { |
|
28 |
return id; |
|
29 |
} |
|
30 |
|
|
31 |
public void setId(final String id) { |
|
32 |
this.id = id; |
|
33 |
} |
|
34 |
|
|
35 |
public List<String> getMergedIds() { |
|
36 |
return mergedIds; |
|
37 |
} |
|
38 |
|
|
39 |
public void setMergedIds(final List<String> mergedIds) { |
|
40 |
this.mergedIds = mergedIds; |
|
41 |
} |
|
42 |
|
|
43 |
public boolean isDeleted() { |
|
44 |
return deleted; |
|
45 |
} |
|
46 |
|
|
47 |
public void setDeleted(final boolean deleted) { |
|
48 |
this.deleted = deleted; |
|
49 |
} |
|
50 |
|
|
51 |
@Override |
|
52 |
public String toString() { |
|
53 |
return new Gson().toJson(this); |
|
54 |
} |
|
55 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ExportSimplifiedRecordsMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dataexport; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
|
|
5 |
import eu.dnetlib.miscutils.functional.xml.ApplyXslt; |
|
6 |
import org.apache.commons.codec.binary.Base64; |
|
7 |
import org.apache.commons.lang.StringUtils; |
|
8 |
import org.apache.commons.logging.Log; |
|
9 |
import org.apache.commons.logging.LogFactory; |
|
10 |
import org.apache.hadoop.io.Text; |
|
11 |
import org.apache.hadoop.mapreduce.Mapper; |
|
12 |
|
|
13 |
public class ExportSimplifiedRecordsMapper extends Mapper<Text, Text, Text, Text> { |
|
14 |
|
|
15 |
private static final Log log = LogFactory.getLog(ExportSimplifiedRecordsMapper.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
16 |
|
|
17 |
private ApplyXslt recordSummarizer; |
|
18 |
|
|
19 |
private Text valueOut; |
|
20 |
|
|
21 |
private Text keyOut; |
|
22 |
|
|
23 |
@Override |
|
24 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
25 |
|
|
26 |
final String xslt = new String(Base64.decodeBase64(context.getConfiguration().get("xslt"))); |
|
27 |
|
|
28 |
log.info("got xslt: \n" + xslt); |
|
29 |
|
|
30 |
recordSummarizer = new ApplyXslt(xslt); |
|
31 |
valueOut = new Text(); |
|
32 |
keyOut = new Text(""); |
|
33 |
} |
|
34 |
|
|
35 |
@Override |
|
36 |
protected void map(final Text key, final Text value, final Context context) throws IOException, InterruptedException { |
|
37 |
|
|
38 |
final String summary = recordSummarizer.evaluate(value.toString()); |
|
39 |
if (StringUtils.isNotBlank(summary)) { |
|
40 |
keyOut.set(StringUtils.substringAfter(key.toString(), "::")); |
|
41 |
valueOut.set(summary.replaceAll("\n","").replaceAll("\t","")); |
|
42 |
context.write(keyOut, valueOut); |
|
43 |
} |
|
44 |
} |
|
45 |
|
|
46 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/VolatileColumnFamily.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase; |
|
2 |
|
|
3 |
public enum VolatileColumnFamily { |
|
4 |
|
|
5 |
dedup, dedupPerson; // instance is here to remove the old protos |
|
6 |
|
|
7 |
public static boolean isVolatile(final String columnName) { |
|
8 |
try { |
|
9 |
return VolatileColumnFamily.valueOf(columnName) != null; |
|
10 |
} catch (final Throwable e) { |
|
11 |
return false; |
|
12 |
} |
|
13 |
} |
|
14 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupDeleteRelMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
7 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
8 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
9 |
import eu.dnetlib.pace.config.DedupConfig; |
|
10 |
import org.apache.hadoop.hbase.client.Delete; |
|
11 |
import org.apache.hadoop.hbase.client.Result; |
|
12 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
13 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
14 |
import org.apache.hadoop.io.Writable; |
|
15 |
|
|
16 |
public class DedupDeleteRelMapper extends TableMapper<ImmutableBytesWritable, Writable> { |
|
17 |
|
|
18 |
private DedupConfig dedupConf; |
|
19 |
|
|
20 |
private ImmutableBytesWritable outKey; |
|
21 |
|
|
22 |
@Override |
|
23 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
24 |
dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF)); |
|
25 |
System.out.println("dedup findRoots mapper\nwf conf: " + dedupConf.toString()); |
|
26 |
|
|
27 |
outKey = new ImmutableBytesWritable(); |
|
28 |
} |
|
29 |
|
|
30 |
@Override |
|
31 |
protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException { |
|
32 |
// System.out.println("Find root mapping: " + new String(rowkey.copyBytes())); |
|
33 |
|
|
34 |
final Type type = Type.valueOf(dedupConf.getWf().getEntityType()); |
|
35 |
|
|
36 |
deleteRels(rowkey, context, value, DedupUtils.getSimilarityCFBytes(type)); |
|
37 |
deleteRels(rowkey, context, value, DedupUtils.getDedupCF_mergedInBytes(type)); |
|
38 |
deleteRels(rowkey, context, value, DedupUtils.getDedupCF_mergesBytes(type)); |
|
39 |
} |
|
40 |
|
|
41 |
private void deleteRels(final ImmutableBytesWritable rowkey, final Context context, final Result value, final byte[] cf) |
|
42 |
throws IOException, InterruptedException { |
|
43 |
|
|
44 |
final Map<byte[], byte[]> rels = value.getFamilyMap(cf); |
|
45 |
|
|
46 |
if ((rels != null) && !rels.isEmpty()) { |
|
47 |
|
|
48 |
final byte[] row = rowkey.copyBytes(); |
|
49 |
final Delete delete = new Delete(row); |
|
50 |
delete.setWriteToWAL(JobParams.WRITE_TO_WAL); |
|
51 |
|
|
52 |
delete.deleteFamily(cf); |
|
53 |
|
|
54 |
outKey.set(row); |
|
55 |
context.write(outKey, delete); |
|
56 |
context.getCounter(dedupConf.getWf().getEntityType(), new String(cf) + " deleted").increment(rels.size()); |
|
57 |
} |
|
58 |
} |
|
59 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupFindRootsMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.nio.ByteBuffer; |
|
5 |
import java.util.Map; |
|
6 |
|
|
7 |
import com.google.protobuf.InvalidProtocolBufferException; |
|
8 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
9 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
10 |
import eu.dnetlib.data.proto.DedupProtos.Dedup; |
|
11 |
import eu.dnetlib.data.proto.KindProtos.Kind; |
|
12 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
13 |
import eu.dnetlib.data.proto.OafProtos.OafRel.Builder; |
|
14 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
15 |
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions; |
|
16 |
import eu.dnetlib.pace.config.DedupConfig; |
|
17 |
import org.apache.hadoop.hbase.client.Put; |
|
18 |
import org.apache.hadoop.hbase.client.Result; |
|
19 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
20 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
21 |
import org.apache.hadoop.hbase.util.Bytes; |
|
22 |
|
|
23 |
public class DedupFindRootsMapper extends TableMapper<ImmutableBytesWritable, Put> { |
|
24 |
|
|
25 |
private DedupConfig dedupConf; |
|
26 |
|
|
27 |
@Override |
|
28 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
29 |
dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF)); |
|
30 |
System.out.println("dedup findRoots mapper\nwf conf: " + dedupConf.toString()); |
|
31 |
} |
|
32 |
|
|
33 |
@Override |
|
34 |
protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException { |
|
35 |
// System.out.println("Find root mapping: " + new String(rowkey.copyBytes())); |
|
36 |
|
|
37 |
final Type type = Type.valueOf(dedupConf.getWf().getEntityType()); |
|
38 |
final Map<byte[], byte[]> similarRels = value.getFamilyMap(DedupUtils.getSimilarityCFBytes(type)); |
|
39 |
|
|
40 |
if ((similarRels != null) && !similarRels.isEmpty()) { |
|
41 |
final ByteBuffer min = findMin(ByteBuffer.wrap(rowkey.get()), similarRels.keySet()); |
|
42 |
|
|
43 |
final byte[] row = rowkey.copyBytes(); |
|
44 |
final byte[] root = DedupUtils.newIdBytes(min, dedupConf.getWf().getDedupRun()); |
|
45 |
|
|
46 |
// System.out.println("Found root: " + new String(root)); |
|
47 |
|
|
48 |
emitDedupRel(context, DedupUtils.getDedupCF_mergedInBytes(type), row, root, buildRel(row, root, Dedup.RelName.isMergedIn)); |
|
49 |
emitDedupRel(context, DedupUtils.getDedupCF_mergesBytes(type), root, row, buildRel(root, row, Dedup.RelName.merges)); |
|
50 |
|
|
51 |
context.getCounter(dedupConf.getWf().getEntityType(), "dedupRel (x2)").increment(1); |
|
52 |
|
|
53 |
// marks the original body deleted |
|
54 |
emitBody(context, row, value.getValue(Bytes.toBytes(dedupConf.getWf().getEntityType()), DedupUtils.BODY_B)); |
|
55 |
|
|
56 |
} else { |
|
57 |
context.getCounter(dedupConf.getWf().getEntityType(), "row not in similarity mesh").increment(1); |
|
58 |
} |
|
59 |
} |
|
60 |
|
|
61 |
private ByteBuffer findMin(ByteBuffer min, final Iterable<byte[]> keys) { |
|
62 |
for (final byte[] q : keys) { |
|
63 |
final ByteBuffer iq = ByteBuffer.wrap(q); |
|
64 |
if (min.compareTo(iq) > 0) { |
|
65 |
min = iq; |
|
66 |
} |
|
67 |
} |
|
68 |
return min; |
|
69 |
} |
|
70 |
|
|
71 |
private void emitBody(final Context context, final byte[] row, final byte[] body) throws InvalidProtocolBufferException, IOException, InterruptedException { |
|
72 |
if (body == null) { |
|
73 |
context.getCounter(dedupConf.getWf().getEntityType(), "missing body").increment(1); |
|
74 |
System.err.println("missing body: " + new String(row)); |
|
75 |
return; |
|
76 |
} |
|
77 |
final Oaf prototype = Oaf.parseFrom(body); |
|
78 |
|
|
79 |
if (prototype.getDataInfo().getDeletedbyinference()) { |
|
80 |
context.getCounter(dedupConf.getWf().getEntityType(), "bodies already deleted").increment(1); |
|
81 |
} else { |
|
82 |
final Oaf.Builder oafRoot = Oaf.newBuilder(prototype); |
|
83 |
oafRoot.getDataInfoBuilder().setDeletedbyinference(true).setInferred(true).setInferenceprovenance(dedupConf.getWf().getConfigurationId()); |
|
84 |
final byte[] family = Bytes.toBytes(dedupConf.getWf().getEntityType()); |
|
85 |
final Put put = new Put(row).add(family, DedupUtils.BODY_B, oafRoot.build().toByteArray()); |
|
86 |
put.setWriteToWAL(JobParams.WRITE_TO_WAL); |
|
87 |
context.write(new ImmutableBytesWritable(row), put); |
|
88 |
context.getCounter(dedupConf.getWf().getEntityType(), "bodies marked deleted").increment(1); |
|
89 |
} |
|
90 |
} |
|
91 |
|
|
92 |
private byte[] buildRel(final byte[] from, final byte[] to, final Dedup.RelName relClass) { |
|
93 |
final Builder oafRel = DedupUtils.getDedup(dedupConf, new String(from), new String(to), relClass); |
|
94 |
final Oaf oaf = |
|
95 |
Oaf.newBuilder() |
|
96 |
.setKind(Kind.relation) |
|
97 |
.setLastupdatetimestamp(System.currentTimeMillis()) |
|
98 |
.setDataInfo( |
|
99 |
AbstractDNetXsltFunctions.getDataInfo(null, "", "0.8", false, true).setInferenceprovenance( |
|
100 |
dedupConf.getWf().getConfigurationId())).setRel(oafRel) |
|
101 |
.build(); |
|
102 |
return oaf.toByteArray(); |
|
103 |
} |
|
104 |
|
|
105 |
private void emitDedupRel(final Context context, final byte[] cf, final byte[] from, final byte[] to, final byte[] value) throws IOException, |
|
106 |
InterruptedException { |
|
107 |
final Put put = new Put(from).add(cf, to, value); |
|
108 |
put.setWriteToWAL(JobParams.WRITE_TO_WAL); |
|
109 |
context.write(new ImmutableBytesWritable(from), put); |
|
110 |
} |
|
111 |
|
|
112 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/CsvSerialiser.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment; |
|
2 |
|
|
3 |
import java.io.StringWriter; |
|
4 |
import java.util.List; |
|
5 |
import java.util.Set; |
|
6 |
|
|
7 |
import com.google.common.base.Joiner; |
|
8 |
import com.google.common.collect.Iterables; |
|
9 |
import com.google.common.collect.Lists; |
|
10 |
import com.google.common.collect.Sets; |
|
11 |
import org.apache.commons.lang.StringUtils; |
|
12 |
|
|
13 |
/** |
|
14 |
* Created by claudio on 26/04/16. |
|
15 |
*/ |
|
16 |
public class CsvSerialiser { |
|
17 |
|
|
18 |
private final static int MAX_FEATURES = 1000; |
|
19 |
private final static int MAX_ROWS = 5000; |
|
20 |
|
|
21 |
private int maxRows = MAX_ROWS; |
|
22 |
private int maxFeatures = MAX_FEATURES; |
|
23 |
|
|
24 |
public CsvSerialiser() { |
|
25 |
} |
|
26 |
|
|
27 |
public CsvSerialiser(int maxRows, int maxFeatures) { |
|
28 |
this.maxRows = maxRows; |
|
29 |
this.maxFeatures = maxFeatures; |
|
30 |
} |
|
31 |
|
|
32 |
public String asCSV(final List<CsvEntry> list) { |
|
33 |
final Set<String> features = Sets.newLinkedHashSet(); |
|
34 |
|
|
35 |
for(CsvEntry e : Iterables.limit(list, maxRows)) { |
|
36 |
features.addAll(e.getFeatures()); |
|
37 |
} |
|
38 |
|
|
39 |
final List<String> cappedFeatures = Lists.newLinkedList(Iterables.limit(features, maxFeatures)); |
|
40 |
//context.getCounter("person", "features " + Iterables.size(cappedFeatures)).increment(1); |
|
41 |
|
|
42 |
final StringWriter csv = new StringWriter(); |
|
43 |
csv.append("\"k\","); |
|
44 |
csv.append(Joiner.on(",").join(cappedFeatures)); |
|
45 |
csv.append(",\"id\",\"name\",\"title\"\n"); |
|
46 |
for(CsvEntry e : Iterables.limit(list, maxRows)) { |
|
47 |
|
|
48 |
boolean hasZero = false; |
|
49 |
boolean hasOne = false; |
|
50 |
|
|
51 |
final StringWriter line = new StringWriter(); |
|
52 |
line.append(e.getKey()+","); |
|
53 |
for(String f : cappedFeatures) { |
|
54 |
if(e.getFeatures().contains(f)) { |
|
55 |
line.append("1,"); |
|
56 |
hasOne = true; |
|
57 |
} else { |
|
58 |
line.append("0,"); |
|
59 |
hasZero = true; |
|
60 |
} |
|
61 |
} |
|
62 |
line.append("\""+e.getId()+"\","); |
|
63 |
line.append("\""+e.getOriginalName()+"\","); |
|
64 |
line.append("\""+e.getTitle()+"\""); |
|
65 |
|
|
66 |
if (hasZero && hasOne) { |
|
67 |
csv.append(line.toString() + "\n"); |
|
68 |
} |
|
69 |
//csv.append(StringUtils.substringBeforeLast(line.toString(), ",") + "\n"); |
|
70 |
} |
|
71 |
|
|
72 |
return csv.toString(); |
|
73 |
} |
|
74 |
|
|
75 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/SubjectsMap.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment; |
|
2 |
|
|
3 |
import java.util.HashMap; |
|
4 |
import java.util.Map.Entry; |
|
5 |
|
|
6 |
import org.apache.commons.logging.Log; |
|
7 |
import org.apache.commons.logging.LogFactory; |
|
8 |
import org.bson.BsonDocument; |
|
9 |
import org.bson.BsonDocumentWrapper; |
|
10 |
import org.bson.codecs.configuration.CodecRegistry; |
|
11 |
import org.bson.conversions.Bson; |
|
12 |
|
|
13 |
/** |
|
14 |
* Created by claudio on 07/03/16. |
|
15 |
*/ |
|
16 |
public class SubjectsMap extends HashMap<String, Subjects> { |
|
17 |
|
|
18 |
private static final Log log = LogFactory.getLog(SubjectsMap.class); |
|
19 |
|
|
20 |
public SubjectsMap mergeFrom(SubjectsMap sm) { |
|
21 |
|
|
22 |
if (sm != null) { |
|
23 |
for (Entry<String, Subjects> e : sm.entrySet()) { |
|
24 |
if (!this.containsKey(e.getKey())) { |
|
25 |
Subjects sub = new Subjects(); |
|
26 |
|
|
27 |
sub.addAll(e.getValue()); |
|
28 |
|
|
29 |
this.put(e.getKey(), sub); |
|
30 |
} else { |
|
31 |
for (String s : e.getValue()) { |
|
32 |
final Subjects subjects = this.get(e.getKey()); |
|
33 |
subjects.add(s); |
|
34 |
} |
|
35 |
} |
|
36 |
} |
|
37 |
} |
|
38 |
|
|
39 |
return this; |
|
40 |
} |
|
41 |
|
|
42 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/PublicationAnalysisMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.List; |
|
5 |
|
|
6 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
7 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
|
8 |
import eu.dnetlib.data.proto.FieldTypeProtos.StringField; |
|
9 |
import eu.dnetlib.data.proto.ResultProtos; |
|
10 |
import org.apache.commons.lang.StringUtils; |
|
11 |
import org.apache.hadoop.hbase.client.Result; |
|
12 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
13 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
14 |
import org.apache.hadoop.io.NullWritable; |
|
15 |
|
|
16 |
/** |
|
17 |
* Created by claudio on 22/04/16. |
|
18 |
*/ |
|
19 |
public class PublicationAnalysisMapper extends TableMapper<NullWritable, NullWritable> { |
|
20 |
|
|
21 |
public static final String RESULT = "result"; |
|
22 |
private static final int MAX_DESCRIPTIONS = 50; |
|
23 |
|
|
24 |
@Override |
|
25 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
26 |
super.setup(context); |
|
27 |
} |
|
28 |
|
|
29 |
@Override |
|
30 |
protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException { |
|
31 |
|
|
32 |
if (new String(key.copyBytes()).contains("dedup_wf")) { |
|
33 |
context.getCounter(RESULT, "roots").increment(1); |
|
34 |
return; |
|
35 |
} |
|
36 |
|
|
37 |
final byte[] body = value.getValue(RESULT.getBytes(), DedupUtils.BODY_B); |
|
38 |
if (body == null) { |
|
39 |
context.getCounter(RESULT, "missing body").increment(1); |
|
40 |
return; |
|
41 |
} |
|
42 |
final OafDecoder decoder = OafDecoder.decode(body); |
|
43 |
final ResultProtos.Result result = decoder.getEntity().getResult(); |
|
44 |
if (result.getMetadata().getResulttype().getClassid().equals("dataset")) { |
|
45 |
context.getCounter(RESULT, "dataset").increment(1); |
|
46 |
return; |
|
47 |
} else { |
|
48 |
context.getCounter(RESULT, "publication").increment(1); |
|
49 |
} |
|
50 |
|
|
51 |
if (result.getMetadata().getDescriptionCount() > MAX_DESCRIPTIONS) { |
|
52 |
context.getCounter(RESULT, "abstracts > " + MAX_DESCRIPTIONS).increment(1); |
|
53 |
} else { |
|
54 |
context.getCounter(RESULT, "abstracts: " + result.getMetadata().getDescriptionCount()).increment(1); |
|
55 |
} |
|
56 |
|
|
57 |
final List<StringField> descList = result.getMetadata().getDescriptionList(); |
|
58 |
|
|
59 |
boolean empty = true; |
|
60 |
for(StringField desc : descList) { |
|
61 |
empty = empty && StringUtils.isBlank(desc.getValue()); |
|
62 |
} |
|
63 |
|
|
64 |
context.getCounter(RESULT, "empty abstract: " + empty).increment(1); |
|
65 |
} |
|
66 |
|
|
67 |
@Override |
|
68 |
protected void cleanup(final Context context) throws IOException, InterruptedException { |
|
69 |
super.cleanup(context); |
|
70 |
} |
|
71 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/Subjects.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment; |
|
2 |
|
|
3 |
import java.util.HashSet; |
|
4 |
|
|
5 |
import org.bson.BsonDocument; |
|
6 |
import org.bson.BsonDocumentWrapper; |
|
7 |
import org.bson.codecs.configuration.CodecRegistry; |
|
8 |
import org.bson.conversions.Bson; |
|
9 |
|
|
10 |
/** |
|
11 |
* Created by claudio on 07/03/16. |
|
12 |
*/ |
|
13 |
public class Subjects extends HashSet<String> { |
|
14 |
|
|
15 |
|
|
16 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/SubjectParser.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
|
|
5 |
import com.google.common.base.Splitter; |
|
6 |
import org.apache.commons.lang.StringUtils; |
|
7 |
import org.dom4j.Element; |
|
8 |
|
|
9 |
/** |
|
10 |
* Created by claudio on 25/03/16. |
|
11 |
*/ |
|
12 |
public class SubjectParser { |
|
13 |
|
|
14 |
public static final String REGEX_SUBJECT = "^(info:eu-repo)\\/(classification)\\/([a-zA-Z]*)\\/(.*)$"; |
|
15 |
private static final int MIN_LENGTH = 5; |
|
16 |
|
|
17 |
public SubjectsMap parse(final org.dom4j.Document doc) { |
|
18 |
|
|
19 |
final List subjectNodes = doc.selectNodes("//*[local-name() = 'subject']"); |
|
20 |
final SubjectsMap subjectMap = new SubjectsMap(); |
|
21 |
|
|
22 |
for(int i = 0; i<subjectNodes.size(); i++) { |
|
23 |
final Element e = (Element) subjectNodes.get(i); |
|
24 |
final String subject = e.getText(); |
|
25 |
|
|
26 |
final String type = guessType(subject); |
|
27 |
if (!subjectMap.containsKey(type)) { |
|
28 |
subjectMap.put(type, new Subjects()); |
|
29 |
} |
|
30 |
|
|
31 |
if (StringUtils.isNotBlank(type)) { |
|
32 |
if ("keyword".equals(type)) { |
|
33 |
final Splitter splitter = Splitter.on(",").trimResults().omitEmptyStrings(); |
|
34 |
for (String token : splitter.split(subject)) { |
|
35 |
final String value = token.replaceAll("[^a-zA-Z ]", "").toLowerCase(); |
|
36 |
if (value.length() >= MIN_LENGTH) { |
|
37 |
subjectMap.get(type).add(value); |
|
38 |
} |
|
39 |
} |
|
40 |
} else { |
|
41 |
String token = subject.replaceFirst(REGEX_SUBJECT, "$4"); |
|
42 |
|
|
43 |
if (StringUtils.isNotBlank(token)) { |
|
44 |
final String value = token.replaceAll("[^a-zA-Z ]", "").toLowerCase(); |
|
45 |
if (value.length() >= MIN_LENGTH) { |
|
46 |
subjectMap.get(type).add(value); |
|
47 |
} |
|
48 |
} |
|
49 |
} |
|
50 |
} |
|
51 |
} |
|
52 |
|
|
53 |
return subjectMap; |
|
54 |
} |
|
55 |
|
|
56 |
private String guessType(final String subject) { |
|
57 |
if (subject.startsWith("info:eu-repo")) { |
|
58 |
final String s = subject.replaceAll(REGEX_SUBJECT, "$3"); |
|
59 |
return s; |
|
60 |
} else { |
|
61 |
return "keyword"; |
|
62 |
} |
|
63 |
} |
|
64 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/CsvEntry.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment; |
|
2 |
|
|
3 |
import java.util.Set; |
|
4 |
|
|
5 |
import com.google.common.collect.Sets; |
|
6 |
import com.google.gson.Gson; |
|
7 |
|
|
8 |
/** |
|
9 |
* Created by claudio on 20/04/16. |
|
10 |
*/ |
|
11 |
public class CsvEntry { |
|
12 |
|
|
13 |
private String key; |
|
14 |
|
|
15 |
private Set<String> features = Sets.newLinkedHashSet(); |
|
16 |
|
|
17 |
private String title; |
|
18 |
|
|
19 |
private String id; |
|
20 |
|
|
21 |
private String originalName; |
|
22 |
|
|
23 |
public CsvEntry() { |
|
24 |
} |
|
25 |
|
|
26 |
public CsvEntry(final String key, final Set<String> features) { |
|
27 |
this.key = key; |
|
28 |
this.features = features; |
|
29 |
} |
|
30 |
|
|
31 |
public CsvEntry(final Set<String> features) { |
|
32 |
this.features = features; |
|
33 |
} |
|
34 |
|
|
35 |
public void addFeature(final String f) { |
|
36 |
getFeatures().add(f); |
|
37 |
} |
|
38 |
|
|
39 |
public Set<String> getFeatures() { |
|
40 |
return features; |
|
41 |
} |
|
42 |
|
|
43 |
public void setFeatures(final Set<String> features) { |
|
44 |
this.features = features; |
|
45 |
} |
|
46 |
|
|
47 |
public static CsvEntry fromJson(final String json) { |
|
48 |
return new Gson().fromJson(json, CsvEntry.class); |
|
49 |
} |
|
50 |
|
|
51 |
public String getKey() { |
|
52 |
return key; |
|
53 |
} |
|
54 |
|
|
55 |
public void setKey(final String key) { |
|
56 |
this.key = key; |
|
57 |
} |
|
58 |
|
|
59 |
public String getTitle() { |
|
60 |
return title; |
|
61 |
} |
|
62 |
|
|
63 |
public void setTitle(final String title) { |
|
64 |
this.title = title; |
|
65 |
} |
|
66 |
|
|
67 |
public String getId() { |
|
68 |
return id; |
|
69 |
} |
|
70 |
|
|
71 |
public void setId(final String id) { |
|
72 |
this.id = id; |
|
73 |
} |
|
74 |
|
|
75 |
@Override |
|
76 |
public String toString() { |
|
77 |
return new Gson().toJson(this); |
|
78 |
} |
|
79 |
|
|
80 |
@Override |
|
81 |
public boolean equals(final Object o) { |
|
82 |
return (o instanceof CsvEntry) && ((CsvEntry) o).getFeatures().equals(getFeatures()); |
|
83 |
} |
|
84 |
|
|
85 |
public String getOriginalName() { |
|
86 |
return originalName; |
|
87 |
} |
|
88 |
|
|
89 |
public void setOriginalName(final String originalName) { |
|
90 |
this.originalName = originalName; |
|
91 |
} |
|
92 |
|
|
93 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/cc/ConnectedComponentsMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.cc; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
|
|
5 |
import org.apache.hadoop.io.Text; |
|
6 |
import org.apache.hadoop.mapreduce.Mapper; |
|
7 |
|
|
8 |
/** |
|
9 |
* Created by claudio on 15/10/15. |
|
10 |
*/ |
|
11 |
public class ConnectedComponentsMapper extends Mapper<Text, VertexWritable, Text, VertexWritable> { |
|
12 |
|
|
13 |
|
|
14 |
@Override |
|
15 |
protected void map(Text key, VertexWritable value, Context context) throws IOException, InterruptedException { |
|
16 |
|
|
17 |
context.write(value.getVertexId(), value); |
|
18 |
|
|
19 |
} |
|
20 |
|
|
21 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/cc/ConnectedComponentsReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.cc; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.nio.ByteBuffer; |
|
5 |
import java.util.Set; |
|
6 |
|
|
7 |
import com.google.common.collect.Sets; |
|
8 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
9 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
10 |
import eu.dnetlib.data.proto.DedupProtos.Dedup; |
|
11 |
import eu.dnetlib.data.proto.KindProtos.Kind; |
|
12 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
13 |
import eu.dnetlib.data.proto.OafProtos.OafRel; |
|
14 |
import eu.dnetlib.data.proto.OafProtos.OafRel.Builder; |
|
15 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
16 |
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions; |
|
17 |
import eu.dnetlib.pace.config.DedupConfig; |
|
18 |
import org.apache.commons.logging.Log; |
|
19 |
import org.apache.commons.logging.LogFactory; |
|
20 |
import org.apache.hadoop.hbase.client.Put; |
|
21 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
22 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
|
23 |
import org.apache.hadoop.hbase.util.Bytes; |
|
24 |
import org.apache.hadoop.io.Text; |
|
25 |
|
|
26 |
/** |
|
27 |
* Created by claudio on 15/10/15. |
|
28 |
*/ |
|
29 |
public class ConnectedComponentsReducer extends TableReducer<Text, VertexWritable, ImmutableBytesWritable> { |
|
30 |
|
|
31 |
private static final Log log = LogFactory.getLog(ConnectedComponentsReducer.class); |
|
32 |
|
|
33 |
private DedupConfig dedupConf; |
|
34 |
|
|
35 |
private byte[] cfMergedIn; |
|
36 |
|
|
37 |
private byte[] cfMerges; |
|
38 |
|
|
39 |
@Override |
|
40 |
protected void setup(final Context context) { |
|
41 |
|
|
42 |
dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF)); |
|
43 |
log.info("dedup findRoots mapper\nwf conf: " + dedupConf.toString()); |
|
44 |
|
|
45 |
final Type type = Type.valueOf(dedupConf.getWf().getEntityType()); |
|
46 |
cfMergedIn = DedupUtils.getDedupCF_mergedInBytes(type); |
|
47 |
cfMerges = DedupUtils.getDedupCF_mergesBytes(type); |
|
48 |
} |
|
49 |
|
|
50 |
@Override |
|
51 |
protected void reduce(Text key, Iterable<VertexWritable> values, Context context) throws IOException, InterruptedException { |
|
52 |
|
|
53 |
final Set<String> set = Sets.newHashSet(); |
|
54 |
|
|
55 |
for(VertexWritable v : values) { |
|
56 |
for(Text t : v.getEdges()) { |
|
57 |
set.add(t.toString()); |
|
58 |
} |
|
59 |
} |
|
60 |
|
|
61 |
final byte[] root = DedupUtils.newIdBytes(ByteBuffer.wrap(Bytes.toBytes(key.toString())), dedupConf.getWf().getDedupRun()); |
|
62 |
|
|
63 |
for(String q : set) { |
|
64 |
final byte[] qb = Bytes.toBytes(q); |
|
65 |
emitDedupRel(context, cfMergedIn, qb, root, buildRel(qb, root, Dedup.RelName.isMergedIn)); |
|
66 |
emitDedupRel(context, cfMerges, root, qb, buildRel(root, qb, Dedup.RelName.merges)); |
|
67 |
|
|
68 |
context.getCounter(dedupConf.getWf().getEntityType(), "dedupRel (x2)").increment(1); |
|
69 |
} |
|
70 |
|
|
71 |
} |
|
72 |
|
|
73 |
private void emitDedupRel(final Context context, final byte[] cf, final byte[] from, final byte[] to, final byte[] value) throws IOException, |
|
74 |
InterruptedException { |
|
75 |
final Put put = new Put(from).add(cf, to, value); |
|
76 |
put.setWriteToWAL(JobParams.WRITE_TO_WAL); |
|
77 |
context.write(new ImmutableBytesWritable(from), put); |
|
78 |
} |
|
79 |
|
|
80 |
private byte[] buildRel(final byte[] from, final byte[] to, final Dedup.RelName relClass) { |
|
81 |
|
|
82 |
final OafRel.Builder oafRef = DedupUtils.getDedup(dedupConf, new String(from), new String(to), relClass); |
|
83 |
final Oaf oaf = DedupUtils.buildRel(dedupConf, oafRef, 0.8).build(); |
|
84 |
|
|
85 |
return oaf.toByteArray(); |
|
86 |
} |
|
87 |
|
|
88 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/cc/MindistSearchMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.cc; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
|
|
5 |
import org.apache.commons.logging.Log; |
|
6 |
import org.apache.commons.logging.LogFactory; |
|
7 |
import org.apache.hadoop.io.Text; |
|
8 |
import org.apache.hadoop.mapreduce.Mapper; |
|
9 |
|
|
10 |
/** |
|
11 |
* Created by claudio on 14/10/15. |
|
12 |
*/ |
|
13 |
public class MindistSearchMapper extends Mapper<Text, VertexWritable, Text, VertexWritable> { |
|
14 |
|
|
15 |
private static final Log log = LogFactory.getLog(MindistSearchMapper.class); |
|
16 |
|
|
17 |
private boolean debug = false; |
|
18 |
|
|
19 |
@Override |
|
20 |
protected void setup(Mapper.Context context) throws IOException, InterruptedException { |
|
21 |
super.setup(context); |
|
22 |
|
|
23 |
debug = context.getConfiguration().getBoolean("mindist_DEBUG", false); |
|
24 |
log.info("debug mode: " + debug); |
|
25 |
} |
|
26 |
|
|
27 |
@Override |
|
28 |
protected void map(Text key, VertexWritable value, Context context) throws IOException, InterruptedException { |
|
29 |
|
|
30 |
emit(key, value, context); |
|
31 |
if (value.isActivated()) { |
|
32 |
VertexWritable vertex = new VertexWritable(); |
|
33 |
for (Text edge : value.getEdges()) { |
|
34 |
if (!edge.toString().equals(value.getVertexId().toString())) { |
|
35 |
vertex.setVertexId(value.getVertexId()); |
|
36 |
vertex.setEdges(null); |
|
37 |
emit(edge, vertex, context); |
|
38 |
} |
|
39 |
} |
|
40 |
} |
|
41 |
} |
|
42 |
|
|
43 |
private void emit(final Text key, final VertexWritable vertex, final Context context) throws IOException, InterruptedException { |
|
44 |
context.write(key, vertex); |
|
45 |
if (debug) { |
|
46 |
log.info(vertex.toJSON()); |
|
47 |
} |
|
48 |
} |
|
49 |
|
|
50 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/cc/HBaseToSimilarityGraphMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.cc; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
|
|
5 |
import org.apache.hadoop.hbase.KeyValue; |
|
6 |
import org.apache.hadoop.hbase.client.Result; |
|
7 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
8 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
9 |
import org.apache.hadoop.io.Text; |
|
10 |
|
|
11 |
/** |
|
12 |
* Created by claudio on 14/10/15. |
|
13 |
*/ |
|
14 |
public class HBaseToSimilarityGraphMapper extends TableMapper<Text, VertexWritable> { |
|
15 |
|
|
16 |
@Override |
|
17 |
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { |
|
18 |
|
|
19 |
final VertexWritable vertex = new VertexWritable(); |
Also available in: Unified diff
drop it, import form master branch will follow