Project

General

Profile

« Previous | Next » 

Revision 53176

DedupBuildRoots[mapper|reducer] merged implementation from beta branch

View differences:

modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupBuildRootsMapper.java
5 5
import java.util.HashSet;
6 6
import java.util.Map;
7 7
import java.util.Set;
8
import java.util.stream.Collectors;
8 9

  
9
import com.google.common.base.Function;
10
import com.google.common.collect.Iterables;
11
import com.google.common.collect.Sets;
12 10
import com.google.protobuf.InvalidProtocolBufferException;
13 11
import eu.dnetlib.data.mapreduce.JobParams;
14 12
import eu.dnetlib.data.mapreduce.util.DedupUtils;
15
import eu.dnetlib.data.mapreduce.util.OafDecoder;
16 13
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
17 14
import eu.dnetlib.data.proto.KindProtos.Kind;
18 15
import eu.dnetlib.data.proto.OafProtos.Oaf;
19
import eu.dnetlib.data.proto.OafProtos.OafEntity;
20 16
import eu.dnetlib.data.proto.TypeProtos.Type;
21 17
import eu.dnetlib.data.transform.OafUtils;
22 18
import eu.dnetlib.pace.config.DedupConfig;
......
51 47
	protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException {
52 48
		// System.out.println("Find root mapping: " + new String(rowkey.copyBytes()));
53 49

  
54
		// TODO: remove this hack - here because we don't want to dedup datasets
55
		if (checkDataset(value)) return;
50
		if (isInvalid(value)) {
51
			context.getCounter(dedupConf.getWf().getEntityType(), "not valid").increment(1);
52
			return;
53
		}
56 54

  
57 55
		final Map<byte[], byte[]> mergedIn = value.getFamilyMap(DedupUtils.getDedupCF_mergedInBytes(Type.valueOf(dedupConf.getWf().getEntityType())));
58 56

  
......
116 114
	}
117 115

  
118 116
	private HashSet<String> getStrings(final Map<byte[], byte[]> mergedIn) {
119
		return Sets.newHashSet(Iterables.transform(mergedIn.keySet(), new Function<byte[], String>() {
120

  
121
			@Override
122
			public String apply(final byte[] input) {
123
				return new String(input);
124
			}
125
		}));
117
		return mergedIn.keySet().stream()
118
				.map(b -> new String(b))
119
				.collect(Collectors.toCollection(HashSet::new));
126 120
	}
127 121

  
128
	private boolean checkDataset(final Result value) {
122
	private boolean isInvalid(final Result value) {
129 123
		final Map<byte[], byte[]> bodyMap = value.getFamilyMap(dedupConf.getWf().getEntityType().getBytes());
130 124

  
131 125
		if ((bodyMap == null) || bodyMap.isEmpty()) return true;
......
134 128

  
135 129
		if (bodyB == null) return true;
136 130

  
137
		final OafEntity entity = OafDecoder.decode(bodyB).getEntity();
138

  
139
		if (entity.getType().equals(Type.result) && entity.getResult().getMetadata().getResulttype().getClassid().equals("dataset")) return true;
140

  
141 131
		return false;
142 132
	}
143 133

  
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupBuildRootsReducer.java
12 12
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils;
13 13
import eu.dnetlib.data.mapreduce.util.OafRelDecoder;
14 14
import eu.dnetlib.data.proto.OafProtos.Oaf;
15
import eu.dnetlib.data.proto.TypeProtos.Type;
16 15
import eu.dnetlib.data.transform.OafEntityMerger;
17 16
import eu.dnetlib.pace.config.DedupConfig;
18 17
import org.apache.commons.lang.StringUtils;
18
import org.apache.hadoop.hbase.client.Durability;
19 19
import org.apache.hadoop.hbase.client.Put;
20
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
20 21
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
21 22
import org.apache.hadoop.hbase.mapreduce.TableReducer;
23
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
22 24
import org.apache.hadoop.hbase.util.Bytes;
23 25
import org.apache.hadoop.io.Text;
24 26

  
......
59 61

  
60 62
		for (final Oaf oaf : toOaf(values)) {
61 63
			switch (oaf.getKind()) {
62
			case entity:
63
				entities.add(oaf);
64
				break;
65
			case relation:
66
				handleRels(context, rowkey, oaf, true);
67
				break;
68
			default:
69
				break;
64
				case entity:
65
					entities.add(oaf);
66
					break;
67
				case relation:
68
					handleRels(context, rowkey, oaf);
69
					break;
70
				default:
71
					break;
70 72
			}
71 73
		}
72 74

  
......
78 80
			context.getCounter(dedupConf.getWf().getEntityType() + " root group size", "> " + JobParams.MAX_COUNTERS).increment(1);
79 81
		}
80 82

  
81
		emit(context, rowkey, dedupConf.getWf().getEntityType(), DedupUtils.BODY_S, builder.build().toByteArray(), "root");
83
		emit(builder.getEntity().getType().toString(),
84
				context, rowkey, dedupConf.getWf().getEntityType(), DedupUtils.BODY_S, builder.build().toByteArray(), "root");
82 85

  
83
		// add person rels TODO: remove this hack
84
		// context.getCounter("hack", "personResult out").increment(personMap.size());
85

  
86 86
	}
87 87

  
88 88
	private Iterable<Oaf> toOaf(final Iterable<ImmutableBytesWritable> values) {
89 89
		return Iterables.transform(values, OafHbaseUtils.oafDecoder());
90 90
	}
91 91

  
92
	private void handleRels(final Context context, final byte[] rowkey, final Oaf oaf, final boolean hack) throws IOException, InterruptedException {
92
	private void handleRels(final Context context, final byte[] rowkey, final Oaf oaf) throws IOException, InterruptedException {
93 93

  
94
		if (hack && checkHack(new String(rowkey), oaf)) {
95
			context.getCounter("hack", "personResult in").increment(1);
96
		} else {
94
		// emit relation from the root to the related entities
95
		OafDecoder decoder = rootToEntity(rowkey, oaf);
96
		emit("emit relation from the root to the related entities",
97
				context, rowkey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "[root -> entity]");
97 98

  
98
			// emit relation from the root to the related entities
99
			OafDecoder decoder = rootToEntity(rowkey, oaf);
100
			emit(context, rowkey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "[root -> entity]");
99
		// emit relation from the related entities to the root
100
		decoder = entityToRoot(rowkey, oaf);
101
		byte[] revKey = Bytes.toBytes(decoder.relSourceId());
102
		emit("emit relation from the related entities to the root",
103
				context, revKey, decoder.getCFQ(), new String(rowkey), decoder.toByteArray(), "[entity -> root]");
101 104

  
102
			// emit relation from the related entities to the root
103
			decoder = entityToRoot(rowkey, oaf);
104
			final byte[] revKey = Bytes.toBytes(decoder.relSourceId());
105
			emit(context, revKey, decoder.getCFQ(), new String(rowkey), decoder.toByteArray(), "[entity -> root]");
106
		}
107 105
		// mark relation from the related entities to the duplicate as deleted
108
		OafDecoder decoder = markDeleted(oaf, true);
109
		byte[] revKey = Bytes.toBytes(decoder.relSourceId());
110
		emit(context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [dup -> entity]");
106
		decoder = markDeleted(oaf, true);
107
		revKey = Bytes.toBytes(decoder.relSourceId());
108
		emit("mark relation from the related entities to the duplicate as deleted",
109
				context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [dup -> entity]");
111 110

  
112 111
		// mark relation from the related entities to the duplicate as deleted
113 112
		decoder = markDeleted(oaf, false);
114 113
		revKey = Bytes.toBytes(decoder.relSourceId());
115
		emit(context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [entity -> dup]");
114
		emit("mark relation from the related entities to the duplicate as deleted",
115
				context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [entity -> dup]");
116 116
	}
117 117

  
118
	private void emit(final Context context, final byte[] rowkey, final String family, final String qualifier, final byte[] value, final String label)
119
			throws IOException, InterruptedException {
118
	private void emit(final String msg, final Context context, final byte[] rowkey, final String family, final String qualifier, final byte[] value, final String label) {
119

  
120 120
		final Put put = new Put(rowkey).add(Bytes.toBytes(family), Bytes.toBytes(qualifier), value);
121
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
122
		context.write(new ImmutableBytesWritable(rowkey), put);
123
		context.getCounter(family, label).increment(1);
124
	}
121
		put.setDurability(Durability.SKIP_WAL);
125 122

  
126
	// /////////////////
127

  
128
	public boolean checkHack(final String root, final Oaf oaf) {
129

  
130
		boolean res;
131
		if (dedupConf.getWf().getEntityType().equals(Type.result.toString()) && !md5matches(root, oaf.getRel().getSource())) {
132

  
133
			res = true;
134
		} else {
135
			res = false;
123
		try {
124
			context.write(new ImmutableBytesWritable(rowkey), put);
125
		} catch (Throwable e) {
126
			System.err.println(
127
					String.format("%s, rowkey %s, family %s, qualifier %s",
128
							msg, new String(rowkey), family, qualifier));
129
			throw new RuntimeException(e);
136 130
		}
137 131

  
138
		// if (root.equals("50|dedup_wf_001::92f6197ea6f16ae554755aced832fb6f")) {
139
		// System.out.println("##################");
140
		// System.out.println("root  : " + root);
141
		// System.out.println("source: " + oaf.getRel().getSource());
142
		// System.out.println("ckeck:  " + res);
143
		// }
144

  
145
		return res;
132
		context.getCounter(family, label).increment(1);
146 133
	}
147 134

  
148
	private boolean md5matches(final String id1, final String id2) {
149
		return id1.replaceAll("^.*\\:\\:", "").equals(id2.replaceAll("^.*\\:\\:", ""));
150
	}
135
	// /////////////////
151 136

  
152 137
	private OafDecoder rootToEntity(final byte[] rootRowkey, final Oaf rel) {
153 138
		return patchRelations(rootRowkey, rel, OafPatch.rootToEntity);
......
168 153
		final Oaf.Builder builder = Oaf.newBuilder(rel);
169 154
		builder.getDataInfoBuilder().setInferred(true).setDeletedbyinference(false);
170 155
		switch (patchKind) {
171
		case rootToEntity:
172
			// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:rootToEntity)");
173
			builder.getRelBuilder().setSource(id);
174
			break;
156
			case rootToEntity:
157
				// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:rootToEntity)");
158
				builder.getRelBuilder().setSource(id);
159
				break;
175 160

  
176
		case entityToRoot:
177
			final String relClass = rel.getRel().getRelClass();
161
			case entityToRoot:
162
				final String relClass = rel.getRel().getRelClass();
178 163
			/*
179 164
			if(StringUtils.isBlank(relClass)) {
180 165
				throw new IllegalStateException(String.format("missing relation term for %s in row %s", rel.getRel().getRelType().name(), id));
181 166
			}
182 167
			*/
183
			final String inverse = relClasses.getInverse(relClass);
184
			if(StringUtils.isBlank(inverse)) {
185
				throw new IllegalStateException(String.format("missing inverse relation for %s in row %s", relClass, id));
186
			}
187
			builder.setRel(decoder.setClassId(inverse));
188
			// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:entityToRoot)");
189
			builder.getRelBuilder().setSource(builder.getRel().getTarget());
190
			builder.getRelBuilder().setTarget(id);
191
			break;
168
				final String inverse = relClasses.getInverse(relClass);
169
				if(StringUtils.isBlank(inverse)) {
170
					throw new IllegalStateException(String.format("missing inverse relation for %s in row %s", relClass, id));
171
				}
172
				builder.setRel(decoder.setClassId(inverse));
173
				// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:entityToRoot)");
174
				builder.getRelBuilder().setSource(builder.getRel().getTarget());
175
				builder.getRelBuilder().setTarget(id);
176
				break;
192 177

  
193
		default:
194
			break;
178
			default:
179
				throw new IllegalStateException(String.format("case not implemented for %s", patchKind.toString()));
195 180
		}
196 181

  
197 182
		return OafDecoder.decode(builder.build());

Also available in: Unified diff