Project

General

Profile

« Previous | Next » 

Revision 45419

work in progress, adapting m/r jobs to the updated graph domain version

View differences:

modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/FindPersonCoauthorsReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

  
3
public class FindPersonCoauthorsReducer {
4

  
5
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/FindPersonCoauthorsMapper.java
1
//package eu.dnetlib.data.mapreduce.hbase.dedup;
2
//
3
//import java.io.IOException;
4
//import java.util.List;
5
//import java.util.NavigableMap;
6
//import java.util.stream.Collectors;
7
//
8
//import org.apache.hadoop.hbase.client.Result;
9
//import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
//import org.apache.hadoop.hbase.mapreduce.TableMapper;
11
//import org.apache.hadoop.hbase.util.Bytes;
12
//import org.apache.hadoop.io.Text;
13
//import org.apache.http.HttpResponse;
14
//import org.apache.http.client.HttpClient;
15
//import org.apache.http.client.methods.HttpPost;
16
//import org.apache.http.client.methods.HttpUriRequest;
17
//import org.apache.http.impl.client.DefaultHttpClient;
18
//import org.apache.http.params.BasicHttpParams;
19
//import org.apache.http.params.HttpParams;
20
//
21
//import com.google.common.collect.Lists;
22
//import com.google.gson.Gson;
23
//
24
//import eu.dnetlib.data.mapreduce.hbase.VolatileColumnFamily;
25
//import eu.dnetlib.data.proto.RelTypeProtos.RelType;
26
//
27
//public class FindPersonCoauthorsMapper extends TableMapper<Text, Text> {
28
//
29
//	private final HttpClient client = new DefaultHttpClient();
30
//
31
//	private final String url = "http://146.48.87.97:8888/addData";
32
//
33
//	@Override
34
//	protected void setup(final Context context) {
35
//		// url = context.getConfiguration().get("dedup.person.coauthors.service.url");
36
//	}
37
//
38
//	@Override
39
//	protected void map(final ImmutableBytesWritable rowkey, final Result row, final Context context) throws IOException, InterruptedException {
40
//		final NavigableMap<byte[], byte[]> candidates = row.getFamilyMap(Bytes.toBytes(VolatileColumnFamily.dedupPerson.toString()));
41
//		if ((candidates == null) || candidates.isEmpty()) return;
42
//
43
//		final List<String> coauthors = Lists.newArrayList();
44
//		final byte[] family = Bytes.toBytes(RelType.personPublication.toString());
45
//		coauthors.addAll(row.getFamilyMap(family).keySet().stream().map(Bytes::toString).collect(Collectors.toList()));
46
//
47
//		for (final byte[] candidate : candidates.keySet()) {
48
//			emit(context, Bytes.toString(candidate), coauthors);
49
//		}
50
//	}
51
//
52
//	private void emit(final Context context, final String candidate, final List<String> coauthors) {
53
//		try {
54
//			final HttpUriRequest request = new HttpPost(url);
55
//			final HttpParams params = new BasicHttpParams();
56
//			params.setParameter("id", candidate);
57
//			params.setParameter("data", (new Gson().toJson(coauthors)));
58
//			request.setParams(params);
59
//			final HttpResponse response = client.execute(request);
60
//			context.getCounter("HTTP call", "code " + response.getStatusLine().getStatusCode()).increment(1);
61
//		} catch (final Exception e) {
62
//			context.getCounter("HTTP call", "Exception " + e.getClass()).increment(1);
63
//		}
64
//	}
65
//}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/util/DedupUtils.java
1
//package eu.dnetlib.data.mapreduce.util;
2
//
3
import java.nio.ByteBuffer;
4

  
5
import eu.dnetlib.data.proto.DNGFProtos.DNGFRel;
6
import eu.dnetlib.data.proto.DNGFProtos.DNGFRel.Builder;
7
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier;
8

  
9
import eu.dnetlib.data.proto.TypeProtos.Type;
10
import eu.dnetlib.pace.config.DedupConfig;
11
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12
import org.apache.hadoop.hbase.util.Bytes;
13

  
14
public class DedupUtils {
15

  
16
	public static final String CF_SEPARATOR = "_";
17

  
18
	public static final String ROOT = "dedup_wf";
19

  
20
	public static final String BODY_S = "body";
21

  
22
	public static final byte[] BODY_B = Bytes.toBytes(BODY_S);
23

  
24
	public static String dedupPrefix(final String dedupRun) {
25
		return "|" + ROOT + "_" + dedupRun + "::";
26
	}
27

  
28
	public static String newId(final String id, final String dedupRun) {
29
		if ((dedupRun == null) || (dedupRun.length() != 3)) throw new IllegalArgumentException("wrong dedupRun param");
30

  
31
		return id.replaceFirst("\\|.*\\:\\:", dedupPrefix(dedupRun));
32
	}
33

  
34
	public static byte[] newIdBytes(final String s, final String dedupRun) {
35
		return newId(s, dedupRun).getBytes();
36
	}
37

  
38
	public static byte[] newIdBytes(final ByteBuffer b, final String dedupRun) {
39
		return newId(new String(b.array()), dedupRun).getBytes();
40
	}
41

  
42
	public static boolean isRoot(final String s) {
43
		return s.contains(ROOT);
44
	}
45

  
46
	public static boolean isRoot(final ImmutableBytesWritable s) {
47
		return isRoot(s.copyBytes());
48
	}
49

  
50
	public static boolean isRoot(final byte[] s) {
51
		return isRoot(new String(s));
52
	}
53

  
54
	public static String getDedupCF_merges(final Type type) {
55
		return getRelType(type) + CF_SEPARATOR + "dedup" + CF_SEPARATOR + "merges";
56
	}
57

  
58
	public static String getDedupCF_merges(final String type) {
59
		return getDedupCF_merges(Type.valueOf(type));
60
	}
61

  
62
	public static byte[] getDedupCF_mergesBytes(final Type type) {
63
		return Bytes.toBytes(getDedupCF_merges(type));
64
	}
65

  
66
	public static byte[] getDedupCF_mergesBytes(final String type) {
67
		return getDedupCF_mergesBytes(Type.valueOf(type));
68
	}
69

  
70
	public static String getDedupCF_mergedIn(final Type type) {
71
		return getRelType(type) + CF_SEPARATOR + "dedup" + CF_SEPARATOR + "isMergedIn";
72
	}
73

  
74
	public static String getDedupCF_mergedIn(final String type) {
75
		return getDedupCF_mergedIn(Type.valueOf(type));
76
	}
77

  
78
	public static byte[] getDedupCF_mergedInBytes(final Type type) {
79
		return Bytes.toBytes(getDedupCF_mergedIn(type));
80
	}
81

  
82
	public static byte[] getDedupCF_mergedInBytes(final String type) {
83
		return getDedupCF_mergedInBytes(Type.valueOf(type));
84
	}
85

  
86
	public static String getSimilarityCF(final Type type) {
87
		return getRelType(type) + CF_SEPARATOR + "dedupSimilarity" + CF_SEPARATOR + "isSimilarTo";
88
	}
89

  
90
	public static String getSimilarityCF(final String type) {
91
		return getSimilarityCF(Type.valueOf(type));
92
	}
93

  
94
	public static byte[] getSimilarityCFBytes(final Type type) {
95
		return Bytes.toBytes(getSimilarityCF(type));
96
	}
97

  
98
	public static byte[] getSimilarityCFBytes(final String type) {
99
		return getSimilarityCFBytes(Type.valueOf(type));
100
	}
101

  
102
	public static String getRelTypeString(final Type type) {
103
		return getRelType(type).toString();
104
	}
105

  
106
	public static String getRelType(final Type type) {
107
		switch (type) {
108
		case organization:
109
			return "organizationOrganization";
110
		case person:
111
			return "personPerson";
112
		case publication:
113
			return "publicationPublication";
114
		case dataset:
115
			return "datasetDataset";
116
		default:
117
			throw new IllegalArgumentException("Deduplication not supported for entity type: " + type);
118
		}
119
	}
120

  
121
	public static ColumnFamily decodeCF(final byte[] b) {
122
		final String[] s = new String(b).split(CF_SEPARATOR);
123
		return new DedupUtils().getCF(  RelType.valueOf(s[0]), SubRelType.valueOf(s[1]));
124
	}
125

  
126
	private ColumnFamily getCF(final RelType relType, final SubRelType subRelType) {
127
		return new ColumnFamily(relType, subRelType);
128
	}
129

  
130
	public static DNGFRel.Builder getDedup(final DedupConfig dedupConf, final String from, final String to, final String relClass) {
131
		final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
132

  
133
		RelDescriptor descriptor = new RelDescriptor(relClass);
134

  
135
		final Builder oafRel =
136
				DNGFRel.newBuilder()
137
						.setRelType(Qualifier.newBuilder()
138
															.setClassid(descriptor.getTermCode()).setClassname(descriptor.getTermCode())
139
															.setSchemeid(descriptor.getOntologyCode()).setSchemename(descriptor.getOntologyCode()))
140
						.setSource(new String(from)).setSourceType(DNGFRowKeyDecoder.decode(new String(from)).getType())
141
						.setTarget(new String(to)).setTargetType(DNGFRowKeyDecoder.decode(new String(to)).getType())
142
						.setChild(false);
143
		switch (type) {
144
		case organization:
145
			oafRel.setOrganizationOrganization(OrganizationOrganization.newBuilder().setDedup(
146
					DedupUtils.dedup(relClass, "dnet:organization_organization_relations")));
147
			break;
148
		case person:
149
			oafRel.setPersonPerson(PersonPerson.newBuilder().setDedup(DedupUtils.dedup(relClass, "dnet:person_person_relations")));
150
			break;
151
		case publication:
152
			oafRel.setPublicationPublication(PublicationPublication.newBuilder().setDedup(DedupUtils.dedup(relClass, "dnet:publication_publication_relations")));
153
			break;
154
		case dataset:
155
			oafRel.setDatasetDataset(DatasetDataset.newBuilder().setDedup(DedupUtils.dedup(relClass, "dnet:dataset_dataset_relations")));
156
			break;
157
		default:
158
			throw new IllegalArgumentException("Deduplication not supported for entity type: " + dedupConf.getWf().getEntityType());
159
		}
160
		return oafRel;
161
	}
162

  
163
	private static Dedup.Builder dedup(final Dedup.RelName relClass, final String scheme) {
164
		return Dedup.newBuilder().setRelMetadata(
165
				RelMetadata.newBuilder().setSemantics(
166
						Qualifier.newBuilder().setClassid(relClass.toString()).setClassname(relClass.toString()).setSchemeid(scheme).setSchemename(scheme)));
167
	}
168

  
169
	class ColumnFamily {
170

  
171
		private final RelType relType;
172
		private final SubRelType subRelType;
173

  
174
		public ColumnFamily(final RelType relType, final SubRelType subRelType) {
175
			this.relType = relType;
176
			this.subRelType = subRelType;
177
		}
178

  
179
		@Override
180
		public String toString() {
181
			return getRelType() + CF_SEPARATOR + getSubRelType();
182
		}
183

  
184
		public RelType getRelType() {
185
			return relType;
186
		}
187

  
188
		public SubRelType getSubRelType() {
189
			return subRelType;
190
		}
191

  
192
	}
193

  
194
}
modules/dnet-mapreduce-jobs/trunk/src/test/java/eu/dnetlib/data/transform/HBaseReadTest.java
1
package eu.dnetlib.data.transform;
2

  
3
import java.io.IOException;
4
import java.util.List;
5
import java.util.concurrent.atomic.AtomicInteger;
6

  
7
import eu.dnetlib.data.graph.model.DNGFDecoder;
8
import eu.dnetlib.data.proto.WdsDatasetProtos.WdsDataset;
9
import eu.dnetlib.data.proto.WdsDatasetProtos.WdsDataset.GeoLocation;
10
import org.apache.commons.io.IOUtils;
11
import org.apache.commons.lang3.StringUtils;
12
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
13
import org.apache.hadoop.conf.Configuration;
14
import org.apache.hadoop.hbase.client.*;
15
import org.apache.hadoop.hbase.util.Bytes;
16
import org.junit.*;
17
import org.springframework.core.io.ClassPathResource;
18
import org.springframework.core.io.Resource;
19

  
20
/**
21
 * Created by claudio on 05/09/16.
22
 */
23
public class HBaseReadTest {
24

  
25
	private static final String TABLE_NAME = "db_wds";
26
	private Resource confIn = new ClassPathResource("eu/dnetlib/data/hadoop/config/hadoop-default.dm.cnr.properties");
27

  
28
	private HTable table;
29

  
30
	@Before
31
	public void setUp() throws IOException {
32

  
33
		final Configuration conf = new Configuration();
34

  
35
		for(final String line : IOUtils.readLines(confIn.getInputStream())) {
36
			System.out.println("line = " + line);
37
			if (!line.trim().isEmpty() && !line.startsWith("#")) {
38
				final String[] split = line.split("=");
39
				conf.set(split[0].trim(), split[1].trim());
40
			}
41
		}
42

  
43
		table = new HTable(conf, Bytes.toBytes(TABLE_NAME));
44
	}
45

  
46
	@Ignore
47
	@Test
48
	public void testReadGeoLocations() throws IOException {
49

  
50
		final Scan scan = new Scan();
51
		scan.addColumn(Bytes.toBytes("dataset"), Bytes.toBytes("body"));
52

  
53
		final ResultScanner rs = table.getScanner(scan);
54

  
55
		System.out.println("start iteration");
56

  
57
		final DescriptiveStatistics statN = new DescriptiveStatistics();
58
		final AtomicInteger invalid = new AtomicInteger(0);
59
		rs.forEach(r -> {
60
			final byte[] b = r.getValue(Bytes.toBytes("dataset"), Bytes.toBytes("body"));
61
			final DNGFDecoder d = DNGFDecoder.decode(b, WdsDataset.geolocation);
62
			final List<GeoLocation> geoList = d.getDNGF().getEntity().getDataset().getMetadata().getExtension(WdsDataset.geolocation);
63
			geoList.forEach(g -> g.getBoxList().forEach(box -> {
64
				if (StringUtils.isNotBlank(box)) {
65
					final String[] split = box.trim().split(" ");
66
					try {
67
						statN.addValue(split.length);
68
						Assert.assertTrue("bad number of coordinates", split.length == 4);
69

  
70
						// Rect(minX=-180.0,maxX=180.0,minY=-90.0,maxY=90.0)
71

  
72
						Assert.assertTrue("minX=-180", Double.parseDouble(split[1]) >= -180.0);
73
						Assert.assertTrue("maxX= 180", Double.parseDouble(split[3]) <=  180.0);
74
						Assert.assertTrue("minY= -90", Double.parseDouble(split[0]) >= -90.0);
75
						Assert.assertTrue("maxY=  90", Double.parseDouble(split[2]) <=  90.0);
76

  
77
						//maxY must be >= minY: 90.0 to -90.0
78
						Assert.assertTrue("maxY must be >= minY", Double.parseDouble(split[2]) >= Double.parseDouble(split[0]));
79

  
80
						//maxY must be >= minY: 90.0 to -90.0
81
						Assert.assertTrue("maxX must be >= minX", Double.parseDouble(split[3]) >= Double.parseDouble(split[1]));
82
					} catch (AssertionError e) {
83
						invalid.set(invalid.get() + 1);
84
						//System.err.println(String.format("document %s has %s coordinates: %s", d.getDNGF().getEntity().getId(), split.length, e.getMessage()));
85
						//throw e;
86
					}
87
				}
88
			}));
89
		});
90

  
91
		rs.close();
92

  
93
		System.out.println(String.format("stat N: %s", statN));
94
		System.out.println(String.format("invalid N: %s", invalid.get()));
95
	}
96

  
97

  
98
	@After
99
	public void tearDown() throws IOException {
100
		table.close();
101
	}
102
}
modules/dnet-mapreduce-jobs/trunk/src/test/resources/eu/dnetlib/data/hadoop/config/hadoop-default.dm.cnr.properties
1
dnet.clustername				=	DM
2

  
3
#CORE-SITE
4
fs.defaultFS					=	hdfs://nmis-hadoop-cluster
5

  
6
hadoop.security.authentication	=	simple
7
hadoop.security.auth_to_local	=	DEFAULT
8

  
9
hadoop.rpc.socket.factory.class.default	=	org.apache.hadoop.net.StandardSocketFactory
10

  
11
#HBASE-SITE
12
hbase.rootdir					=	hdfs://nmis-hadoop-cluster/hbase
13

  
14
hbase.security.authentication	=	simple
15
zookeeper.znode.rootserver		=	root-region-server
16
hbase.zookeeper.quorum			=	quorum1.t.hadoop.research-infrastructures.eu,quorum2.t.hadoop.research-infrastructures.eu,quorum3.t.hadoop.research-infrastructures.eu,quorum4.t.hadoop.research-infrastructures.eu,jobtracker.t.hadoop.research-infrastructures.eu
17
hbase.zookeeper.property.clientPort	=	2182
18
hbase.zookeeper.client.port		=	2182
19
zookeeper.znode.parent			=	/hbase
20

  
21
#HDFS-SITE
22
dfs.replication					=	2
23
dfs.nameservices						=	nmis-hadoop-cluster
24
dfs.ha.namenodes.nmis-hadoop-cluster	=	nn1,nn2
25

  
26
dfs.namenode.rpc-address.nmis-hadoop-cluster.nn1=quorum1.t.hadoop.research-infrastructures.eu:8020
27
dfs.namenode.http-address.nmis-hadoop-cluster.nn1=quorum1.t.hadoop.research-infrastructures.eu:50070
28
dfs.namenode.rpc-address.nmis-hadoop-cluster.nn2=quorum2.t.hadoop.research-infrastructures.eu:8020
29
dfs.namenode.http-address.nmis-hadoop-cluster.nn2=quorum2.t.hadoop.research-infrastructures.eu:50070
30

  
31
dfs.client.failover.proxy.provider.nmis-hadoop-cluster=org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
32

  
33
#MAPRED-SITE
34
mapred.job.tracker					=	nmis-hadoop-jt
35
mapred.jobtrackers.nmis-hadoop-jt	=	jt1,jt2
36

  
37
mapred.jobtracker.rpc-address.nmis-hadoop-jt.jt1 = jobtracker.t.hadoop.research-infrastructures.eu:8021
38
mapred.jobtracker.rpc-address.nmis-hadoop-jt.jt2 = quorum4.t.hadoop.research-infrastructures.eu:8022
39

  
40
mapred.client.failover.proxy.provider.nmis-hadoop-jt = org.apache.hadoop.mapred.ConfiguredFailoverProxyProvider
41

  
42
mapred.mapper.new-api			=	true
43
mapred.reducer.new-api			=	true
44

  
45
#OOZIE SERVER
46
oozie.service.loc				=	http://oozie.t.hadoop.research-infrastructures.eu:11000/oozie
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/PublicationAnalysisMapper.java
1
//package eu.dnetlib.data.mapreduce.hbase.dedup.experiment;
2
//
3
//import java.io.IOException;
4
//import java.util.List;
5
//
6
//import eu.dnetlib.data.mapreduce.util.DNGFDecoder;
7
//import eu.dnetlib.data.mapreduce.util.DedupUtils;
8
//import eu.dnetlib.data.proto.FieldTypeProtos.StringField;
9
//import eu.dnetlib.data.proto.PublicationProtos.Publication;
10
//import org.apache.commons.lang3.StringUtils;
11
//import org.apache.hadoop.hbase.client.Result;
12
//import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
13
//import org.apache.hadoop.hbase.mapreduce.TableMapper;
14
//import org.apache.hadoop.io.NullWritable;
15
//
16
///**
17
// * Created by claudio on 22/04/16.
18
// */
19
//public class PublicationAnalysisMapper extends TableMapper<NullWritable, NullWritable> {
20
//
21
//	public static final String RESULT = "result";
22
//	private static final int MAX_DESCRIPTIONS = 50;
23
//
24
//	@Override
25
//	protected void setup(final Context context) throws IOException, InterruptedException {
26
//		super.setup(context);
27
//	}
28
//
29
//	@Override
30
//	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
31
//
32
//		if (new String(key.copyBytes()).contains("dedup_wf")) {
33
//			context.getCounter(RESULT, "roots").increment(1);
34
//			return;
35
//		}
36
//
37
//		final byte[] body = value.getValue(RESULT.getBytes(), DedupUtils.BODY_B);
38
//		if (body == null) {
39
//			context.getCounter(RESULT, "missing body").increment(1);
40
//			return;
41
//		}
42
//		final DNGFDecoder decoder = DNGFDecoder.decode(body);
43
//		final Publication publication = decoder.getEntity().getPublication();
44
//		if (publication.getMetadata().getResulttype().getClassid().equals("dataset")) {
45
//			context.getCounter(RESULT, "dataset").increment(1);
46
//			return;
47
//		} else {
48
//			context.getCounter(RESULT, "publication").increment(1);
49
//		}
50
//
51
//		if (publication.getMetadata().getDescriptionCount() > MAX_DESCRIPTIONS) {
52
//			context.getCounter(RESULT, "abstracts > " + MAX_DESCRIPTIONS).increment(1);
53
//		} else {
54
//			context.getCounter(RESULT, "abstracts: " + publication.getMetadata().getDescriptionCount()).increment(1);
55
//		}
56
//
57
//		final List<StringField> descList = publication.getMetadata().getDescriptionList();
58
//
59
//		boolean empty = true;
60
//		for(StringField desc : descList) {
61
//			empty = empty && StringUtils.isBlank(desc.getValue());
62
//		}
63
//
64
//		context.getCounter(RESULT, "empty abstract: " + empty).increment(1);
65
//	}
66
//
67
//	@Override
68
//	protected void cleanup(final Context context) throws IOException, InterruptedException {
69
//		super.cleanup(context);
70
//	}
71
//}
1
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment;
2

  
3
import java.io.IOException;
4
import java.util.List;
5

  
6
import eu.dnetlib.data.graph.model.DNGFDecoder;
7
import eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO;
8
import eu.dnetlib.data.proto.FieldTypeProtos.StringField;
9
import eu.dnetlib.data.proto.PublicationProtos.Publication;
10
import org.apache.commons.lang3.StringUtils;
11
import org.apache.hadoop.hbase.client.Result;
12
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
13
import org.apache.hadoop.hbase.mapreduce.TableMapper;
14
import org.apache.hadoop.io.NullWritable;
15

  
16
/**
17
 * Created by claudio on 22/04/16.
18
 */
19
public class PublicationAnalysisMapper extends TableMapper<NullWritable, NullWritable> {
20

  
21
	public static final String RESULT = "result";
22
	private static final int MAX_DESCRIPTIONS = 50;
23

  
24
	@Override
25
	protected void setup(final Context context) throws IOException, InterruptedException {
26
		super.setup(context);
27
	}
28

  
29
	@Override
30
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
31

  
32
		if (new String(key.copyBytes()).contains("dedup_wf")) {
33
			context.getCounter(RESULT, "roots").increment(1);
34
			return;
35
		}
36

  
37
		final byte[] body = value.getValue(RESULT.getBytes(), HBaseTableDAO.cfMetadataByte());
38
		if (body == null) {
39
			context.getCounter(RESULT, "missing body").increment(1);
40
			return;
41
		}
42
		final DNGFDecoder decoder = DNGFDecoder.decode(body);
43
		final Publication publication = decoder.getEntity().getPublication();
44
		if (publication.getMetadata().getResulttype().getClassid().equals("dataset")) {
45
			context.getCounter(RESULT, "dataset").increment(1);
46
			return;
47
		} else {
48
			context.getCounter(RESULT, "publication").increment(1);
49
		}
50

  
51
		if (publication.getMetadata().getDescriptionCount() > MAX_DESCRIPTIONS) {
52
			context.getCounter(RESULT, "abstracts > " + MAX_DESCRIPTIONS).increment(1);
53
		} else {
54
			context.getCounter(RESULT, "abstracts: " + publication.getMetadata().getDescriptionCount()).increment(1);
55
		}
56

  
57
		final List<StringField> descList = publication.getMetadata().getDescriptionList();
58

  
59
		boolean empty = true;
60
		for(StringField desc : descList) {
61
			empty = empty && StringUtils.isBlank(desc.getValue());
62
		}
63

  
64
		context.getCounter(RESULT, "empty abstract: " + empty).increment(1);
65
	}
66

  
67
	@Override
68
	protected void cleanup(final Context context) throws IOException, InterruptedException {
69
		super.cleanup(context);
70
	}
71
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupMarkDeletedEntityMapper.java
1
//package eu.dnetlib.data.mapreduce.hbase.dedup;
2
//
3
//import java.io.IOException;
4
//import java.util.Map;
5
//
6
//import eu.dnetlib.data.mapreduce.JobParams;
7
//import eu.dnetlib.data.mapreduce.util.DedupUtils;
8
//import eu.dnetlib.data.proto.DNGFProtos.DNGF;
9
//import eu.dnetlib.data.proto.DNGFProtos.DNGFRel.Builder;
10
//import eu.dnetlib.data.proto.KindProtos.Kind;
11
//import eu.dnetlib.data.proto.SubRelProtos.Dedup;
12
//import eu.dnetlib.data.proto.TypeProtos.Type;
13
//import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
14
//import eu.dnetlib.pace.config.DedupConfig;
15
//import org.apache.commons.logging.Log;
16
//import org.apache.commons.logging.LogFactory;
17
//import org.apache.hadoop.hbase.client.Put;
18
//import org.apache.hadoop.hbase.client.Result;
19
//import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
20
//import org.apache.hadoop.hbase.mapreduce.TableMapper;
21
//import org.apache.hadoop.hbase.util.Bytes;
22
//
23
//public class DedupMarkDeletedEntityMapper extends TableMapper<ImmutableBytesWritable, Put> {
24
//
25
//	private static final Log log = LogFactory.getLog(DedupMarkDeletedEntityMapper.class);
26
//
27
//	private DedupConfig dedupConf;
28
//
29
//	@Override
30
//	protected void setup(final Context context) throws IOException, InterruptedException {
31
//		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
32
//		log.info("dedup findRoots mapper\nwf conf: " + dedupConf.toString());
33
//	}
34
//
35
//	@Override
36
//	protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException {
37
//		// System.out.println("Find root mapping: " + new String(rowkey.copyBytes()));
38
//
39
//		final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
40
//		final Map<byte[], byte[]> mergedIn = value.getFamilyMap(DedupUtils.getDedupCF_mergedInBytes(type));
41
//
42
//		if ((mergedIn != null) && !mergedIn.isEmpty()) {
43
//
44
//			final byte[] row = rowkey.copyBytes();
45
//
46
//			// marks the original body deleted
47
//			emitBody(context, row, value.getValue(Bytes.toBytes(type.toString()), DedupUtils.BODY_B));
48
//
49
//		} else {
50
//			context.getCounter(type.toString(), "row not merged").increment(1);
51
//		}
52
//	}
53
//
54
//
55
//	private void emitBody(final Context context, final byte[] row, final byte[] body) throws IOException, InterruptedException {
56
//		final String type = dedupConf.getWf().getEntityType();
57
//		if (body == null) {
58
//			context.getCounter(type, "missing body").increment(1);
59
//			System.err.println("missing body: " + new String(row));
60
//			return;
61
//		}
62
//		final DNGF prototype = DNGF.parseFrom(body);
63
//
64
//		if (prototype.getDataInfo().getDeletedbyinference()) {
65
//			context.getCounter(type, "bodies already deleted").increment(1);
66
//		} else {
67
//			final DNGF.Builder oafRoot = DNGF.newBuilder(prototype);
68
//			oafRoot.getDataInfoBuilder().setDeletedbyinference(true).setInferred(true).setInferenceprovenance(dedupConf.getWf().getConfigurationId());
69
//			final byte[] family = Bytes.toBytes(type);
70
//			final Put put = new Put(row).add(family, DedupUtils.BODY_B, oafRoot.build().toByteArray());
71
//			put.setWriteToWAL(JobParams.WRITE_TO_WAL);
72
//			context.write(new ImmutableBytesWritable(row), put);
73
//			context.getCounter(type, "bodies marked deleted").increment(1);
74
//		}
75
//	}
76
//
77
//	private byte[] buildRel(final byte[] from, final byte[] to, final Dedup.RelName relClass) {
78
//		final Builder oafRel = DedupUtils.getDedup(dedupConf, new String(from), new String(to), relClass);
79
//		final DNGF oaf =
80
//				DNGF.newBuilder()
81
//						.setKind(Kind.relation)
82
//						.setLastupdatetimestamp(System.currentTimeMillis())
83
//						.setDataInfo(
84
//								AbstractDNetXsltFunctions.getDataInfo(null, "", "0.8", false, true).setInferenceprovenance(
85
//										dedupConf.getWf().getConfigurationId())).setRel(oafRel)
86
//				.build();
87
//		return oaf.toByteArray();
88
//	}
89
//
90
//}
1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

  
3
import java.io.IOException;
4
import java.util.Map;
5

  
6
import eu.dnetlib.data.mapreduce.JobParams;
7
import eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO;
8
import eu.dnetlib.data.proto.DNGFProtos.DNGF;
9
import eu.dnetlib.data.proto.TypeProtos.Type;
10
import eu.dnetlib.pace.config.DedupConfig;
11
import org.apache.commons.logging.Log;
12
import org.apache.commons.logging.LogFactory;
13
import org.apache.hadoop.hbase.client.Put;
14
import org.apache.hadoop.hbase.client.Result;
15
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
16
import org.apache.hadoop.hbase.mapreduce.TableMapper;
17
import org.apache.hadoop.hbase.util.Bytes;
18

  
19
public class DedupMarkDeletedEntityMapper extends TableMapper<ImmutableBytesWritable, Put> {
20

  
21
	private static final Log log = LogFactory.getLog(DedupMarkDeletedEntityMapper.class);
22

  
23
	private DedupConfig dedupConf;
24

  
25
	@Override
26
	protected void setup(final Context context) throws IOException, InterruptedException {
27
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
28
		log.info("dedup findRoots mapper\nwf conf: " + dedupConf.toString());
29
	}
30

  
31
	@Override
32
	protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException {
33
		// System.out.println("Find root mapping: " + new String(rowkey.copyBytes()));
34

  
35
		final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
36
		final Map<byte[], byte[]> mergedIn = value.getFamilyMap(HBaseTableDAO.getDedupQualifier_mergedInBytes(type));
37

  
38
		if ((mergedIn != null) && !mergedIn.isEmpty()) {
39

  
40
			final byte[] row = rowkey.copyBytes();
41

  
42
			// marks the original body deleted
43
			emitBody(context, row, value.getValue(Bytes.toBytes(type.toString()), HBaseTableDAO.cfMetadataByte()));
44

  
45
		} else {
46
			context.getCounter(type.toString(), "row not merged").increment(1);
47
		}
48
	}
49

  
50

  
51
	private void emitBody(final Context context, final byte[] row, final byte[] body) throws IOException, InterruptedException {
52
		final String type = dedupConf.getWf().getEntityType();
53
		if (body == null) {
54
			context.getCounter(type, "missing body").increment(1);
55
			System.err.println("missing body: " + new String(row));
56
			return;
57
		}
58
		final DNGF prototype = DNGF.parseFrom(body);
59

  
60
		if (prototype.getDataInfo().getDeletedbyinference()) {
61
			context.getCounter(type, "bodies already deleted").increment(1);
62
		} else {
63
			final DNGF.Builder oafRoot = DNGF.newBuilder(prototype);
64
			oafRoot.getDataInfoBuilder().setDeletedbyinference(true).setInferred(true).setInferenceprovenance(dedupConf.getWf().getConfigurationId());
65
			final byte[] family = Bytes.toBytes(type);
66
			final Put put = new Put(row).add(family, HBaseTableDAO.cfMetadataByte(), oafRoot.build().toByteArray());
67
			put.setWriteToWAL(JobParams.WRITE_TO_WAL);
68
			context.write(new ImmutableBytesWritable(row), put);
69
			context.getCounter(type, "bodies marked deleted").increment(1);
70
		}
71
	}
72

  
73
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/util/UpdateMerger.java
6 6
import java.util.Map.Entry;
7 7

  
8 8
import com.google.common.collect.Maps;
9
import eu.dnetlib.data.graph.model.DNGFDecoder;
10
import eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO;
9 11
import eu.dnetlib.data.proto.WdsDatasetProtos.WdsDataset;
10 12
import org.apache.hadoop.hbase.util.Bytes;
11 13
import org.apache.hadoop.mapreduce.Mapper.Context;
......
39 41
	private static DNGF doMerge(final Context context, final Map<String, byte[]> map)
40 42
			throws InvalidProtocolBufferException {
41 43

  
42
		final byte[] value = map.get(DedupUtils.BODY_S);
44
		final byte[] value = map.get(HBaseTableDAO.cfMetadata());
43 45
		if (value == null) return null;
44 46

  
45 47
		DNGF.Builder builder = DNGF.newBuilder(DNGFDecoder.decode(value, WdsDataset.geolocation).getDNGF());
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/util/AbstractRecordFactory.java
10 10
import eu.dnetlib.data.mapreduce.hbase.index.config.ContextMapper;
11 11
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
12 12
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
13
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses;
14 13
import eu.dnetlib.data.proto.DNGFProtos;
15 14
import eu.dnetlib.data.proto.FieldTypeProtos;
16 15
import eu.dnetlib.data.proto.PublicationProtos;
......
34 33
    protected List<DNGFDecoder> children = Lists.newLinkedList();
35 34
    protected EntityConfigTable entityConfigTable;
36 35
    protected ContextMapper contextMapper;
37
    protected RelClasses relClasses;
36

  
38 37
    protected Map<String, Integer> counters = Maps.newHashMap();
39 38
    protected boolean entityDefaults;
40 39
    protected boolean relDefaults;
......
49 48
        this.relDefaults = relDefaults;
50 49
        this.childDefaults = childDefaults;
51 50
        this.ontologies = ontologies;
52

  
53 51
    }
54 52

  
55 53
    public Map<String, Integer> getRelCounters() {
56 54
        return relCounters;
57 55
    }
58 56

  
59
    public RelClasses getRelClasses() {
60
        return relClasses;
61
    }
62

  
63 57
    public void setMainEntity(final DNGFDecoder mainEntity) {
64 58
        this.mainEntity = mainEntity;
65 59
        this.key = mainEntity.decodeEntity().getId();
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupDeleteSimRelMapper.java
1
//package eu.dnetlib.data.mapreduce.hbase.dedup;
2
//
3
//import java.io.IOException;
4
//import java.util.Map;
5
//
6
//import org.apache.hadoop.hbase.client.Delete;
7
//import org.apache.hadoop.hbase.client.Result;
8
//import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
//import org.apache.hadoop.hbase.mapreduce.TableMapper;
10
//import org.apache.hadoop.io.Writable;
11
//
12
//import eu.dnetlib.data.mapreduce.JobParams;
13
//import eu.dnetlib.data.mapreduce.util.DedupUtils;
14
//import eu.dnetlib.data.proto.TypeProtos.Type;
15
//import eu.dnetlib.pace.config.DedupConfig;
16
//
17
//public class DedupDeleteSimRelMapper extends TableMapper<ImmutableBytesWritable, Writable> {
18
//
19
//	private DedupConfig dedupConf;
20
//
21
//	private ImmutableBytesWritable outKey;
22
//
23
//	@Override
24
//	protected void setup(final Context context) throws IOException, InterruptedException {
25
//		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
26
//		System.out.println("dedup findRoots mapper\nwf conf: " + dedupConf.toString());
27
//
28
//		outKey = new ImmutableBytesWritable();
29
//	}
30
//
31
//	@Override
32
//	protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException {
33
//		// System.out.println("Find root mapping: " + new String(rowkey.copyBytes()));
34
//
35
//		final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
36
//
37
//		final Map<byte[], byte[]> similarRels = value.getFamilyMap(DedupUtils.getSimilarityCFBytes(type));
38
//
39
//		if ((similarRels != null) && !similarRels.isEmpty()) {
40
//
41
//			final byte[] row = rowkey.copyBytes();
42
//			final Delete delete = new Delete(row);
43
//
44
//			for (final byte[] q : similarRels.keySet()) {
45
//				delete.deleteColumns(DedupUtils.getSimilarityCFBytes(type), q);
46
//			}
47
//
48
//			outKey.set(row);
49
//			context.write(outKey, delete);
50
//			context.getCounter(dedupConf.getWf().getEntityType(), "similarity deleted").increment(similarRels.size());
51
//
52
//		} else {
53
//			context.getCounter(dedupConf.getWf().getEntityType(), "row not in similarity mesh").increment(1);
54
//		}
55
//	}
56
//}
1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

  
3
import java.io.IOException;
4
import java.util.Map;
5

  
6
import eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO;
7
import org.apache.hadoop.hbase.client.Delete;
8
import org.apache.hadoop.hbase.client.Result;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.hbase.mapreduce.TableMapper;
11
import org.apache.hadoop.io.Writable;
12

  
13
import eu.dnetlib.data.mapreduce.JobParams;
14
import eu.dnetlib.data.proto.TypeProtos.Type;
15
import eu.dnetlib.pace.config.DedupConfig;
16

  
17
public class DedupDeleteSimRelMapper extends TableMapper<ImmutableBytesWritable, Writable> {
18

  
19
	private DedupConfig dedupConf;
20

  
21
	private ImmutableBytesWritable outKey;
22

  
23
	@Override
24
	protected void setup(final Context context) throws IOException, InterruptedException {
25
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
26
		System.out.println("dedup findRoots mapper\nwf conf: " + dedupConf.toString());
27

  
28
		outKey = new ImmutableBytesWritable();
29
	}
30

  
31
	@Override
32
	protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException {
33
		// System.out.println("Find root mapping: " + new String(rowkey.copyBytes()));
34

  
35
		final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
36

  
37
		final Map<byte[], byte[]> similarRels = value.getFamilyMap(HBaseTableDAO.getSimilarityQualifierBytes(type));
38

  
39
		if ((similarRels != null) && !similarRels.isEmpty()) {
40

  
41
			final byte[] row = rowkey.copyBytes();
42
			final Delete delete = new Delete(row);
43

  
44
			for (final byte[] q : similarRels.keySet()) {
45
				delete.deleteColumns(HBaseTableDAO.getSimilarityQualifierBytes(type), q);
46
			}
47

  
48
			outKey.set(row);
49
			context.write(outKey, delete);
50
			context.getCounter(dedupConf.getWf().getEntityType(), "similarity deleted").increment(similarRels.size());
51

  
52
		} else {
53
			context.getCounter(dedupConf.getWf().getEntityType(), "row not in similarity mesh").increment(1);
54
		}
55
	}
56
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/cc/ConnectedComponentsReducer.java
1
//package eu.dnetlib.data.mapreduce.hbase.dedup.cc;
2
//
3
//import java.io.IOException;
4
//import java.nio.ByteBuffer;
5
//import java.util.Set;
6
//
7
//import com.google.common.collect.Sets;
8
//import eu.dnetlib.data.mapreduce.JobParams;
9
//import eu.dnetlib.data.mapreduce.util.DedupUtils;
10
//import eu.dnetlib.data.proto.DNGFProtos.DNGF;
11
//import eu.dnetlib.data.proto.DNGFProtos.DNGFRel;
12
//import eu.dnetlib.data.proto.KindProtos.Kind;
13
//import eu.dnetlib.data.proto.SubRelProtos.Dedup;
14
//import eu.dnetlib.data.proto.TypeProtos.Type;
15
//import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
16
//import eu.dnetlib.pace.config.DedupConfig;
17
//import org.apache.commons.logging.Log;
18
//import org.apache.commons.logging.LogFactory;
19
//import org.apache.hadoop.hbase.client.Put;
20
//import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
21
//import org.apache.hadoop.hbase.mapreduce.TableReducer;
22
//import org.apache.hadoop.hbase.util.Bytes;
23
//import org.apache.hadoop.io.Text;
24
//
25
///**
26
// * Created by claudio on 15/10/15.
27
// */
28
//public class ConnectedComponentsReducer extends TableReducer<Text, VertexWritable, ImmutableBytesWritable> {
29
//
30
//	private static final Log log = LogFactory.getLog(ConnectedComponentsReducer.class);
31
//
32
//	private DedupConfig dedupConf;
33
//
34
//	private byte[] cfMergedIn;
35
//
36
//	private byte[] cfMerges;
37
//
38
//	@Override
39
//	protected void setup(final Context context) {
40
//
41
//		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
42
//		log.info("dedup findRoots mapper\nwf conf: " + dedupConf.toString());
43
//
44
//		final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
45
//		cfMergedIn = DedupUtils.getDedupCF_mergedInBytes(type);
46
//		cfMerges = DedupUtils.getDedupCF_mergesBytes(type);
47
//	}
48
//
49
//	@Override
50
//	protected void reduce(Text key, Iterable<VertexWritable> values, Context context) throws IOException, InterruptedException {
51
//
52
//		final Set<String> set = Sets.newHashSet();
53
//
54
//		for(VertexWritable v : values) {
55
//			for(Text t : v.getEdges()) {
56
//				set.add(t.toString());
57
//			}
58
//		}
59
//
60
//		final byte[] root = DedupUtils.newIdBytes(ByteBuffer.wrap(Bytes.toBytes(key.toString())), dedupConf.getWf().getDedupRun());
61
//
62
//		for(String q : set) {
63
//			final byte[] qb = Bytes.toBytes(q);
64
//			emitDedupRel(context, cfMergedIn, qb, root, buildRel(qb, root, Dedup.RelName.isMergedIn));
65
//			emitDedupRel(context, cfMerges, root, qb, buildRel(root, qb, Dedup.RelName.merges));
66
//
67
//			context.getCounter(dedupConf.getWf().getEntityType(), "dedupRel (x2)").increment(1);
68
//		}
69
//
70
//	}
71
//
72
//	private void emitDedupRel(final Context context, final byte[] cf, final byte[] from, final byte[] to, final byte[] value) throws IOException,
73
//			InterruptedException {
74
//		final Put put = new Put(from).add(cf, to, value);
75
//		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
76
//		context.write(new ImmutableBytesWritable(from), put);
77
//	}
78
//
79
//	private byte[] buildRel(final byte[] from, final byte[] to, final Dedup.RelName relClass) {
80
//		final DNGFRel.Builder oafRel = DedupUtils.getDedup(dedupConf, new String(from), new String(to), relClass);
81
//		final DNGF oaf =
82
//				DNGF.newBuilder()
83
//						.setKind(Kind.relation)
84
//						.setLastupdatetimestamp(System.currentTimeMillis())
85
//						.setDataInfo(
86
//								AbstractDNetXsltFunctions.getDataInfo(null, "", "0.8", false, true).setInferenceprovenance(
87
//										dedupConf.getWf().getConfigurationId())).setRel(oafRel)
88
//						.build();
89
//		return oaf.toByteArray();
90
//	}
91
//
92
//}
1
package eu.dnetlib.data.mapreduce.hbase.dedup.cc;
2

  
3
import java.io.IOException;
4
import java.nio.ByteBuffer;
5
import java.util.Set;
6

  
7
import com.google.common.collect.Sets;
8
import eu.dnetlib.data.mapreduce.JobParams;
9
import eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO;
10
import eu.dnetlib.data.proto.DNGFProtos.DNGF;
11
import eu.dnetlib.data.proto.DNGFProtos.DNGFRel;
12
import eu.dnetlib.data.proto.KindProtos.Kind;
13
import eu.dnetlib.data.proto.TypeProtos.Type;
14
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
15
import eu.dnetlib.pace.config.DedupConfig;
16
import org.apache.commons.logging.Log;
17
import org.apache.commons.logging.LogFactory;
18
import org.apache.hadoop.hbase.client.Put;
19
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
20
import org.apache.hadoop.hbase.mapreduce.TableReducer;
21
import org.apache.hadoop.hbase.util.Bytes;
22
import org.apache.hadoop.io.Text;
23

  
24
/**
25
 * Created by claudio on 15/10/15.
26
 */
27
public class ConnectedComponentsReducer extends TableReducer<Text, VertexWritable, ImmutableBytesWritable> {
28

  
29
	private static final Log log = LogFactory.getLog(ConnectedComponentsReducer.class);
30

  
31
	private DedupConfig dedupConf;
32

  
33
	private byte[] cfMergedIn;
34

  
35
	private byte[] cfMerges;
36

  
37
	@Override
38
	protected void setup(final Context context) {
39

  
40
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
41
		log.info("dedup findRoots mapper\nwf conf: " + dedupConf.toString());
42

  
43
		final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
44
		cfMergedIn = HBaseTableDAO.getDedupQualifier_mergedInBytes(type);
45
		cfMerges = HBaseTableDAO.getDedupQualifier_mergesBytes(type);
46
	}
47

  
48
	@Override
49
	protected void reduce(Text key, Iterable<VertexWritable> values, Context context) throws IOException, InterruptedException {
50

  
51
		final Set<String> set = Sets.newHashSet();
52

  
53
		for(VertexWritable v : values) {
54
			for(Text t : v.getEdges()) {
55
				set.add(t.toString());
56
			}
57
		}
58

  
59
		final byte[] root = HBaseTableDAO.newIdBytes(ByteBuffer.wrap(Bytes.toBytes(key.toString())), dedupConf.getWf().getDedupRun());
60

  
61
		for(String q : set) {
62
			final byte[] qb = Bytes.toBytes(q);
63
			emitDedupRel(context, cfMergedIn, qb, root, buildRel(qb, root, "isMergedIn"));
64
			emitDedupRel(context, cfMerges, root, qb, buildRel(root, qb, "merges"));
65

  
66
			context.getCounter(dedupConf.getWf().getEntityType(), "dedupRel (x2)").increment(1);
67
		}
68

  
69
	}
70

  
71
	private void emitDedupRel(final Context context, final byte[] cf, final byte[] from, final byte[] to, final byte[] value) throws IOException,
72
			InterruptedException {
73
		final Put put = new Put(from).add(cf, to, value);
74
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
75
		context.write(new ImmutableBytesWritable(from), put);
76
	}
77

  
78
	private byte[] buildRel(final byte[] from, final byte[] to, final String relClass) {
79
		final DNGFRel.Builder oafRel = HBaseTableDAO.getDedup(new String(from), new String(to), relClass);
80
		final DNGF oaf =
81
				DNGF.newBuilder()
82
						.setKind(Kind.relation)
83
						.setLastupdatetimestamp(System.currentTimeMillis())
84
						.setDataInfo(
85
								AbstractDNetXsltFunctions.getDataInfo(null, "", "0.8", false, true).setInferenceprovenance(
86
										dedupConf.getWf().getConfigurationId())).setRel(oafRel)
87
						.build();
88
		return oaf.toByteArray();
89
	}
90

  
91
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupMapper.java
1
//package eu.dnetlib.data.mapreduce.hbase.dedup;
2
//
3
//import java.io.IOException;
4
//import java.util.Collection;
5
//import java.util.List;
6
//import java.util.Map;
7
//
8
//import com.google.common.collect.Maps;
9
//import eu.dnetlib.data.mapreduce.JobParams;
10
//import eu.dnetlib.data.mapreduce.util.DedupUtils;
11
//import eu.dnetlib.data.mapreduce.util.DNGFDecoder;
12
//import eu.dnetlib.data.proto.DNGFProtos.DNGFEntity;
13
//import eu.dnetlib.data.proto.TypeProtos.Type;
14
//import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner;
15
//import eu.dnetlib.pace.config.DedupConfig;
16
//import eu.dnetlib.pace.model.MapDocument;
17
//import eu.dnetlib.pace.model.ProtoDocumentBuilder;
18
//import org.apache.commons.logging.Log;
19
//import org.apache.commons.logging.LogFactory;
20
//import org.apache.hadoop.hbase.client.Result;
21
//import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
22
//import org.apache.hadoop.hbase.mapreduce.TableMapper;
23
//import org.apache.hadoop.hbase.util.Bytes;
24
//import org.apache.hadoop.io.Text;
25
//
26
//public class DedupMapper extends TableMapper<Text, ImmutableBytesWritable> {
27
//
28
//	private static final Log log = LogFactory.getLog(DedupMapper.class);
29
//
30
//	private DedupConfig dedupConf;
31
//
32
//	private Map<String, List<String>> blackListMap = Maps.newHashMap();
33
//
34
//	private Text outKey;
35
//
36
//	private ImmutableBytesWritable ibw;
37
//
38
//	@Override
39
//	protected void setup(final Context context) throws IOException, InterruptedException {
40
//
41
//		final String dedupConfJson = context.getConfiguration().get(JobParams.DEDUP_CONF);
42
//
43
//		log.info("pace conf strings");
44
//		log.info("pace conf: " + dedupConfJson);
45
//
46
//		dedupConf = DedupConfig.load(dedupConfJson);
47
//
48
//		blackListMap = dedupConf.getPace().getBlacklists();
49
//
50
//		outKey = new Text();
51
//		ibw = new ImmutableBytesWritable();
52
//
53
//		log.info("pace conf");
54
//		log.info("entity type: " + dedupConf.getWf().getEntityType());
55
//		log.info("clustering: " + dedupConf.getPace().getClustering());
56
//		log.info("conditions: " + dedupConf.getPace().getConditions());
57
//		log.info("fields: " + dedupConf.getPace().getModel());
58
//		log.info("blacklists: " + blackListMap);
59
//		log.info("wf conf: " + dedupConf.toString());
60
//	}
61
//
62
//	@Override
63
//	protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException, InterruptedException {
64
//		// log.info("got key: " + new String(keyIn.copyBytes()));
65
//
66
//		final byte[] body = result.getValue(dedupConf.getWf().getEntityType().getBytes(), DedupUtils.BODY_B);
67
//
68
//		if (body != null) {
69
//
70
//			final DNGFDecoder decoder = DNGFDecoder.decode(body);
71
//			if (decoder.getDNGF().getDataInfo().getDeletedbyinference()) {
72
//				context.getCounter(dedupConf.getWf().getEntityType(), "deleted by inference").increment(1);
73
//				return;
74
//			}
75
//
76
//			final DNGFEntity entity = decoder.getEntity();
77
//
78
//			context.getCounter(entity.getType().toString(), "decoded").increment(1);
79
//
80
//			if (entity.getType().equals(Type.valueOf(dedupConf.getWf().getEntityType()))) {
81
//
82
//				// GeneratedMessage metadata = DNGFEntityDecoder.decode(entity).getEntity();
83
//				final MapDocument doc = ProtoDocumentBuilder.newInstance(Bytes.toString(keyIn.copyBytes()), entity, dedupConf.getPace().getModel());
84
//				emitNGrams(context, doc, BlacklistAwareClusteringCombiner.filterAndCombine(doc, dedupConf, blackListMap));
85
//			}
86
//		} else {
87
//			context.getCounter(dedupConf.getWf().getEntityType(), "missing body").increment(1);
88
//		}
89
//	}
90
//
91
//	private void emitNGrams(final Context context, final MapDocument doc, final Collection<String> ngrams) throws IOException, InterruptedException {
92
//		for (final String ngram : ngrams) {
93
//			outKey.set(ngram);
94
//			ibw.set(doc.toByteArray());
95
//			context.write(outKey, ibw);
96
//		}
97
//	}
98
//
99
//}
1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

  
3
import java.io.IOException;
4
import java.util.Collection;
5
import java.util.List;
6
import java.util.Map;
7

  
8
import com.google.common.collect.Maps;
9
import eu.dnetlib.data.graph.model.DNGFDecoder;
10
import eu.dnetlib.data.mapreduce.JobParams;
11
import eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO;
12
import eu.dnetlib.data.proto.DNGFProtos.DNGFEntity;
13
import eu.dnetlib.data.proto.TypeProtos.Type;
14
import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner;
15
import eu.dnetlib.pace.config.DedupConfig;
16
import eu.dnetlib.pace.model.MapDocument;
17
import eu.dnetlib.pace.model.ProtoDocumentBuilder;
18
import org.apache.commons.logging.Log;
19
import org.apache.commons.logging.LogFactory;
20
import org.apache.hadoop.hbase.client.Result;
21
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
22
import org.apache.hadoop.hbase.mapreduce.TableMapper;
23
import org.apache.hadoop.hbase.util.Bytes;
24
import org.apache.hadoop.io.Text;
25

  
26
public class DedupMapper extends TableMapper<Text, ImmutableBytesWritable> {
27

  
28
	private static final Log log = LogFactory.getLog(DedupMapper.class);
29

  
30
	private DedupConfig dedupConf;
31

  
32
	private Map<String, List<String>> blackListMap = Maps.newHashMap();
33

  
34
	private Text outKey;
35

  
36
	private ImmutableBytesWritable ibw;
37

  
38
	@Override
39
	protected void setup(final Context context) throws IOException, InterruptedException {
40

  
41
		final String dedupConfJson = context.getConfiguration().get(JobParams.DEDUP_CONF);
42

  
43
		log.info("pace conf strings");
44
		log.info("pace conf: " + dedupConfJson);
45

  
46
		dedupConf = DedupConfig.load(dedupConfJson);
47

  
48
		blackListMap = dedupConf.getPace().getBlacklists();
49

  
50
		outKey = new Text();
51
		ibw = new ImmutableBytesWritable();
52

  
53
		log.info("pace conf");
54
		log.info("entity type: " + dedupConf.getWf().getEntityType());
55
		log.info("clustering: " + dedupConf.getPace().getClustering());
56
		log.info("conditions: " + dedupConf.getPace().getConditions());
57
		log.info("fields: " + dedupConf.getPace().getModel());
58
		log.info("blacklists: " + blackListMap);
59
		log.info("wf conf: " + dedupConf.toString());
60
	}
61

  
62
	@Override
63
	protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException, InterruptedException {
64
		// log.info("got key: " + new String(keyIn.copyBytes()));
65

  
66
		final byte[] body = result.getValue(dedupConf.getWf().getEntityType().getBytes(), HBaseTableDAO.cfMetadataByte());
67

  
68
		if (body != null) {
69

  
70
			final DNGFDecoder decoder = DNGFDecoder.decode(body);
71
			if (decoder.getDNGF().getDataInfo().getDeletedbyinference()) {
72
				context.getCounter(dedupConf.getWf().getEntityType(), "deleted by inference").increment(1);
73
				return;
74
			}
75

  
76
			final DNGFEntity entity = decoder.getEntity();
77

  
78
			context.getCounter(entity.getType().toString(), "decoded").increment(1);
79

  
80
			if (entity.getType().equals(Type.valueOf(dedupConf.getWf().getEntityType()))) {
81

  
82
				// GeneratedMessage metadata = DNGFEntityDecoder.decode(entity).getEntity();
83
				final MapDocument doc = ProtoDocumentBuilder.newInstance(Bytes.toString(keyIn.copyBytes()), entity, dedupConf.getPace().getModel());
84
				emitNGrams(context, doc, BlacklistAwareClusteringCombiner.filterAndCombine(doc, dedupConf, blackListMap));
85
			}
86
		} else {
87
			context.getCounter(dedupConf.getWf().getEntityType(), "missing body").increment(1);
88
		}
89
	}
90

  
91
	private void emitNGrams(final Context context, final MapDocument doc, final Collection<String> ngrams) throws IOException, InterruptedException {
92
		for (final String ngram : ngrams) {
93
			outKey.set(ngram);
94
			ibw.set(doc.toByteArray());
95
			context.write(outKey, ibw);
96
		}
97
	}
98

  
99
}
modules/dnet-mapreduce-jobs/trunk/pom.xml
314 314
		</dependency>
315 315

  
316 316
		<dependency>
317
			<groupId>org.apache.commons</groupId>
318
			<artifactId>commons-lang3</artifactId>
319
			<version>${commons.lang.version}</version>
320
		</dependency>
321
		<dependency>
317 322
			<groupId>com.typesafe</groupId>
318 323
			<artifactId>config</artifactId>
319 324
			<version>1.3.0</version>
......
324 329
			<artifactId>mongo-java-driver</artifactId>
325 330
			<version>${mongodb.driver.version}</version>
326 331
		</dependency>
327
                <dependency>
332
        <dependency>
328 333
			<groupId>org.elasticsearch</groupId>
329 334
			<artifactId>elasticsearch-hadoop-mr</artifactId>
330 335
			<version>2.0.2</version>
......
335 340
			<version>1.8.5</version>
336 341
			<scope>test</scope>
337 342
		</dependency>
338

  
339

  
340 343
		<dependency>
344
			<groupId>org.apache.zookeeper</groupId>
345
			<artifactId>zookeeper</artifactId>
346
			<version>3.4.5-cdh4.3.0</version>
347
			<scope>test</scope>
348
		</dependency>
349
		<dependency>
341 350
			<groupId>org.json</groupId>
342 351
			<artifactId>json</artifactId>
343 352
			<version>20160212</version>
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/SimpleDedupPersonMapper.java
1
//package eu.dnetlib.data.mapreduce.hbase.dedup;
2
//
3
//import java.io.IOException;
4
//
5
//import org.apache.hadoop.hbase.client.Result;
6
//import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
7
//import org.apache.hadoop.hbase.mapreduce.TableMapper;
8
//import org.apache.hadoop.io.Text;
9
//
10
//import eu.dnetlib.data.mapreduce.JobParams;
11
//import eu.dnetlib.data.mapreduce.util.DedupUtils;
12
//import eu.dnetlib.data.mapreduce.util.DNGFDecoder;
13
//import eu.dnetlib.pace.config.DedupConfig;
14
//import eu.dnetlib.pace.model.Person;
15
//
16
//public class SimpleDedupPersonMapper extends TableMapper<Text, ImmutableBytesWritable> {
17
//
18
//	private DedupConfig dedupConf;
19
//
20
//	private Text rowKey;
21
//
22
//	private ImmutableBytesWritable ibw;
23
//
24
//	@Override
25
//	protected void setup(final Context context) throws IOException, InterruptedException {
26
//		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
27
//		rowKey = new Text();
28
//		ibw = new ImmutableBytesWritable();
29
//	}
30
//
31
//	@Override
32
//	protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException, InterruptedException {
33
//		// System.out.println("got key: " + new String(keyIn.copyBytes()));
34
//
35
//		if (DedupUtils.isRoot(new String(keyIn.copyBytes()))) {
36
//			context.getCounter(dedupConf.getWf().getEntityType(), "roots skipped").increment(1);
37
//			return;
38
//		}
39
//		final byte[] body = result.getValue(dedupConf.getWf().getEntityType().getBytes(), DedupUtils.BODY_B);
40
//
41
//		if (body != null) {
42
//			try {
43
//				final DNGFDecoder decoder = DNGFDecoder.decode(body);
44
//
45
//				final String hash = new Person(decoder.getEntity().getPerson().getMetadata().getFullname().getValue(), false).hash();
46
//				// String hash = new Person(getPersonName(decoder), true).hash();
47
//
48
//				rowKey.set(hash);
49
//				ibw.set(body);
50
//				context.write(rowKey, ibw);
51
//
52
//			} catch (final Throwable e) {
53
//				System.out.println("GOT EX " + e);
54
//				e.printStackTrace(System.err);
55
//				context.getCounter(dedupConf.getWf().getEntityType(), e.getClass().toString()).increment(1);
56
//			}
57
//		} else {
58
//			context.getCounter(dedupConf.getWf().getEntityType(), "missing body").increment(1);
59
//		}
60
//	}
61
//
62
//	// private String getPersonName(DNGFDecoder decoder) {
63
//	// Metadata m = decoder.getEntity().getPerson().getMetadata();
64
//	// String secondnames = Joiner.on(" ").join(m.getSecondnamesList());
65
//	//
66
//	// return isValid(m.getFullname()) ? m.getFullname() : (secondnames + ", " + m.getFirstname());
67
//	// }
68
//
69
//	// private boolean isValid(String fullname) {
70
//	// return fullname != null && !fullname.isEmpty();
71
//	// }
72
//
73
//}
1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

  
3
import java.io.IOException;
4

  
5
import eu.dnetlib.data.graph.model.DNGFDecoder;
6
import eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO;
7
import org.apache.hadoop.hbase.client.Result;
8
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
import org.apache.hadoop.hbase.mapreduce.TableMapper;
10
import org.apache.hadoop.io.Text;
11

  
12
import eu.dnetlib.data.mapreduce.JobParams;
13
import eu.dnetlib.pace.config.DedupConfig;
14
import eu.dnetlib.pace.model.Person;
15

  
16
public class SimpleDedupPersonMapper extends TableMapper<Text, ImmutableBytesWritable> {
17

  
18
	private DedupConfig dedupConf;
19

  
20
	private Text rowKey;
21

  
22
	private ImmutableBytesWritable ibw;
23

  
24
	@Override
25
	protected void setup(final Context context) throws IOException, InterruptedException {
26
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
27
		rowKey = new Text();
28
		ibw = new ImmutableBytesWritable();
29
	}
30

  
31
	@Override
32
	protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException, InterruptedException {
33
		// System.out.println("got key: " + new String(keyIn.copyBytes()));
34

  
35
		if (HBaseTableDAO.isRoot(new String(keyIn.copyBytes()))) {
36
			context.getCounter(dedupConf.getWf().getEntityType(), "roots skipped").increment(1);
37
			return;
38
		}
39
		final byte[] body = result.getValue(dedupConf.getWf().getEntityType().getBytes(), HBaseTableDAO.cfMetadataByte());
40

  
41
		if (body != null) {
42
			try {
43
				final DNGFDecoder decoder = DNGFDecoder.decode(body);
44

  
45
				final String hash = new Person(decoder.getEntity().getPerson().getMetadata().getFullname().getValue(), false).hash();
46
				// String hash = new Person(getPersonName(decoder), true).hash();
47

  
48
				rowKey.set(hash);
49
				ibw.set(body);
50
				context.write(rowKey, ibw);
51

  
52
			} catch (final Throwable e) {
53
				System.out.println("GOT EX " + e);
54
				e.printStackTrace(System.err);
55
				context.getCounter(dedupConf.getWf().getEntityType(), e.getClass().toString()).increment(1);
56
			}
57
		} else {
58
			context.getCounter(dedupConf.getWf().getEntityType(), "missing body").increment(1);
59
		}
60
	}
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff