Project

General

Profile

« Previous | Next » 

Revision 53799

drop it, import form master branch will follow

View differences:

modules/dnet-mapreduce-jobs/trunk/deploy.info
1
{"type_source": "SVN", "goal": "package -U -T 4C source:jar", "url": "http://svn-public.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-mapreduce-jobs/trunk/", "deploy_repository": "dnet45-snapshots", "version": "4", "mail": "sandro.labruzzo@isti.cnr.it,michele.artini@isti.cnr.it, claudio.atzori@isti.cnr.it, alessia.bardi@isti.cnr.it", "deploy_repository_url": "http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots", "name": "dnet-mapreduce-jobs"}
modules/dnet-mapreduce-jobs/trunk/src/main/resources/META-INF/services/javax.xml.transform.TransformerFactory
1
net.sf.saxon.TransformerFactoryImpl
modules/dnet-mapreduce-jobs/trunk/src/main/resources/eu/dnetlib/data/mapreduce/hbase/dedup/blacklist/title_blacklist.txt
1
^(Corpus Oral Dialectal \(COD\)\.).*$
2
^(Kiri Karl Morgensternile).*$
3
^(\[Eksliibris Aleksandr).*\]$
4
^(Kiri A\. de Vignolles).*$
5
^(2 kirja Karl Morgensternile).*$
6
^(Pirita kloostri idaosa arheoloogilised).*$
7
^(Kiri tundmatule).*$
8
^(Kiri Jenaer Allgemeine Literaturzeitung toimetusele).*$
9
^(Eksliibris Nikolai Birukovile).*$
10
^(Eksliibris Nikolai Issakovile).*$
11
^(\[Eksliibris Aleksandr).*$
12
^(WHP Cruise Summary Information of section).*$
13
^(Measurement of the top quark\-pair production cross section with ATLAS in pp collisions at).*$
14
^(Measurement of the spin\-dependent structure function).*
modules/dnet-mapreduce-jobs/trunk/src/main/resources/eu/dnetlib/data/mapreduce/util/entity.st
1
<oaf:$name$>
2
	$metadata:{$it$}$
3
  <rels>
4
    $rels:{$it$}$
5
  </rels>
6
  <children>
7
	$children:{$it$}$
8
  </children>
9
</oaf:$name$>
10
$inference:{$it$}$
modules/dnet-mapreduce-jobs/trunk/src/main/resources/eu/dnetlib/data/mapreduce/util/record.st
1
<?xml version="1.0"?>
2
<record>
3
  <result xmlns:dri="http://www.driver-repository.eu/namespace/dri" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
4
    <header>
5
      <dri:objIdentifier>$id$</dri:objIdentifier>
6
      <dri:dateOfCollection>$dateofcollection$</dri:dateOfCollection>
7
      <dri:dateOfTransformation>$dateoftransformation$</dri:dateOfTransformation>
8
      <counters>
9
	  $counters:{$it$}$
10
	  </counters>
11
    </header>
12
    <metadata>
13
      <oaf:entity xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
14
		    xmlns:oaf="http://namespace.openaire.eu/oaf" 
15
		    xsi:schemaLocation="http://namespace.openaire.eu/oaf $schemaLocation$">
16
		$it$
17
      </oaf:entity>
18
    </metadata>
19
  </result>
20
</record>
modules/dnet-mapreduce-jobs/trunk/src/main/resources/eu/dnetlib/data/mapreduce/util/childresult.st
1
<result>
2
    <dri:objIdentifier>$objIdentifier$</dri:objIdentifier>
3
	$metadata:{$it$}$
4
</result>
modules/dnet-mapreduce-jobs/trunk/src/main/resources/eu/dnetlib/data/mapreduce/util/rel.st
1
<rel inferred="$inferred$" trust="$trust$" inferenceprovenance="$inferenceprovenance$" provenanceaction="$provenanceaction$">
2
  <to class="$class$" scheme="$scheme$" type="$type$">$objIdentifier$</to>
3
  $metadata:{$it$}$
4
</rel>
modules/dnet-mapreduce-jobs/trunk/src/main/resources/eu/dnetlib/data/mapreduce/util/child.st
1
<$name$$if(hasId)$ objidentifier="$id$"$else$$endif$>
2
	$metadata:{$it$}$
3
</$name$>
modules/dnet-mapreduce-jobs/trunk/src/main/resources/eu/dnetlib/data/mapreduce/util/instance.st
1
<instance id="$instanceId$">
2
    $metadata:{$it$}$
3
	$webresources:{$it$}$
4
</instance>
modules/dnet-mapreduce-jobs/trunk/src/main/resources/eu/dnetlib/data/mapreduce/util/webresource.st
1
<webresource>
2
  <url>$identifier$</url>
3
</webresource>
modules/dnet-mapreduce-jobs/trunk/src/main/resources/log4j.properties
1
### Root Level ###
2
log4j.rootLogger=WARN, CONSOLE
3

  
4
### Configuration for the CONSOLE appender ###
5
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
6
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
7
log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c - %m%n
8

  
9
### Application Level ###
10
log4j.logger.eu.dnetlib=INFO
11
log4j.logger.eu.dnetlib.enabling.is.sn=INFO
12
log4j.logger.org.apache.cxf.interceptor=FATAL
13
log4j.logger.org.apache.cxf.ws.addressing.ContextUtils=FATAL
14
log4j.logger.eu.dnetlib.enabling.tools.AbstractBaseService=INFO
15
log4j.logger.eu.dnetlib.enabling.inspector=DEBUG
16
log4j.logger.eu.dnetlib.xml.database.LoggingTrigger=WARN
17
log4j.logger.eu.dnetlib.enabling.tools.registration.ServiceRegistrator=INFO
18
log4j.logger.eu.dnetlib.enabling.inspector=FATAL
19
log4j.logger.eu.dnetlib.enabling.inspector.SubscriptionController=DEBUG
20
log4j.logger.eu.dnetlib.springutils.stringtemplate.StringTemplateViewResolver=FATAL
21
log4j.logger.eu.dnetlib.enabling.is.sn.SynchronousNotificationSenderImpl=WARN
22
log4j.logger.eu.dnetlib.enabling.tools.LocalServiceResolverImpl=WARN
23
log4j.logger.eu.dnetlib.enabling.is.sn.NotificationInvokerImpl=WARN
24
log4j.logger.eu.dnetlib.data.collective=INFO
25
log4j.logger.eu.dnetlib.data.hadoop.utils.ScanFactory=DEBUG
26
log4j.logger.org.apache.xerces.parsers.SAXParser=OFF
27
log4j.logger.eu.dnetlib.conf.PropertyFetcher=WARN
28
#log4j.logger.eu.dnetlib.data.transform.XsltRowTransformerFactory=DEBUG
29

  
30
log4j.logger.eu.dnetlib.enabling.is.sn.ISSNServiceImpl=OFF
31
log4j.logger.eu.dnetlib.enabling.datasources.DatasourceManagerClients=FATAL
32
log4j.logger.eu.dnetlib.data.mdstore.modular.mongodb.utils.MetadataCheckJob=DEBUG
33
log4j.logger.eu.dnetlib.enabling.is.sn.ISSNServiceCore=WARN
34
log4j.logger.eu.dnetlib.xml.database.exist.ExistDatabase=WARN
35
log4j.logger.eu.dnetlib.enabling.is.store.AbstractContentInitializer=FATAL
36

  
37
log4j.logger.org.apache.hadoop.hbase.mapreduce.TableInputFormatBase=FATAL
38
log4j.logger.eu.dnetlib.data.mdstore.modular.plugin.CreatorExtractor=DEBUG
39

  
40
### Spring ###
41
log4j.logger.org.springframework=ERROR
42
log4j.logger.org.apache=DEBUG
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/bulktag/BulkTaggingMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.bulktag;
2

  
3
import com.google.common.base.Splitter;
4
import com.google.common.collect.Lists;
5
import eu.dnetlib.data.bulktag.CommunityConfiguration;
6
import eu.dnetlib.data.bulktag.CommunityConfigurationFactory;
7
import eu.dnetlib.data.proto.FieldTypeProtos;
8
import eu.dnetlib.data.proto.OafProtos.Oaf;
9
import eu.dnetlib.data.proto.ResultProtos;
10
import org.apache.commons.lang.StringUtils;
11
import org.apache.hadoop.hbase.client.Put;
12
import org.apache.hadoop.hbase.client.Result;
13
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
14
import org.apache.hadoop.hbase.mapreduce.TableMapper;
15
import org.apache.hadoop.hbase.util.Bytes;
16
import org.apache.hadoop.io.Writable;
17

  
18
import java.io.IOException;
19
import java.util.List;
20
import java.util.Map;
21

  
22
public class BulkTaggingMapper extends TableMapper<ImmutableBytesWritable, Writable> {
23

  
24
	private CommunityConfiguration cc;
25

  
26
	private ResultTagger tagger;
27
	private boolean enabled;
28

  
29
	@Override
30
	protected void setup(final Context context) throws IOException, InterruptedException {
31
		super.setup(context);
32

  
33
		final String conf = context.getConfiguration().get("tagging.conf");
34
		enabled = context.getConfiguration().getBoolean("tagging.enabled",false);
35
		if (StringUtils.isBlank(conf)) {
36
			throw new IllegalArgumentException("missing bulk tagging configuration");
37
		}
38
		System.out.println("conf = " + conf);
39
		cc = CommunityConfigurationFactory.fromJson(conf);
40
		tagger = new ResultTagger();
41
		tagger.setTrust(context.getConfiguration().get("bulktagging.trust", "0.85"));
42
	}
43

  
44
	@Override
45
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
46

  
47
		final Map<byte[], byte[]> resultMap = value.getFamilyMap(Bytes.toBytes("result"));
48

  
49
		final byte[] body = resultMap.get(Bytes.toBytes("body"));
50

  
51
		if (body != null) {
52
			context.getCounter("Bulk Tagging", "not null body ").increment(1);
53

  
54
			final Oaf oaf = tagger.enrichContext(Oaf.parseFrom(body), cc, context);
55
			if (oaf == null) {
56
				//context.getCounter("In mapper", " null oaf ").increment(1);
57
				return;
58
			}
59

  
60
			long tagged = oaf.getEntity().getResult().getMetadata().getContextList().stream()
61
					.flatMap(c -> c.getDataInfoList().stream())
62
					.map(FieldTypeProtos.DataInfo::getInferenceprovenance)
63
					.filter(infProv -> "bulktagging".equals(infProv))
64
					.count();
65
			context.getCounter("Bulk Tagging", " bulktagged ").increment(tagged);
66

  
67

  
68
			final Put put = new Put(key.copyBytes()).add(Bytes.toBytes("result"), Bytes.toBytes("body"), oaf.toByteArray());
69

  
70
			if(tagged > 0){
71
				if (enabled)
72
					context.write(key, put);
73
				context.getCounter("Bulk Tagging", " write op ").increment(1);
74
			}
75

  
76
		}
77
		else{
78
			context.getCounter("Bulk Tagging", " null body ").increment(1);
79
		}
80

  
81
	}
82

  
83

  
84
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/bulktag/ResultTagger.java
1
package eu.dnetlib.data.mapreduce.hbase.bulktag;
2

  
3
import com.google.common.base.Functions;
4
import com.google.common.collect.Maps;
5
import com.google.common.collect.Sets;
6
import eu.dnetlib.data.bulktag.CommunityConfiguration;
7
import eu.dnetlib.data.bulktag.Pair;
8
import eu.dnetlib.data.proto.FieldTypeProtos;
9
import eu.dnetlib.data.proto.OafProtos;
10
import eu.dnetlib.data.proto.ResultProtos;
11
import org.apache.commons.lang3.StringUtils;
12
import org.apache.hadoop.mapreduce.Mapper;
13

  
14
import java.util.*;
15
import java.util.stream.Collectors;
16
import java.util.stream.Stream;
17

  
18
/**
19
 * Created by miriam on 02/08/2018.
20
 */
21
public class ResultTagger {
22
    private final static String DATA_INFO_TYPE = "bulktagging";
23
    private final static String SCHEMA_NAME = "dnet:provenanceActions";
24
    private final static String CLASS_ID = "bulktagging::community";
25
    private final static String SCHEMA_ID = "dnet:provenanceActions";
26
    private final static String COUNTER_GROUP = "Bulk Tagging";
27

  
28
    private String trust = "0.8";
29

  
30

  
31
    public OafProtos.Oaf enrichContext(final OafProtos.Oaf oaf, final CommunityConfiguration conf, final Mapper.Context context) {
32

  
33
        //context.getCounter(COUNTER_GROUP, "to enrich").increment(1);
34
        final OafProtos.Oaf.Builder builder = OafProtos.Oaf.newBuilder(oaf);
35

  
36

  
37
        if(oaf.getDataInfo().getDeletedbyinference()){
38
            context.getCounter(COUNTER_GROUP, "deleted by inference").increment(1);
39
            return null;
40
        }
41
        //context.getCounter(COUNTER_GROUP, "not deleted by inference").increment(1);
42

  
43
        final List<ResultProtos.Result.Context> contextList = oaf.getEntity().getResult().getMetadata().getContextList();
44

  
45
        if(contextList.size()>0){
46
            context.getCounter(COUNTER_GROUP, "exist context list").increment(1);
47
        }else{
48
            context.getCounter(COUNTER_GROUP, "not exist context list").increment(1);
49
        }
50
        //communities contains all the communities to be added as context for the result
51
        final Set<String> communities = new HashSet<>();
52

  
53
        oaf.getEntity().getResult().getMetadata().getSubjectList().stream()
54
                .map(subject -> subject.getValue())
55
                .filter(StringUtils::isNotBlank)
56
                .map(String::toLowerCase)
57
                .map(String::trim)
58
                .collect(Collectors.toCollection(HashSet::new))
59
                .forEach(s -> communities.addAll(conf.getCommunityForSubjectValue(s)));
60

  
61
        oaf.getEntity().getResult().getInstanceList()
62
                .stream()
63
                .map(i -> new Pair<>(i.getCollectedfrom().getKey(), i.getHostedby().getKey()))
64
                .flatMap(p -> Stream.of(p.getFst(), p.getSnd()))
65
                .map(s -> StringUtils.substringAfter(s, "|"))
66
                .collect(Collectors.toCollection(HashSet::new))
67
                .forEach(dsId -> communities.addAll(conf.getCommunityForDatasourceValue(dsId)));
68

  
69
        //TODO: add code for Zenodo Communities
70

  
71
        if(communities.isEmpty()){
72
            context.getCounter(COUNTER_GROUP, "list of communities empty").increment(1);
73
        }else{
74
            context.getCounter(COUNTER_GROUP, "list of communities has values!").increment(1);
75
        }
76

  
77
        final ResultProtos.Result.Metadata.Builder mBuilder = builder.getEntityBuilder().getResultBuilder().getMetadataBuilder();
78

  
79
        final Map<String, ResultProtos.Result.Context.Builder> cBuilders = Maps.newHashMap();
80
        mBuilder.getContextBuilderList().forEach(cBuilder -> {
81
            cBuilders.put(cBuilder.getId(), cBuilder);
82
        });
83

  
84
        for(String contextId:communities){
85

  
86
            final ResultProtos.Result.Context.Builder cBuilder = cBuilders.get(contextId);
87
            if (cBuilder != null) {
88

  
89
                if (!cBuilder.getDataInfoBuilderList().stream()
90
                        .map(di -> di.getInferenceprovenance())
91
                        .anyMatch(s -> DATA_INFO_TYPE.equals(s))) {
92

  
93
                    cBuilder.addDataInfo(buildDataInfo());
94
                    context.getCounter(COUNTER_GROUP, "add provenance").increment(1);
95
                } else {
96
                    context.getCounter(COUNTER_GROUP, "provenance already bulk tagged").increment(1);
97
                }
98
            } else {
99
                context.getCounter(COUNTER_GROUP, "add context").increment(1);
100
                mBuilder.addContext(buildContext(contextId));
101
            }
102

  
103
        }
104

  
105
        return builder.build();
106
    }
107

  
108
    private ResultProtos.Result.Context buildContext(final String contextId) {
109
        return ResultProtos.Result.Context.newBuilder()
110
                .setId(contextId)
111
                .addDataInfo(buildDataInfo())
112
                .build();
113
    }
114

  
115
    private FieldTypeProtos.DataInfo buildDataInfo() {
116
        FieldTypeProtos.DataInfo.Builder builder = FieldTypeProtos.DataInfo.newBuilder()
117
                .setInferred(true)
118
                .setProvenanceaction(
119
                        FieldTypeProtos.Qualifier.newBuilder()
120
                                .setClassid(CLASS_ID)
121
                                .setClassname("Bulk Tagging for Communities")
122
                                .setSchemeid(SCHEMA_ID)
123
                                .setSchemename(SCHEMA_NAME))
124
                .setInferenceprovenance(DATA_INFO_TYPE)
125
                .setTrust(trust);
126
        return builder
127
                .build();
128
    }
129

  
130

  
131
    public void setTrust(String s) {
132
        trust = s;
133
    }
134
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ExportSimplifiedRecordsReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2

  
3
import java.io.IOException;
4

  
5
import org.apache.hadoop.io.Text;
6
import org.apache.hadoop.mapreduce.Reducer;
7

  
8
public class ExportSimplifiedRecordsReducer extends Reducer<Text, Text, Text, Text> {
9

  
10
	private Text keyOut;
11

  
12
	@Override
13
	protected void setup(final Context context) throws IOException, InterruptedException {
14
		keyOut = new Text("");
15
	}
16

  
17
	@Override
18
	protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
19
		for(final Text v : values) {
20
			//keyOut.set(key.toString() + "@@@");
21
			context.write(keyOut, v);
22
		}
23
	}
24

  
25
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ExportInformationSpaceMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2

  
3
import java.io.IOException;
4
import java.util.Map;
5
import java.util.Map.Entry;
6
import java.util.NavigableMap;
7

  
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.apache.hadoop.hbase.client.Result;
11
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12
import org.apache.hadoop.hbase.mapreduce.TableMapper;
13
import org.apache.hadoop.io.Text;
14

  
15
import com.google.common.base.Joiner;
16
import com.googlecode.protobuf.format.JsonFormat;
17

  
18
import eu.dnetlib.data.mapreduce.util.OafDecoder;
19

  
20
/**
21
 * Exports Oaf objects as their json serialization.
22
 *
23
 * @author claudio
24
 *
25
 */
26
public class ExportInformationSpaceMapper extends TableMapper<Text, Text> {
27

  
28
	/**
29
	 * logger.
30
	 */
31
	private static final Log log = LogFactory.getLog(ExportInformationSpaceMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
32

  
33
	private static final String SEPARATOR = "@";
34

  
35
	private Text keyOut;
36

  
37
	private Text valueOut;
38

  
39
	@Override
40
	protected void setup(final Context context) throws IOException, InterruptedException {
41
		super.setup(context);
42

  
43
		keyOut = new Text();
44
		valueOut = new Text();
45
	}
46

  
47
	@Override
48
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
49
		try {
50
			byte[] rowKey = keyIn.copyBytes();
51
			Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap();
52

  
53
			for (byte[] cf : row.keySet()) {
54

  
55
				for (Entry<byte[], byte[]> q : row.get(cf).entrySet()) {
56

  
57
					emit(rowKey, cf, q.getKey(), q.getValue(), context);
58
				}
59
			}
60
		} catch (final Throwable e) {
61
			log.error("error exporting the following record from HBase: " + value.toString(), e);
62
			context.getCounter("error", e.getClass().getName()).increment(1);
63
			throw new RuntimeException(e);
64
		}
65
	}
66

  
67
	private void emit(final byte[] rowKey, final byte[] cf, final byte[] q, final byte[] value, final Context context) throws IOException, InterruptedException {
68

  
69
		keyOut.set(Joiner.on(SEPARATOR).join(new String(rowKey), new String(cf), new String(q)));
70

  
71
		if ((value == null) || (value.length == 0)) {
72
			valueOut.set("");
73
		} else {
74
			valueOut.set(new JsonFormat().printToString(OafDecoder.decode(value).getOaf()));
75
		}
76
		context.write(keyOut, valueOut);
77
	}
78

  
79
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ExportResultIdentifiersMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2

  
3
import java.io.IOException;
4
import java.util.ArrayList;
5
import java.util.Collections;
6
import java.util.List;
7

  
8
import com.google.common.base.Function;
9
import com.google.common.collect.Iterables;
10
import com.google.common.collect.Lists;
11
import eu.dnetlib.data.mapreduce.util.DedupUtils;
12
import eu.dnetlib.data.mapreduce.util.OafDecoder;
13
import eu.dnetlib.data.proto.OafProtos.OafEntity;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.apache.hadoop.hbase.client.Result;
17
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
18
import org.apache.hadoop.hbase.mapreduce.TableMapper;
19
import org.apache.hadoop.hbase.util.Bytes;
20
import org.apache.hadoop.io.Text;
21

  
22
/**
23
 * Exports the result identifiers as json.
24
 *
25
 * @author claudio
26
 */
27
public class ExportResultIdentifiersMapper extends TableMapper<Text, Text> {
28

  
29
	/**
30
	 * logger.
31
	 */
32
	private static final Log log = LogFactory.getLog(ExportResultIdentifiersMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
33

  
34
	private static final String CF = "result";
35

  
36
	private Text keyOut;
37

  
38
	private Text valueOut;
39

  
40
	@Override
41
	protected void setup(final Context context) throws IOException, InterruptedException {
42
		super.setup(context);
43

  
44
		keyOut = new Text("");
45
		valueOut = new Text();
46
	}
47

  
48
	@Override
49
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
50
		try {
51
			final byte[] body = value.getValue(Bytes.toBytes(CF), DedupUtils.BODY_B);
52

  
53
			if (body == null) {
54
				context.getCounter(CF, "missing body").increment(1);
55
				return;
56
			}
57

  
58
			final OpenaireEntityId id = new OpenaireEntityId();
59
			final OafDecoder d = OafDecoder.decode(body);
60

  
61
			id.setDeleted(d.getOaf().getDataInfo().getDeletedbyinference());
62
			id.setId(d.getEntityId());
63

  
64
			final List<OafEntity> childrenList = d.getEntity().getChildrenList();
65
			if (childrenList != null && !childrenList.isEmpty()) {
66
				final ArrayList<String> mergedIds = Lists.newArrayList(Iterables.transform(childrenList, new Function<OafEntity, String>() {
67
					@Override
68
					public String apply(final OafEntity oafEntity) {
69
						return oafEntity.getId();
70
					}
71
				}));
72
				Collections.sort(mergedIds);
73
				id.setMergedIds(mergedIds);
74
			}
75

  
76
			valueOut.set(id.toString());
77
			context.write(keyOut, valueOut);
78

  
79
		} catch (final Throwable e) {
80
			log.error("error exporting the following record from HBase: " + value.toString(), e);
81
			context.getCounter("error", e.getClass().getName()).increment(1);
82
			throw new RuntimeException(e);
83
		}
84
	}
85

  
86
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/OpenaireEntityId.java
1
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2

  
3
import java.util.List;
4

  
5
import com.google.gson.Gson;
6

  
7
/**
8
 * Created by claudio on 27/01/16.
9
 */
10
public class OpenaireEntityId {
11

  
12
	private String id;
13

  
14
	private List<String> mergedIds;
15

  
16
	private boolean deleted;
17

  
18
	public OpenaireEntityId() {
19
	}
20

  
21
	public OpenaireEntityId(final String id, final List<String> mergedIds, final boolean deleted) {
22
		this.id = id;
23
		this.mergedIds = mergedIds;
24
		this.deleted = deleted;
25
	}
26

  
27
	public String getId() {
28
		return id;
29
	}
30

  
31
	public void setId(final String id) {
32
		this.id = id;
33
	}
34

  
35
	public List<String> getMergedIds() {
36
		return mergedIds;
37
	}
38

  
39
	public void setMergedIds(final List<String> mergedIds) {
40
		this.mergedIds = mergedIds;
41
	}
42

  
43
	public boolean isDeleted() {
44
		return deleted;
45
	}
46

  
47
	public void setDeleted(final boolean deleted) {
48
		this.deleted = deleted;
49
	}
50

  
51
	@Override
52
	public String toString() {
53
		return new Gson().toJson(this);
54
	}
55
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ExportSimplifiedRecordsMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2

  
3
import java.io.IOException;
4

  
5
import eu.dnetlib.miscutils.functional.xml.ApplyXslt;
6
import org.apache.commons.codec.binary.Base64;
7
import org.apache.commons.lang.StringUtils;
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.apache.hadoop.io.Text;
11
import org.apache.hadoop.mapreduce.Mapper;
12

  
13
public class ExportSimplifiedRecordsMapper extends Mapper<Text, Text, Text, Text> {
14

  
15
	private static final Log log = LogFactory.getLog(ExportSimplifiedRecordsMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
16

  
17
	private ApplyXslt recordSummarizer;
18

  
19
	private Text valueOut;
20

  
21
	private Text keyOut;
22

  
23
	@Override
24
	protected void setup(final Context context) throws IOException, InterruptedException {
25

  
26
		final String xslt = new String(Base64.decodeBase64(context.getConfiguration().get("xslt")));
27

  
28
		log.info("got xslt: \n" + xslt);
29

  
30
		recordSummarizer = new ApplyXslt(xslt);
31
		valueOut = new Text();
32
		keyOut = new Text("");
33
	}
34

  
35
	@Override
36
	protected void map(final Text key, final Text value, final Context context) throws IOException, InterruptedException {
37

  
38
		final String summary = recordSummarizer.evaluate(value.toString());
39
		if (StringUtils.isNotBlank(summary)) {
40
		    keyOut.set(StringUtils.substringAfter(key.toString(), "::"));
41
			valueOut.set(summary.replaceAll("\n","").replaceAll("\t",""));
42
			context.write(keyOut, valueOut);
43
		}
44
	}
45

  
46
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/VolatileColumnFamily.java
1
package eu.dnetlib.data.mapreduce.hbase;
2

  
3
public enum VolatileColumnFamily {
4

  
5
	dedup, dedupPerson; // instance is here to remove the old protos
6

  
7
	public static boolean isVolatile(final String columnName) {
8
		try {
9
			return VolatileColumnFamily.valueOf(columnName) != null;
10
		} catch (final Throwable e) {
11
			return false;
12
		}
13
	}
14
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupDeleteRelMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

  
3
import java.io.IOException;
4
import java.util.Map;
5

  
6
import eu.dnetlib.data.mapreduce.JobParams;
7
import eu.dnetlib.data.mapreduce.util.DedupUtils;
8
import eu.dnetlib.data.proto.TypeProtos.Type;
9
import eu.dnetlib.pace.config.DedupConfig;
10
import org.apache.hadoop.hbase.client.Delete;
11
import org.apache.hadoop.hbase.client.Result;
12
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
13
import org.apache.hadoop.hbase.mapreduce.TableMapper;
14
import org.apache.hadoop.io.Writable;
15

  
16
public class DedupDeleteRelMapper extends TableMapper<ImmutableBytesWritable, Writable> {
17

  
18
	private DedupConfig dedupConf;
19

  
20
	private ImmutableBytesWritable outKey;
21

  
22
	@Override
23
	protected void setup(final Context context) throws IOException, InterruptedException {
24
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
25
		System.out.println("dedup findRoots mapper\nwf conf: " + dedupConf.toString());
26

  
27
		outKey = new ImmutableBytesWritable();
28
	}
29

  
30
	@Override
31
	protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException {
32
		// System.out.println("Find root mapping: " + new String(rowkey.copyBytes()));
33

  
34
		final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
35

  
36
		deleteRels(rowkey, context, value, DedupUtils.getSimilarityCFBytes(type));
37
		deleteRels(rowkey, context, value, DedupUtils.getDedupCF_mergedInBytes(type));
38
		deleteRels(rowkey, context, value, DedupUtils.getDedupCF_mergesBytes(type));
39
	}
40

  
41
	private void deleteRels(final ImmutableBytesWritable rowkey, final Context context, final Result value, final byte[] cf)
42
			throws IOException, InterruptedException {
43

  
44
		final Map<byte[], byte[]> rels = value.getFamilyMap(cf);
45

  
46
		if ((rels != null) && !rels.isEmpty()) {
47

  
48
			final byte[] row = rowkey.copyBytes();
49
			final Delete delete = new Delete(row);
50
			delete.setWriteToWAL(JobParams.WRITE_TO_WAL);
51

  
52
			delete.deleteFamily(cf);
53

  
54
			outKey.set(row);
55
			context.write(outKey, delete);
56
			context.getCounter(dedupConf.getWf().getEntityType(), new String(cf) + " deleted").increment(rels.size());
57
		}
58
	}
59
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupFindRootsMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

  
3
import java.io.IOException;
4
import java.nio.ByteBuffer;
5
import java.util.Map;
6

  
7
import com.google.protobuf.InvalidProtocolBufferException;
8
import eu.dnetlib.data.mapreduce.JobParams;
9
import eu.dnetlib.data.mapreduce.util.DedupUtils;
10
import eu.dnetlib.data.proto.DedupProtos.Dedup;
11
import eu.dnetlib.data.proto.KindProtos.Kind;
12
import eu.dnetlib.data.proto.OafProtos.Oaf;
13
import eu.dnetlib.data.proto.OafProtos.OafRel.Builder;
14
import eu.dnetlib.data.proto.TypeProtos.Type;
15
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
16
import eu.dnetlib.pace.config.DedupConfig;
17
import org.apache.hadoop.hbase.client.Put;
18
import org.apache.hadoop.hbase.client.Result;
19
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
20
import org.apache.hadoop.hbase.mapreduce.TableMapper;
21
import org.apache.hadoop.hbase.util.Bytes;
22

  
23
public class DedupFindRootsMapper extends TableMapper<ImmutableBytesWritable, Put> {
24

  
25
	private DedupConfig dedupConf;
26

  
27
	@Override
28
	protected void setup(final Context context) throws IOException, InterruptedException {
29
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
30
		System.out.println("dedup findRoots mapper\nwf conf: " + dedupConf.toString());
31
	}
32

  
33
	@Override
34
	protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException {
35
		// System.out.println("Find root mapping: " + new String(rowkey.copyBytes()));
36

  
37
		final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
38
		final Map<byte[], byte[]> similarRels = value.getFamilyMap(DedupUtils.getSimilarityCFBytes(type));
39

  
40
		if ((similarRels != null) && !similarRels.isEmpty()) {
41
			final ByteBuffer min = findMin(ByteBuffer.wrap(rowkey.get()), similarRels.keySet());
42

  
43
			final byte[] row = rowkey.copyBytes();
44
			final byte[] root = DedupUtils.newIdBytes(min, dedupConf.getWf().getDedupRun());
45

  
46
			// System.out.println("Found root: " + new String(root));
47

  
48
			emitDedupRel(context, DedupUtils.getDedupCF_mergedInBytes(type), row, root, buildRel(row, root, Dedup.RelName.isMergedIn));
49
			emitDedupRel(context, DedupUtils.getDedupCF_mergesBytes(type), root, row, buildRel(root, row, Dedup.RelName.merges));
50

  
51
			context.getCounter(dedupConf.getWf().getEntityType(), "dedupRel (x2)").increment(1);
52

  
53
			// marks the original body deleted
54
			emitBody(context, row, value.getValue(Bytes.toBytes(dedupConf.getWf().getEntityType()), DedupUtils.BODY_B));
55

  
56
		} else {
57
			context.getCounter(dedupConf.getWf().getEntityType(), "row not in similarity mesh").increment(1);
58
		}
59
	}
60

  
61
	private ByteBuffer findMin(ByteBuffer min, final Iterable<byte[]> keys) {
62
		for (final byte[] q : keys) {
63
			final ByteBuffer iq = ByteBuffer.wrap(q);
64
			if (min.compareTo(iq) > 0) {
65
				min = iq;
66
			}
67
		}
68
		return min;
69
	}
70

  
71
	private void emitBody(final Context context, final byte[] row, final byte[] body) throws InvalidProtocolBufferException, IOException, InterruptedException {
72
		if (body == null) {
73
			context.getCounter(dedupConf.getWf().getEntityType(), "missing body").increment(1);
74
			System.err.println("missing body: " + new String(row));
75
			return;
76
		}
77
		final Oaf prototype = Oaf.parseFrom(body);
78

  
79
		if (prototype.getDataInfo().getDeletedbyinference()) {
80
			context.getCounter(dedupConf.getWf().getEntityType(), "bodies already deleted").increment(1);
81
		} else {
82
			final Oaf.Builder oafRoot = Oaf.newBuilder(prototype);
83
			oafRoot.getDataInfoBuilder().setDeletedbyinference(true).setInferred(true).setInferenceprovenance(dedupConf.getWf().getConfigurationId());
84
			final byte[] family = Bytes.toBytes(dedupConf.getWf().getEntityType());
85
			final Put put = new Put(row).add(family, DedupUtils.BODY_B, oafRoot.build().toByteArray());
86
			put.setWriteToWAL(JobParams.WRITE_TO_WAL);
87
			context.write(new ImmutableBytesWritable(row), put);
88
			context.getCounter(dedupConf.getWf().getEntityType(), "bodies marked deleted").increment(1);
89
		}
90
	}
91

  
92
	private byte[] buildRel(final byte[] from, final byte[] to, final Dedup.RelName relClass) {
93
		final Builder oafRel = DedupUtils.getDedup(dedupConf, new String(from), new String(to), relClass);
94
		final Oaf oaf =
95
				Oaf.newBuilder()
96
						.setKind(Kind.relation)
97
						.setLastupdatetimestamp(System.currentTimeMillis())
98
						.setDataInfo(
99
								AbstractDNetXsltFunctions.getDataInfo(null, "", "0.8", false, true).setInferenceprovenance(
100
										dedupConf.getWf().getConfigurationId())).setRel(oafRel)
101
				.build();
102
		return oaf.toByteArray();
103
	}
104

  
105
	private void emitDedupRel(final Context context, final byte[] cf, final byte[] from, final byte[] to, final byte[] value) throws IOException,
106
			InterruptedException {
107
		final Put put = new Put(from).add(cf, to, value);
108
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
109
		context.write(new ImmutableBytesWritable(from), put);
110
	}
111

  
112
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/CsvSerialiser.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment;
2

  
3
import java.io.StringWriter;
4
import java.util.List;
5
import java.util.Set;
6

  
7
import com.google.common.base.Joiner;
8
import com.google.common.collect.Iterables;
9
import com.google.common.collect.Lists;
10
import com.google.common.collect.Sets;
11
import org.apache.commons.lang.StringUtils;
12

  
13
/**
14
 * Created by claudio on 26/04/16.
15
 */
16
public class CsvSerialiser {
17

  
18
	private final static int MAX_FEATURES = 1000;
19
	private final static int MAX_ROWS = 5000;
20

  
21
	private int maxRows = MAX_ROWS;
22
	private int maxFeatures = MAX_FEATURES;
23

  
24
	public CsvSerialiser() {
25
	}
26

  
27
	public CsvSerialiser(int maxRows, int maxFeatures) {
28
		this.maxRows = maxRows;
29
		this.maxFeatures = maxFeatures;
30
	}
31

  
32
	public String asCSV(final List<CsvEntry> list) {
33
		final Set<String> features = Sets.newLinkedHashSet();
34

  
35
		for(CsvEntry e : Iterables.limit(list, maxRows)) {
36
			features.addAll(e.getFeatures());
37
		}
38

  
39
		final List<String> cappedFeatures = Lists.newLinkedList(Iterables.limit(features, maxFeatures));
40
		//context.getCounter("person", "features " + Iterables.size(cappedFeatures)).increment(1);
41

  
42
		final StringWriter csv = new StringWriter();
43
		csv.append("\"k\",");
44
		csv.append(Joiner.on(",").join(cappedFeatures));
45
		csv.append(",\"id\",\"name\",\"title\"\n");
46
		for(CsvEntry e : Iterables.limit(list, maxRows)) {
47

  
48
			boolean hasZero = false;
49
			boolean hasOne = false;
50

  
51
			final StringWriter line = new StringWriter();
52
			line.append(e.getKey()+",");
53
			for(String f : cappedFeatures) {
54
				if(e.getFeatures().contains(f)) {
55
					line.append("1,");
56
					hasOne = true;
57
				} else {
58
					line.append("0,");
59
					hasZero = true;
60
				}
61
			}
62
			line.append("\""+e.getId()+"\",");
63
			line.append("\""+e.getOriginalName()+"\",");
64
			line.append("\""+e.getTitle()+"\"");
65

  
66
			if (hasZero && hasOne) {
67
				csv.append(line.toString() + "\n");
68
			}
69
			//csv.append(StringUtils.substringBeforeLast(line.toString(), ",")  + "\n");
70
		}
71

  
72
		return csv.toString();
73
	}
74

  
75
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/SubjectsMap.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment;
2

  
3
import java.util.HashMap;
4
import java.util.Map.Entry;
5

  
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.bson.BsonDocument;
9
import org.bson.BsonDocumentWrapper;
10
import org.bson.codecs.configuration.CodecRegistry;
11
import org.bson.conversions.Bson;
12

  
13
/**
14
 * Created by claudio on 07/03/16.
15
 */
16
public class SubjectsMap extends HashMap<String, Subjects> {
17

  
18
	private static final Log log = LogFactory.getLog(SubjectsMap.class);
19

  
20
	public SubjectsMap mergeFrom(SubjectsMap sm) {
21

  
22
		if (sm != null) {
23
			for (Entry<String, Subjects> e : sm.entrySet()) {
24
				if (!this.containsKey(e.getKey())) {
25
					Subjects sub = new Subjects();
26

  
27
					sub.addAll(e.getValue());
28

  
29
					this.put(e.getKey(), sub);
30
				} else {
31
					for (String s : e.getValue()) {
32
						final Subjects subjects = this.get(e.getKey());
33
						subjects.add(s);
34
					}
35
				}
36
			}
37
		}
38

  
39
		return this;
40
	}
41

  
42
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/PublicationAnalysisMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment;
2

  
3
import java.io.IOException;
4
import java.util.List;
5

  
6
import eu.dnetlib.data.mapreduce.util.DedupUtils;
7
import eu.dnetlib.data.mapreduce.util.OafDecoder;
8
import eu.dnetlib.data.proto.FieldTypeProtos.StringField;
9
import eu.dnetlib.data.proto.ResultProtos;
10
import org.apache.commons.lang.StringUtils;
11
import org.apache.hadoop.hbase.client.Result;
12
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
13
import org.apache.hadoop.hbase.mapreduce.TableMapper;
14
import org.apache.hadoop.io.NullWritable;
15

  
16
/**
17
 * Created by claudio on 22/04/16.
18
 */
19
public class PublicationAnalysisMapper extends TableMapper<NullWritable, NullWritable> {
20

  
21
	public static final String RESULT = "result";
22
	private static final int MAX_DESCRIPTIONS = 50;
23

  
24
	@Override
25
	protected void setup(final Context context) throws IOException, InterruptedException {
26
		super.setup(context);
27
	}
28

  
29
	@Override
30
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
31

  
32
		if (new String(key.copyBytes()).contains("dedup_wf")) {
33
			context.getCounter(RESULT, "roots").increment(1);
34
			return;
35
		}
36

  
37
		final byte[] body = value.getValue(RESULT.getBytes(), DedupUtils.BODY_B);
38
		if (body == null) {
39
			context.getCounter(RESULT, "missing body").increment(1);
40
			return;
41
		}
42
		final OafDecoder decoder = OafDecoder.decode(body);
43
		final ResultProtos.Result result = decoder.getEntity().getResult();
44
		if (result.getMetadata().getResulttype().getClassid().equals("dataset")) {
45
			context.getCounter(RESULT, "dataset").increment(1);
46
			return;
47
		} else {
48
			context.getCounter(RESULT, "publication").increment(1);
49
		}
50

  
51
		if (result.getMetadata().getDescriptionCount() > MAX_DESCRIPTIONS) {
52
			context.getCounter(RESULT, "abstracts > " + MAX_DESCRIPTIONS).increment(1);
53
		} else {
54
			context.getCounter(RESULT, "abstracts: " + result.getMetadata().getDescriptionCount()).increment(1);
55
		}
56

  
57
		final List<StringField> descList = result.getMetadata().getDescriptionList();
58

  
59
		boolean empty = true;
60
		for(StringField desc : descList) {
61
			empty = empty && StringUtils.isBlank(desc.getValue());
62
		}
63

  
64
		context.getCounter(RESULT, "empty abstract: " + empty).increment(1);
65
	}
66

  
67
	@Override
68
	protected void cleanup(final Context context) throws IOException, InterruptedException {
69
		super.cleanup(context);
70
	}
71
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/Subjects.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment;
2

  
3
import java.util.HashSet;
4

  
5
import org.bson.BsonDocument;
6
import org.bson.BsonDocumentWrapper;
7
import org.bson.codecs.configuration.CodecRegistry;
8
import org.bson.conversions.Bson;
9

  
10
/**
11
 * Created by claudio on 07/03/16.
12
 */
13
public class Subjects extends HashSet<String> {
14

  
15

  
16
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/SubjectParser.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment;
2

  
3
import java.util.List;
4

  
5
import com.google.common.base.Splitter;
6
import org.apache.commons.lang.StringUtils;
7
import org.dom4j.Element;
8

  
9
/**
10
 * Created by claudio on 25/03/16.
11
 */
12
public class SubjectParser {
13

  
14
	public static final String REGEX_SUBJECT = "^(info:eu-repo)\\/(classification)\\/([a-zA-Z]*)\\/(.*)$";
15
	private static final int MIN_LENGTH = 5;
16

  
17
	public SubjectsMap parse(final org.dom4j.Document doc) {
18

  
19
		final List subjectNodes = doc.selectNodes("//*[local-name() = 'subject']");
20
		final SubjectsMap subjectMap = new SubjectsMap();
21

  
22
		for(int i = 0; i<subjectNodes.size(); i++) {
23
			final Element e = (Element) subjectNodes.get(i);
24
			final String subject = e.getText();
25

  
26
			final String type = guessType(subject);
27
			if (!subjectMap.containsKey(type)) {
28
				subjectMap.put(type, new Subjects());
29
			}
30

  
31
			if (StringUtils.isNotBlank(type)) {
32
				if ("keyword".equals(type)) {
33
					final Splitter splitter = Splitter.on(",").trimResults().omitEmptyStrings();
34
					for (String token : splitter.split(subject)) {
35
						final String value = token.replaceAll("[^a-zA-Z ]", "").toLowerCase();
36
						if (value.length() >= MIN_LENGTH) {
37
							subjectMap.get(type).add(value);
38
						}
39
					}
40
				} else {
41
					String token = subject.replaceFirst(REGEX_SUBJECT, "$4");
42

  
43
					if (StringUtils.isNotBlank(token)) {
44
						final String value = token.replaceAll("[^a-zA-Z ]", "").toLowerCase();
45
						if (value.length() >= MIN_LENGTH) {
46
							subjectMap.get(type).add(value);
47
						}
48
					}
49
				}
50
			}
51
		}
52

  
53
		return subjectMap;
54
	}
55

  
56
	private String guessType(final String subject) {
57
		if (subject.startsWith("info:eu-repo")) {
58
			final String s = subject.replaceAll(REGEX_SUBJECT, "$3");
59
			return s;
60
		} else {
61
			return "keyword";
62
		}
63
	}
64
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/CsvEntry.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment;
2

  
3
import java.util.Set;
4

  
5
import com.google.common.collect.Sets;
6
import com.google.gson.Gson;
7

  
8
/**
9
 * Created by claudio on 20/04/16.
10
 */
11
public class CsvEntry {
12

  
13
	private String key;
14

  
15
	private Set<String> features = Sets.newLinkedHashSet();
16

  
17
	private String title;
18

  
19
	private String id;
20

  
21
	private String originalName;
22

  
23
	public CsvEntry() {
24
	}
25

  
26
	public CsvEntry(final String key, final Set<String> features) {
27
		this.key = key;
28
		this.features = features;
29
	}
30

  
31
	public CsvEntry(final Set<String> features) {
32
		this.features = features;
33
	}
34

  
35
	public void addFeature(final String f) {
36
		getFeatures().add(f);
37
	}
38

  
39
	public Set<String> getFeatures() {
40
		return features;
41
	}
42

  
43
	public void setFeatures(final Set<String> features) {
44
		this.features = features;
45
	}
46

  
47
	public static CsvEntry fromJson(final String json) {
48
		return new Gson().fromJson(json, CsvEntry.class);
49
	}
50

  
51
	public String getKey() {
52
		return key;
53
	}
54

  
55
	public void setKey(final String key) {
56
		this.key = key;
57
	}
58

  
59
	public String getTitle() {
60
		return title;
61
	}
62

  
63
	public void setTitle(final String title) {
64
		this.title = title;
65
	}
66

  
67
	public String getId() {
68
		return id;
69
	}
70

  
71
	public void setId(final String id) {
72
		this.id = id;
73
	}
74

  
75
	@Override
76
	public String toString() {
77
		return new Gson().toJson(this);
78
	}
79

  
80
	@Override
81
	public boolean equals(final Object o) {
82
		return (o instanceof CsvEntry) && ((CsvEntry) o).getFeatures().equals(getFeatures());
83
	}
84

  
85
	public String getOriginalName() {
86
		return originalName;
87
	}
88

  
89
	public void setOriginalName(final String originalName) {
90
		this.originalName = originalName;
91
	}
92

  
93
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/cc/ConnectedComponentsMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.cc;
2

  
3
import java.io.IOException;
4

  
5
import org.apache.hadoop.io.Text;
6
import org.apache.hadoop.mapreduce.Mapper;
7

  
8
/**
9
 * Created by claudio on 15/10/15.
10
 */
11
public class ConnectedComponentsMapper extends Mapper<Text, VertexWritable, Text, VertexWritable> {
12

  
13

  
14
	@Override
15
	protected void map(Text key, VertexWritable value, Context context) throws IOException, InterruptedException {
16

  
17
		context.write(value.getVertexId(), value);
18

  
19
	}
20

  
21
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/cc/ConnectedComponentsReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.cc;
2

  
3
import java.io.IOException;
4
import java.nio.ByteBuffer;
5
import java.util.Set;
6

  
7
import com.google.common.collect.Sets;
8
import eu.dnetlib.data.mapreduce.JobParams;
9
import eu.dnetlib.data.mapreduce.util.DedupUtils;
10
import eu.dnetlib.data.proto.DedupProtos.Dedup;
11
import eu.dnetlib.data.proto.KindProtos.Kind;
12
import eu.dnetlib.data.proto.OafProtos.Oaf;
13
import eu.dnetlib.data.proto.OafProtos.OafRel;
14
import eu.dnetlib.data.proto.OafProtos.OafRel.Builder;
15
import eu.dnetlib.data.proto.TypeProtos.Type;
16
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
17
import eu.dnetlib.pace.config.DedupConfig;
18
import org.apache.commons.logging.Log;
19
import org.apache.commons.logging.LogFactory;
20
import org.apache.hadoop.hbase.client.Put;
21
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
22
import org.apache.hadoop.hbase.mapreduce.TableReducer;
23
import org.apache.hadoop.hbase.util.Bytes;
24
import org.apache.hadoop.io.Text;
25

  
26
/**
27
 * Created by claudio on 15/10/15.
28
 */
29
public class ConnectedComponentsReducer extends TableReducer<Text, VertexWritable, ImmutableBytesWritable> {
30

  
31
	private static final Log log = LogFactory.getLog(ConnectedComponentsReducer.class);
32

  
33
	private DedupConfig dedupConf;
34

  
35
	private byte[] cfMergedIn;
36

  
37
	private byte[] cfMerges;
38

  
39
	@Override
40
	protected void setup(final Context context) {
41

  
42
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
43
		log.info("dedup findRoots mapper\nwf conf: " + dedupConf.toString());
44

  
45
		final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
46
		cfMergedIn = DedupUtils.getDedupCF_mergedInBytes(type);
47
		cfMerges = DedupUtils.getDedupCF_mergesBytes(type);
48
	}
49

  
50
	@Override
51
	protected void reduce(Text key, Iterable<VertexWritable> values, Context context) throws IOException, InterruptedException {
52

  
53
		final Set<String> set = Sets.newHashSet();
54

  
55
		for(VertexWritable v : values) {
56
			for(Text t : v.getEdges()) {
57
				set.add(t.toString());
58
			}
59
		}
60

  
61
		final byte[] root = DedupUtils.newIdBytes(ByteBuffer.wrap(Bytes.toBytes(key.toString())), dedupConf.getWf().getDedupRun());
62

  
63
		for(String q : set) {
64
			final byte[] qb = Bytes.toBytes(q);
65
			emitDedupRel(context, cfMergedIn, qb, root, buildRel(qb, root, Dedup.RelName.isMergedIn));
66
			emitDedupRel(context, cfMerges, root, qb, buildRel(root, qb, Dedup.RelName.merges));
67

  
68
			context.getCounter(dedupConf.getWf().getEntityType(), "dedupRel (x2)").increment(1);
69
		}
70

  
71
	}
72

  
73
	private void emitDedupRel(final Context context, final byte[] cf, final byte[] from, final byte[] to, final byte[] value) throws IOException,
74
			InterruptedException {
75
		final Put put = new Put(from).add(cf, to, value);
76
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
77
		context.write(new ImmutableBytesWritable(from), put);
78
	}
79

  
80
	private byte[] buildRel(final byte[] from, final byte[] to, final Dedup.RelName relClass) {
81

  
82
		final OafRel.Builder oafRef = DedupUtils.getDedup(dedupConf, new String(from), new String(to), relClass);
83
		final Oaf oaf = DedupUtils.buildRel(dedupConf, oafRef, 0.8).build();
84

  
85
		return oaf.toByteArray();
86
	}
87

  
88
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/cc/MindistSearchMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.cc;
2

  
3
import java.io.IOException;
4

  
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.apache.hadoop.io.Text;
8
import org.apache.hadoop.mapreduce.Mapper;
9

  
10
/**
11
 * Created by claudio on 14/10/15.
12
 */
13
public class MindistSearchMapper extends Mapper<Text, VertexWritable, Text, VertexWritable> {
14

  
15
	private static final Log log = LogFactory.getLog(MindistSearchMapper.class);
16

  
17
	private boolean debug = false;
18

  
19
	@Override
20
	protected void setup(Mapper.Context context) throws IOException, InterruptedException {
21
		super.setup(context);
22

  
23
		debug = context.getConfiguration().getBoolean("mindist_DEBUG", false);
24
		log.info("debug mode: " + debug);
25
	}
26

  
27
	@Override
28
	protected void map(Text key, VertexWritable value, Context context) throws IOException, InterruptedException {
29

  
30
		emit(key, value, context);
31
		if (value.isActivated()) {
32
			VertexWritable vertex = new VertexWritable();
33
			for (Text edge : value.getEdges()) {
34
				if (!edge.toString().equals(value.getVertexId().toString())) {
35
					vertex.setVertexId(value.getVertexId());
36
					vertex.setEdges(null);
37
					emit(edge, vertex, context);
38
				}
39
			}
40
		}
41
	}
42

  
43
	private void emit(final Text key, final VertexWritable vertex, final Context context) throws IOException, InterruptedException {
44
		context.write(key, vertex);
45
		if (debug) {
46
			log.info(vertex.toJSON());
47
		}
48
	}
49

  
50
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/cc/HBaseToSimilarityGraphMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.cc;
2

  
3
import java.io.IOException;
4

  
5
import org.apache.hadoop.hbase.KeyValue;
6
import org.apache.hadoop.hbase.client.Result;
7
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
8
import org.apache.hadoop.hbase.mapreduce.TableMapper;
9
import org.apache.hadoop.io.Text;
10

  
11
/**
12
 * Created by claudio on 14/10/15.
13
 */
14
public class HBaseToSimilarityGraphMapper extends TableMapper<Text, VertexWritable> {
15

  
16
	@Override
17
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
18

  
19
		final VertexWritable vertex = new VertexWritable();
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff