Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dataimport;
2

    
3
import java.io.IOException;
4
import java.util.List;
5

    
6
import com.google.common.base.Function;
7
import com.google.common.collect.Iterables;
8
import com.google.common.collect.Lists;
9
import eu.dnetlib.data.mapreduce.JobParams;
10
import eu.dnetlib.data.mapreduce.util.OafDecoder;
11
import eu.dnetlib.data.proto.OafProtos.Oaf;
12
import eu.dnetlib.data.proto.PersonProtos.Person;
13
import eu.dnetlib.data.proto.PersonProtos.Person.CoAuthor;
14
import eu.dnetlib.data.proto.TypeProtos.Type;
15
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
16
import eu.dnetlib.miscutils.maps.ConcurrentSizedMap;
17
import org.apache.commons.lang.StringUtils;
18
import org.apache.commons.logging.Log;
19
import org.apache.commons.logging.LogFactory;
20
import org.apache.hadoop.hbase.client.Put;
21
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
22
import org.apache.hadoop.hbase.mapreduce.TableReducer;
23
import org.apache.hadoop.hbase.util.Bytes;
24

    
25
public class CoAuthorReducer extends TableReducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
26

    
27
	private static final Log log = LogFactory.getLog(CoAuthorReducer.class);
28

    
29
	private static int MAX_COAUTHORS = 50;
30

    
31
	private int max_coauthors = MAX_COAUTHORS;
32

    
33
	@Override
34
	protected void setup(final Context context) throws IOException, InterruptedException {
35
		max_coauthors = Integer.parseInt(context.getConfiguration().get("max.coauthors"));
36
	}
37

    
38
	@Override
39
	protected void reduce(final ImmutableBytesWritable key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException,
40
			InterruptedException {
41

    
42
		final byte[] keyBytes = key.copyBytes();
43
		final String rowKey = new String(keyBytes);
44

    
45
		log.info("Reducing key: '" + rowKey + "'");
46

    
47
		final Function<Person, CoAuthor> personToCoAuthor = new Function<Person, CoAuthor>() {
48

    
49
			@Override
50
			public CoAuthor apply(final Person p) {
51
				final CoAuthor.Builder cb = CoAuthor.newBuilder();
52
				cb.setId(getId(rowKey, p.getMetadata().getFullname().getValue().trim()));
53
				cb.getMetadataBuilder().mergeFrom(p.getMetadata());
54
				return cb.build();
55
			}
56
		};
57

    
58
		final Oaf.Builder out = Oaf.newBuilder();
59
		final ConcurrentSizedMap<String, CoAuthor> coAuthors = new ConcurrentSizedMap<String, CoAuthor>();
60
		coAuthors.setQueueSize(max_coauthors);
61

    
62
		for (final ImmutableBytesWritable ibw : values) {
63
			final OafDecoder d = OafDecoder.decode(ibw.copyBytes());
64

    
65
			final Type type = d.getEntity().getType();
66
			switch (type) {
67
			case person:
68
				out.mergeFrom(d.getOaf());
69
				out.getEntityBuilder().getPersonBuilder().clearCoauthor();
70
				context.getCounter("coauthor", "merge").increment(1);
71
				break;
72
			case result:
73
				final List<Person> authorList = d.getEntity().getResult().getAuthorList();
74
				for (final CoAuthor coauthor : Lists.newArrayList(Iterables.transform(authorList, personToCoAuthor))) {
75
					coAuthors.put(coauthor.getId(), incrementCount(coAuthors, coauthor));
76
				}
77
				break;
78
			case datasource:
79
			case organization:
80
			case project:
81
			default:
82
				context.getCounter("coauthor", "skipped entity type (reduce)").increment(1);
83
				break;
84
			}
85
		}
86

    
87
		coAuthors.remove(rowKey); // remove himself
88
		out.getEntityBuilder().getPersonBuilder().addAllCoauthor(coAuthors.values());
89

    
90
		final Put put = new Put(keyBytes).add(Bytes.toBytes(Type.person.toString()), Bytes.toBytes("body"), out.build().toByteArray());
91
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
92
		context.write(key, put);
93
	}
94

    
95
	private CoAuthor incrementCount(final ConcurrentSizedMap<String, CoAuthor> coAuthors, final CoAuthor coauthor) {
96
		if (coAuthors.containsKey(coauthor.getId())) {
97
			final CoAuthor.Builder c = CoAuthor.newBuilder(coAuthors.get(coauthor.getId()));
98
			c.setCoauthoredpapers(c.getCoauthoredpapers() + 1);
99
			return c.build();
100
		}
101
		return coauthor;
102
	}
103

    
104
	private String getId(final String resultId, final String name) {
105
		final String prefix = StringUtils.substringBefore(StringUtils.substringAfter(resultId, "|"), "::");
106
		// final String id = StringUtils.substringAfter(resultId, "|") + "::" + name.replaceAll("\\s+", " ").trim();
107
		final String id = name.replaceAll("\\s+", " ").trim();
108
		return AbstractDNetXsltFunctions.oafId(Type.person.toString(), prefix, id);
109
	}
110

    
111
}
(3-3/7)