Project

General

Profile

« Previous | Next » 

Revision 49029

getting rid of person entities

View differences:

modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/preprocess/ExportFullnameMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.preprocess;
2

  
3
import java.io.IOException;
4

  
5
import org.apache.hadoop.hbase.client.Result;
6
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
7
import org.apache.hadoop.hbase.mapreduce.TableMapper;
8
import org.apache.hadoop.hbase.util.Bytes;
9
import org.apache.hadoop.io.Text;
10

  
11
import eu.dnetlib.data.mapreduce.util.DedupUtils;
12
import eu.dnetlib.data.mapreduce.util.OafDecoder;
13
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
14
import eu.dnetlib.data.proto.TypeProtos.Type;
15

  
16
/**
17
 * builds map {merged author -> anchorId}
18
 *
19
 * @author claudio
20
 *
21
 */
22
public class ExportFullnameMapper extends TableMapper<ImmutableBytesWritable, Text> {
23

  
24
	private ImmutableBytesWritable outKey;
25

  
26
	private Text outValue;
27

  
28
	@Override
29
	protected void setup(final Context context) {
30
		outKey = new ImmutableBytesWritable(Bytes.toBytes("1"));
31
		outValue = new Text();
32
	}
33

  
34
	@Override
35
	protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException {
36

  
37
		final OafRowKeyDecoder rkd = OafRowKeyDecoder.decode(rowkey.copyBytes());
38

  
39
		if (!Type.person.equals(rkd.getType())) {
40
			context.getCounter(rkd.getType().toString(), "skipped").increment(1);
41
		}
42

  
43
		final byte[] body = value.getValue(Bytes.toBytes(Type.person.toString()), DedupUtils.BODY_B);
44

  
45
		final OafDecoder d = OafDecoder.decode(body);
46

  
47
		final String fullname = d.getEntity().getPerson().getMetadata().getFullname().getValue();
48

  
49
		outValue.set(fullname);
50

  
51
		context.write(outKey, outValue);
52
	}
53

  
54
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/preprocess/ExportFullnameReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.preprocess;
2

  
3
import java.io.IOException;
4

  
5
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
6
import org.apache.hadoop.io.Text;
7
import org.apache.hadoop.mapreduce.Reducer;
8

  
9
public class ExportFullnameReducer extends Reducer<ImmutableBytesWritable, Text, Text, Text> {
10

  
11
	private Text outValue;
12

  
13
	@Override
14
	protected void setup(final Context context) throws IOException, InterruptedException {
15

  
16
		outValue = new Text("");
17
	}
18

  
19
	@Override
20
	protected void reduce(final ImmutableBytesWritable key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
21

  
22
		for (final Text fullname : values) {
23
			context.write(fullname, outValue);
24

  
25
		}
26
	}
27

  
28
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/FindPersonCoauthorsReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

  
3
public class FindPersonCoauthorsReducer {
4

  
5
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/FindPersonCoauthorsMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

  
3
import java.io.IOException;
4
import java.util.List;
5
import java.util.NavigableMap;
6

  
7
import org.apache.hadoop.hbase.client.Result;
8
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
import org.apache.hadoop.hbase.mapreduce.TableMapper;
10
import org.apache.hadoop.hbase.util.Bytes;
11
import org.apache.hadoop.io.Text;
12
import org.apache.http.HttpResponse;
13
import org.apache.http.client.HttpClient;
14
import org.apache.http.client.methods.HttpPost;
15
import org.apache.http.client.methods.HttpUriRequest;
16
import org.apache.http.impl.client.DefaultHttpClient;
17
import org.apache.http.params.BasicHttpParams;
18
import org.apache.http.params.HttpParams;
19

  
20
import com.google.common.collect.Lists;
21
import com.google.gson.Gson;
22

  
23
import eu.dnetlib.data.mapreduce.hbase.VolatileColumnFamily;
24
import eu.dnetlib.data.proto.RelTypeProtos.RelType;
25

  
26
public class FindPersonCoauthorsMapper extends TableMapper<Text, Text> {
27

  
28
	private final HttpClient client = new DefaultHttpClient();
29

  
30
	private final String url = "http://146.48.87.97:8888/addData";
31

  
32
	@Override
33
	protected void setup(final Context context) {
34
		// url = context.getConfiguration().get("dedup.person.coauthors.service.url");
35
	}
36

  
37
	@Override
38
	protected void map(final ImmutableBytesWritable rowkey, final Result row, final Context context) throws IOException, InterruptedException {
39
		final NavigableMap<byte[], byte[]> candidates = row.getFamilyMap(Bytes.toBytes(VolatileColumnFamily.dedupPerson.toString()));
40
		if ((candidates == null) || candidates.isEmpty()) return;
41

  
42
		final List<String> coauthors = Lists.newArrayList();
43
		for (final byte[] b : row.getFamilyMap(Bytes.toBytes(RelType.personResult.toString())).keySet()) {
44
			coauthors.add(Bytes.toString(b));
45
		}
46

  
47
		for (final byte[] candidate : candidates.keySet()) {
48
			emit(context, Bytes.toString(candidate), coauthors);
49
		}
50
	}
51

  
52
	private void emit(final Context context, final String candidate, final List<String> coauthors) {
53
		try {
54
			final HttpUriRequest request = new HttpPost(url);
55
			final HttpParams params = new BasicHttpParams();
56
			params.setParameter("id", candidate);
57
			params.setParameter("data", (new Gson().toJson(coauthors)));
58
			request.setParams(params);
59
			final HttpResponse response = client.execute(request);
60
			context.getCounter("HTTP call", "code " + response.getStatusLine().getStatusCode()).increment(1);
61
		} catch (final Exception e) {
62
			context.getCounter("HTTP call", "Exception " + e.getClass()).increment(1);
63
		}
64
	}
65
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupFindRootsPersonMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

  
3
import java.io.IOException;
4
import java.nio.ByteBuffer;
5
import java.util.Map;
6

  
7
import org.apache.hadoop.hbase.client.Result;
8
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
import org.apache.hadoop.hbase.mapreduce.TableMapper;
10
import org.apache.hadoop.hbase.util.Bytes;
11

  
12
import com.google.protobuf.InvalidProtocolBufferException;
13
import com.googlecode.protobuf.format.JsonFormat;
14

  
15
import eu.dnetlib.data.mapreduce.JobParams;
16
import eu.dnetlib.data.mapreduce.util.DedupUtils;
17
import eu.dnetlib.data.mapreduce.util.OafDecoder;
18
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
19
import eu.dnetlib.data.proto.OafProtos.Oaf;
20
import eu.dnetlib.data.proto.PersonProtos.Person;
21
import eu.dnetlib.data.proto.TypeProtos.Type;
22
import eu.dnetlib.pace.config.DedupConfig;
23
import eu.dnetlib.pace.model.gt.GTAuthor;
24

  
25
public class DedupFindRootsPersonMapper extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
26

  
27
	private DedupConfig dedupConf;
28

  
29
	private ImmutableBytesWritable outKey;
30

  
31
	private ImmutableBytesWritable outValue;
32

  
33
	@Override
34
	protected void setup(final Context context) throws IOException, InterruptedException {
35
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
36
		System.out.println("dedup findRoots mapper\nwf conf: " + dedupConf.toString());
37

  
38
		outKey = new ImmutableBytesWritable();
39
		outValue = new ImmutableBytesWritable();
40
	}
41

  
42
	@Override
43
	protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException {
44
		// System.out.println("Find root mapping: " + new String(rowkey.copyBytes()));
45

  
46
		final OafRowKeyDecoder rkd = OafRowKeyDecoder.decode(rowkey.copyBytes());
47

  
48
		if (!Type.person.equals(rkd.getType())) {
49
			context.getCounter(rkd.getType().toString(), "skipped").increment(1);
50
		}
51

  
52
		final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
53
		final Map<byte[], byte[]> similarRels = value.getFamilyMap(DedupUtils.getSimilarityCFBytes(type));
54

  
55
		if ((similarRels != null) && !similarRels.isEmpty()) {
56
			final ByteBuffer min = findMin(rowkey.copyBytes(), similarRels.keySet());
57

  
58
			final byte[] groupingKey = DedupUtils.newIdBytes(min, dedupConf.getWf().getDedupRun());
59

  
60
			final GTAuthor gta = asGTA(context, rowkey, value.getValue(Bytes.toBytes(dedupConf.getWf().getEntityType()), DedupUtils.BODY_B));
61

  
62
			emitBody(context, groupingKey, gta);
63
		} else {
64
			context.getCounter(dedupConf.getWf().getEntityType(), "row not in similarity mesh").increment(1);
65
		}
66
	}
67

  
68
	private GTAuthor asGTA(final Context context, final ImmutableBytesWritable rowkey, final byte[] input) {
69

  
70
		final OafDecoder decoder = OafDecoder.decode(input);
71
		final Oaf oaf = decoder.getOaf();
72

  
73
		final Person person = oaf.getEntity().getPerson();
74

  
75
		final GTAuthor gta = GTAuthor.fromOafJson(new JsonFormat().printToString(person));
76
		final String id = new String(rowkey.copyBytes());
77
		gta.setId(id);
78
		gta.getAuthor().setId(id);
79
		return gta;
80
	}
81

  
82
	private ByteBuffer findMin(final byte[] key, final Iterable<byte[]> keys) {
83
		ByteBuffer bb = ByteBuffer.wrap(key);
84
		for (final byte[] q : keys) {
85
			final ByteBuffer iq = ByteBuffer.wrap(q);
86
			if (bb.compareTo(iq) > 0) {
87
				bb = iq;
88
			}
89
		}
90
		return bb;
91
	}
92

  
93
	private void emitBody(final Context context, final byte[] row, final GTAuthor gta) throws InvalidProtocolBufferException, IOException, InterruptedException {
94

  
95
		outKey.set(row);
96
		outValue.set(toOafByteArray(gta));
97

  
98
		context.write(outKey, outValue);
99
		context.getCounter(dedupConf.getWf().getEntityType(), "in").increment(1);
100
	}
101

  
102
	public byte[] toOafByteArray(final GTAuthor gta) {
103
		// final Oaf oaf = new GTAuthorMapper().map(gta);
104
		// return oaf.toByteArray();
105
		return Bytes.toBytes(gta.toString());
106
	}
107

  
108
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/SimpleDedupPersonMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

  
3
import java.io.IOException;
4

  
5
import org.apache.hadoop.hbase.client.Result;
6
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
7
import org.apache.hadoop.hbase.mapreduce.TableMapper;
8
import org.apache.hadoop.io.Text;
9

  
10
import eu.dnetlib.data.mapreduce.JobParams;
11
import eu.dnetlib.data.mapreduce.util.DedupUtils;
12
import eu.dnetlib.data.mapreduce.util.OafDecoder;
13
import eu.dnetlib.pace.config.DedupConfig;
14
import eu.dnetlib.pace.model.Person;
15

  
16
public class SimpleDedupPersonMapper extends TableMapper<Text, ImmutableBytesWritable> {
17

  
18
	private DedupConfig dedupConf;
19

  
20
	private Text rowKey;
21

  
22
	private ImmutableBytesWritable ibw;
23

  
24
	@Override
25
	protected void setup(final Context context) throws IOException, InterruptedException {
26
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
27
		rowKey = new Text();
28
		ibw = new ImmutableBytesWritable();
29
	}
30

  
31
	@Override
32
	protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException, InterruptedException {
33
		// System.out.println("got key: " + new String(keyIn.copyBytes()));
34

  
35
		if (DedupUtils.isRoot(new String(keyIn.copyBytes()))) {
36
			context.getCounter(dedupConf.getWf().getEntityType(), "roots skipped").increment(1);
37
			return;
38
		}
39
		final byte[] body = result.getValue(dedupConf.getWf().getEntityType().getBytes(), DedupUtils.BODY_B);
40

  
41
		if (body != null) {
42
			try {
43
				final OafDecoder decoder = OafDecoder.decode(body);
44

  
45
				final String hash = new Person(decoder.getEntity().getPerson().getMetadata().getFullname().getValue(), false).hash();
46
				// String hash = new Person(getPersonName(decoder), true).hash();
47

  
48
				rowKey.set(hash);
49
				ibw.set(body);
50
				context.write(rowKey, ibw);
51

  
52
			} catch (final Throwable e) {
53
				System.out.println("GOT EX " + e);
54
				e.printStackTrace(System.err);
55
				context.getCounter(dedupConf.getWf().getEntityType(), e.getClass().toString()).increment(1);
56
			}
57
		} else {
58
			context.getCounter(dedupConf.getWf().getEntityType(), "missing body").increment(1);
59
		}
60
	}
61

  
62
	// private String getPersonName(OafDecoder decoder) {
63
	// Metadata m = decoder.getEntity().getPerson().getMetadata();
64
	// String secondnames = Joiner.on(" ").join(m.getSecondnamesList());
65
	//
66
	// return isValid(m.getFullname()) ? m.getFullname() : (secondnames + ", " + m.getFirstname());
67
	// }
68

  
69
	// private boolean isValid(String fullname) {
70
	// return fullname != null && !fullname.isEmpty();
71
	// }
72

  
73
}
74 0

  
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/FindDedupCandidatePersonsMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

  
3
import java.io.IOException;
4
import java.util.Set;
5

  
6
import org.apache.hadoop.hbase.client.Result;
7
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
8
import org.apache.hadoop.hbase.mapreduce.TableMapper;
9
import org.apache.hadoop.hbase.util.Bytes;
10
import org.apache.hadoop.io.Text;
11

  
12
import com.google.common.base.Function;
13
import com.google.common.collect.Iterables;
14
import com.google.common.collect.Sets;
15

  
16
import eu.dnetlib.data.mapreduce.util.DedupUtils;
17
import eu.dnetlib.data.mapreduce.util.OafDecoder;
18
import eu.dnetlib.data.proto.TypeProtos.Type;
19
import eu.dnetlib.pace.model.PersonComparatorUtils;
20

  
21
public class FindDedupCandidatePersonsMapper extends TableMapper<Text, Text> {
22

  
23
	private static final byte[] PERSON_CF = Type.person.toString().getBytes();
24
	private static final byte[] PERSONRESULT_CF = "personResult".getBytes();
25

  
26
	@Override
27
	protected void setup(final Context context) {
28

  
29
	}
30

  
31
	@Override
32
	protected void map(final ImmutableBytesWritable rowkey, final Result row, final Context context) throws IOException, InterruptedException {
33
		String id = Bytes.toString(rowkey.get());
34
		String fullname = extractFullname(row);
35
		Set<String> resultIds = extractResultIds(row);
36

  
37
		if (fullname != null) {
38
			Text text = (new DedupPersonBean(id, fullname, resultIds)).toText();
39
			for (String k : PersonComparatorUtils.getNgramsForPerson(fullname)) {
40
				context.write(new Text(k), text);
41
			}
42
		}
43
	}
44

  
45
	private Set<String> extractResultIds(final Result row) {
46
		return Sets.newHashSet(Iterables.transform(row.getFamilyMap(PERSONRESULT_CF).keySet(), new Function<byte[], String>() {
47

  
48
			@Override
49
			public String apply(final byte[] b) {
50
				return Bytes.toString(b);
51
			}
52
		}));
53
	}
54

  
55
	private String extractFullname(final Result row) {
56
		byte[] body = row.getValue(PERSON_CF, DedupUtils.BODY_B);
57
		if (body == null) return null;
58
		return OafDecoder.decode(body).getEntity().getPerson().getMetadata().getFullname().getValue();
59
	}
60
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupFindRootsPersonReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

  
3
import java.io.IOException;
4

  
5
import com.google.common.base.Function;
6
import com.google.common.collect.Iterables;
7
import eu.dnetlib.data.mapreduce.JobParams;
8
import eu.dnetlib.data.mapreduce.util.DedupUtils;
9
import eu.dnetlib.data.proto.DedupProtos.Dedup;
10
import eu.dnetlib.data.proto.KindProtos.Kind;
11
import eu.dnetlib.data.proto.OafProtos.Oaf;
12
import eu.dnetlib.data.proto.OafProtos.OafRel;
13
import eu.dnetlib.data.proto.TypeProtos.Type;
14
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
15
import eu.dnetlib.pace.config.DedupConfig;
16
import eu.dnetlib.pace.model.gt.Authors;
17
import eu.dnetlib.pace.model.gt.CoAuthors;
18
import eu.dnetlib.pace.model.gt.GTAuthor;
19
import eu.dnetlib.pace.model.gt.GTAuthorMapper;
20
import org.apache.hadoop.hbase.client.Delete;
21
import org.apache.hadoop.hbase.client.Put;
22
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
23
import org.apache.hadoop.hbase.mapreduce.TableReducer;
24
import org.apache.hadoop.hbase.util.Bytes;
25

  
26
public class DedupFindRootsPersonReducer extends TableReducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
27

  
28
	private DedupConfig dedupConf;
29

  
30
	private ImmutableBytesWritable outKey;
31

  
32
	@Override
33
	protected void setup(final Context context) throws IOException, InterruptedException {
34
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
35
		System.out.println("dedup findRoots mapper\nwf conf: " + dedupConf.toString());
36

  
37
		outKey = new ImmutableBytesWritable();
38
	}
39

  
40
	@Override
41
	protected void reduce(final ImmutableBytesWritable key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException,
42
			InterruptedException {
43

  
44
		final Authors aas = new Authors();
45
		final CoAuthors cas = new CoAuthors();
46
		// final Set<String> dupIds = Sets.newHashSet();
47

  
48
		for (final GTAuthor a : asGTA(context, values)) {
49
			if (a.hasMerged()) {
50
				aas.addAll(a.getMerged());
51
			} else {
52
				aas.add(a.getAuthor());
53
			}
54
			if (a.hasCoAuthors()) {
55
				cas.addAll(a.getCoAuthors());
56
			}
57

  
58
			// dupIds.add(a.getId());
59

  
60
			final byte[] row = Bytes.toBytes(a.getId());
61
			final Delete delete = new Delete(row);
62
			outKey.set(row);
63
			context.write(outKey, delete);
64
			context.getCounter(dedupConf.getWf().getEntityType(), "deleted").increment(1);
65
		}
66

  
67
		// if (aas.isEmpty())
68
		// throw new IllegalArgumentException("empty merged author set, grouping key: " + new String(key.copyBytes()) + ", dupIds: " +
69
		// dupIds);
70

  
71
		final String rootId = hashCodeString(aas);
72
		final GTAuthor gta = new GTAuthor(rootId, aas, cas, true);
73

  
74
		// for (final String id : dupIds) {
75
		// final byte[] row = Bytes.toBytes(id);
76
		// final byte[] root = Bytes.toBytes(rootId);
77
		// emitDedupRel(context, DedupUtils.getDedupCF_mergedInBytes(Type.person), row, root, buildRel(row, root,
78
		// Dedup.RelName.isMergedIn));
79
		// emitDedupRel(context, DedupUtils.getDedupCF_mergesBytes(Type.person), root, row, buildRel(root, row, Dedup.RelName.merges));
80
		//
81
		// context.getCounter(dedupConf.getWf().getEntityType(), "dedupRel (x2)").increment(1);
82
		// }
83

  
84
		final Put put = new Put(Bytes.toBytes(gta.getId()));
85
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
86
		put.add(Bytes.toBytes(dedupConf.getWf().getEntityType()), DedupUtils.BODY_B, toOafByteArray(gta));
87

  
88
		outKey.set(Bytes.toBytes(gta.getId()));
89
		context.write(outKey, put);
90

  
91
		context.getCounter(dedupConf.getWf().getEntityType(), "out").increment(1);
92
	}
93

  
94
	private Iterable<GTAuthor> asGTA(final Context context, final Iterable<ImmutableBytesWritable> values) {
95

  
96
		return Iterables.transform(values, new Function<ImmutableBytesWritable, GTAuthor>() {
97

  
98
			@Override
99
			public GTAuthor apply(final ImmutableBytesWritable input) {
100
				return GTAuthor.fromJson(new String(input.copyBytes()));
101
			}
102
		});
103
	}
104

  
105
	public byte[] toOafByteArray(final GTAuthor gta) {
106
		final Oaf oaf = new GTAuthorMapper().map(gta);
107
		return oaf.toByteArray();
108
	}
109

  
110
	protected String hashCodeString(final Authors ag) {
111
		return getRowKey(String.valueOf(ag.hashCode()));
112
	}
113

  
114
	protected String getRowKey(final String s) {
115
		return AbstractDNetXsltFunctions.oafId(Type.person.toString(), "dedup_wf_001", s);
116
	}
117

  
118
	private byte[] buildRel(final byte[] from, final byte[] to, final Dedup.RelName relClass) {
119
		final OafRel.Builder oafRel = DedupUtils.getDedup(dedupConf, new String(from), new String(to), relClass);
120
		final Oaf oaf =
121
				Oaf.newBuilder()
122
						.setKind(Kind.relation)
123
						.setLastupdatetimestamp(System.currentTimeMillis())
124
						.setDataInfo(
125
								AbstractDNetXsltFunctions.getDataInfo(null, "", "0.8", false, true).setInferenceprovenance(
126
										dedupConf.getWf().getConfigurationId())).setRel(oafRel)
127
						.build();
128
		return oaf.toByteArray();
129
	}
130

  
131
	private void emitDedupRel(final Context context, final byte[] cf, final byte[] from, final byte[] to, final byte[] value) throws
132
			IOException, InterruptedException {
133
		final Put put = new Put(from).add(cf, to, value);
134
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
135
		context.write(new ImmutableBytesWritable(from), put);
136
	}
137

  
138
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/JoinPersonGroupReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment;
2

  
3
import java.io.IOException;
4

  
5
import java.util.List;
6
import java.util.Set;
7

  
8
import com.google.common.base.Function;
9
import com.google.common.base.Splitter;
10
import com.google.common.collect.Iterables;
11
import com.google.common.collect.Lists;
12

  
13
import com.google.common.collect.Sets;
14
import org.apache.commons.lang.StringUtils;
15
import org.apache.commons.logging.Log;
16
import org.apache.commons.logging.LogFactory;
17

  
18
import org.apache.hadoop.io.Text;
19
import org.apache.hadoop.mapreduce.Reducer;
20

  
21
public class JoinPersonGroupReducer extends Reducer<Text, Text, Text, Text> {
22

  
23
	/**
24
	 * logger.
25
	 */
26
	private static final Log log = LogFactory.getLog(JoinPersonGroupReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
27

  
28
	private Text tKey;
29
	private Text tValue;
30

  
31
	private final static int MIN_ENTRIES_THRESHOLD = 1;
32
	private int minEntriesThreshold;
33

  
34
	private final static int MAX_ENTRIES_THRESHOLD = Integer.MAX_VALUE;
35
	private int maxEntriesThreshold;
36

  
37
	private final static int MAX_FEATURES_THRESHOLD = Integer.MAX_VALUE;
38
	private int maxFeaturesThreshold;
39

  
40
	private Set<String> knownHashValues = Sets.newHashSet();
41

  
42
	private boolean passAll = false;
43

  
44
	@Override
45
	protected void setup(final Context context) throws IOException, InterruptedException {
46
		super.setup(context);
47
		tKey = new Text("");
48
		tValue = new Text();
49

  
50
		minEntriesThreshold = context.getConfiguration().getInt("min.entries.threshold", MIN_ENTRIES_THRESHOLD);
51
		maxEntriesThreshold = context.getConfiguration().getInt("max.entries.threshold", MAX_ENTRIES_THRESHOLD);
52
		maxFeaturesThreshold = context.getConfiguration().getInt("max.features.threshold", MAX_FEATURES_THRESHOLD);
53

  
54
		final String hashCsv = context.getConfiguration().get("hash.values.csv", "");
55

  
56
		log.info("hash csv: " + hashCsv);
57
		if (hashCsv.contains("ALL")) {
58
			passAll = true;
59
		}
60

  
61
		for(String hash : Splitter.on(",").omitEmptyStrings().trimResults().split(hashCsv)) {
62
			knownHashValues.add(hash);
63
		}
64

  
65
	}
66

  
67
	@Override
68
	protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
69

  
70
		final CsvSerialiser csvSerialiser = new CsvSerialiser(maxEntriesThreshold, maxFeaturesThreshold);
71
		final String outKey = key.toString().replaceAll("[^a-zA-Z ]", "").toLowerCase();
72

  
73
		if (!passAll && !knownHashValues.contains(outKey)) {
74
			return;
75
		}
76

  
77
		if (StringUtils.isBlank(outKey)) {
78
			context.getCounter("person", "blank key").increment(1);
79
			return;
80
		}
81

  
82
		final List<CsvEntry> entries = Lists.newArrayList(Iterables.transform(values, new Function<Text, CsvEntry>() {
83

  
84
			@Override
85
			public CsvEntry apply(final Text t) {
86
				return CsvEntry.fromJson(t.toString());
87
			}
88
		}));
89

  
90
		trackPersonInfo(entries.size(), context, "person");
91

  
92
		if (entries.size() < minEntriesThreshold || entries.size() > maxEntriesThreshold) {
93
			return;
94
		}
95

  
96
		if (!passAll) {
97
			context.getCounter("person hash", outKey).increment(entries.size());
98
		}
99

  
100
		//tKey.set(outKey);
101
		tValue.set(csvSerialiser.asCSV(entries));
102
		context.write(tKey, tValue);
103

  
104
		context.getCounter("person", "csv").increment(1);
105
	}
106

  
107
	private void trackPersonInfo(final int count, final Context context, final String counterName) {
108

  
109
		if (count > 0 && count <= 10) {
110
			context.getCounter(counterName, count + "").increment(1);
111
			return;
112
		}
113

  
114
		if (count > 10 && count <= 20) {
115
			context.getCounter(counterName, "[10, 20)").increment(1);
116
			return;
117
		}
118

  
119
		if (count > 20 && count <= 30) {
120
			context.getCounter(counterName, "[20, 30)").increment(1);
121
			return;
122
		}
123

  
124
		if (count > 30 && count <= 40) {
125
			context.getCounter(counterName, "[30, 40)").increment(1);
126
			return;
127
		}
128

  
129
		if (count > 40 && count <= 50) {
130
			context.getCounter(counterName, "[40, 50)").increment(1);
131
			return;
132
		}
133

  
134
		if (count > 50 && count <= 70) {
135
			context.getCounter(counterName, "[50, 70)").increment(1);
136
			return;
137
		}
138

  
139
		if (count > 70 && count <= 100) {
140
			context.getCounter(counterName, "[70, 100)").increment(1);
141
			return;
142
		}
143

  
144
		if (count > 100 && count <= 150) {
145
			context.getCounter(counterName, "[100, 150)").increment(1);
146
			return;
147
		}
148

  
149
		if (count > 150 && count <= 200) {
150
			context.getCounter(counterName, "[150, 200)").increment(1);
151
			return;
152
		}
153

  
154
		if (count > 200) {
155
			context.getCounter(counterName, "[200, *)").increment(1);
156
			return;
157
		}
158
	}
159

  
160
	@Override
161
	public void cleanup(final Context context) throws IOException, InterruptedException {
162
		super.cleanup(context);
163
	}
164

  
165
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/JoinPersonGroupMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment;
2

  
3
import java.io.IOException;
4
import java.io.StringReader;
5
import java.util.List;
6
import java.util.Set;
7

  
8
import com.google.common.base.Splitter;
9
import com.google.common.collect.Iterables;
10
import com.google.common.collect.Lists;
11
import com.google.common.collect.Sets;
12
import eu.dnetlib.data.mapreduce.JobParams;
13
import eu.dnetlib.data.mapreduce.util.DedupUtils;
14
import eu.dnetlib.data.mapreduce.util.OafDecoder;
15
import eu.dnetlib.data.proto.PersonProtos;
16
import eu.dnetlib.data.proto.TypeProtos.Type;
17
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
18
import eu.dnetlib.pace.config.DedupConfig;
19
import eu.dnetlib.pace.model.Person;
20
import org.apache.commons.lang.StringUtils;
21
import org.apache.commons.lang.math.RandomUtils;
22
import org.apache.hadoop.hbase.client.Put;
23
import org.apache.hadoop.hbase.client.Result;
24
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
25
import org.apache.hadoop.hbase.mapreduce.TableMapper;
26
import org.apache.hadoop.io.Text;
27
import org.apache.hadoop.mapreduce.Mapper;
28
import org.dom4j.Document;
29
import org.dom4j.Element;
30
import org.dom4j.io.SAXReader;
31

  
32
public class JoinPersonGroupMapper extends Mapper<Text, Text, Text, Text> {
33

  
34
	private static final String SUBJECT_PREFIX = "subject.";
35
	private static final String COAUTHOR_PREFIX = "coauthor.";
36

  
37
	public static final String PERSON = "person";
38

  
39
	private static final int MAX_TOKENS = 5;
40
	private static final int MIN_FEATURES = 10;
41

  
42
	private Text outKey;
43
	private Text outValue;
44

  
45
	private SubjectParser sp;
46

  
47
	@Override
48
	protected void setup(final Context context) throws IOException, InterruptedException {
49
		outKey = new Text();
50
		outValue = new Text();
51

  
52
		sp = new SubjectParser();
53
	}
54

  
55
	@Override
56
	protected void map(final Text key, final Text value, final Context context) throws IOException, InterruptedException {
57
		// System.out.println("got key: " + new String(keyIn.copyBytes()));
58

  
59
		final SAXReader r = new SAXReader();
60
		try {
61
			final Document doc = r.read(new StringReader(value.toString()));
62
			final SubjectsMap sm = sp.parse(doc);
63

  
64
			final CsvEntry entry = new CsvEntry();
65
			for(Subjects subs : sm.values()) {
66
				for(String subject : subs) {
67
					final String s = SUBJECT_PREFIX + cleanup(subject);
68
					entry.addFeature("\"" + s + "\"");
69
				}
70
			}
71

  
72
			final List<Person> authors = getAuthors(doc);
73
			final String title = getTitle(doc);
74
			final String pubId = getId(doc);
75

  
76
			for(Person p1 : authors) {
77

  
78
				context.getCounter(PERSON, "accurate " + p1.isAccurate()).increment(1);
79
				final Set<String> hashes = getOutKeys(p1);
80
				context.getCounter(PERSON, String.format("accurate %s keys", p1.isAccurate())).increment(hashes.size());
81
				for(String s1 : hashes) {
82
					//final String s1 = normalize(p1);
83
					final CsvEntry c = new CsvEntry(s1, entry.getFeatures());
84
					for (Person p2 : authors) {
85
						final String s2 = normalize(p2.getSurnameString());
86
						if (p1.isAccurate() && p2.isAccurate()) {
87
							if (!p1.getSurnameString().equalsIgnoreCase(p2.getSurnameString())) {
88
								c.addFeature("\"" + COAUTHOR_PREFIX + s2.replaceAll("\"", "").replaceAll("\\s+", "_") + "\"");
89
							}
90
						}
91
					}
92

  
93
					final String prefix = StringUtils.substringBefore(pubId, "::");
94
					final String originalId = StringUtils.substringAfter(pubId, "::");
95

  
96
					c.setId(getId(prefix, originalId, p1.getOriginal()));
97
					c.setOriginalName(p1.getOriginal());
98
					c.setTitle(title);
99

  
100
					c.getFeatures().remove(s1);
101

  
102
					if (s1.length() <= 3) {
103
						context.getCounter(PERSON, "key size <= 3").increment(1);
104
						return;
105
					}
106

  
107
					if(c.getFeatures().size() < MIN_FEATURES) {
108
						context.getCounter(PERSON, "features < " + MIN_FEATURES).increment(1);
109
						return;
110
					}
111

  
112
					outKey.set(s1);
113
					outValue.set(c.toString());
114

  
115
					context.write(outKey, outValue);
116
				}
117
			}
118

  
119
		} catch (final Throwable e) {
120
			System.out.println("GOT EX " + e);
121
			e.printStackTrace(System.err);
122
			context.getCounter(PERSON, e.getClass().toString()).increment(1);
123
		}
124
	}
125

  
126
	protected String getId(final String nsPrefix, final String originalId, final String name) {
127

  
128
		final String localId = name.replaceAll("\\s+", " ").trim();
129

  
130
		// person id doesn't depend on the publication id
131
		// return AbstractDNetXsltFunctions.oafId(Type.person.toString(), prefix, localId);
132

  
133
		// person id depends on the publication id and the person name
134
		return AbstractDNetXsltFunctions.oafId(Type.person.toString(), nsPrefix, originalId + "::" + localId);
135
	}
136

  
137
	private String cleanup(final String s) {
138
		return s.replaceAll(" ", "_").replaceAll("\\.", "_").replaceAll("\"", "");
139
	}
140

  
141
	private String getId(final Document doc) {
142
		return doc.valueOf("//*[local-name() = 'objIdentifier']/text()");
143
	}
144

  
145
	private List<Person> getAuthors(final Document doc) {
146
		final List creatorNodes = doc.selectNodes("//*[local-name() = 'creator']");
147
		final List<Person> authors = Lists.newArrayList();
148

  
149
		for(int i = 0; i<creatorNodes.size(); i++) {
150
			final Element e = (Element) creatorNodes.get(i);
151
			authors.add(new Person(e.getText(), false));
152
		}
153
		return authors;
154
	}
155

  
156
	private String getTitle(final Document doc) {
157
		final List titleNodes = doc.selectNodes("//*[local-name() = 'title']");
158
		if (titleNodes != null && titleNodes.size() > 0) {
159
			final Element titleNode = (Element) titleNodes.get(0);
160

  
161
			return titleNode.getText().replaceAll(",", "");
162
		}
163
		return "";
164
	}
165

  
166
	private Set<String> getOutKeys(final Person p1) {
167
		final Set<String> hashes = Sets.newHashSet();
168
		if (p1.isAccurate()) {
169
			for(String name : p1.getName()) {
170
				hashes.add(normalize(p1.getSurnameString() + firstLC(name)));
171
			}
172
		} else {
173
			final String s = normalize(p1.getOriginal());
174
			for (final String token1 : tokens(s)) {
175
				for (final String token2 : tokens(s)) {
176
					if (!token1.equals(token2)) {
177
						hashes.add(firstLC(token1) + token2);
178
					}
179
				}
180
			}
181
		}
182
		return hashes;
183
	}
184

  
185
	private String normalize(final Person p) {
186

  
187
		final String s = p.getSurnameString() + firstLC(p.getNameString());
188
		return normalize(s);
189
	}
190

  
191
	private String normalize(final String s) {
192
		return s.replaceAll("[^a-zA-Z ]", "").toLowerCase().trim();
193
	}
194

  
195
	private Iterable<String> tokens(final String s) {
196
		return Iterables.limit(Splitter.on(" ").omitEmptyStrings().trimResults().split(s), MAX_TOKENS);
197
	}
198

  
199
	private String firstLC(final String s) {
200
		return StringUtils.substring(s, 0, 1).toLowerCase();
201
	}
202

  
203
}
204 0

  
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/AnchorStatsMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment;
2

  
3
import java.io.IOException;
4
import java.nio.charset.Charset;
5
import java.util.List;
6

  
7
import com.google.common.base.Function;
8
import com.google.common.base.Joiner;
9
import com.google.common.collect.Iterables;
10
import eu.dnetlib.data.mapreduce.util.DedupUtils;
11
import eu.dnetlib.data.mapreduce.util.OafDecoder;
12
import eu.dnetlib.data.proto.FieldTypeProtos.StringField;
13
import eu.dnetlib.data.proto.PersonProtos;
14
import org.apache.commons.lang.StringUtils;
15
import org.apache.hadoop.hbase.client.Result;
16
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
17
import org.apache.hadoop.hbase.mapreduce.TableMapper;
18
import org.apache.hadoop.io.NullWritable;
19

  
20
/**
21
 * builds map {merged author -> anchorId}
22
 *
23
 * @author claudio
24
 *
25
 */
26
public class AnchorStatsMapper extends TableMapper<NullWritable, NullWritable> {
27

  
28
	@Override
29
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
30

  
31

  
32
		final byte[] body = value.getValue("person".getBytes(), DedupUtils.BODY_B);
33

  
34
		if (body != null) {
35
			try {
36
				final OafDecoder decoder = OafDecoder.decode(body);
37

  
38
				final PersonProtos.Person p = decoder.getEntity().getPerson();
39

  
40
				if (!p.getAnchor()) {
41
					context.getCounter("person", "not anchor").increment(1);
42
					return;
43
				}
44

  
45
				trackPersonInfo(p.getMergedpersonCount(), context, "person merged");
46
				trackPersonInfo(p.getCoauthorCount(), context, "person coauthors");
47

  
48
			} catch (final Throwable e) {
49
				System.out.println("GOT EX " + e);
50
				//e.printStackTrace(System.err);
51
				context.getCounter("error", e.getClass().toString()).increment(1);
52
			}
53
		} else {
54
			context.getCounter("person", "missing body").increment(1);
55
		}
56
	}
57

  
58
	private void trackPersonInfo(final int count, final Context context, final String counterName) {
59

  
60
		if (count > 0 && count <= 10) {
61
			context.getCounter(counterName, count + "").increment(1);
62
			return;
63
		}
64

  
65
		if (count > 10 && count <= 20) {
66
			context.getCounter(counterName, "[10, 20)").increment(1);
67
			return;
68
		}
69

  
70
		if (count > 20 && count <= 30) {
71
			context.getCounter(counterName, "[20, 30)").increment(1);
72
			return;
73
		}
74

  
75
		if (count > 30 && count <= 40) {
76
			context.getCounter(counterName, "[30, 40)").increment(1);
77
			return;
78
		}
79

  
80
		if (count > 40 && count <= 50) {
81
			context.getCounter(counterName, "[40, 50)").increment(1);
82
			return;
83
		}
84

  
85
		if (count > 50 && count <= 70) {
86
			context.getCounter(counterName, "[50, 70)").increment(1);
87
			return;
88
		}
89

  
90
		if (count > 70 && count <= 100) {
91
			context.getCounter(counterName, "[70, 100)").increment(1);
92
			return;
93
		}
94

  
95
		if (count > 100) {
96
			context.getCounter(counterName, "[100, *)").increment(1);
97
			return;
98
		}
99

  
100
	}
101

  
102
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/gt/CoAuthorUpdateMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.gt;
2

  
3
import java.io.BufferedReader;
4
import java.io.IOException;
5
import java.io.InputStreamReader;
6
import java.util.List;
7
import java.util.Map;
8

  
9
import eu.dnetlib.data.mapreduce.JobParams;
10
import org.apache.commons.lang.StringUtils;
11
import org.apache.hadoop.conf.Configuration;
12
import org.apache.hadoop.fs.FileSystem;
13
import org.apache.hadoop.fs.Path;
14
import org.apache.hadoop.hbase.client.Put;
15
import org.apache.hadoop.hbase.client.Result;
16
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
17
import org.apache.hadoop.hbase.mapreduce.TableMapper;
18
import org.apache.hadoop.hbase.util.Bytes;
19

  
20
import com.google.common.collect.Maps;
21

  
22
import eu.dnetlib.data.mapreduce.util.DedupUtils;
23
import eu.dnetlib.data.mapreduce.util.OafDecoder;
24
import eu.dnetlib.data.proto.OafProtos.Oaf;
25
import eu.dnetlib.data.proto.PersonProtos.Person.CoAuthor;
26
import eu.dnetlib.data.proto.PersonProtos.Person.CoAuthor.Builder;
27
import eu.dnetlib.data.proto.TypeProtos.Type;
28

  
29
public class CoAuthorUpdateMapper extends TableMapper<ImmutableBytesWritable, Put> {
30

  
31
	private Map<String, String> mergedToAnchor;
32

  
33
	@Override
34
	protected void setup(final Context context) throws IOException, InterruptedException {
35

  
36
		mergedToAnchor = loadMap(context.getConfiguration());
37

  
38
		System.out.println("anchor map size: " + mergedToAnchor.size());
39
	}
40

  
41
	@Override
42
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
43

  
44
		final byte[] body = value.getValue(Bytes.toBytes(Type.person.toString()), DedupUtils.BODY_B);
45

  
46
		final OafDecoder d = OafDecoder.decode(body);
47

  
48
		final Oaf.Builder oafBuilder = Oaf.newBuilder(d.getOaf());
49

  
50
		final List<Builder> coAuthors = oafBuilder.getEntityBuilder().getPersonBuilder().getCoauthorBuilderList();
51

  
52
		for (final Builder cb : coAuthors) {
53

  
54
			final String newAnchorId = mergedToAnchor.get(cb.getId());
55
			if (newAnchorId != null) {
56
				context.getCounter("anchor", "hit").increment(1);
57

  
58
				if (!cb.getAnchorId().equals(newAnchorId)) {
59
					cb.setAnchorId(newAnchorId);
60
					context.getCounter("anchor", "updated").increment(1);
61
				}
62
			} else {
63
				context.getCounter("anchor", "miss").increment(1);
64
			}
65
		}
66

  
67
		final Map<String, CoAuthor> coAuthorSet = Maps.newHashMap();
68

  
69
		for (final Builder cb : coAuthors) {
70
			coAuthorSet.put(cb.hasAnchorId() ? cb.getAnchorId() : cb.getId(), cb.build());
71
		}
72

  
73
		oafBuilder.getEntityBuilder().getPersonBuilder().clearCoauthor();
74
		oafBuilder.getEntityBuilder().getPersonBuilder().addAllCoauthor(coAuthorSet.values());
75

  
76
		final Put put = new Put(key.copyBytes());
77
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
78
		put.add(Bytes.toBytes(Type.person.toString()), DedupUtils.BODY_B, oafBuilder.build().toByteArray());
79

  
80
		context.write(key, put);
81

  
82
	}
83

  
84
	private Map<String, String> loadMap(final Configuration conf) throws IOException {
85
		final Map<String, String> map = Maps.newHashMap();
86
		final String filePath = conf.get("mapred.output.dir") + "/part-r-00000";
87
		if (StringUtils.isBlank(filePath)) throw new IllegalArgumentException("missing 'mapred.output.dir'");
88

  
89
		final Path path = new Path(filePath);
90
		final FileSystem fs = FileSystem.get(conf);
91
		final BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path)));
92

  
93
		String line = br.readLine();
94
		while (line != null) {
95
			final String[] split = line.split("=");
96

  
97
			map.put(split[0], split[1]);
98

  
99
			line = br.readLine();
100
		}
101

  
102
		return map;
103
	}
104

  
105
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/gt/BuildMergedAnchorMapMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.gt;
2

  
3
import java.io.IOException;
4
import java.util.Map;
5

  
6
import org.apache.hadoop.hbase.client.Result;
7
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
8
import org.apache.hadoop.hbase.mapreduce.TableMapper;
9
import org.apache.hadoop.hbase.util.Bytes;
10
import org.apache.hadoop.io.Text;
11

  
12
import com.google.common.collect.Maps;
13
import com.google.common.reflect.TypeToken;
14
import com.google.gson.Gson;
15

  
16
import eu.dnetlib.data.mapreduce.util.DedupUtils;
17
import eu.dnetlib.data.mapreduce.util.OafDecoder;
18
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
19
import eu.dnetlib.data.proto.PersonProtos.Person.MergedPerson;
20
import eu.dnetlib.data.proto.TypeProtos.Type;
21

  
22
/**
23
 * builds map {merged author -> anchorId}
24
 *
25
 * @author claudio
26
 *
27
 */
28
public class BuildMergedAnchorMapMapper extends TableMapper<ImmutableBytesWritable, Text> {
29

  
30
	private ImmutableBytesWritable outKey;
31

  
32
	private Text outValue;
33

  
34
	@SuppressWarnings("serial")
35
	final java.lang.reflect.Type token = new TypeToken<Map<String, String>>() {}.getType();
36

  
37
	private Gson gson;
38

  
39
	@Override
40
	protected void setup(final Context context) {
41
		outKey = new ImmutableBytesWritable(Bytes.toBytes("1"));
42
		outValue = new Text();
43

  
44
		gson = new Gson();
45
	}
46

  
47
	@Override
48
	protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException {
49

  
50
		final OafRowKeyDecoder rkd = OafRowKeyDecoder.decode(rowkey.copyBytes());
51

  
52
		if (!Type.person.equals(rkd.getType())) {
53
			context.getCounter(rkd.getType().toString(), "skipped").increment(1);
54
		}
55

  
56
		final byte[] body = value.getValue(Bytes.toBytes(Type.person.toString()), DedupUtils.BODY_B);
57

  
58
		final OafDecoder d = OafDecoder.decode(body);
59

  
60
		final String anchorId = d.getEntity().getId();
61
		final Map<String, String> map = Maps.newHashMap();
62

  
63
		for (final MergedPerson p : d.getEntity().getPerson().getMergedpersonList()) {
64
			map.put(p.getId(), anchorId);
65
		}
66

  
67
		outValue.set(gson.toJson(map, token));
68

  
69
		context.write(outKey, outValue);
70

  
71
	}
72
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/gt/GTCleanerMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.gt;
2

  
3
import java.io.IOException;
4
import java.util.Map;
5

  
6
import org.apache.hadoop.hbase.client.Delete;
7
import org.apache.hadoop.hbase.client.Result;
8
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
import org.apache.hadoop.hbase.mapreduce.TableMapper;
10
import org.apache.hadoop.hbase.util.Bytes;
11
import org.apache.hadoop.mapreduce.Counter;
12

  
13
import com.google.protobuf.InvalidProtocolBufferException;
14

  
15
import eu.dnetlib.data.mapreduce.util.DedupUtils;
16
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
17
import eu.dnetlib.data.proto.KindProtos.Kind;
18
import eu.dnetlib.data.proto.OafProtos.Oaf;
19
import eu.dnetlib.data.proto.OafProtos.OafEntity;
20
import eu.dnetlib.data.proto.PersonProtos.Person;
21
import eu.dnetlib.data.proto.TypeProtos.Type;
22

  
23
/**
24
 * Removes the Non-root rows
25
 *
26
 *
27
 * @author claudio
28
 *
29
 */
30
public class GTCleanerMapper extends TableMapper<ImmutableBytesWritable, Delete> {
31

  
32
	@Override
33
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
34

  
35
		final OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
36

  
37
		final Type type = keyDecoder.getType();
38

  
39
		if (!type.equals(Type.person)) {
40
			incrementCounter(context, "wrong entity type", type.toString(), 1);
41
			return;
42
		}
43

  
44
		final Map<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(type.toString()));
45
		final byte[] bodyB = map.get(DedupUtils.BODY_B);
46
		if (bodyB == null) {
47
			incrementCounter(context, "missing body (map)", type.toString(), 1);
48
			return;
49
		}
50

  
51
		final Oaf oaf = decodeProto(context, bodyB);
52

  
53
		if (!isValid(oaf)) {
54
			incrementCounter(context, "missing body (map)", type.toString(), 1);
55
			return;
56
		}
57

  
58
		if (mergedSize(oaf, 0) || mergedSize(oaf, 1)) {
59
			context.write(keyIn, new Delete(keyIn.copyBytes()));
60
			incrementCounter(context, Kind.entity.toString(), "deleted", 1);
61
		}
62

  
63
	}
64

  
65
	private boolean mergedSize(final Oaf oaf, final int size) {
66
		final OafEntity entity = oaf.getEntity();
67

  
68
		if (entity == null) return false;
69

  
70
		final Person person = entity.getPerson();
71

  
72
		return (person.getMergedpersonList() != null) && (person.getMergedpersonList().size() == size);
73
	}
74

  
75
	private boolean isValid(final Oaf oaf) {
76
		return (oaf != null) && oaf.isInitialized();
77
	}
78

  
79
	private Oaf decodeProto(final Context context, final byte[] body) {
80
		try {
81
			return Oaf.parseFrom(body);
82
		} catch (final InvalidProtocolBufferException e) {
83
			e.printStackTrace(System.err);
84
			context.getCounter("decodeProto", e.getClass().getName()).increment(1);
85
		}
86
		return null;
87
	}
88

  
89
	private void incrementCounter(final Context context, final String k, final String t, final int n) {
90
		getCounter(context, k, t).increment(n);
91
	}
92

  
93
	private Counter getCounter(final Context context, final String k, final String t) {
94
		return context.getCounter(k, t);
95
	}
96

  
97
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/gt/RootPersonExportMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.gt;
2

  
3
import java.io.IOException;
4
import java.util.Map;
5

  
6
import org.apache.hadoop.hbase.client.Result;
7
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
8
import org.apache.hadoop.hbase.mapreduce.TableMapper;
9
import org.apache.hadoop.hbase.util.Bytes;
10
import org.apache.hadoop.io.Text;
11
import org.apache.hadoop.mapreduce.Counter;
12

  
13
import com.google.protobuf.InvalidProtocolBufferException;
14
import com.googlecode.protobuf.format.JsonFormat;
15

  
16
import eu.dnetlib.data.mapreduce.util.DedupUtils;
17
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
18
import eu.dnetlib.data.proto.KindProtos.Kind;
19
import eu.dnetlib.data.proto.OafProtos.Oaf;
20
import eu.dnetlib.data.proto.OafProtos.OafEntity;
21
import eu.dnetlib.data.proto.PersonProtos.Person;
22
import eu.dnetlib.data.proto.TypeProtos.Type;
23

  
24
public class RootPersonExportMapper extends TableMapper<Text, Text> {
25

  
26
	private Text outKey;
27

  
28
	private Text outValue;
29

  
30
	@Override
31
	protected void setup(final Context context) throws IOException, InterruptedException {
32
		outKey = new Text("");
33
		outValue = new Text();
34
	}
35

  
36
	@Override
37
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
38

  
39
		final OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
40

  
41
		final Type type = keyDecoder.getType();
42
		// if (!DedupUtils.isRoot(keyDecoder.getId())) {
43
		// incrementCounter(context, "not root id", type.toString(), 1);
44
		// return;
45
		// }
46

  
47
		if (!type.equals(Type.person)) {
48
			incrementCounter(context, "wrong entity type", type.toString(), 1);
49
			return;
50
		}
51

  
52
		final Map<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(type.toString()));
53
		final byte[] bodyB = map.get(DedupUtils.BODY_B);
54
		if (bodyB == null) {
55
			incrementCounter(context, "missing body (map)", type.toString(), 1);
56
			return;
57
		}
58

  
59
		final Oaf oaf = decodeProto(context, bodyB);
60

  
61
		if (!isValid(oaf)) {
62
			incrementCounter(context, "missing body (map)", type.toString(), 1);
63
			return;
64
		}
65

  
66
		if (mergedSize(oaf, 0)) {
67
			incrementCounter(context, "merge size = 0", type.toString(), 1);
68
			return;
69
		}
70
		if (mergedSize(oaf, 1)) {
71
			incrementCounter(context, "merge size = 1", type.toString(), 1);
72
			return;
73
		}
74

  
75
		emit(new String(keyIn.copyBytes()), context, oaf);
76
		incrementCounter(context, Kind.entity.toString(), getEntityType(oaf, type), 1);
77
	}
78

  
79
	private boolean mergedSize(final Oaf oaf, final int size) {
80
		final OafEntity entity = oaf.getEntity();
81

  
82
		if (entity == null) return false;
83

  
84
		final Person person = entity.getPerson();
85

  
86
		return (person.getMergedpersonList() != null) && (person.getMergedpersonList().size() == size);
87
	}
88

  
89
	private boolean isValid(final Oaf oaf) {
90
		return (oaf != null) && oaf.isInitialized();
91
	}
92

  
93
	private Oaf decodeProto(final Context context, final byte[] body) {
94
		try {
95
			return Oaf.parseFrom(body);
96
		} catch (final InvalidProtocolBufferException e) {
97
			e.printStackTrace(System.err);
98
			context.getCounter("decodeProto", e.getClass().getName()).increment(1);
99
		}
100
		return null;
101
	}
102

  
103
	private void emit(final String key, final Context context, final Oaf oaf) throws IOException, InterruptedException {
104
		// outKey.set(key);
105
		outValue.set(new JsonFormat().printToString(oaf));
106

  
107
		context.write(outKey, outValue);
108
	}
109

  
110
	private void incrementCounter(final Context context, final String k, final String t, final int n) {
111
		getCounter(context, k, t).increment(n);
112
	}
113

  
114
	private Counter getCounter(final Context context, final String k, final String t) {
115
		return context.getCounter(k, t);
116
	}
117

  
118
	private String getEntityType(final Oaf oaf, final Type type) {
119
		switch (type) {
120
		case result:
121
			return oaf.getEntity().getResult().getMetadata().getResulttype().getClassid();
122
		default:
123
			return type.toString();
124
		}
125
	}
126

  
127
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/gt/BuildMergedAnchorMapReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.gt;
2

  
3
import java.io.IOException;
4
import java.util.Map;
5
import java.util.Map.Entry;
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff