Project

General

Profile

1
package eu.dnetlib.data.mapreduce.util;
2

    
3
import java.util.*;
4
import java.util.function.Predicate;
5
import java.util.stream.Collectors;
6
import java.util.stream.Stream;
7

    
8
import com.google.common.base.Splitter;
9
import com.google.common.collect.Iterables;
10
import com.google.common.collect.Lists;
11
import com.google.common.collect.Sets;
12
import eu.dnetlib.data.proto.FieldTypeProtos.*;
13
import static eu.dnetlib.openaire.hadoop.utils.HBaseTableUtils.*;
14

    
15
import eu.dnetlib.data.proto.OafProtos;
16
import eu.dnetlib.data.proto.TypeProtos;
17
import eu.dnetlib.openaire.hadoop.utils.HBaseTableUtils;
18
import org.apache.commons.lang3.StringUtils;
19
import org.apache.hadoop.hbase.client.Durability;
20
import org.apache.hadoop.hbase.client.Put;
21
import org.apache.hadoop.hbase.client.Result;
22
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
23

    
24
import com.google.common.base.Function;
25
import com.google.protobuf.InvalidProtocolBufferException;
26

    
27
import eu.dnetlib.data.proto.OafProtos.Oaf;
28
import eu.dnetlib.data.transform.OafUtils;
29
import org.apache.hadoop.hbase.util.Bytes;
30
import org.apache.hadoop.mapreduce.Mapper;
31
import org.apache.hadoop.mapreduce.Reducer;
32

    
33

    
34

    
35
public class OafHbaseUtils extends OafUtils {
36

    
37
	public static OafDecoder decode(final ImmutableBytesWritable oaf) {
38
		return new OafDecoder(oaf.copyBytes());
39
	}
40

    
41
	public static Function<ImmutableBytesWritable, OafDecoder> decoder() {
42
		return input -> OafDecoder.decode(input.copyBytes());
43
	}
44

    
45
	public static Iterable<Oaf> asOaf(final Iterable<ImmutableBytesWritable> in) {
46
		return Iterables.transform(in, oafDecoder());
47
	}
48

    
49
	public static Function<ImmutableBytesWritable, Oaf> oafDecoder() {
50
		return input -> parse(input);
51
	}
52

    
53
	public static Oaf parse(final ImmutableBytesWritable input) {
54
		try {
55
			return Oaf.parseFrom(input.copyBytes());
56
		} catch (final InvalidProtocolBufferException e) {
57
			throw new IllegalArgumentException(e);
58
		}
59
	}
60

    
61
	public static <T> String getValue(T t) {
62
		return mapValue(t);
63
	}
64

    
65
	public static <T> String getKey(T t) {
66
		return mapKey(t);
67
	}
68

    
69
	public static <T> String getValue(Iterable<T> ts) {
70
		return Iterables.getFirst(listValues(ts), "");
71
	}
72

    
73
	public static <T> Set<String> hashSetValues(Iterable<T> ts) {
74
		return Sets.newHashSet(Iterables.transform(ts, t -> mapValue(t)));
75
	}
76

    
77
	public static <T> List<String> listValues(Iterable<T> ts) {
78
		return Lists.newArrayList(Iterables.transform(ts, t -> mapValue(t)));
79
	}
80

    
81
	public static <T> List<String> listObjects(Iterable<T> ts) {
82
		return Lists.newArrayList(Iterables.transform(ts, t -> mapObject(t)));
83
	}
84

    
85
	public static <T> String getKey(Iterable<T> ts) {
86
		return Iterables.getFirst(listKeys(ts), "");
87
	}
88

    
89
	public static <T> List<String> listKeys(Iterable<T> ts) {
90
		return Lists.newArrayList(Iterables.transform(ts, t -> mapKey(t)));
91
	}
92

    
93
	public static <T> Set<String> hashSetKeys(Iterable<T> ts) {
94
		return Sets.newHashSet(Iterables.transform(ts, t -> mapKey(t)));
95
	}
96

    
97
	private static <T> String mapKey(final T t) {
98
		if (t instanceof KeyValue) return ((KeyValue) t).getKey();
99
		if (t instanceof String) return (String) t;
100
		if (t instanceof Qualifier) return ((Qualifier) t).getClassid();
101

    
102
		throw new IllegalArgumentException(String.format("type %s not mapped", t.getClass()));
103
	}
104

    
105
	public static <T> String mapValue(final T t) {
106
		if (t instanceof StructuredProperty) return ((StructuredProperty) t).getValue();
107
		if (t instanceof KeyValue) return ((KeyValue) t).getValue();
108
		if (t instanceof String) return (String) t;
109
		if (t instanceof StringField) return ((StringField) t).getValue();
110
		if (t instanceof Qualifier) return ((Qualifier) t).getClassname();
111
		if (t instanceof Author) {
112
			Author a = (Author) t;
113
			if (a.getPidCount() == 0) {
114
				return a.getFullname();
115
			} else {
116
				return a.getFullname() + " " + listObjects(a.getPidList());
117
			}
118
		}
119

    
120
		throw new IllegalArgumentException(String.format("type %s not mapped", t.getClass()));
121
	}
122

    
123
	public static <T> String mapObject(final T t) {
124
		if (t instanceof KeyValue) {
125
			KeyValue kv = (KeyValue) t;
126
			return kv.getKey() + ":" + kv.getValue();
127
		}
128
		if (t instanceof String) return (String) t;
129

    
130
		throw new IllegalArgumentException(String.format("type %s not mapped", t.getClass()));
131
	}
132

    
133
	public static List<String> getPropertyValues(final Reducer.Context context, final String name) {
134
		return doGetPropertyValues(context.getConfiguration().get(name, ""));
135
	}
136

    
137
	public static List<String> getPropertyValues(final Mapper.Context context, final String name) {
138
		return doGetPropertyValues(context.getConfiguration().get(name, ""));
139
	}
140

    
141
	private static List<String> doGetPropertyValues(final String s) {
142
		return Lists.newArrayList(Splitter.on(",").omitEmptyStrings().split(s));
143
	}
144

    
145
	public static List<Oaf> rel(final Result value) {
146
		return value.list().stream()
147
				.filter(kv -> {
148
					final String q = new String(kv.getQualifier());
149
					return q.matches(OafRowKeyDecoder.ID_REGEX);
150
				})
151
				.filter(kv -> kv.getValue() != null && kv.getValue().length > 0)
152
				.map(kv -> parseProto(kv.getValue()))
153
				.collect(Collectors.toList());
154
	}
155

    
156
	public static Oaf parseProto(final byte[] value) {
157
		final OafDecoder d = OafDecoder.decode(value);
158
		return d.getOaf();
159
	}
160

    
161
	public static Put asPut(final Oaf oaf) {
162
		switch (oaf.getKind()) {
163
			case entity:
164

    
165
				final Put entity = getPut(oaf.getEntity().getId());
166
				byte[] cf = Bytes.toBytes(oaf.getEntity().getType().toString());
167
				return entity.add(cf, DedupUtils.BODY_B, oaf.toByteArray());
168
			case relation:
169
				final OafProtos.OafRel rel = oaf.getRel();
170
				final Put putRel = getPut(rel.getSource());
171

    
172
				final OafRelDecoder relDecoder = OafRelDecoder.decode(rel);
173

    
174
				final byte[] cfRel = Bytes.toBytes(relDecoder.getCFQ());
175
				final byte[] qualifier = Bytes.toBytes(rel.getTarget());
176

    
177
				return putRel.add(cfRel, qualifier, oaf.toByteArray());
178
			default:
179
				throw new IllegalArgumentException("invalid kind");
180
		}
181
	}
182

    
183
	private static Put getPut(final String rowkey) {
184
		final Put put = new Put(Bytes.toBytes(rowkey));
185
		put.setDurability(Durability.SKIP_WAL);
186
		return put;
187
	}
188

    
189
	public static byte[] getBodyB(final Result value, final TypeProtos.Type type) {
190
		return value.getValue(Bytes.toBytes(type.toString()), DedupUtils.BODY_B);
191
	}
192

    
193
	public static Oaf getBody(final Result value, final TypeProtos.Type type) throws InvalidProtocolBufferException {
194
		final byte[] body = getBodyB(value, type);
195
		return body != null ? parseProto(body) : null;
196
	}
197

    
198
}
(3-3/8)