Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dli;
2

    
3
import java.io.IOException;
4
import java.util.Arrays;
5
import java.util.Iterator;
6
import java.util.LinkedList;
7
import java.util.concurrent.atomic.AtomicInteger;
8

    
9
import com.google.common.base.Splitter;
10
import com.google.common.collect.Lists;
11
import com.google.protobuf.InvalidProtocolBufferException;
12
import com.googlecode.protobuf.format.JsonFormat;
13
import eu.dnetlib.data.mapreduce.JobParams;
14
import eu.dnetlib.data.mapreduce.hbase.dli.kv.DliKey;
15
import eu.dnetlib.data.proto.dli.ScholixObjectProtos;
16
import eu.dnetlib.data.proto.dli.Scholix2ObjectProtos;
17
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
18
import eu.dnetlib.dli.proto.ScholixVersion;
19
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
22
import org.apache.hadoop.io.Text;
23
import org.apache.hadoop.mapreduce.Reducer;
24

    
25
/**
26
 * Created by claudio on 13/03/2017.
27
 */
28
public class PrepareScholixDataReducer extends Reducer<DliKey, ImmutableBytesWritable, Text, Text> {
29

    
30
	private static final Log log = LogFactory.getLog(PrepareScholixDataReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
31

    
32
	private Text outKey;
33

    
34
	private Text outValue;
35
	private ScholixVersion scholixVersion ;
36

    
37

    
38
	@Override
39
	protected void setup(final Context context) throws IOException, InterruptedException {
40
		outKey = new Text("");
41
		outValue = new Text();
42
		scholixVersion = ScholixVersion.valueOf(context.getConfiguration().get(JobParams.SCHOLIXVERSION));
43

    
44
	}
45

    
46
	@Override
47
	protected void reduce(DliKey key, Iterable<ImmutableBytesWritable> values, Context context) throws IOException, InterruptedException {
48

    
49
		//System.out.println("reducing key = " + key);
50

    
51
		final Iterator<ImmutableBytesWritable> it = values.iterator();
52
		final ImmutableBytesWritable first = it.next();
53
		final AtomicInteger groupSize = new AtomicInteger(1);
54

    
55
		switch (scholixVersion) {
56
			case v1: {
57
				final ScholixObjectProtos.Scholix source = parsev1(key, first).build();
58

    
59
				if (!source.hasSource()) {
60
					context.getCounter("scholix", "missing source").increment(1);
61
					return;
62
				}
63

    
64
				it.forEachRemaining(t -> {
65
					groupSize.incrementAndGet();
66
					final ScholixObjectProtos.Scholix.Builder rel = parsev1(key, t);
67
					if (rel.hasTarget()) {
68
						rel.setSource(source.getSource());
69
						rel.setIdentifier(generateScholixIdentifier(rel));
70
						emit(context, JsonFormat.printToString(rel.build()));
71
					} else {
72
						context.getCounter("scholix", "missing target").increment(1);
73
					}
74
				});
75
				break;
76
			}
77
			case v2:{
78
				final Scholix2ObjectProtos.Scholix source = parsev2(key, first).build();
79

    
80
				if (!source.hasSource()) {
81
					context.getCounter("scholix", "missing source").increment(1);
82
					return;
83
				}
84
				it.forEachRemaining(t -> {
85
					groupSize.incrementAndGet();
86
					final Scholix2ObjectProtos.Scholix.Builder rel = parsev2(key, t);
87
					if (rel.hasTarget()) {
88
						rel.setSource(source.getSource());
89
						emit(context, JsonFormat.printToString(rel.build()));
90
					} else {
91
						context.getCounter("scholix", "missing target").increment(1);
92
					}
93
				});
94
				break;
95
			}
96
		}
97
		groupSizeCounter(context, groupSize.get(),
98
				"1,1",
99
				"1,10",
100
				"10,20",
101
				"20,100",
102
				"100,200",
103
				"200,500",
104
				"500,1000",
105
				"1000,2000",
106
				"2000,5000",
107
				"5000,10000",
108
				"10000,20000",
109
				"20000,*");
110

    
111

    
112

    
113
	}
114

    
115
	private void groupSizeCounter(final Context context, final int groupSize, final String... groups) {
116
		Arrays.asList(groups).forEach(g -> {
117
			final LinkedList<String> i = Lists.newLinkedList(Splitter.on(",").split(g));
118
			int min = Integer.parseInt(i.getFirst());
119
			int max = i.getLast().equals("*") ? Integer.MAX_VALUE : Integer.parseInt(i.getLast());
120
			groupSizeCounter(context, groupSize, min, max);
121
		});
122

    
123
	}
124

    
125
	private void groupSizeCounter(final Context context, final int groupSize, Integer min, Integer max) {
126
		if (groupSize > min & groupSize <= max) {
127
			context.getCounter("scholix groups", String.format("group size (%s,%s)", min, max)).increment(1);
128
		}
129
	}
130

    
131
	private ScholixObjectProtos.Scholix.Builder parsev1(DliKey key, final ImmutableBytesWritable value) {
132
		try {
133
			return ScholixObjectProtos.Scholix.newBuilder(ScholixObjectProtos.Scholix.parseFrom(value.copyBytes()));
134
		} catch (InvalidProtocolBufferException e) {
135
			throw new IllegalArgumentException(String.format("cannot parse Scholix, keytype '%s', id '%s'", key.getKeyType(), key.getId()));
136
		}
137
	}
138

    
139
	private String generateScholixIdentifier(ScholixObjectProtos.Scholix.Builder scholix) {
140
		final String relname = scholix.getRelationship().getName();
141
		final String sourceId = scholix.getSource().getDnetIdentifier();
142
		final String targetId = scholix.getTarget().getDnetIdentifier();
143
		return AbstractDNetXsltFunctions.md5(String.format("%s::%s::%s",sourceId,relname,targetId));
144
	}
145

    
146

    
147
	private Scholix2ObjectProtos.Scholix.Builder parsev2(DliKey key, final ImmutableBytesWritable value) {
148
		try {
149
			return Scholix2ObjectProtos.Scholix.newBuilder(Scholix2ObjectProtos.Scholix.parseFrom(value.copyBytes()));
150
		} catch (InvalidProtocolBufferException e) {
151
			throw new IllegalArgumentException(String.format("cannot parse Scholix, keytype '%s', id '%s'", key.getKeyType(), key.getId()));
152
		}
153
	}
154

    
155
	private void emit(final Context context, final String data) {
156
		outValue.set(data.getBytes());
157
		try {
158
			context.write(outKey, outValue);
159
		} catch (Exception e) {
160
			e.printStackTrace();
161
			throw new RuntimeException(e);
162
		}
163
	}
164

    
165
}
(2-2/4)