Project

General

Profile

1
package eu.dnetlib.data.mapreduce.util.dao;
2

    
3
import java.nio.ByteBuffer;
4
import java.util.*;
5
import java.util.Map.Entry;
6
import java.util.function.Function;
7
import java.util.function.Predicate;
8
import java.util.stream.Collectors;
9
import java.util.stream.Stream;
10

    
11
import com.google.protobuf.InvalidProtocolBufferException;
12
import eu.dnetlib.data.graph.model.DNGFDecoder;
13
import eu.dnetlib.data.graph.model.DNGFRelDecoder;
14
import eu.dnetlib.data.graph.model.DNGFRowKeyDecoder;
15
import eu.dnetlib.data.graph.utils.RelDescriptor;
16
import eu.dnetlib.data.mapreduce.JobParams;
17
import eu.dnetlib.data.proto.*;
18
import eu.dnetlib.data.proto.DNGFProtos.DNGF;
19
import eu.dnetlib.data.proto.DNGFProtos.DNGFRel;
20
import eu.dnetlib.data.proto.DNGFProtos.DNGFRel.Builder;
21
import eu.dnetlib.data.proto.DliFieldTypeProtos;
22
import eu.dnetlib.data.proto.DliProtos;
23
import eu.dnetlib.data.proto.FieldTypeProtos.KeyValue;
24
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier;
25
import eu.dnetlib.data.proto.TypeProtos.Type;
26
import eu.dnetlib.data.proto.WDSFieldTypeProtos;
27
import eu.dnetlib.data.proto.WdsDatasetProtos;
28
import eu.dnetlib.data.proto.WdsPublicationProtos;
29
import eu.dnetlib.data.transform.Ontologies;
30
import org.apache.commons.lang3.StringUtils;
31
import org.apache.hadoop.hbase.client.Put;
32
import org.apache.hadoop.hbase.client.Result;
33
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
34
import org.apache.hadoop.hbase.util.Bytes;
35

    
36
import static eu.dnetlib.data.graph.utils.RelDescriptor.ONTOLOGY_SEPARATOR;
37
import static eu.dnetlib.data.graph.utils.RelDescriptor.QUALIFIER_SEPARATOR;
38

    
39
/**
40
 * Created by claudio on 16/01/2017.
41
 */
42
public class HBaseTableDAO {
43

    
44
	public static final String ROOT = "dedup_wf";
45

    
46
	public static byte[] cfMetadataByte() {
47
		return Bytes.toBytes(cfMetadata());
48
	}
49

    
50
	public static String cfMetadata() {
51
		return ColumnFamily.metadata.toString();
52
	}
53

    
54
	public static byte[] cfRelsByte() {
55
		return Bytes.toBytes(cfRels());
56
	}
57

    
58
	public static String cfRels() {
59
		return ColumnFamily.rels.toString();
60
	}
61

    
62
	public static RelDescriptor parseQualifier(final byte[] qualifier) {
63
		return parseQualifier(new String(qualifier));
64
	}
65

    
66
	private static RelDescriptor parseQualifier(final String qualifier) {
67
		return new RelDescriptor(qualifier);
68
	}
69

    
70
	public static String newId(final String id, final String dedupRun) {
71
		if ((dedupRun == null) || (dedupRun.length() != 3)) throw new IllegalArgumentException("wrong dedupRun param");
72

    
73
		return id.replaceFirst("\\|.*\\:\\:", dedupPrefix(dedupRun));
74
	}
75

    
76
	public static byte[] newIdBytes(final String s, final String dedupRun) {
77
		return newId(s, dedupRun).getBytes();
78
	}
79

    
80
	public static byte[] newIdBytes(final ByteBuffer b, final String dedupRun) {
81
		return newId(new String(b.array()), dedupRun).getBytes();
82
	}
83

    
84
	public static String dedupPrefix(final String dedupRun) {
85
		return "|" + ROOT + "_" + dedupRun + "::";
86
	}
87

    
88
	public static boolean isRoot(final String s) {
89
		return s.contains(ROOT);
90
	}
91

    
92
	public static DNGFRel.Builder getDedup(final String from, final String to, final String relQualifier) {
93
		final RelDescriptor rd = new RelDescriptor(relQualifier);
94
		final Builder rel =
95
				DNGFRel.newBuilder()
96
						.setRelType(Qualifier.newBuilder()
97
								.setClassid(rd.getTermCode()).setClassname(rd.getTermCode())
98
								.setSchemeid(rd.getOntologyCode()).setSchemename(rd.getOntologyCode()))
99
						.setSource(from).setSourceType(DNGFRowKeyDecoder.decode(from).getType())
100
						.setTarget(to).setTargetType(DNGFRowKeyDecoder.decode(to).getType())
101
						.setChild(false);
102
		return rel;
103
	}
104

    
105
	public static boolean isRoot(final ImmutableBytesWritable s) {
106
		return isRoot(s.copyBytes());
107
	}
108

    
109
	public static boolean isRoot(final byte[] s) {
110
		return isRoot(new String(s));
111
	}
112

    
113
	public static String getDedupQualifier_merges(final Type type, final String targetId) {
114
		return getDedupQualifier_merges(type) + QUALIFIER_SEPARATOR + targetId;
115
	}
116

    
117
	public static byte[] getDedupQualifier_mergesBytes(final Type type, final String targetId) {
118
		return Bytes.toBytes(getDedupQualifier_merges(type, targetId));
119
	}
120

    
121
	public static String getDedupQualifier_merges(final Type type) {
122
		return getDedupRelType(type) + ONTOLOGY_SEPARATOR + "merges";
123
	}
124

    
125
	public static String getDedupQualifier_mergedIn(final Type type) {
126
		return getDedupRelType(type) + ONTOLOGY_SEPARATOR + "isMergedIn";
127
	}
128

    
129
	public static String getDedupQualifier_mergedIn(final Type type, final String targetId) {
130
		return getDedupQualifier_mergedIn(type) + QUALIFIER_SEPARATOR + targetId;
131
	}
132

    
133
	public static String getDedupQualifier_merges(final String type) {
134
		return getDedupQualifier_merges(Type.valueOf(type));
135
	}
136

    
137
	public static byte[] getDedupQualifier_mergesBytes(final Type type) {
138
		return Bytes.toBytes(getDedupQualifier_merges(type));
139
	}
140

    
141
	public static byte[] getDedupQualifier_mergesBytes(final String type) {
142
		return getDedupQualifier_mergesBytes(Type.valueOf(type));
143
	}
144

    
145
	public static byte[] getDedupQualifier_mergesBytes(final String type, final String targetId) {
146
		return getDedupQualifier_mergesBytes(Type.valueOf(type), targetId);
147
	}
148

    
149
	public static String getDedupQualifier_mergedIn(final String type) {
150
		return getDedupQualifier_mergedIn(Type.valueOf(type));
151
	}
152

    
153
	public static byte[] getDedupQualifier_mergedInBytes(final Type type) {
154
		return Bytes.toBytes(getDedupQualifier_mergedIn(type));
155
	}
156

    
157
	public static byte[] getDedupQualifier_mergedInBytes(final Type type, final String targetId) {
158
		return Bytes.toBytes(getDedupQualifier_mergedIn(type, targetId));
159
	}
160

    
161
	public static byte[] getDedupQualifier_mergedInBytes(final String type) {
162
		return getDedupQualifier_mergedInBytes(Type.valueOf(type));
163
	}
164

    
165
	public static String getSimilarityQualifier(final Type type) {
166
		return getDedupRelType(type) + ONTOLOGY_SEPARATOR + "isSimilarTo";
167
	}
168

    
169
	public static String getSimilarityQualifier(final Type type, final String targetId) {
170
		return getSimilarityQualifier(type) + QUALIFIER_SEPARATOR + targetId;
171
	}
172

    
173
	public static String getSimilarityQualifier(final String type) {
174
		return getSimilarityQualifier(Type.valueOf(type));
175
	}
176

    
177
	public static byte[] getSimilarityQualifierBytes(final Type type) {
178
		return Bytes.toBytes(getSimilarityQualifier(type));
179
	}
180

    
181
	public static byte[] getSimilarityQualifierBytes(final Type type, final String targetId) {
182
		return Bytes.toBytes(getSimilarityQualifier(type, targetId));
183
	}
184

    
185
    public static String getInverseRelation(final DNGF.Builder inputRel, final Ontologies ontologies) {
186

    
187
        return getInverseRelation(DNGFRelDecoder.decode(inputRel.getRel()).getRelDescriptor(), ontologies);
188
    }
189

    
190

    
191
    public static String getInverseRelation(final RelDescriptor inputRel, final Ontologies ontologies) {
192
        try {
193
            final String inverseRelation = ontologies.inverseOf(inputRel);
194
            if (StringUtils.isBlank(inverseRelation)) {
195
                return "unknown";
196
            }
197
            return inverseRelation;
198
        } catch (Throwable e) {
199
            return "unknown";
200
        }
201
    }
202

    
203
	public static byte[] getSimilarityQualifierBytes(final String type) {
204
		return getSimilarityQualifierBytes(Type.valueOf(type));
205
	}
206

    
207
	public static String getRelTypeString(final Type type) {
208
		return getDedupRelType(type).toString();
209
	}
210

    
211
	public static List<String> getTargetIds(final Result result, final String qualifier) {
212

    
213
		final Map<byte[], byte[]> rels = result.getFamilyMap(cfRelsByte());
214
		if (rels == null) {
215
			return new ArrayList<>();
216
		}
217
		return getTargetIds(rels, qualifier);
218
	}
219

    
220
	public static List<String> getTargetIds(final Map<byte[], byte[]> rels, final String semantics) {
221
		return rels.keySet().stream()
222
				.map(b -> new String(b))
223
				.filter(s -> s.contains(semantics))
224
				.map(s -> StringUtils.substringAfter(s, QUALIFIER_SEPARATOR))
225
				.collect(Collectors.toList());
226
	}
227

    
228
	public static byte[] getMetadataB(final Result value, final Type type) {
229
		return value.getValue(cfMetadataByte(), Bytes.toBytes(type.toString()));
230
	}
231

    
232
	public static DNGF getMetadata(final Result value, final Type type) throws InvalidProtocolBufferException {
233
		final byte[] body = getMetadataB(value, type);
234
		return body != null ? parseProto(body) : null;
235
	}
236

    
237
	public static String getDedupRelType(final Type type) {
238
		switch (type) {
239
		case organization:
240
			return "organization_organization";
241
		case person:
242
			return "person_person";
243
		case publication:
244
			return "publication_publication";
245
		case dataset:
246
			return "dataset_dataset";
247
            case unknown:
248
                return "unknown_unknown";
249

    
250
		default:
251
			throw new IllegalArgumentException("Deduplication not supported for entity type: " + type);
252
		}
253
	}
254

    
255
	public static Put asPutByCollectedFrom(final DNGF dngf) {
256
		switch (dngf.getKind()) {
257
		case entity:
258
			return asPut(dngf, sumHashes(dngf.getEntity().getCollectedfromList()).longValue());
259
		case relation:
260
			return asPut(dngf, sumHashes(dngf.getRel().getCollectedfromList()).longValue());
261
		default:
262
			throw new IllegalArgumentException("invalid kind");
263
		}
264
	}
265

    
266
	private static Integer sumHashes(final List<KeyValue> cfs) {
267
		return cfs.stream()
268
				.map(kv -> kv.getValue())
269
				.map(s -> s.hashCode())
270
                .map(h -> Math.abs(h))
271
                .collect(Collectors.summingInt(Integer::intValue));
272
	}
273

    
274
	public static Put asPut(final DNGF dngf, final Long ts) {
275
		switch (dngf.getKind()) {
276
		case entity:
277

    
278
			final Put entity = getPut(dngf.getEntity().getId(), ts);
279
			return entity.add(cfMetadataByte(), Bytes.toBytes(dngf.getEntity().getType().toString()), dngf.toByteArray());
280
		case relation:
281
			final DNGFRel rel = dngf.getRel();
282
			final Put putRel = getPut(rel.getSource(), ts);
283

    
284
			final Qualifier relType = rel.getRelType();
285
			final String qualifier = relType.getSchemeid() + ONTOLOGY_SEPARATOR + relType.getClassid() + QUALIFIER_SEPARATOR + rel.getTarget();
286

    
287
			return putRel.add(cfRelsByte(), Bytes.toBytes(qualifier), dngf.toByteArray());
288
		default:
289
			throw new IllegalArgumentException("invalid kind");
290
		}
291
	}
292

    
293
	private static Put getPut(final String rowkey, final Long ts) {
294
		final Put put = ts != null ? new Put(Bytes.toBytes(rowkey), ts) : new Put(Bytes.toBytes(rowkey));
295
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
296
		return put;
297
	}
298

    
299
	public static Put asPut(final DNGF dngf) {
300
		return asPut(dngf, null);
301
	}
302

    
303
	public static byte[] rowKey(final DNGF dngf) {
304
		switch (dngf.getKind()) {
305
		case entity:
306
			return dngf.getEntity().getId().getBytes();
307
		case relation:
308
			final DNGFRel rel = dngf.getRel();
309
			return rel.getSource().getBytes();
310
		default:
311
			throw new IllegalArgumentException("invalid kind");
312
		}
313
	}
314

    
315
	public static Map<String, Map<Long, DNGF>> relVersions(final Result value, final String... filter) {
316
		final NavigableMap<byte[], NavigableMap<Long, byte[]>> map = value.getMap().get(cfRelsByte());
317
		return filterMapKeys(map, filter)
318
				.collect(Collectors.toMap(
319
						key -> key,
320
						key -> map.get(key.getBytes()).entrySet()
321
								.stream()
322
								.collect(Collectors.toMap(
323
										Entry::getKey,
324
										inner -> parseProto(inner.getValue()))
325
								)));
326
	}
327

    
328
	public static Map<String, DNGF> rel(final Result value, final String... filter) {
329
		final NavigableMap<byte[], byte[]> map = value.getFamilyMap(cfRelsByte());
330
		return filterMapKeys(map, filter)
331
				.collect(Collectors.toMap(
332
						key -> key,
333
						key -> parseProto(map.get(key.getBytes()))));
334
	}
335

    
336
	private static Stream<String> filterMapKeys(final Map<byte[], ?> map, final String... filter) {
337
		return map.keySet().stream()
338
				.map(Bytes::toString)
339
				.filter(asFilter(filter));
340
	}
341

    
342
	private static Predicate<String> asFilter(final String... filters) {
343
		return qualifier -> Arrays.asList(filters).stream().allMatch(filter -> !qualifier.contains(filter));
344
	}
345

    
346
	public static DNGF parseProto(final byte[] value) {
347
		final DNGFDecoder d = DNGFDecoder.decode(value, DliFieldTypeProtos.completionStatus,
348
				DliProtos.completionStatus, DliProtos.resolvedfrom, DliProtos.typedIdentifier,
349
				WdsDatasetProtos.WdsDataset.geolocation, WdsDatasetProtos.WdsDataset.otherRels, WdsPublicationProtos.WdsPublication.projects,
350
				WdsDatasetProtos.WdsDataset.projects, WDSFieldTypeProtos.identifierType,WDSFieldTypeProtos.identifierValue, WDSFieldTypeProtos.relationSemantic
351
		);
352
		return d.getDNGF();
353
	}
354

    
355
	public static Function<DNGFDecoder, String> idDecoder() {
356
		return input -> input.getEntityId();
357
	}
358

    
359
	public static ImmutableBytesWritable ibw(final String targetId) {
360
		return new ImmutableBytesWritable(Bytes.toBytes(targetId));
361
	}
362

    
363
    public enum ColumnFamily {metadata, rels}
364

    
365
}
    (1-1/1)