Revision 49029
Added by Claudio Atzori over 6 years ago
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/preprocess/ExportFullnameMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.preprocess; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
|
|
5 |
import org.apache.hadoop.hbase.client.Result; |
|
6 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
7 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
8 |
import org.apache.hadoop.hbase.util.Bytes; |
|
9 |
import org.apache.hadoop.io.Text; |
|
10 |
|
|
11 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
12 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
|
13 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
14 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
15 |
|
|
16 |
/** |
|
17 |
* builds map {merged author -> anchorId} |
|
18 |
* |
|
19 |
* @author claudio |
|
20 |
* |
|
21 |
*/ |
|
22 |
public class ExportFullnameMapper extends TableMapper<ImmutableBytesWritable, Text> { |
|
23 |
|
|
24 |
private ImmutableBytesWritable outKey; |
|
25 |
|
|
26 |
private Text outValue; |
|
27 |
|
|
28 |
@Override |
|
29 |
protected void setup(final Context context) { |
|
30 |
outKey = new ImmutableBytesWritable(Bytes.toBytes("1")); |
|
31 |
outValue = new Text(); |
|
32 |
} |
|
33 |
|
|
34 |
@Override |
|
35 |
protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException { |
|
36 |
|
|
37 |
final OafRowKeyDecoder rkd = OafRowKeyDecoder.decode(rowkey.copyBytes()); |
|
38 |
|
|
39 |
if (!Type.person.equals(rkd.getType())) { |
|
40 |
context.getCounter(rkd.getType().toString(), "skipped").increment(1); |
|
41 |
} |
|
42 |
|
|
43 |
final byte[] body = value.getValue(Bytes.toBytes(Type.person.toString()), DedupUtils.BODY_B); |
|
44 |
|
|
45 |
final OafDecoder d = OafDecoder.decode(body); |
|
46 |
|
|
47 |
final String fullname = d.getEntity().getPerson().getMetadata().getFullname().getValue(); |
|
48 |
|
|
49 |
outValue.set(fullname); |
|
50 |
|
|
51 |
context.write(outKey, outValue); |
|
52 |
} |
|
53 |
|
|
54 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/preprocess/ExportFullnameReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.preprocess; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
|
|
5 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
6 |
import org.apache.hadoop.io.Text; |
|
7 |
import org.apache.hadoop.mapreduce.Reducer; |
|
8 |
|
|
9 |
public class ExportFullnameReducer extends Reducer<ImmutableBytesWritable, Text, Text, Text> { |
|
10 |
|
|
11 |
private Text outValue; |
|
12 |
|
|
13 |
@Override |
|
14 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
15 |
|
|
16 |
outValue = new Text(""); |
|
17 |
} |
|
18 |
|
|
19 |
@Override |
|
20 |
protected void reduce(final ImmutableBytesWritable key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException { |
|
21 |
|
|
22 |
for (final Text fullname : values) { |
|
23 |
context.write(fullname, outValue); |
|
24 |
|
|
25 |
} |
|
26 |
} |
|
27 |
|
|
28 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/FindPersonCoauthorsReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
|
|
3 |
public class FindPersonCoauthorsReducer { |
|
4 |
|
|
5 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/FindPersonCoauthorsMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.List; |
|
5 |
import java.util.NavigableMap; |
|
6 |
|
|
7 |
import org.apache.hadoop.hbase.client.Result; |
|
8 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
9 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
10 |
import org.apache.hadoop.hbase.util.Bytes; |
|
11 |
import org.apache.hadoop.io.Text; |
|
12 |
import org.apache.http.HttpResponse; |
|
13 |
import org.apache.http.client.HttpClient; |
|
14 |
import org.apache.http.client.methods.HttpPost; |
|
15 |
import org.apache.http.client.methods.HttpUriRequest; |
|
16 |
import org.apache.http.impl.client.DefaultHttpClient; |
|
17 |
import org.apache.http.params.BasicHttpParams; |
|
18 |
import org.apache.http.params.HttpParams; |
|
19 |
|
|
20 |
import com.google.common.collect.Lists; |
|
21 |
import com.google.gson.Gson; |
|
22 |
|
|
23 |
import eu.dnetlib.data.mapreduce.hbase.VolatileColumnFamily; |
|
24 |
import eu.dnetlib.data.proto.RelTypeProtos.RelType; |
|
25 |
|
|
26 |
public class FindPersonCoauthorsMapper extends TableMapper<Text, Text> { |
|
27 |
|
|
28 |
private final HttpClient client = new DefaultHttpClient(); |
|
29 |
|
|
30 |
private final String url = "http://146.48.87.97:8888/addData"; |
|
31 |
|
|
32 |
@Override |
|
33 |
protected void setup(final Context context) { |
|
34 |
// url = context.getConfiguration().get("dedup.person.coauthors.service.url"); |
|
35 |
} |
|
36 |
|
|
37 |
@Override |
|
38 |
protected void map(final ImmutableBytesWritable rowkey, final Result row, final Context context) throws IOException, InterruptedException { |
|
39 |
final NavigableMap<byte[], byte[]> candidates = row.getFamilyMap(Bytes.toBytes(VolatileColumnFamily.dedupPerson.toString())); |
|
40 |
if ((candidates == null) || candidates.isEmpty()) return; |
|
41 |
|
|
42 |
final List<String> coauthors = Lists.newArrayList(); |
|
43 |
for (final byte[] b : row.getFamilyMap(Bytes.toBytes(RelType.personResult.toString())).keySet()) { |
|
44 |
coauthors.add(Bytes.toString(b)); |
|
45 |
} |
|
46 |
|
|
47 |
for (final byte[] candidate : candidates.keySet()) { |
|
48 |
emit(context, Bytes.toString(candidate), coauthors); |
|
49 |
} |
|
50 |
} |
|
51 |
|
|
52 |
private void emit(final Context context, final String candidate, final List<String> coauthors) { |
|
53 |
try { |
|
54 |
final HttpUriRequest request = new HttpPost(url); |
|
55 |
final HttpParams params = new BasicHttpParams(); |
|
56 |
params.setParameter("id", candidate); |
|
57 |
params.setParameter("data", (new Gson().toJson(coauthors))); |
|
58 |
request.setParams(params); |
|
59 |
final HttpResponse response = client.execute(request); |
|
60 |
context.getCounter("HTTP call", "code " + response.getStatusLine().getStatusCode()).increment(1); |
|
61 |
} catch (final Exception e) { |
|
62 |
context.getCounter("HTTP call", "Exception " + e.getClass()).increment(1); |
|
63 |
} |
|
64 |
} |
|
65 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupFindRootsPersonMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.nio.ByteBuffer; |
|
5 |
import java.util.Map; |
|
6 |
|
|
7 |
import org.apache.hadoop.hbase.client.Result; |
|
8 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
9 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
10 |
import org.apache.hadoop.hbase.util.Bytes; |
|
11 |
|
|
12 |
import com.google.protobuf.InvalidProtocolBufferException; |
|
13 |
import com.googlecode.protobuf.format.JsonFormat; |
|
14 |
|
|
15 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
16 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
17 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
|
18 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
19 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
20 |
import eu.dnetlib.data.proto.PersonProtos.Person; |
|
21 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
22 |
import eu.dnetlib.pace.config.DedupConfig; |
|
23 |
import eu.dnetlib.pace.model.gt.GTAuthor; |
|
24 |
|
|
25 |
public class DedupFindRootsPersonMapper extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> { |
|
26 |
|
|
27 |
private DedupConfig dedupConf; |
|
28 |
|
|
29 |
private ImmutableBytesWritable outKey; |
|
30 |
|
|
31 |
private ImmutableBytesWritable outValue; |
|
32 |
|
|
33 |
@Override |
|
34 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
35 |
dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF)); |
|
36 |
System.out.println("dedup findRoots mapper\nwf conf: " + dedupConf.toString()); |
|
37 |
|
|
38 |
outKey = new ImmutableBytesWritable(); |
|
39 |
outValue = new ImmutableBytesWritable(); |
|
40 |
} |
|
41 |
|
|
42 |
@Override |
|
43 |
protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException { |
|
44 |
// System.out.println("Find root mapping: " + new String(rowkey.copyBytes())); |
|
45 |
|
|
46 |
final OafRowKeyDecoder rkd = OafRowKeyDecoder.decode(rowkey.copyBytes()); |
|
47 |
|
|
48 |
if (!Type.person.equals(rkd.getType())) { |
|
49 |
context.getCounter(rkd.getType().toString(), "skipped").increment(1); |
|
50 |
} |
|
51 |
|
|
52 |
final Type type = Type.valueOf(dedupConf.getWf().getEntityType()); |
|
53 |
final Map<byte[], byte[]> similarRels = value.getFamilyMap(DedupUtils.getSimilarityCFBytes(type)); |
|
54 |
|
|
55 |
if ((similarRels != null) && !similarRels.isEmpty()) { |
|
56 |
final ByteBuffer min = findMin(rowkey.copyBytes(), similarRels.keySet()); |
|
57 |
|
|
58 |
final byte[] groupingKey = DedupUtils.newIdBytes(min, dedupConf.getWf().getDedupRun()); |
|
59 |
|
|
60 |
final GTAuthor gta = asGTA(context, rowkey, value.getValue(Bytes.toBytes(dedupConf.getWf().getEntityType()), DedupUtils.BODY_B)); |
|
61 |
|
|
62 |
emitBody(context, groupingKey, gta); |
|
63 |
} else { |
|
64 |
context.getCounter(dedupConf.getWf().getEntityType(), "row not in similarity mesh").increment(1); |
|
65 |
} |
|
66 |
} |
|
67 |
|
|
68 |
private GTAuthor asGTA(final Context context, final ImmutableBytesWritable rowkey, final byte[] input) { |
|
69 |
|
|
70 |
final OafDecoder decoder = OafDecoder.decode(input); |
|
71 |
final Oaf oaf = decoder.getOaf(); |
|
72 |
|
|
73 |
final Person person = oaf.getEntity().getPerson(); |
|
74 |
|
|
75 |
final GTAuthor gta = GTAuthor.fromOafJson(new JsonFormat().printToString(person)); |
|
76 |
final String id = new String(rowkey.copyBytes()); |
|
77 |
gta.setId(id); |
|
78 |
gta.getAuthor().setId(id); |
|
79 |
return gta; |
|
80 |
} |
|
81 |
|
|
82 |
private ByteBuffer findMin(final byte[] key, final Iterable<byte[]> keys) { |
|
83 |
ByteBuffer bb = ByteBuffer.wrap(key); |
|
84 |
for (final byte[] q : keys) { |
|
85 |
final ByteBuffer iq = ByteBuffer.wrap(q); |
|
86 |
if (bb.compareTo(iq) > 0) { |
|
87 |
bb = iq; |
|
88 |
} |
|
89 |
} |
|
90 |
return bb; |
|
91 |
} |
|
92 |
|
|
93 |
private void emitBody(final Context context, final byte[] row, final GTAuthor gta) throws InvalidProtocolBufferException, IOException, InterruptedException { |
|
94 |
|
|
95 |
outKey.set(row); |
|
96 |
outValue.set(toOafByteArray(gta)); |
|
97 |
|
|
98 |
context.write(outKey, outValue); |
|
99 |
context.getCounter(dedupConf.getWf().getEntityType(), "in").increment(1); |
|
100 |
} |
|
101 |
|
|
102 |
public byte[] toOafByteArray(final GTAuthor gta) { |
|
103 |
// final Oaf oaf = new GTAuthorMapper().map(gta); |
|
104 |
// return oaf.toByteArray(); |
|
105 |
return Bytes.toBytes(gta.toString()); |
|
106 |
} |
|
107 |
|
|
108 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/SimpleDedupPersonMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
|
|
5 |
import org.apache.hadoop.hbase.client.Result; |
|
6 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
7 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
8 |
import org.apache.hadoop.io.Text; |
|
9 |
|
|
10 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
11 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
12 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
|
13 |
import eu.dnetlib.pace.config.DedupConfig; |
|
14 |
import eu.dnetlib.pace.model.Person; |
|
15 |
|
|
16 |
public class SimpleDedupPersonMapper extends TableMapper<Text, ImmutableBytesWritable> { |
|
17 |
|
|
18 |
private DedupConfig dedupConf; |
|
19 |
|
|
20 |
private Text rowKey; |
|
21 |
|
|
22 |
private ImmutableBytesWritable ibw; |
|
23 |
|
|
24 |
@Override |
|
25 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
26 |
dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF)); |
|
27 |
rowKey = new Text(); |
|
28 |
ibw = new ImmutableBytesWritable(); |
|
29 |
} |
|
30 |
|
|
31 |
@Override |
|
32 |
protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException, InterruptedException { |
|
33 |
// System.out.println("got key: " + new String(keyIn.copyBytes())); |
|
34 |
|
|
35 |
if (DedupUtils.isRoot(new String(keyIn.copyBytes()))) { |
|
36 |
context.getCounter(dedupConf.getWf().getEntityType(), "roots skipped").increment(1); |
|
37 |
return; |
|
38 |
} |
|
39 |
final byte[] body = result.getValue(dedupConf.getWf().getEntityType().getBytes(), DedupUtils.BODY_B); |
|
40 |
|
|
41 |
if (body != null) { |
|
42 |
try { |
|
43 |
final OafDecoder decoder = OafDecoder.decode(body); |
|
44 |
|
|
45 |
final String hash = new Person(decoder.getEntity().getPerson().getMetadata().getFullname().getValue(), false).hash(); |
|
46 |
// String hash = new Person(getPersonName(decoder), true).hash(); |
|
47 |
|
|
48 |
rowKey.set(hash); |
|
49 |
ibw.set(body); |
|
50 |
context.write(rowKey, ibw); |
|
51 |
|
|
52 |
} catch (final Throwable e) { |
|
53 |
System.out.println("GOT EX " + e); |
|
54 |
e.printStackTrace(System.err); |
|
55 |
context.getCounter(dedupConf.getWf().getEntityType(), e.getClass().toString()).increment(1); |
|
56 |
} |
|
57 |
} else { |
|
58 |
context.getCounter(dedupConf.getWf().getEntityType(), "missing body").increment(1); |
|
59 |
} |
|
60 |
} |
|
61 |
|
|
62 |
// private String getPersonName(OafDecoder decoder) { |
|
63 |
// Metadata m = decoder.getEntity().getPerson().getMetadata(); |
|
64 |
// String secondnames = Joiner.on(" ").join(m.getSecondnamesList()); |
|
65 |
// |
|
66 |
// return isValid(m.getFullname()) ? m.getFullname() : (secondnames + ", " + m.getFirstname()); |
|
67 |
// } |
|
68 |
|
|
69 |
// private boolean isValid(String fullname) { |
|
70 |
// return fullname != null && !fullname.isEmpty(); |
|
71 |
// } |
|
72 |
|
|
73 |
} |
|
74 | 0 |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/FindDedupCandidatePersonsMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Set; |
|
5 |
|
|
6 |
import org.apache.hadoop.hbase.client.Result; |
|
7 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
8 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
9 |
import org.apache.hadoop.hbase.util.Bytes; |
|
10 |
import org.apache.hadoop.io.Text; |
|
11 |
|
|
12 |
import com.google.common.base.Function; |
|
13 |
import com.google.common.collect.Iterables; |
|
14 |
import com.google.common.collect.Sets; |
|
15 |
|
|
16 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
17 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
|
18 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
19 |
import eu.dnetlib.pace.model.PersonComparatorUtils; |
|
20 |
|
|
21 |
public class FindDedupCandidatePersonsMapper extends TableMapper<Text, Text> { |
|
22 |
|
|
23 |
private static final byte[] PERSON_CF = Type.person.toString().getBytes(); |
|
24 |
private static final byte[] PERSONRESULT_CF = "personResult".getBytes(); |
|
25 |
|
|
26 |
@Override |
|
27 |
protected void setup(final Context context) { |
|
28 |
|
|
29 |
} |
|
30 |
|
|
31 |
@Override |
|
32 |
protected void map(final ImmutableBytesWritable rowkey, final Result row, final Context context) throws IOException, InterruptedException { |
|
33 |
String id = Bytes.toString(rowkey.get()); |
|
34 |
String fullname = extractFullname(row); |
|
35 |
Set<String> resultIds = extractResultIds(row); |
|
36 |
|
|
37 |
if (fullname != null) { |
|
38 |
Text text = (new DedupPersonBean(id, fullname, resultIds)).toText(); |
|
39 |
for (String k : PersonComparatorUtils.getNgramsForPerson(fullname)) { |
|
40 |
context.write(new Text(k), text); |
|
41 |
} |
|
42 |
} |
|
43 |
} |
|
44 |
|
|
45 |
private Set<String> extractResultIds(final Result row) { |
|
46 |
return Sets.newHashSet(Iterables.transform(row.getFamilyMap(PERSONRESULT_CF).keySet(), new Function<byte[], String>() { |
|
47 |
|
|
48 |
@Override |
|
49 |
public String apply(final byte[] b) { |
|
50 |
return Bytes.toString(b); |
|
51 |
} |
|
52 |
})); |
|
53 |
} |
|
54 |
|
|
55 |
private String extractFullname(final Result row) { |
|
56 |
byte[] body = row.getValue(PERSON_CF, DedupUtils.BODY_B); |
|
57 |
if (body == null) return null; |
|
58 |
return OafDecoder.decode(body).getEntity().getPerson().getMetadata().getFullname().getValue(); |
|
59 |
} |
|
60 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupFindRootsPersonReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
|
|
5 |
import com.google.common.base.Function; |
|
6 |
import com.google.common.collect.Iterables; |
|
7 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
8 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
9 |
import eu.dnetlib.data.proto.DedupProtos.Dedup; |
|
10 |
import eu.dnetlib.data.proto.KindProtos.Kind; |
|
11 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
12 |
import eu.dnetlib.data.proto.OafProtos.OafRel; |
|
13 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
14 |
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions; |
|
15 |
import eu.dnetlib.pace.config.DedupConfig; |
|
16 |
import eu.dnetlib.pace.model.gt.Authors; |
|
17 |
import eu.dnetlib.pace.model.gt.CoAuthors; |
|
18 |
import eu.dnetlib.pace.model.gt.GTAuthor; |
|
19 |
import eu.dnetlib.pace.model.gt.GTAuthorMapper; |
|
20 |
import org.apache.hadoop.hbase.client.Delete; |
|
21 |
import org.apache.hadoop.hbase.client.Put; |
|
22 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
23 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
|
24 |
import org.apache.hadoop.hbase.util.Bytes; |
|
25 |
|
|
26 |
public class DedupFindRootsPersonReducer extends TableReducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> { |
|
27 |
|
|
28 |
private DedupConfig dedupConf; |
|
29 |
|
|
30 |
private ImmutableBytesWritable outKey; |
|
31 |
|
|
32 |
@Override |
|
33 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
34 |
dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF)); |
|
35 |
System.out.println("dedup findRoots mapper\nwf conf: " + dedupConf.toString()); |
|
36 |
|
|
37 |
outKey = new ImmutableBytesWritable(); |
|
38 |
} |
|
39 |
|
|
40 |
@Override |
|
41 |
protected void reduce(final ImmutableBytesWritable key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, |
|
42 |
InterruptedException { |
|
43 |
|
|
44 |
final Authors aas = new Authors(); |
|
45 |
final CoAuthors cas = new CoAuthors(); |
|
46 |
// final Set<String> dupIds = Sets.newHashSet(); |
|
47 |
|
|
48 |
for (final GTAuthor a : asGTA(context, values)) { |
|
49 |
if (a.hasMerged()) { |
|
50 |
aas.addAll(a.getMerged()); |
|
51 |
} else { |
|
52 |
aas.add(a.getAuthor()); |
|
53 |
} |
|
54 |
if (a.hasCoAuthors()) { |
|
55 |
cas.addAll(a.getCoAuthors()); |
|
56 |
} |
|
57 |
|
|
58 |
// dupIds.add(a.getId()); |
|
59 |
|
|
60 |
final byte[] row = Bytes.toBytes(a.getId()); |
|
61 |
final Delete delete = new Delete(row); |
|
62 |
outKey.set(row); |
|
63 |
context.write(outKey, delete); |
|
64 |
context.getCounter(dedupConf.getWf().getEntityType(), "deleted").increment(1); |
|
65 |
} |
|
66 |
|
|
67 |
// if (aas.isEmpty()) |
|
68 |
// throw new IllegalArgumentException("empty merged author set, grouping key: " + new String(key.copyBytes()) + ", dupIds: " + |
|
69 |
// dupIds); |
|
70 |
|
|
71 |
final String rootId = hashCodeString(aas); |
|
72 |
final GTAuthor gta = new GTAuthor(rootId, aas, cas, true); |
|
73 |
|
|
74 |
// for (final String id : dupIds) { |
|
75 |
// final byte[] row = Bytes.toBytes(id); |
|
76 |
// final byte[] root = Bytes.toBytes(rootId); |
|
77 |
// emitDedupRel(context, DedupUtils.getDedupCF_mergedInBytes(Type.person), row, root, buildRel(row, root, |
|
78 |
// Dedup.RelName.isMergedIn)); |
|
79 |
// emitDedupRel(context, DedupUtils.getDedupCF_mergesBytes(Type.person), root, row, buildRel(root, row, Dedup.RelName.merges)); |
|
80 |
// |
|
81 |
// context.getCounter(dedupConf.getWf().getEntityType(), "dedupRel (x2)").increment(1); |
|
82 |
// } |
|
83 |
|
|
84 |
final Put put = new Put(Bytes.toBytes(gta.getId())); |
|
85 |
put.setWriteToWAL(JobParams.WRITE_TO_WAL); |
|
86 |
put.add(Bytes.toBytes(dedupConf.getWf().getEntityType()), DedupUtils.BODY_B, toOafByteArray(gta)); |
|
87 |
|
|
88 |
outKey.set(Bytes.toBytes(gta.getId())); |
|
89 |
context.write(outKey, put); |
|
90 |
|
|
91 |
context.getCounter(dedupConf.getWf().getEntityType(), "out").increment(1); |
|
92 |
} |
|
93 |
|
|
94 |
private Iterable<GTAuthor> asGTA(final Context context, final Iterable<ImmutableBytesWritable> values) { |
|
95 |
|
|
96 |
return Iterables.transform(values, new Function<ImmutableBytesWritable, GTAuthor>() { |
|
97 |
|
|
98 |
@Override |
|
99 |
public GTAuthor apply(final ImmutableBytesWritable input) { |
|
100 |
return GTAuthor.fromJson(new String(input.copyBytes())); |
|
101 |
} |
|
102 |
}); |
|
103 |
} |
|
104 |
|
|
105 |
public byte[] toOafByteArray(final GTAuthor gta) { |
|
106 |
final Oaf oaf = new GTAuthorMapper().map(gta); |
|
107 |
return oaf.toByteArray(); |
|
108 |
} |
|
109 |
|
|
110 |
protected String hashCodeString(final Authors ag) { |
|
111 |
return getRowKey(String.valueOf(ag.hashCode())); |
|
112 |
} |
|
113 |
|
|
114 |
protected String getRowKey(final String s) { |
|
115 |
return AbstractDNetXsltFunctions.oafId(Type.person.toString(), "dedup_wf_001", s); |
|
116 |
} |
|
117 |
|
|
118 |
private byte[] buildRel(final byte[] from, final byte[] to, final Dedup.RelName relClass) { |
|
119 |
final OafRel.Builder oafRel = DedupUtils.getDedup(dedupConf, new String(from), new String(to), relClass); |
|
120 |
final Oaf oaf = |
|
121 |
Oaf.newBuilder() |
|
122 |
.setKind(Kind.relation) |
|
123 |
.setLastupdatetimestamp(System.currentTimeMillis()) |
|
124 |
.setDataInfo( |
|
125 |
AbstractDNetXsltFunctions.getDataInfo(null, "", "0.8", false, true).setInferenceprovenance( |
|
126 |
dedupConf.getWf().getConfigurationId())).setRel(oafRel) |
|
127 |
.build(); |
|
128 |
return oaf.toByteArray(); |
|
129 |
} |
|
130 |
|
|
131 |
private void emitDedupRel(final Context context, final byte[] cf, final byte[] from, final byte[] to, final byte[] value) throws |
|
132 |
IOException, InterruptedException { |
|
133 |
final Put put = new Put(from).add(cf, to, value); |
|
134 |
put.setWriteToWAL(JobParams.WRITE_TO_WAL); |
|
135 |
context.write(new ImmutableBytesWritable(from), put); |
|
136 |
} |
|
137 |
|
|
138 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/JoinPersonGroupReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
|
|
5 |
import java.util.List; |
|
6 |
import java.util.Set; |
|
7 |
|
|
8 |
import com.google.common.base.Function; |
|
9 |
import com.google.common.base.Splitter; |
|
10 |
import com.google.common.collect.Iterables; |
|
11 |
import com.google.common.collect.Lists; |
|
12 |
|
|
13 |
import com.google.common.collect.Sets; |
|
14 |
import org.apache.commons.lang.StringUtils; |
|
15 |
import org.apache.commons.logging.Log; |
|
16 |
import org.apache.commons.logging.LogFactory; |
|
17 |
|
|
18 |
import org.apache.hadoop.io.Text; |
|
19 |
import org.apache.hadoop.mapreduce.Reducer; |
|
20 |
|
|
21 |
public class JoinPersonGroupReducer extends Reducer<Text, Text, Text, Text> { |
|
22 |
|
|
23 |
/** |
|
24 |
* logger. |
|
25 |
*/ |
|
26 |
private static final Log log = LogFactory.getLog(JoinPersonGroupReducer.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
27 |
|
|
28 |
private Text tKey; |
|
29 |
private Text tValue; |
|
30 |
|
|
31 |
private final static int MIN_ENTRIES_THRESHOLD = 1; |
|
32 |
private int minEntriesThreshold; |
|
33 |
|
|
34 |
private final static int MAX_ENTRIES_THRESHOLD = Integer.MAX_VALUE; |
|
35 |
private int maxEntriesThreshold; |
|
36 |
|
|
37 |
private final static int MAX_FEATURES_THRESHOLD = Integer.MAX_VALUE; |
|
38 |
private int maxFeaturesThreshold; |
|
39 |
|
|
40 |
private Set<String> knownHashValues = Sets.newHashSet(); |
|
41 |
|
|
42 |
private boolean passAll = false; |
|
43 |
|
|
44 |
@Override |
|
45 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
46 |
super.setup(context); |
|
47 |
tKey = new Text(""); |
|
48 |
tValue = new Text(); |
|
49 |
|
|
50 |
minEntriesThreshold = context.getConfiguration().getInt("min.entries.threshold", MIN_ENTRIES_THRESHOLD); |
|
51 |
maxEntriesThreshold = context.getConfiguration().getInt("max.entries.threshold", MAX_ENTRIES_THRESHOLD); |
|
52 |
maxFeaturesThreshold = context.getConfiguration().getInt("max.features.threshold", MAX_FEATURES_THRESHOLD); |
|
53 |
|
|
54 |
final String hashCsv = context.getConfiguration().get("hash.values.csv", ""); |
|
55 |
|
|
56 |
log.info("hash csv: " + hashCsv); |
|
57 |
if (hashCsv.contains("ALL")) { |
|
58 |
passAll = true; |
|
59 |
} |
|
60 |
|
|
61 |
for(String hash : Splitter.on(",").omitEmptyStrings().trimResults().split(hashCsv)) { |
|
62 |
knownHashValues.add(hash); |
|
63 |
} |
|
64 |
|
|
65 |
} |
|
66 |
|
|
67 |
@Override |
|
68 |
protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException { |
|
69 |
|
|
70 |
final CsvSerialiser csvSerialiser = new CsvSerialiser(maxEntriesThreshold, maxFeaturesThreshold); |
|
71 |
final String outKey = key.toString().replaceAll("[^a-zA-Z ]", "").toLowerCase(); |
|
72 |
|
|
73 |
if (!passAll && !knownHashValues.contains(outKey)) { |
|
74 |
return; |
|
75 |
} |
|
76 |
|
|
77 |
if (StringUtils.isBlank(outKey)) { |
|
78 |
context.getCounter("person", "blank key").increment(1); |
|
79 |
return; |
|
80 |
} |
|
81 |
|
|
82 |
final List<CsvEntry> entries = Lists.newArrayList(Iterables.transform(values, new Function<Text, CsvEntry>() { |
|
83 |
|
|
84 |
@Override |
|
85 |
public CsvEntry apply(final Text t) { |
|
86 |
return CsvEntry.fromJson(t.toString()); |
|
87 |
} |
|
88 |
})); |
|
89 |
|
|
90 |
trackPersonInfo(entries.size(), context, "person"); |
|
91 |
|
|
92 |
if (entries.size() < minEntriesThreshold || entries.size() > maxEntriesThreshold) { |
|
93 |
return; |
|
94 |
} |
|
95 |
|
|
96 |
if (!passAll) { |
|
97 |
context.getCounter("person hash", outKey).increment(entries.size()); |
|
98 |
} |
|
99 |
|
|
100 |
//tKey.set(outKey); |
|
101 |
tValue.set(csvSerialiser.asCSV(entries)); |
|
102 |
context.write(tKey, tValue); |
|
103 |
|
|
104 |
context.getCounter("person", "csv").increment(1); |
|
105 |
} |
|
106 |
|
|
107 |
private void trackPersonInfo(final int count, final Context context, final String counterName) { |
|
108 |
|
|
109 |
if (count > 0 && count <= 10) { |
|
110 |
context.getCounter(counterName, count + "").increment(1); |
|
111 |
return; |
|
112 |
} |
|
113 |
|
|
114 |
if (count > 10 && count <= 20) { |
|
115 |
context.getCounter(counterName, "[10, 20)").increment(1); |
|
116 |
return; |
|
117 |
} |
|
118 |
|
|
119 |
if (count > 20 && count <= 30) { |
|
120 |
context.getCounter(counterName, "[20, 30)").increment(1); |
|
121 |
return; |
|
122 |
} |
|
123 |
|
|
124 |
if (count > 30 && count <= 40) { |
|
125 |
context.getCounter(counterName, "[30, 40)").increment(1); |
|
126 |
return; |
|
127 |
} |
|
128 |
|
|
129 |
if (count > 40 && count <= 50) { |
|
130 |
context.getCounter(counterName, "[40, 50)").increment(1); |
|
131 |
return; |
|
132 |
} |
|
133 |
|
|
134 |
if (count > 50 && count <= 70) { |
|
135 |
context.getCounter(counterName, "[50, 70)").increment(1); |
|
136 |
return; |
|
137 |
} |
|
138 |
|
|
139 |
if (count > 70 && count <= 100) { |
|
140 |
context.getCounter(counterName, "[70, 100)").increment(1); |
|
141 |
return; |
|
142 |
} |
|
143 |
|
|
144 |
if (count > 100 && count <= 150) { |
|
145 |
context.getCounter(counterName, "[100, 150)").increment(1); |
|
146 |
return; |
|
147 |
} |
|
148 |
|
|
149 |
if (count > 150 && count <= 200) { |
|
150 |
context.getCounter(counterName, "[150, 200)").increment(1); |
|
151 |
return; |
|
152 |
} |
|
153 |
|
|
154 |
if (count > 200) { |
|
155 |
context.getCounter(counterName, "[200, *)").increment(1); |
|
156 |
return; |
|
157 |
} |
|
158 |
} |
|
159 |
|
|
160 |
@Override |
|
161 |
public void cleanup(final Context context) throws IOException, InterruptedException { |
|
162 |
super.cleanup(context); |
|
163 |
} |
|
164 |
|
|
165 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/JoinPersonGroupMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.io.StringReader; |
|
5 |
import java.util.List; |
|
6 |
import java.util.Set; |
|
7 |
|
|
8 |
import com.google.common.base.Splitter; |
|
9 |
import com.google.common.collect.Iterables; |
|
10 |
import com.google.common.collect.Lists; |
|
11 |
import com.google.common.collect.Sets; |
|
12 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
13 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
14 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
|
15 |
import eu.dnetlib.data.proto.PersonProtos; |
|
16 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
17 |
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions; |
|
18 |
import eu.dnetlib.pace.config.DedupConfig; |
|
19 |
import eu.dnetlib.pace.model.Person; |
|
20 |
import org.apache.commons.lang.StringUtils; |
|
21 |
import org.apache.commons.lang.math.RandomUtils; |
|
22 |
import org.apache.hadoop.hbase.client.Put; |
|
23 |
import org.apache.hadoop.hbase.client.Result; |
|
24 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
25 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
26 |
import org.apache.hadoop.io.Text; |
|
27 |
import org.apache.hadoop.mapreduce.Mapper; |
|
28 |
import org.dom4j.Document; |
|
29 |
import org.dom4j.Element; |
|
30 |
import org.dom4j.io.SAXReader; |
|
31 |
|
|
32 |
public class JoinPersonGroupMapper extends Mapper<Text, Text, Text, Text> { |
|
33 |
|
|
34 |
private static final String SUBJECT_PREFIX = "subject."; |
|
35 |
private static final String COAUTHOR_PREFIX = "coauthor."; |
|
36 |
|
|
37 |
public static final String PERSON = "person"; |
|
38 |
|
|
39 |
private static final int MAX_TOKENS = 5; |
|
40 |
private static final int MIN_FEATURES = 10; |
|
41 |
|
|
42 |
private Text outKey; |
|
43 |
private Text outValue; |
|
44 |
|
|
45 |
private SubjectParser sp; |
|
46 |
|
|
47 |
@Override |
|
48 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
49 |
outKey = new Text(); |
|
50 |
outValue = new Text(); |
|
51 |
|
|
52 |
sp = new SubjectParser(); |
|
53 |
} |
|
54 |
|
|
55 |
@Override |
|
56 |
protected void map(final Text key, final Text value, final Context context) throws IOException, InterruptedException { |
|
57 |
// System.out.println("got key: " + new String(keyIn.copyBytes())); |
|
58 |
|
|
59 |
final SAXReader r = new SAXReader(); |
|
60 |
try { |
|
61 |
final Document doc = r.read(new StringReader(value.toString())); |
|
62 |
final SubjectsMap sm = sp.parse(doc); |
|
63 |
|
|
64 |
final CsvEntry entry = new CsvEntry(); |
|
65 |
for(Subjects subs : sm.values()) { |
|
66 |
for(String subject : subs) { |
|
67 |
final String s = SUBJECT_PREFIX + cleanup(subject); |
|
68 |
entry.addFeature("\"" + s + "\""); |
|
69 |
} |
|
70 |
} |
|
71 |
|
|
72 |
final List<Person> authors = getAuthors(doc); |
|
73 |
final String title = getTitle(doc); |
|
74 |
final String pubId = getId(doc); |
|
75 |
|
|
76 |
for(Person p1 : authors) { |
|
77 |
|
|
78 |
context.getCounter(PERSON, "accurate " + p1.isAccurate()).increment(1); |
|
79 |
final Set<String> hashes = getOutKeys(p1); |
|
80 |
context.getCounter(PERSON, String.format("accurate %s keys", p1.isAccurate())).increment(hashes.size()); |
|
81 |
for(String s1 : hashes) { |
|
82 |
//final String s1 = normalize(p1); |
|
83 |
final CsvEntry c = new CsvEntry(s1, entry.getFeatures()); |
|
84 |
for (Person p2 : authors) { |
|
85 |
final String s2 = normalize(p2.getSurnameString()); |
|
86 |
if (p1.isAccurate() && p2.isAccurate()) { |
|
87 |
if (!p1.getSurnameString().equalsIgnoreCase(p2.getSurnameString())) { |
|
88 |
c.addFeature("\"" + COAUTHOR_PREFIX + s2.replaceAll("\"", "").replaceAll("\\s+", "_") + "\""); |
|
89 |
} |
|
90 |
} |
|
91 |
} |
|
92 |
|
|
93 |
final String prefix = StringUtils.substringBefore(pubId, "::"); |
|
94 |
final String originalId = StringUtils.substringAfter(pubId, "::"); |
|
95 |
|
|
96 |
c.setId(getId(prefix, originalId, p1.getOriginal())); |
|
97 |
c.setOriginalName(p1.getOriginal()); |
|
98 |
c.setTitle(title); |
|
99 |
|
|
100 |
c.getFeatures().remove(s1); |
|
101 |
|
|
102 |
if (s1.length() <= 3) { |
|
103 |
context.getCounter(PERSON, "key size <= 3").increment(1); |
|
104 |
return; |
|
105 |
} |
|
106 |
|
|
107 |
if(c.getFeatures().size() < MIN_FEATURES) { |
|
108 |
context.getCounter(PERSON, "features < " + MIN_FEATURES).increment(1); |
|
109 |
return; |
|
110 |
} |
|
111 |
|
|
112 |
outKey.set(s1); |
|
113 |
outValue.set(c.toString()); |
|
114 |
|
|
115 |
context.write(outKey, outValue); |
|
116 |
} |
|
117 |
} |
|
118 |
|
|
119 |
} catch (final Throwable e) { |
|
120 |
System.out.println("GOT EX " + e); |
|
121 |
e.printStackTrace(System.err); |
|
122 |
context.getCounter(PERSON, e.getClass().toString()).increment(1); |
|
123 |
} |
|
124 |
} |
|
125 |
|
|
126 |
protected String getId(final String nsPrefix, final String originalId, final String name) { |
|
127 |
|
|
128 |
final String localId = name.replaceAll("\\s+", " ").trim(); |
|
129 |
|
|
130 |
// person id doesn't depend on the publication id |
|
131 |
// return AbstractDNetXsltFunctions.oafId(Type.person.toString(), prefix, localId); |
|
132 |
|
|
133 |
// person id depends on the publication id and the person name |
|
134 |
return AbstractDNetXsltFunctions.oafId(Type.person.toString(), nsPrefix, originalId + "::" + localId); |
|
135 |
} |
|
136 |
|
|
137 |
private String cleanup(final String s) { |
|
138 |
return s.replaceAll(" ", "_").replaceAll("\\.", "_").replaceAll("\"", ""); |
|
139 |
} |
|
140 |
|
|
141 |
private String getId(final Document doc) { |
|
142 |
return doc.valueOf("//*[local-name() = 'objIdentifier']/text()"); |
|
143 |
} |
|
144 |
|
|
145 |
private List<Person> getAuthors(final Document doc) { |
|
146 |
final List creatorNodes = doc.selectNodes("//*[local-name() = 'creator']"); |
|
147 |
final List<Person> authors = Lists.newArrayList(); |
|
148 |
|
|
149 |
for(int i = 0; i<creatorNodes.size(); i++) { |
|
150 |
final Element e = (Element) creatorNodes.get(i); |
|
151 |
authors.add(new Person(e.getText(), false)); |
|
152 |
} |
|
153 |
return authors; |
|
154 |
} |
|
155 |
|
|
156 |
private String getTitle(final Document doc) { |
|
157 |
final List titleNodes = doc.selectNodes("//*[local-name() = 'title']"); |
|
158 |
if (titleNodes != null && titleNodes.size() > 0) { |
|
159 |
final Element titleNode = (Element) titleNodes.get(0); |
|
160 |
|
|
161 |
return titleNode.getText().replaceAll(",", ""); |
|
162 |
} |
|
163 |
return ""; |
|
164 |
} |
|
165 |
|
|
166 |
private Set<String> getOutKeys(final Person p1) { |
|
167 |
final Set<String> hashes = Sets.newHashSet(); |
|
168 |
if (p1.isAccurate()) { |
|
169 |
for(String name : p1.getName()) { |
|
170 |
hashes.add(normalize(p1.getSurnameString() + firstLC(name))); |
|
171 |
} |
|
172 |
} else { |
|
173 |
final String s = normalize(p1.getOriginal()); |
|
174 |
for (final String token1 : tokens(s)) { |
|
175 |
for (final String token2 : tokens(s)) { |
|
176 |
if (!token1.equals(token2)) { |
|
177 |
hashes.add(firstLC(token1) + token2); |
|
178 |
} |
|
179 |
} |
|
180 |
} |
|
181 |
} |
|
182 |
return hashes; |
|
183 |
} |
|
184 |
|
|
185 |
private String normalize(final Person p) { |
|
186 |
|
|
187 |
final String s = p.getSurnameString() + firstLC(p.getNameString()); |
|
188 |
return normalize(s); |
|
189 |
} |
|
190 |
|
|
191 |
private String normalize(final String s) { |
|
192 |
return s.replaceAll("[^a-zA-Z ]", "").toLowerCase().trim(); |
|
193 |
} |
|
194 |
|
|
195 |
private Iterable<String> tokens(final String s) { |
|
196 |
return Iterables.limit(Splitter.on(" ").omitEmptyStrings().trimResults().split(s), MAX_TOKENS); |
|
197 |
} |
|
198 |
|
|
199 |
private String firstLC(final String s) { |
|
200 |
return StringUtils.substring(s, 0, 1).toLowerCase(); |
|
201 |
} |
|
202 |
|
|
203 |
} |
|
204 | 0 |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/AnchorStatsMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.nio.charset.Charset; |
|
5 |
import java.util.List; |
|
6 |
|
|
7 |
import com.google.common.base.Function; |
|
8 |
import com.google.common.base.Joiner; |
|
9 |
import com.google.common.collect.Iterables; |
|
10 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
11 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
|
12 |
import eu.dnetlib.data.proto.FieldTypeProtos.StringField; |
|
13 |
import eu.dnetlib.data.proto.PersonProtos; |
|
14 |
import org.apache.commons.lang.StringUtils; |
|
15 |
import org.apache.hadoop.hbase.client.Result; |
|
16 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
17 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
18 |
import org.apache.hadoop.io.NullWritable; |
|
19 |
|
|
20 |
/** |
|
21 |
* builds map {merged author -> anchorId} |
|
22 |
* |
|
23 |
* @author claudio |
|
24 |
* |
|
25 |
*/ |
|
26 |
public class AnchorStatsMapper extends TableMapper<NullWritable, NullWritable> { |
|
27 |
|
|
28 |
@Override |
|
29 |
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { |
|
30 |
|
|
31 |
|
|
32 |
final byte[] body = value.getValue("person".getBytes(), DedupUtils.BODY_B); |
|
33 |
|
|
34 |
if (body != null) { |
|
35 |
try { |
|
36 |
final OafDecoder decoder = OafDecoder.decode(body); |
|
37 |
|
|
38 |
final PersonProtos.Person p = decoder.getEntity().getPerson(); |
|
39 |
|
|
40 |
if (!p.getAnchor()) { |
|
41 |
context.getCounter("person", "not anchor").increment(1); |
|
42 |
return; |
|
43 |
} |
|
44 |
|
|
45 |
trackPersonInfo(p.getMergedpersonCount(), context, "person merged"); |
|
46 |
trackPersonInfo(p.getCoauthorCount(), context, "person coauthors"); |
|
47 |
|
|
48 |
} catch (final Throwable e) { |
|
49 |
System.out.println("GOT EX " + e); |
|
50 |
//e.printStackTrace(System.err); |
|
51 |
context.getCounter("error", e.getClass().toString()).increment(1); |
|
52 |
} |
|
53 |
} else { |
|
54 |
context.getCounter("person", "missing body").increment(1); |
|
55 |
} |
|
56 |
} |
|
57 |
|
|
58 |
private void trackPersonInfo(final int count, final Context context, final String counterName) { |
|
59 |
|
|
60 |
if (count > 0 && count <= 10) { |
|
61 |
context.getCounter(counterName, count + "").increment(1); |
|
62 |
return; |
|
63 |
} |
|
64 |
|
|
65 |
if (count > 10 && count <= 20) { |
|
66 |
context.getCounter(counterName, "[10, 20)").increment(1); |
|
67 |
return; |
|
68 |
} |
|
69 |
|
|
70 |
if (count > 20 && count <= 30) { |
|
71 |
context.getCounter(counterName, "[20, 30)").increment(1); |
|
72 |
return; |
|
73 |
} |
|
74 |
|
|
75 |
if (count > 30 && count <= 40) { |
|
76 |
context.getCounter(counterName, "[30, 40)").increment(1); |
|
77 |
return; |
|
78 |
} |
|
79 |
|
|
80 |
if (count > 40 && count <= 50) { |
|
81 |
context.getCounter(counterName, "[40, 50)").increment(1); |
|
82 |
return; |
|
83 |
} |
|
84 |
|
|
85 |
if (count > 50 && count <= 70) { |
|
86 |
context.getCounter(counterName, "[50, 70)").increment(1); |
|
87 |
return; |
|
88 |
} |
|
89 |
|
|
90 |
if (count > 70 && count <= 100) { |
|
91 |
context.getCounter(counterName, "[70, 100)").increment(1); |
|
92 |
return; |
|
93 |
} |
|
94 |
|
|
95 |
if (count > 100) { |
|
96 |
context.getCounter(counterName, "[100, *)").increment(1); |
|
97 |
return; |
|
98 |
} |
|
99 |
|
|
100 |
} |
|
101 |
|
|
102 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/gt/CoAuthorUpdateMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.gt; |
|
2 |
|
|
3 |
import java.io.BufferedReader; |
|
4 |
import java.io.IOException; |
|
5 |
import java.io.InputStreamReader; |
|
6 |
import java.util.List; |
|
7 |
import java.util.Map; |
|
8 |
|
|
9 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
10 |
import org.apache.commons.lang.StringUtils; |
|
11 |
import org.apache.hadoop.conf.Configuration; |
|
12 |
import org.apache.hadoop.fs.FileSystem; |
|
13 |
import org.apache.hadoop.fs.Path; |
|
14 |
import org.apache.hadoop.hbase.client.Put; |
|
15 |
import org.apache.hadoop.hbase.client.Result; |
|
16 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
17 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
18 |
import org.apache.hadoop.hbase.util.Bytes; |
|
19 |
|
|
20 |
import com.google.common.collect.Maps; |
|
21 |
|
|
22 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
23 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
|
24 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
25 |
import eu.dnetlib.data.proto.PersonProtos.Person.CoAuthor; |
|
26 |
import eu.dnetlib.data.proto.PersonProtos.Person.CoAuthor.Builder; |
|
27 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
28 |
|
|
29 |
public class CoAuthorUpdateMapper extends TableMapper<ImmutableBytesWritable, Put> { |
|
30 |
|
|
31 |
private Map<String, String> mergedToAnchor; |
|
32 |
|
|
33 |
@Override |
|
34 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
35 |
|
|
36 |
mergedToAnchor = loadMap(context.getConfiguration()); |
|
37 |
|
|
38 |
System.out.println("anchor map size: " + mergedToAnchor.size()); |
|
39 |
} |
|
40 |
|
|
41 |
@Override |
|
42 |
protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException { |
|
43 |
|
|
44 |
final byte[] body = value.getValue(Bytes.toBytes(Type.person.toString()), DedupUtils.BODY_B); |
|
45 |
|
|
46 |
final OafDecoder d = OafDecoder.decode(body); |
|
47 |
|
|
48 |
final Oaf.Builder oafBuilder = Oaf.newBuilder(d.getOaf()); |
|
49 |
|
|
50 |
final List<Builder> coAuthors = oafBuilder.getEntityBuilder().getPersonBuilder().getCoauthorBuilderList(); |
|
51 |
|
|
52 |
for (final Builder cb : coAuthors) { |
|
53 |
|
|
54 |
final String newAnchorId = mergedToAnchor.get(cb.getId()); |
|
55 |
if (newAnchorId != null) { |
|
56 |
context.getCounter("anchor", "hit").increment(1); |
|
57 |
|
|
58 |
if (!cb.getAnchorId().equals(newAnchorId)) { |
|
59 |
cb.setAnchorId(newAnchorId); |
|
60 |
context.getCounter("anchor", "updated").increment(1); |
|
61 |
} |
|
62 |
} else { |
|
63 |
context.getCounter("anchor", "miss").increment(1); |
|
64 |
} |
|
65 |
} |
|
66 |
|
|
67 |
final Map<String, CoAuthor> coAuthorSet = Maps.newHashMap(); |
|
68 |
|
|
69 |
for (final Builder cb : coAuthors) { |
|
70 |
coAuthorSet.put(cb.hasAnchorId() ? cb.getAnchorId() : cb.getId(), cb.build()); |
|
71 |
} |
|
72 |
|
|
73 |
oafBuilder.getEntityBuilder().getPersonBuilder().clearCoauthor(); |
|
74 |
oafBuilder.getEntityBuilder().getPersonBuilder().addAllCoauthor(coAuthorSet.values()); |
|
75 |
|
|
76 |
final Put put = new Put(key.copyBytes()); |
|
77 |
put.setWriteToWAL(JobParams.WRITE_TO_WAL); |
|
78 |
put.add(Bytes.toBytes(Type.person.toString()), DedupUtils.BODY_B, oafBuilder.build().toByteArray()); |
|
79 |
|
|
80 |
context.write(key, put); |
|
81 |
|
|
82 |
} |
|
83 |
|
|
84 |
private Map<String, String> loadMap(final Configuration conf) throws IOException { |
|
85 |
final Map<String, String> map = Maps.newHashMap(); |
|
86 |
final String filePath = conf.get("mapred.output.dir") + "/part-r-00000"; |
|
87 |
if (StringUtils.isBlank(filePath)) throw new IllegalArgumentException("missing 'mapred.output.dir'"); |
|
88 |
|
|
89 |
final Path path = new Path(filePath); |
|
90 |
final FileSystem fs = FileSystem.get(conf); |
|
91 |
final BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path))); |
|
92 |
|
|
93 |
String line = br.readLine(); |
|
94 |
while (line != null) { |
|
95 |
final String[] split = line.split("="); |
|
96 |
|
|
97 |
map.put(split[0], split[1]); |
|
98 |
|
|
99 |
line = br.readLine(); |
|
100 |
} |
|
101 |
|
|
102 |
return map; |
|
103 |
} |
|
104 |
|
|
105 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/gt/BuildMergedAnchorMapMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.gt; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 |
import org.apache.hadoop.hbase.client.Result; |
|
7 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
8 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
9 |
import org.apache.hadoop.hbase.util.Bytes; |
|
10 |
import org.apache.hadoop.io.Text; |
|
11 |
|
|
12 |
import com.google.common.collect.Maps; |
|
13 |
import com.google.common.reflect.TypeToken; |
|
14 |
import com.google.gson.Gson; |
|
15 |
|
|
16 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
17 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
|
18 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
19 |
import eu.dnetlib.data.proto.PersonProtos.Person.MergedPerson; |
|
20 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
21 |
|
|
22 |
/** |
|
23 |
* builds map {merged author -> anchorId} |
|
24 |
* |
|
25 |
* @author claudio |
|
26 |
* |
|
27 |
*/ |
|
28 |
public class BuildMergedAnchorMapMapper extends TableMapper<ImmutableBytesWritable, Text> { |
|
29 |
|
|
30 |
private ImmutableBytesWritable outKey; |
|
31 |
|
|
32 |
private Text outValue; |
|
33 |
|
|
34 |
@SuppressWarnings("serial") |
|
35 |
final java.lang.reflect.Type token = new TypeToken<Map<String, String>>() {}.getType(); |
|
36 |
|
|
37 |
private Gson gson; |
|
38 |
|
|
39 |
@Override |
|
40 |
protected void setup(final Context context) { |
|
41 |
outKey = new ImmutableBytesWritable(Bytes.toBytes("1")); |
|
42 |
outValue = new Text(); |
|
43 |
|
|
44 |
gson = new Gson(); |
|
45 |
} |
|
46 |
|
|
47 |
@Override |
|
48 |
protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException { |
|
49 |
|
|
50 |
final OafRowKeyDecoder rkd = OafRowKeyDecoder.decode(rowkey.copyBytes()); |
|
51 |
|
|
52 |
if (!Type.person.equals(rkd.getType())) { |
|
53 |
context.getCounter(rkd.getType().toString(), "skipped").increment(1); |
|
54 |
} |
|
55 |
|
|
56 |
final byte[] body = value.getValue(Bytes.toBytes(Type.person.toString()), DedupUtils.BODY_B); |
|
57 |
|
|
58 |
final OafDecoder d = OafDecoder.decode(body); |
|
59 |
|
|
60 |
final String anchorId = d.getEntity().getId(); |
|
61 |
final Map<String, String> map = Maps.newHashMap(); |
|
62 |
|
|
63 |
for (final MergedPerson p : d.getEntity().getPerson().getMergedpersonList()) { |
|
64 |
map.put(p.getId(), anchorId); |
|
65 |
} |
|
66 |
|
|
67 |
outValue.set(gson.toJson(map, token)); |
|
68 |
|
|
69 |
context.write(outKey, outValue); |
|
70 |
|
|
71 |
} |
|
72 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/gt/GTCleanerMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.gt; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 |
import org.apache.hadoop.hbase.client.Delete; |
|
7 |
import org.apache.hadoop.hbase.client.Result; |
|
8 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
9 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
10 |
import org.apache.hadoop.hbase.util.Bytes; |
|
11 |
import org.apache.hadoop.mapreduce.Counter; |
|
12 |
|
|
13 |
import com.google.protobuf.InvalidProtocolBufferException; |
|
14 |
|
|
15 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
16 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
17 |
import eu.dnetlib.data.proto.KindProtos.Kind; |
|
18 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
19 |
import eu.dnetlib.data.proto.OafProtos.OafEntity; |
|
20 |
import eu.dnetlib.data.proto.PersonProtos.Person; |
|
21 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
22 |
|
|
23 |
/** |
|
24 |
* Removes the Non-root rows |
|
25 |
* |
|
26 |
* |
|
27 |
* @author claudio |
|
28 |
* |
|
29 |
*/ |
|
30 |
public class GTCleanerMapper extends TableMapper<ImmutableBytesWritable, Delete> { |
|
31 |
|
|
32 |
@Override |
|
33 |
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { |
|
34 |
|
|
35 |
final OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes()); |
|
36 |
|
|
37 |
final Type type = keyDecoder.getType(); |
|
38 |
|
|
39 |
if (!type.equals(Type.person)) { |
|
40 |
incrementCounter(context, "wrong entity type", type.toString(), 1); |
|
41 |
return; |
|
42 |
} |
|
43 |
|
|
44 |
final Map<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(type.toString())); |
|
45 |
final byte[] bodyB = map.get(DedupUtils.BODY_B); |
|
46 |
if (bodyB == null) { |
|
47 |
incrementCounter(context, "missing body (map)", type.toString(), 1); |
|
48 |
return; |
|
49 |
} |
|
50 |
|
|
51 |
final Oaf oaf = decodeProto(context, bodyB); |
|
52 |
|
|
53 |
if (!isValid(oaf)) { |
|
54 |
incrementCounter(context, "missing body (map)", type.toString(), 1); |
|
55 |
return; |
|
56 |
} |
|
57 |
|
|
58 |
if (mergedSize(oaf, 0) || mergedSize(oaf, 1)) { |
|
59 |
context.write(keyIn, new Delete(keyIn.copyBytes())); |
|
60 |
incrementCounter(context, Kind.entity.toString(), "deleted", 1); |
|
61 |
} |
|
62 |
|
|
63 |
} |
|
64 |
|
|
65 |
private boolean mergedSize(final Oaf oaf, final int size) { |
|
66 |
final OafEntity entity = oaf.getEntity(); |
|
67 |
|
|
68 |
if (entity == null) return false; |
|
69 |
|
|
70 |
final Person person = entity.getPerson(); |
|
71 |
|
|
72 |
return (person.getMergedpersonList() != null) && (person.getMergedpersonList().size() == size); |
|
73 |
} |
|
74 |
|
|
75 |
private boolean isValid(final Oaf oaf) { |
|
76 |
return (oaf != null) && oaf.isInitialized(); |
|
77 |
} |
|
78 |
|
|
79 |
private Oaf decodeProto(final Context context, final byte[] body) { |
|
80 |
try { |
|
81 |
return Oaf.parseFrom(body); |
|
82 |
} catch (final InvalidProtocolBufferException e) { |
|
83 |
e.printStackTrace(System.err); |
|
84 |
context.getCounter("decodeProto", e.getClass().getName()).increment(1); |
|
85 |
} |
|
86 |
return null; |
|
87 |
} |
|
88 |
|
|
89 |
private void incrementCounter(final Context context, final String k, final String t, final int n) { |
|
90 |
getCounter(context, k, t).increment(n); |
|
91 |
} |
|
92 |
|
|
93 |
private Counter getCounter(final Context context, final String k, final String t) { |
|
94 |
return context.getCounter(k, t); |
|
95 |
} |
|
96 |
|
|
97 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/gt/RootPersonExportMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.gt; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 |
import org.apache.hadoop.hbase.client.Result; |
|
7 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
8 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
9 |
import org.apache.hadoop.hbase.util.Bytes; |
|
10 |
import org.apache.hadoop.io.Text; |
|
11 |
import org.apache.hadoop.mapreduce.Counter; |
|
12 |
|
|
13 |
import com.google.protobuf.InvalidProtocolBufferException; |
|
14 |
import com.googlecode.protobuf.format.JsonFormat; |
|
15 |
|
|
16 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
17 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
18 |
import eu.dnetlib.data.proto.KindProtos.Kind; |
|
19 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
20 |
import eu.dnetlib.data.proto.OafProtos.OafEntity; |
|
21 |
import eu.dnetlib.data.proto.PersonProtos.Person; |
|
22 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
23 |
|
|
24 |
public class RootPersonExportMapper extends TableMapper<Text, Text> { |
|
25 |
|
|
26 |
private Text outKey; |
|
27 |
|
|
28 |
private Text outValue; |
|
29 |
|
|
30 |
@Override |
|
31 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
32 |
outKey = new Text(""); |
|
33 |
outValue = new Text(); |
|
34 |
} |
|
35 |
|
|
36 |
@Override |
|
37 |
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { |
|
38 |
|
|
39 |
final OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes()); |
|
40 |
|
|
41 |
final Type type = keyDecoder.getType(); |
|
42 |
// if (!DedupUtils.isRoot(keyDecoder.getId())) { |
|
43 |
// incrementCounter(context, "not root id", type.toString(), 1); |
|
44 |
// return; |
|
45 |
// } |
|
46 |
|
|
47 |
if (!type.equals(Type.person)) { |
|
48 |
incrementCounter(context, "wrong entity type", type.toString(), 1); |
|
49 |
return; |
|
50 |
} |
|
51 |
|
|
52 |
final Map<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(type.toString())); |
|
53 |
final byte[] bodyB = map.get(DedupUtils.BODY_B); |
|
54 |
if (bodyB == null) { |
|
55 |
incrementCounter(context, "missing body (map)", type.toString(), 1); |
|
56 |
return; |
|
57 |
} |
|
58 |
|
|
59 |
final Oaf oaf = decodeProto(context, bodyB); |
|
60 |
|
|
61 |
if (!isValid(oaf)) { |
|
62 |
incrementCounter(context, "missing body (map)", type.toString(), 1); |
|
63 |
return; |
|
64 |
} |
|
65 |
|
|
66 |
if (mergedSize(oaf, 0)) { |
|
67 |
incrementCounter(context, "merge size = 0", type.toString(), 1); |
|
68 |
return; |
|
69 |
} |
|
70 |
if (mergedSize(oaf, 1)) { |
|
71 |
incrementCounter(context, "merge size = 1", type.toString(), 1); |
|
72 |
return; |
|
73 |
} |
|
74 |
|
|
75 |
emit(new String(keyIn.copyBytes()), context, oaf); |
|
76 |
incrementCounter(context, Kind.entity.toString(), getEntityType(oaf, type), 1); |
|
77 |
} |
|
78 |
|
|
79 |
private boolean mergedSize(final Oaf oaf, final int size) { |
|
80 |
final OafEntity entity = oaf.getEntity(); |
|
81 |
|
|
82 |
if (entity == null) return false; |
|
83 |
|
|
84 |
final Person person = entity.getPerson(); |
|
85 |
|
|
86 |
return (person.getMergedpersonList() != null) && (person.getMergedpersonList().size() == size); |
|
87 |
} |
|
88 |
|
|
89 |
private boolean isValid(final Oaf oaf) { |
|
90 |
return (oaf != null) && oaf.isInitialized(); |
|
91 |
} |
|
92 |
|
|
93 |
private Oaf decodeProto(final Context context, final byte[] body) { |
|
94 |
try { |
|
95 |
return Oaf.parseFrom(body); |
|
96 |
} catch (final InvalidProtocolBufferException e) { |
|
97 |
e.printStackTrace(System.err); |
|
98 |
context.getCounter("decodeProto", e.getClass().getName()).increment(1); |
|
99 |
} |
|
100 |
return null; |
|
101 |
} |
|
102 |
|
|
103 |
private void emit(final String key, final Context context, final Oaf oaf) throws IOException, InterruptedException { |
|
104 |
// outKey.set(key); |
|
105 |
outValue.set(new JsonFormat().printToString(oaf)); |
|
106 |
|
|
107 |
context.write(outKey, outValue); |
|
108 |
} |
|
109 |
|
|
110 |
private void incrementCounter(final Context context, final String k, final String t, final int n) { |
|
111 |
getCounter(context, k, t).increment(n); |
|
112 |
} |
|
113 |
|
|
114 |
private Counter getCounter(final Context context, final String k, final String t) { |
|
115 |
return context.getCounter(k, t); |
|
116 |
} |
|
117 |
|
|
118 |
private String getEntityType(final Oaf oaf, final Type type) { |
|
119 |
switch (type) { |
|
120 |
case result: |
|
121 |
return oaf.getEntity().getResult().getMetadata().getResulttype().getClassid(); |
|
122 |
default: |
|
123 |
return type.toString(); |
|
124 |
} |
|
125 |
} |
|
126 |
|
|
127 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/gt/BuildMergedAnchorMapReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.gt; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Map; |
|
5 |
import java.util.Map.Entry; |
Also available in: Unified diff
getting rid of person entities