Project

General

Profile

« Previous | Next » 

Revision 34901

temporary commit

View differences:

modules/dnet-mapreduce-jobs/branches/offlineDedup/install.sh
2 2

  
3 3
mvn clean install -DskipTests=true;
4 4
rm -rf ~/.m2/repository/eu/dnetlib/dnet-mapreduce-jobs-assembly;
5
mvn assembly:assembly -DskipTests=true && mvn install:install-file -Dfile=target/dnet-mapreduce-jobs-0.0.6.3-SNAPSHOT-jar-with-dependencies.jar -DgroupId=eu.dnetlib -DartifactId=dnet-mapreduce-jobs-assembly -Dversion=0.0.6.3-SNAPSHOT -Dpackaging=jar
5
mvn assembly:assembly -DskipTests=true && mvn install:install-file -Dfile=target/dnet-mapreduce-jobs-0.0.6.4-SNAPSHOT-jar-with-dependencies.jar -DgroupId=eu.dnetlib -DartifactId=dnet-mapreduce-jobs-assembly -Dversion=0.0.6.4-SNAPSHOT -Dpackaging=jar
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/test/java/eu/dnetlib/data/mapreduce/util/XmlRecordFactoryTest.java
45 45

  
46 46
	@Test
47 47
	public void testJsonProtobuf() {
48
		final OafDecoder decoder = OafTest.embed(OafTest.getResult("id"), Kind.entity);
48
		final String id = "id";
49
		final OafDecoder decoder = OafTest.embed(OafTest.getResult(id), Kind.entity);
50

  
51
		builder.setMainEntity(decoder);
52

  
49 53
		final String json = JsonFormat.printToString(decoder.getOaf());
50 54
		System.out.println(json);
51 55
		System.out.println("json size: " + json.length());
......
54 58
		final String base64String = Base64.encodeBase64String(decoder.getOaf().toByteArray());
55 59
		System.out.println("base64 size: " + base64String.length());
56 60

  
57
		System.out.println("decoded " + JsonFormat.printToString(OafDecoder.decode(Base64.decodeBase64(base64String)).getOaf()));
61
		// System.out.println("decoded " + JsonFormat.printToString(OafDecoder.decode(Base64.decodeBase64(base64String)).getOaf()));
62

  
63
		final String xml = builder.build().replaceAll("\\n", " ").replaceAll("\\s+", " ").replaceAll("> <", "><");
64
		System.out.println("xml size: " + xml.length());
65
		System.out.println(xml.replaceAll("\\n", " ").replaceAll("\\s+", " ").replaceAll("> <", "><"));
58 66
	}
59 67

  
60 68
	@Test
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/test/resources/eu/dnetlib/data/transform/record.xml
32 32
    <dc:source>TDX (Tesis Doctorals en Xarxa)</dc:source>
33 33
    <dc:subject>Ciències Humanes</dc:subject>
34 34
    <dc:subject>82 - Literatura</dc:subject>
35
    <dc:format>dc.format</dc:format>
35 36
    <dr:CobjCategory>0006</dr:CobjCategory>
36 37
    <dr:CobjCategory>0000</dr:CobjCategory>
37 38
    <dr:CobjIdentifier>urn:isbn:9788469416310</dr:CobjIdentifier>
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupMapper.java
1 1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2 2

  
3 3
import java.io.IOException;
4
import java.util.ArrayList;
4 5
import java.util.Collection;
6
import java.util.List;
5 7
import java.util.Map;
6 8

  
9
import org.apache.commons.collections.MapUtils;
7 10
import org.apache.hadoop.hbase.client.Result;
8 11
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9 12
import org.apache.hadoop.hbase.mapreduce.TableMapper;
10 13
import org.apache.hadoop.hbase.util.Bytes;
11 14
import org.apache.hadoop.io.Text;
12 15

  
16
import com.google.common.base.Function;
17
import com.google.common.collect.Iterables;
18
import com.google.common.collect.Lists;
13 19
import com.google.common.collect.Maps;
14 20

  
21
import eu.dnetlib.data.mapreduce.JobParams;
15 22
import eu.dnetlib.data.mapreduce.util.DedupUtils;
16 23
import eu.dnetlib.data.mapreduce.util.OafDecoder;
17
import eu.dnetlib.data.proto.OafProtos.OafEntity;
18 24
import eu.dnetlib.data.proto.TypeProtos.Type;
19 25
import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner;
20 26
import eu.dnetlib.pace.config.Config;
21 27
import eu.dnetlib.pace.config.DynConf;
22 28
import eu.dnetlib.pace.model.MapDocument;
23 29
import eu.dnetlib.pace.model.ProtoDocumentBuilder;
30
import eu.dnetlib.pace.model.RootMapDocument;
24 31
import eu.dnetlib.pace.util.DedupConfig;
25 32
import eu.dnetlib.pace.util.DedupConfigLoader;
26 33

  
......
39 46
	@Override
40 47
	protected void setup(final Context context) throws IOException, InterruptedException {
41 48

  
42
		paceConf = DynConf.load(context.getConfiguration().get("dedup.pace.conf"));
43
		dedupConf = DedupConfigLoader.load(context.getConfiguration().get("dedup.wf.conf"));
49
		paceConf = DynConf.load(context.getConfiguration().get(JobParams.PACE_CONF));
50
		dedupConf = DedupConfigLoader.load(context.getConfiguration().get(JobParams.WF_CONF));
44 51
		blackListMap = paceConf.blacklists();
45 52

  
46 53
		outKey = new Text();
......
59 66
	protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException, InterruptedException {
60 67
		// System.out.println("got key: " + new String(keyIn.copyBytes()));
61 68

  
62
		byte[] body = result.getValue(dedupConf.getEntityType().getBytes(), DedupUtils.BODY_B);
69
		final byte[] body = result.getValue(dedupConf.getEntityType().getBytes(), DedupUtils.BODY_B);
63 70

  
64 71
		if (body != null) {
65 72

  
66
			final OafEntity entity = OafDecoder.decode(body).getEntity();
73
			final OafDecoder decoder = OafDecoder.decode(body);
67 74

  
68
			context.getCounter(entity.getType().toString(), "decoded").increment(1);
75
			context.getCounter(dedupConf.getEntityType(), "decoded").increment(1);
69 76

  
70
			if (entity.getType().equals(Type.valueOf(dedupConf.getEntityType()))) {
77
			if (decoder.getEntity().getType().equals(Type.valueOf(dedupConf.getEntityType()))) {
71 78

  
79
				if (decoder.getOaf().getDataInfo().getDeletedbyinference()) {
80
					context.getCounter(dedupConf.getEntityType(), "deleted by inference").increment(1);
81
					return;
82
				}
83
				if (decoder.getOaf().getDataInfo().getInferred()) {
84
					context.getCounter(dedupConf.getEntityType(), "inferred").increment(1);
85
				}
86

  
72 87
				// TODO: remove this hack - here because we don't want to dedup datasets
73
				if (entity.getType().equals(Type.result) && entity.getResult().getMetadata().getResulttype().getClassid().equals("dataset")) { return; }
88
				if (isDataset(decoder)) return;
74 89

  
75 90
				// GeneratedMessage metadata = OafEntityDecoder.decode(entity).getEntity();
76
				MapDocument doc = ProtoDocumentBuilder.newInstance(Bytes.toString(keyIn.copyBytes()), entity, paceConf.fields());
77
				emitNGrams(context, doc, BlacklistAwareClusteringCombiner.filterAndCombine(doc, paceConf, blackListMap));
91
				final MapDocument doc = ProtoDocumentBuilder.newInstance(Bytes.toString(keyIn.copyBytes()), decoder.getEntity(), paceConf.fields());
92

  
93
				final List<String> merges = getMergedIds(result);
94

  
95
				emitNGrams(context, new RootMapDocument(doc, merges), BlacklistAwareClusteringCombiner.filterAndCombine(doc, paceConf, blackListMap));
78 96
			}
79 97
		} else {
80 98
			context.getCounter(dedupConf.getEntityType(), "missing body").increment(1);
81 99
		}
82 100
	}
83 101

  
84
	private void emitNGrams(final Context context, final MapDocument doc, final Collection<String> ngrams) throws IOException, InterruptedException {
85
		for (String ngram : ngrams) {
102
	private List<String> getMergedIds(final Result result) {
103

  
104
		final Map<byte[], byte[]> mergesMap = result.getFamilyMap(DedupUtils.getDedupCF_mergesBytes(dedupConf.getEntityType()));
105
		return MapUtils.isEmpty(mergesMap) ? new ArrayList<String>() : Lists.newArrayList(Iterables.transform(mergesMap.keySet(),
106
				new Function<byte[], String>() {
107

  
108
					@Override
109
					public String apply(final byte[] id) {
110

  
111
						return Bytes.toString(id);
112
					}
113
				}));
114
	}
115

  
116
	private boolean isDataset(final OafDecoder decoder) {
117
		return decoder.getEntity().getType().equals(Type.result)
118
				&& decoder.getEntity().getResult().getMetadata().getResulttype().getClassid().equals("dataset");
119
	}
120

  
121
	private void emitNGrams(final Context context, final RootMapDocument doc, final Collection<String> ngrams) throws IOException, InterruptedException {
122
		if (doc.hasMerges()) {
123
			context.getCounter(dedupConf.getEntityType(), "merged").increment(doc.getMerges().size());
124
		}
125
		for (final String ngram : ngrams) {
86 126
			outKey.set(ngram);
87 127
			ibw.set(doc.toByteArray());
88 128
			context.write(outKey, ibw);
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupGrouperMapper.java
12 12
import org.apache.hadoop.hbase.util.Bytes;
13 13
import org.apache.hadoop.io.Text;
14 14

  
15
import eu.dnetlib.data.mapreduce.JobParams;
15 16
import eu.dnetlib.data.mapreduce.util.DedupUtils;
16 17
import eu.dnetlib.data.proto.TypeProtos.Type;
17 18
import eu.dnetlib.pace.util.DedupConfig;
......
19 20

  
20 21
public class DedupGrouperMapper extends TableMapper<Text, Put> {
21 22

  
22
	private static final boolean WRITE_TO_WAL = false;
23

  
24 23
	public static final String COUNTER_GROUP = "dedup.grouper";
25 24

  
26 25
	public static final String COUNTER_NAME = "written.rels";
......
33 32
	protected void setup(final Context context) throws IOException, InterruptedException {
34 33
		rowKey = new Text();
35 34

  
36
		dedupConf = DedupConfigLoader.load(context.getConfiguration().get("dedup.wf.conf"));
35
		dedupConf = DedupConfigLoader.load(context.getConfiguration().get(JobParams.WF_CONF));
37 36
	}
38 37

  
39 38
	@Override
40 39
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
41 40

  
41
		// if (DedupUtils.isRoot(keyIn)) {
42
		// context.getCounter(COUNTER_GROUP, "skipped roots").increment(1);
43
		// return;
44
		// }
45

  
42 46
		final List<KeyValue> kvList = value.list();
43
		//System.out.println("Grouper mapping " + kvList.size() + " rels for key: " + new String(keyIn.copyBytes()));
44 47

  
45
		for (KeyValue n : kvList) {
46
			for (KeyValue j : kvList) {
48
		for (final KeyValue n : kvList) {
49
			for (final KeyValue j : kvList) {
47 50

  
48
				byte[] nq = n.getQualifier();
49
				byte[] jq = j.getQualifier();
51
				final byte[] nq = n.getQualifier();
52
				final byte[] jq = j.getQualifier();
50 53

  
51 54
				if (!Arrays.equals(nq, jq)) {
52 55

  
53
					Put put = new Put(nq).add(DedupUtils.getSimilarityCFBytes(Type.valueOf(dedupConf.getEntityType())), jq, Bytes.toBytes(""));
54
					put.setWriteToWAL(WRITE_TO_WAL);
56
					final Put put = new Put(nq).add(DedupUtils.getSimilarityCFBytes(Type.valueOf(dedupConf.getEntityType())), jq, Bytes.toBytes(""));
57
					put.setWriteToWAL(JobParams.WRITE_TO_WAL);
55 58
					rowKey.set(nq);
56 59
					context.write(rowKey, put);
57 60

  
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupReducer.java
17 17

  
18 18
import com.google.common.collect.Lists;
19 19

  
20
import eu.dnetlib.data.mapreduce.JobParams;
20 21
import eu.dnetlib.data.mapreduce.util.DedupUtils;
21 22
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
22 23
import eu.dnetlib.data.proto.TypeProtos.Type;
......
25 26
import eu.dnetlib.pace.config.DynConf;
26 27
import eu.dnetlib.pace.distance.PaceDocumentDistance;
27 28
import eu.dnetlib.pace.model.FieldList;
28
import eu.dnetlib.pace.model.MapDocument;
29 29
import eu.dnetlib.pace.model.MapDocumentComparator;
30
import eu.dnetlib.pace.model.MapDocumentSerializer;
30
import eu.dnetlib.pace.model.RootMapDocument;
31
import eu.dnetlib.pace.model.RootMapDocumentSerializer;
31 32
import eu.dnetlib.pace.util.DedupConfig;
32 33
import eu.dnetlib.pace.util.DedupConfigLoader;
33 34

  
34 35
public class DedupReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
35 36

  
36
	private static final boolean WRITE_TO_WAL = false;
37
	// private static final int LIMIT = 2000;
38
	// private static final int FIELD_LIMIT = 10;
39
	// private static final int WINDOW_SIZE = 200;
40

  
41 37
	private Config paceConf;
42 38
	private DedupConfig dedupConf;
43 39

  
......
45 41

  
46 42
	@Override
47 43
	protected void setup(final Context context) throws IOException, InterruptedException {
48
		paceConf = DynConf.load(context.getConfiguration().get("dedup.pace.conf"));
49
		dedupConf = DedupConfigLoader.load(context.getConfiguration().get("dedup.wf.conf"));
44
		paceConf = DynConf.load(context.getConfiguration().get(JobParams.PACE_CONF));
45
		dedupConf = DedupConfigLoader.load(context.getConfiguration().get(JobParams.WF_CONF));
50 46
		ibw = new ImmutableBytesWritable();
51 47

  
52 48
		System.out.println("dedup reduce phase \npace conf: " + paceConf.fields() + "\nwf conf: " + dedupConf.toString());
......
54 50

  
55 51
	@Override
56 52
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
57
		System.out.println("\nReducing key: '" + key + "'");
58 53

  
59
		final Queue<MapDocument> q = prepare(context, key, values);
54
		final Queue<RootMapDocument> q = prepare(context, key, values);
60 55
		switch (Type.valueOf(dedupConf.getEntityType())) {
61 56
		case person:
62 57
			process(q, context);
......
72 67
		}
73 68
	}
74 69

  
75
	private Queue<MapDocument> prepare(final Context context, final Text key, final Iterable<ImmutableBytesWritable> values) {
76
		final Queue<MapDocument> queue = new PriorityQueue<MapDocument>(100, new MapDocumentComparator(dedupConf.getOrderField()));
70
	private Queue<RootMapDocument> prepare(final Context context, final Text key, final Iterable<ImmutableBytesWritable> values) {
71
		final Queue<RootMapDocument> queue = new PriorityQueue<RootMapDocument>(100, new MapDocumentComparator(dedupConf.getOrderField()));
77 72

  
78 73
		final Set<String> seen = new HashSet<String>();
79 74

  
80
		for (ImmutableBytesWritable i : values) {
81
			MapDocument doc = MapDocumentSerializer.decode(i.copyBytes());
82
			String id = doc.getIdentifier();
75
		for (final ImmutableBytesWritable i : values) {
76
			final RootMapDocument doc = RootMapDocumentSerializer.decode(i.copyBytes());
77
			final String id = doc.getIdentifier();
83 78

  
84 79
			if (!seen.contains(id)) {
85 80
				seen.add(id);
......
89 84
			if (queue.size() > dedupConf.getQueueMaxSize()) {
90 85
				// context.getCounter("ngram size > " + LIMIT, "'" + key.toString() + "', --> " + context.getTaskAttemptID()).increment(1);
91 86
				context.getCounter("ngram size > " + dedupConf.getQueueMaxSize(), "N").increment(1);
92
				System.out.println("breaking out after limit (" + dedupConf.getQueueMaxSize() + ") for ngram '" + key);
87
				// System.out.println("breaking out after limit (" + dedupConf.getQueueMaxSize() + ") for ngram '" + key);
93 88
				break;
94 89
			}
95 90
		}
......
97 92
		return queue;
98 93
	}
99 94

  
100
	private Queue<MapDocument> simplifyQueue(final Queue<MapDocument> queue, final String ngram, final Context context) {
101
		final Queue<MapDocument> q = new LinkedList<MapDocument>();
95
	private Queue<RootMapDocument> simplifyQueue(final Queue<RootMapDocument> queue, final String ngram, final Context context) {
96
		final Queue<RootMapDocument> q = new LinkedList<RootMapDocument>();
102 97

  
103 98
		String fieldRef = "";
104
		List<MapDocument> tempResults = Lists.newArrayList();
99
		final List<RootMapDocument> tempResults = Lists.newArrayList();
105 100

  
106 101
		while (!queue.isEmpty()) {
107
			MapDocument result = queue.remove();
102
			final RootMapDocument result = queue.remove();
108 103

  
109 104
			if (!result.values(dedupConf.getOrderField()).isEmpty()) {
110
				String field = NGramUtils.cleanupForOrdering(result.values(dedupConf.getOrderField()).stringValue());
105
				final String field = NGramUtils.cleanupForOrdering(result.values(dedupConf.getOrderField()).stringValue());
111 106
				if (field.equals(fieldRef)) {
112 107
					tempResults.add(result);
113 108
				} else {
......
125 120
		return q;
126 121
	}
127 122

  
128
	private void populateSimplifiedQueue(final Queue<MapDocument> q,
129
			final List<MapDocument> tempResults,
123
	private void populateSimplifiedQueue(final Queue<RootMapDocument> q,
124
			final List<RootMapDocument> tempResults,
130 125
			final Context context,
131 126
			final String fieldRef,
132 127
			final String ngram) {
......
134 129
			q.addAll(tempResults);
135 130
		} else {
136 131
			context.getCounter(dedupConf.getEntityType(), "Skipped records for count(" + dedupConf.getOrderField() + ") >= " + dedupConf.getGroupMaxSize())
137
			.increment(tempResults.size());
132
					.increment(tempResults.size());
138 133
			System.out.println("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram);
139 134
		}
140 135
	}
141 136

  
142
	private void process(final Queue<MapDocument> queue, final Context context) throws IOException, InterruptedException {
137
	private void process(final Queue<RootMapDocument> queue, final Context context) throws IOException, InterruptedException {
143 138

  
144 139
		final PaceDocumentDistance algo = new PaceDocumentDistance();
145 140

  
146 141
		while (!queue.isEmpty()) {
147 142

  
148
			final MapDocument pivot = queue.remove();
143
			final RootMapDocument pivot = queue.remove();
149 144
			final String idPivot = pivot.getIdentifier();
150 145

  
151 146
			final FieldList fieldsPivot = pivot.values(dedupConf.getOrderField());
152
			final String fieldPivot = fieldsPivot == null || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue();
147
			final String fieldPivot = (fieldsPivot == null) || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue();
153 148

  
154 149
			if (fieldPivot != null) {
155 150
				// System.out.println(idPivot + " --> " + fieldPivot);
156 151

  
157 152
				int i = 0;
158
				for (MapDocument curr : queue) {
153
				for (final RootMapDocument curr : queue) {
159 154
					final String idCurr = curr.getIdentifier();
160 155

  
161 156
					if (mustSkip(idCurr)) {
......
168 163
					}
169 164

  
170 165
					final FieldList fieldsCurr = curr.values(dedupConf.getOrderField());
171
					final String fieldCurr = fieldsCurr == null || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue();
166
					final String fieldCurr = (fieldsCurr == null) || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue();
172 167

  
173
					if (!idCurr.equals(idPivot) && fieldCurr != null) {
168
					if (!idCurr.equals(idPivot) && (fieldCurr != null)) {
174 169

  
175
						double d = algo.between(pivot, curr, paceConf);
170
						final double d = algo.between(pivot, curr, paceConf);
176 171

  
177 172
						if (d >= dedupConf.getThreshold()) {
178 173
							writeSimilarity(context, idPivot, idCurr);
179 174
							context.getCounter(dedupConf.getEntityType(), SubRelType.dedupSimilarity.toString() + " (x2)").increment(1);
175

  
176
							handleRoot(context, pivot);
177
							handleRoot(context, curr);
180 178
						} else {
181 179
							context.getCounter(dedupConf.getEntityType(), "d < " + dedupConf.getThreshold()).increment(1);
182 180
						}
......
187 185
		}
188 186
	}
189 187

  
188
	private void handleRoot(final Context context, final RootMapDocument doc) throws IOException, InterruptedException {
189
		if (DedupUtils.isRoot(doc.getIdentifier())) {
190
			context.getCounter(dedupConf.getEntityType(), "root " + SubRelType.dedupSimilarity.toString() + " (x2)").increment(1);
191
			for (final String mergedId : doc.getMerges()) {
192
				writeSimilarity(context, doc.getIdentifier(), mergedId);
193
			}
194
		}
195
	}
196

  
190 197
	private boolean mustSkip(final String idPivot) {
191 198
		return dedupConf.getSkipList().contains(getNsPrefix(idPivot));
192 199
	}
......
196 203
	}
197 204

  
198 205
	private void writeSimilarity(final Context context, final String idPivot, final String id) throws IOException, InterruptedException {
199
		byte[] rowKey = Bytes.toBytes(idPivot);
200
		byte[] target = Bytes.toBytes(id);
206
		final byte[] rowKey = Bytes.toBytes(idPivot);
207
		final byte[] target = Bytes.toBytes(id);
201 208

  
202 209
		emitRel(context, rowKey, target);
203 210
		emitRel(context, target, rowKey);
204 211
	}
205 212

  
206 213
	private void emitRel(final Context context, final byte[] from, final byte[] to) throws IOException, InterruptedException {
207
		Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(Type.valueOf(dedupConf.getEntityType())), to, Bytes.toBytes(""));
208
		put.setWriteToWAL(WRITE_TO_WAL);
214
		final Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(Type.valueOf(dedupConf.getEntityType())), to, Bytes.toBytes(""));
215
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
209 216
		ibw.set(from);
210 217
		context.write(ibw, put);
211 218
	}
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupRootsToCsvMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

  
3
import java.io.IOException;
4
import java.nio.charset.Charset;
5
import java.util.List;
6
import java.util.Map;
7
import java.util.UUID;
8

  
9
import org.apache.commons.collections.MapUtils;
10
import org.apache.commons.logging.Log;
11
import org.apache.commons.logging.LogFactory;
12
import org.apache.hadoop.hbase.client.Result;
13
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
14
import org.apache.hadoop.hbase.mapreduce.TableMapper;
15
import org.apache.hadoop.hbase.util.Bytes;
16

  
17
import com.google.common.collect.Lists;
18
import com.googlecode.protobuf.format.JsonFormat;
19

  
20
import eu.dnetlib.data.mapreduce.JobParams;
21
import eu.dnetlib.data.mapreduce.util.DedupUtils;
22
import eu.dnetlib.data.mapreduce.util.OafDecoder;
23
import eu.dnetlib.pace.util.DedupConfig;
24
import eu.dnetlib.pace.util.DedupConfigLoader;
25

  
26
public class DedupRootsToCsvMapper extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
27

  
28
	/**
29
	 * logger.
30
	 */
31
	private static final Log log = LogFactory.getLog(DedupRootsToCsvMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
32

  
33
	private DedupConfig dedupConf;
34

  
35
	private ImmutableBytesWritable key;
36

  
37
	private ImmutableBytesWritable value;
38

  
39
	@Override
40
	protected void setup(final Context context) {
41
		dedupConf = DedupConfigLoader.load(context.getConfiguration().get(JobParams.WF_CONF));
42
		System.out.println("dedup buildRoots mapper\nwf conf: " + dedupConf.toString());
43
		key = new ImmutableBytesWritable();
44
		value = new ImmutableBytesWritable();
45
	}
46

  
47
	@Override
48
	protected void map(final ImmutableBytesWritable rowkey, final Result result, final Context context) throws IOException, InterruptedException {
49

  
50
		// if (!DedupUtils.isRoot(rowkey)) {
51
		// context.getCounter(dedupConf.getEntityType(), "not root, skipped").increment(1);
52
		// return;
53
		// }
54

  
55
		final Map<byte[], byte[]> entityCf = result.getFamilyMap(Bytes.toBytes(dedupConf.getEntityType()));
56
		if (MapUtils.isEmpty(entityCf) && (entityCf.get(DedupUtils.BODY_B) == null)) {
57
			context.getCounter(dedupConf.getEntityType(), "missing body").increment(1);
58
			return;
59
		}
60

  
61
		final byte[] body = entityCf.get(DedupUtils.BODY_B);
62

  
63
		final String joaf = JsonFormat.printToString(OafDecoder.decode(body).getOaf());
64
		final List<String> mergedIds = Lists.newArrayList();
65

  
66
		final Map<byte[], byte[]> mergedIn = result.getFamilyMap(DedupUtils.getDedupCF_mergesBytes(dedupConf.getEntityType()));
67
		if (MapUtils.isNotEmpty(mergedIn)) {
68
			for (final byte[] q : mergedIn.keySet()) {
69
				final String id = new String(q, Charset.forName("UTF-8"));
70

  
71
				// log.info("rowkey: '" + new String(rowkey.copyBytes(), Charset.forName("UTF-8")) + "' mergedId: '" + id + "'\n\n\n");
72

  
73
				mergedIds.add(id);
74
			}
75
		}
76

  
77
		if (mergedIds.isEmpty()) {
78
			context.getCounter(dedupConf.getEntityType(), "root with no merged ids").increment(1);
79
			// return;
80
		}
81

  
82
		final RootEntity re = new RootEntity(joaf, mergedIds);
83

  
84
		key.set(Bytes.toBytes(UUID.randomUUID().toString()));
85
		value.set(Bytes.toBytes(re.toString()));
86
		context.write(key, value);
87
		context.getCounter(dedupConf.getEntityType(), "root entity").increment(1);
88
	}
89

  
90
}
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupBuildRootsMapper.java
13 13
import com.google.common.collect.Iterables;
14 14
import com.google.protobuf.InvalidProtocolBufferException;
15 15

  
16
import eu.dnetlib.data.mapreduce.JobParams;
16 17
import eu.dnetlib.data.mapreduce.util.DedupUtils;
17 18
import eu.dnetlib.data.mapreduce.util.OafDecoder;
18 19
import eu.dnetlib.data.mapreduce.util.OafUtils;
......
35 36

  
36 37
	@Override
37 38
	protected void setup(final Context context) {
38
		dedupConf = DedupConfigLoader.load(context.getConfiguration().get("dedup.wf.conf"));
39
		dedupConf = DedupConfigLoader.load(context.getConfiguration().get(JobParams.WF_CONF));
39 40
		System.out.println("dedup buildRoots mapper\nwf conf: " + dedupConf.toString());
40 41

  
41 42
		entityNames = OafUtils.entities();
......
44 45

  
45 46
	@Override
46 47
	protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException {
47
		// System.out.println("Find root mapping: " + new String(rowkey.copyBytes()));
48 48

  
49 49
		if (!DedupUtils.isRoot(rowkey)) {
50 50

  
51 51
			// TODO: remove this hack - here because we don't want to dedup datasets
52 52
			if (checkDataset(value)) return;
53 53

  
54
			Map<byte[], byte[]> dedupRels = value.getFamilyMap(DedupUtils.getDedupCF_mergedInBytes(Type.valueOf(dedupConf.getEntityType())));
54
			// if (isDeletedByInference(value)) {
55
			// context.getCounter(dedupConf.getEntityType(), "deleted by inference").increment(1);
56
			// return;
57
			// }
55 58

  
59
			final Map<byte[], byte[]> dedupRels = value.getFamilyMap(DedupUtils.getDedupCF_mergedInBytes(Type.valueOf(dedupConf.getEntityType())));
60

  
56 61
			if ((dedupRels != null) && !dedupRels.isEmpty()) {
57 62

  
58 63
				final Text rootId = findRoot(dedupRels);
59
				// byte[] rootIdBytes = rootId.copyBytes();
60
				// byte[] rowkeyBytes = rowkey.copyBytes();
61 64

  
62 65
				context.getCounter(dedupConf.getEntityType(), "merged").increment(1);
63
				for (String family : dedupConf.getRootBuilderFamilies()) {
66
				for (final String family : dedupConf.getRootBuilderFamilies()) {
64 67

  
65 68
					// if (checkHack(rowkeyBytes, rootIdBytes, family)) {
66 69
					// context.getCounter("hack", "personResult skipped").increment(1);
......
75 78

  
76 79
							emit(context, rootId, body.toByteArray());
77 80
						} else {
78
							for (byte[] o : map.values()) {
81
							for (final byte[] o : map.values()) {
79 82

  
80 83
								if (!isRelMarkedDeleted(context, o)) {
81 84
									emit(context, rootId, o);
......
84 87
								}
85 88
							}
86 89
						}
87
					} // else {
88
						// System.err.println("empty family: " + family + "\nkey: " + sKey);
89
						// context.getCounter("DedupBuildRootsMapper", "empty family '" + family + "'").increment(1);
90
					// }
91
					// }
90
					}
92 91
				}
93 92
			} else {
94 93
				context.getCounter(dedupConf.getEntityType(), "not in duplicate group").increment(1);
......
98 97
		}
99 98
	}
100 99

  
100
	// private boolean isDeletedByInference(final Result value) {
101
	// final byte[] body = value.getFamilyMap(Bytes.toBytes(dedupConf.getEntityType())).get(DedupUtils.BODY_B);
102
	//
103
	// if (body == null) return false;
104
	//
105
	// return OafDecoder.decode(body).getOaf().getDataInfo().getDeletedbyinference();
106
	// }
107

  
101 108
	private boolean checkDataset(final Result value) {
102 109
		final Map<byte[], byte[]> bodyMap = value.getFamilyMap(dedupConf.getEntityNameBytes());
103 110

  
......
127 134

  
128 135
	private boolean isRelMarkedDeleted(final Context context, final byte[] o) {
129 136
		try {
130
			Oaf oaf = Oaf.parseFrom(o);
137
			final Oaf oaf = Oaf.parseFrom(o);
131 138
			return oaf.getKind().equals(Kind.relation) && oaf.getDataInfo().getDeletedbyinference();
132
		} catch (InvalidProtocolBufferException e) {
139
		} catch (final InvalidProtocolBufferException e) {
133 140
			context.getCounter("error", e.getClass().getName()).increment(1);
134 141
			return true;
135 142
		}
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/RootEntity.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

  
3
import java.util.List;
4

  
5
import com.google.gson.Gson;
6

  
7
public class RootEntity {
8

  
9
	private String joaf;
10
	private List<String> mergedIds;
11

  
12
	public RootEntity() {}
13

  
14
	public RootEntity(final String joaf, final List<String> mergedIds) {
15
		super();
16
		this.setJoaf(joaf);
17
		this.setMergedIds(mergedIds);
18
	}
19

  
20
	public static RootEntity decode(final String s) {
21
		return new Gson().fromJson(s, RootEntity.class);
22
	}
23

  
24
	@Override
25
	public String toString() {
26
		return new Gson().toJson(this);
27
	}
28

  
29
	public String getJoaf() {
30
		return joaf;
31
	}
32

  
33
	public void setJoaf(final String joaf) {
34
		this.joaf = joaf;
35
	}
36

  
37
	public List<String> getMergedIds() {
38
		return mergedIds;
39
	}
40

  
41
	public void setMergedIds(final List<String> mergedIds) {
42
		this.mergedIds = mergedIds;
43
	}
44
}
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupRootsToCsvReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

  
3
import java.io.IOException;
4
import java.nio.charset.Charset;
5
import java.util.List;
6

  
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.io.Text;
11
import org.apache.hadoop.mapreduce.Reducer;
12
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
13

  
14
import com.googlecode.protobuf.format.JsonFormat;
15
import com.googlecode.protobuf.format.JsonFormat.ParseException;
16

  
17
import eu.dnetlib.data.mapreduce.JobParams;
18
import eu.dnetlib.data.mapreduce.util.OafDecoder;
19
import eu.dnetlib.data.proto.OafProtos.Oaf;
20
import eu.dnetlib.pace.util.DedupConfig;
21
import eu.dnetlib.pace.util.DedupConfigLoader;
22

  
23
public class DedupRootsToCsvReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text> {
24

  
25
	/**
26
	 * logger.
27
	 */
28
	private static final Log log = LogFactory.getLog(DedupRootsToCsvReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
29

  
30
	private static final String COUNTER_GROUP = "csv";
31

  
32
	enum Tables {
33
		Groups, NativeGroups, NativeEntities
34
	}
35

  
36
	private DedupConfig dedupConf;
37

  
38
	private String DELIM;
39

  
40
	private String WRAP;
41

  
42
	private Text tKey;
43
	private Text tValue;
44
	private MultipleOutputs<Text, Text> mos;
45

  
46
	@Override
47
	protected void setup(final Context context) throws IOException, InterruptedException {
48
		super.setup(context);
49
		tKey = new Text();
50
		tValue = new Text();
51

  
52
		mos = new MultipleOutputs<Text, Text>(context);
53
		dedupConf = DedupConfigLoader.load(context.getConfiguration().get(JobParams.WF_CONF));
54

  
55
		log.info("wf conf: " + dedupConf.toString());
56

  
57
		DELIM = context.getConfiguration().get("mapred.textoutputformat.separator", "!");
58
		WRAP = context.getConfiguration().get("mapred.textoutputformat.wrapper", "#");
59

  
60
		log.info("unsing field DELIMITER: '" + DELIM + "'");
61
	}
62

  
63
	@Override
64
	protected void reduce(final ImmutableBytesWritable key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException,
65
			InterruptedException {
66

  
67
		for (final ImmutableBytesWritable ibw : values) {
68
			final RootEntity rootEntity = RootEntity.decode(new String(ibw.copyBytes(), Charset.forName("UTF-8")));
69

  
70
			emitRoot(context, rootEntity.getJoaf());
71
			emitRels(context, key, rootEntity.getMergedIds());
72

  
73
			return;
74
		}
75
	}
76

  
77
	private void emitRoot(final Context context, final String joaf) throws IOException, InterruptedException {
78

  
79
		final Oaf.Builder builder = Oaf.newBuilder();
80

  
81
		try {
82
			JsonFormat.merge(joaf, builder);
83
		} catch (final ParseException e) {
84
			context.getCounter("roots csv", e.getClass().getSimpleName()).increment(1);
85
			return;
86
		}
87

  
88
		final OafDecoder d = OafDecoder.decode(builder.build());
89

  
90
		tKey.set((WRAP + d.getEntityId() + WRAP).getBytes(Charset.forName("UTF-8")));
91
		// tValue.set(value.getBytes(Charset.forName("UTF-8")));
92

  
93
		mos.write(Tables.NativeEntities.toString(), tKey, tValue, Tables.NativeEntities.toString());
94
		context.getCounter(COUNTER_GROUP, "native_entities").increment(1);
95
	}
96

  
97
	private void emitRels(final Context context, final ImmutableBytesWritable key, final List<String> mergedIds) throws IOException, InterruptedException {
98
		final StringBuilder sb = new StringBuilder();
99
		final String groupId = new String(key.copyBytes());
100

  
101
		// native_groups groups native_entities
102
		tKey.set((WRAP + groupId + WRAP).getBytes(Charset.forName("UTF-8")));
103
		tValue.set((WRAP + dedupConf.getConfigurationId() + WRAP).getBytes(Charset.forName("UTF-8")));
104
		mos.write(Tables.Groups.toString(), tKey, tValue, Tables.Groups.toString());
105
		context.getCounter(COUNTER_GROUP, "groups").increment(mergedIds.size());
106

  
107
		for (final String id : mergedIds) {
108
			sb.append(WRAP).append(id).append(WRAP).append(DELIM);
109

  
110
			tValue.set(sb.toString().getBytes(Charset.forName("UTF-8")));
111
			mos.write(Tables.NativeGroups.toString(), tKey, tValue, Tables.NativeGroups.toString());
112
		}
113
		context.getCounter(COUNTER_GROUP, "native_groups").increment(mergedIds.size());
114
	}
115

  
116
	@Override
117
	public void cleanup(final Context context) throws IOException, InterruptedException {
118
		mos.close();
119
	}
120

  
121
}
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupBuildRootsReducer.java
33 33
		rootToEntity, entityToRoot
34 34
	}
35 35

  
36
	private static final boolean WRITE_TO_WAL = false;
37

  
38 36
	private DedupConfig dedupConf;
39 37

  
40 38
	private RelClasses relClasses;
......
42 40
	@Override
43 41
	protected void setup(final Context context) throws IOException, InterruptedException {
44 42
		super.setup(context);
45
		dedupConf = DedupConfigLoader.load(context.getConfiguration().get("dedup.wf.conf"));
43
		dedupConf = DedupConfigLoader.load(context.getConfiguration().get(JobParams.WF_CONF));
46 44
		System.out.println("dedup buildRoots reducer\n\nwf conf: " + dedupConf.toString());
47 45

  
48 46
		final String relClassJson = context.getConfiguration().get("relClasses");
......
56 54

  
57 55
		// ensures we're dealing with a root, otherwise returns
58 56
		if (!DedupUtils.isRoot(key.toString())) {
59
			System.err.println("aborting DedupBuildRootsReducer, found non-root key: " + key);
57
			// System.err.println("aborting DedupBuildRootsReducer, found non-root key: " + key);
60 58
			context.getCounter("DedupBuildRootsReducer", "aborted").increment(1);
61 59
			return;
62 60
		}
......
126 124
	private void emit(final Context context, final byte[] rowkey, final String family, final String qualifier, final byte[] value, final String label)
127 125
			throws IOException, InterruptedException {
128 126
		final Put put = new Put(rowkey).add(Bytes.toBytes(family), Bytes.toBytes(qualifier), value);
129
		put.setWriteToWAL(WRITE_TO_WAL);
127
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
130 128
		context.write(new ImmutableBytesWritable(rowkey), put);
131 129
		context.getCounter(family, label).increment(1);
132 130
	}
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/CSVSerializer.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

  
3
import java.io.IOException;
4
import java.util.Collection;
5
import java.util.List;
6
import java.util.Set;
7

  
8
import org.apache.commons.csv.CSVFormat;
9
import org.apache.commons.csv.CSVPrinter;
10
import org.apache.commons.lang.CharUtils;
11

  
12
import com.google.common.base.Function;
13
import com.google.common.base.Joiner;
14
import com.google.common.collect.Iterables;
15
import com.google.common.collect.Lists;
16
import com.google.common.collect.Sets;
17
import com.googlecode.protobuf.format.JsonFormat;
18

  
19
import eu.dnetlib.data.mapreduce.util.OafDecoder;
20
import eu.dnetlib.data.proto.FieldTypeProtos.DataInfo;
21
import eu.dnetlib.data.proto.FieldTypeProtos.KeyValue;
22
import eu.dnetlib.data.proto.FieldTypeProtos.StringField;
23
import eu.dnetlib.data.proto.FieldTypeProtos.StructuredProperty;
24
import eu.dnetlib.data.proto.OrganizationProtos.Organization;
25
import eu.dnetlib.data.proto.ResultProtos.Result;
26

  
27
public class CSVSerializer {
28

  
29
	private CSVFormat format;
30

  
31
	public CSVSerializer(final String delimiter, final String wrapper) {
32
		format = CSVFormat.newFormat(CharUtils.toChar(delimiter)).withQuote(CharUtils.toChar(wrapper));
33
	}
34

  
35
	public String getNativeEntity(final OafDecoder d) throws IOException {
36
		final StringBuilder sb = new StringBuilder();
37
		final CSVPrinter csv = new CSVPrinter(sb, format);
38
		csv.print(d.getEntityId());
39
		csv.print(j(u(kv(d.getEntity().getCollectedfromList()))));
40
		csv.print(type(d));
41
		csv.print(body(d));
42
		csv.print(ft(d));
43
		csv.flush();
44
		csv.close();
45
		return sb.toString();
46
	}
47

  
48
	private String type(final OafDecoder d) {
49
		final DataInfo dataInfo = d.getOaf().getDataInfo();
50
		return dataInfo.getDeletedbyinference() && dataInfo.getInferenceprovenance().contains("dedup") ? "r" : "n";
51
	}
52

  
53
	private String body(final OafDecoder d) {
54
		return JsonFormat.printToString(d.getMetadata());
55
	}
56

  
57
	private String ft(final OafDecoder d) {
58
		final StringBuilder sb = new StringBuilder();
59
		switch (d.getEntity().getType()) {
60
		case result:
61
			final Result.Metadata mr = d.getEntity().getResult().getMetadata();
62
			sb.append(sp(mr.getTitleList())).append(" ").append(sf(mr.getDescriptionList()));
63
			break;
64
		case organization:
65
			final Organization.Metadata om = d.getEntity().getOrganization().getMetadata();
66
			sb.append(om.getLegalname().getValue() + " " + om.getLegalshortname().getValue());
67
			break;
68
		case person:
69
		case project:
70
		case datasource:
71
		default:
72
			throw new IllegalArgumentException("Unhandled fulltext extraction for type: " + d.getEntity().getType());
73
		}
74
		return sb.toString();
75
	}
76

  
77
	private Set<String> u(final Collection<String> kv) {
78
		return Sets.newLinkedHashSet(kv);
79
	}
80

  
81
	private String j(final Collection<String> l) {
82
		return Joiner.on(" ").skipNulls().join(l);
83
	}
84

  
85
	private String sf(final List<StringField> list) {
86
		final StringBuilder sb = new StringBuilder();
87
		for (final StringField sp : list) {
88
			sb.append(sp.getValue()).append(" ");
89
		}
90
		return sb.toString().trim();
91
	}
92

  
93
	private String sp(final List<StructuredProperty> list) {
94
		final StringBuilder sb = new StringBuilder();
95
		for (final StructuredProperty sp : list) {
96
			sb.append(sp.getValue()).append(" ");
97
		}
98
		return sb.toString().trim();
99
	}
100

  
101
	private List<String> kv(final List<KeyValue> list) {
102
		return Lists.newLinkedList(Iterables.transform(list, new Function<KeyValue, String>() {
103

  
104
			@Override
105
			public String apply(final KeyValue kv) {
106
				return kv.getValue().trim();
107
			}
108
		}));
109
	}
110

  
111
}
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/OfflineHbaseLoadMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

  
3
import java.io.IOException;
4
import java.util.Map;
5

  
6
import org.apache.commons.collections.MapUtils;
7
import org.apache.commons.lang.StringUtils;
8
import org.apache.hadoop.hbase.client.Put;
9
import org.apache.hadoop.hbase.client.Result;
10
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
11
import org.apache.hadoop.hbase.mapreduce.TableMapper;
12
import org.apache.hadoop.hbase.util.Bytes;
13

  
14
import eu.dnetlib.data.mapreduce.JobParams;
15
import eu.dnetlib.data.mapreduce.util.DedupUtils;
16

  
17
public class OfflineHbaseLoadMapper extends TableMapper<ImmutableBytesWritable, Put> {
18

  
19
	private String entityType;
20

  
21
	@Override
22
	protected void setup(final Context context) throws IOException, InterruptedException {
23
		super.setup(context);
24

  
25
		entityType = context.getConfiguration().get("entityType");
26
		if (StringUtils.isBlank(entityType)) throw new IllegalArgumentException("missing entityType parameter");
27
	}
28

  
29
	@Override
30
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context)
31
			throws IOException, InterruptedException {
32

  
33
		final Map<byte[], byte[]> entityMap = value.getFamilyMap(Bytes.toBytes(entityType));
34
		if (MapUtils.isEmpty(entityMap) || !entityMap.containsKey(DedupUtils.BODY_B)) {
35
			context.getCounter(entityType, "missing body").increment(1);
36
		}
37

  
38
		final byte[] body = entityMap.get(DedupUtils.BODY_B);
39
		final Put put = new Put(key.copyBytes());
40
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
41
		put.add(Bytes.toBytes(entityType), DedupUtils.BODY_B, body);
42

  
43
		context.write(key, put);
44
		context.getCounter(entityType, "loaded").increment(1);
45
	}
46

  
47
	@Override
48
	protected void cleanup(final Context context) throws IOException, InterruptedException {
49
		super.cleanup(context);
50
	}
51

  
52
}
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupFindRootsMapper.java
12 12

  
13 13
import com.google.protobuf.InvalidProtocolBufferException;
14 14

  
15
import eu.dnetlib.data.mapreduce.JobParams;
15 16
import eu.dnetlib.data.mapreduce.util.DedupUtils;
16 17
import eu.dnetlib.data.proto.DedupProtos.Dedup;
17 18
import eu.dnetlib.data.proto.KindProtos.Kind;
......
24 25

  
25 26
public class DedupFindRootsMapper extends TableMapper<ImmutableBytesWritable, Put> {
26 27

  
27
	private static final boolean WRITE_TO_WAL = false;
28
	public static final String COUNTER_GROUP = "dedup.patch.roots";
28 29

  
29 30
	private DedupConfig dedupConf;
30 31

  
31 32
	@Override
32 33
	protected void setup(final Context context) throws IOException, InterruptedException {
33
		dedupConf = DedupConfigLoader.load(context.getConfiguration().get("dedup.wf.conf"));
34
		dedupConf = DedupConfigLoader.load(context.getConfiguration().get(JobParams.WF_CONF));
34 35
		System.out.println("dedup findRoots mapper\nwf conf: " + dedupConf.toString());
35 36
	}
36 37

  
......
38 39
	protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException {
39 40
		// System.out.println("Find root mapping: " + new String(rowkey.copyBytes()));
40 41

  
41
		Type type = Type.valueOf(dedupConf.getEntityType());
42
		Map<byte[], byte[]> similarRels = value.getFamilyMap(DedupUtils.getSimilarityCFBytes(type));
42
		final Type type = Type.valueOf(dedupConf.getEntityType());
43
		final Map<byte[], byte[]> similarRels = value.getFamilyMap(DedupUtils.getSimilarityCFBytes(type));
43 44

  
44 45
		if ((similarRels != null) && !similarRels.isEmpty()) {
45
			ByteBuffer min = findMin(ByteBuffer.wrap(rowkey.get()), similarRels.keySet());
46 46

  
47
			byte[] row = rowkey.copyBytes();
48
			byte[] root = DedupUtils.newIdBytes(min, dedupConf.getDedupRun());
47
			if (DedupUtils.isRoot(rowkey)) {
48
				context.getCounter(COUNTER_GROUP, "roots").increment(1);
49
			}
49 50

  
51
			final ByteBuffer min = findMin(ByteBuffer.wrap(rowkey.get()), similarRels.keySet());
52

  
53
			final byte[] row = rowkey.copyBytes();
54
			final byte[] root = DedupUtils.newIdBytes(min, dedupConf.getDedupRun());
55

  
50 56
			// System.out.println("Found root: " + new String(root));
51 57

  
52 58
			emitDedupRel(context, DedupUtils.getDedupCF_mergedInBytes(type), row, root, buildRel(row, root, Dedup.RelName.isMergedIn));
......
63 69
	}
64 70

  
65 71
	private ByteBuffer findMin(ByteBuffer min, final Iterable<byte[]> keys) {
66
		for (byte[] q : keys) {
67
			ByteBuffer iq = ByteBuffer.wrap(q);
72
		for (final byte[] q : keys) {
73
			final ByteBuffer iq = ByteBuffer.wrap(q);
68 74
			if (min.compareTo(iq) > 0) {
69 75
				min = iq;
70 76
			}
......
75 81
	private void emitBody(final Context context, final byte[] row, final byte[] body) throws InvalidProtocolBufferException, IOException, InterruptedException {
76 82
		if (body == null) {
77 83
			context.getCounter(dedupConf.getEntityType(), "missing body").increment(1);
78
			System.err.println("missing body: " + new String(row));
84
			// System.err.println("missing body: " + new String(row));
79 85
			return;
80 86
		}
81 87
		final Oaf prototype = Oaf.parseFrom(body);
......
83 89
		if (prototype.getDataInfo().getDeletedbyinference()) {
84 90
			context.getCounter(dedupConf.getEntityType(), "bodies already deleted").increment(1);
85 91
		} else {
86
			Oaf.Builder oafRoot = Oaf.newBuilder(prototype);
92
			final Oaf.Builder oafRoot = Oaf.newBuilder(prototype);
87 93
			oafRoot.getDataInfoBuilder().setDeletedbyinference(true).setInferred(true).setInferenceprovenance("dedup");
88
			byte[] family = Bytes.toBytes(dedupConf.getEntityType());
89
			Put put = new Put(row).add(family, DedupUtils.BODY_B, oafRoot.build().toByteArray());
90
			put.setWriteToWAL(WRITE_TO_WAL);
94
			final byte[] family = Bytes.toBytes(dedupConf.getEntityType());
95
			final Put put = new Put(row).add(family, DedupUtils.BODY_B, oafRoot.build().toByteArray());
96
			put.setWriteToWAL(JobParams.WRITE_TO_WAL);
91 97
			context.write(new ImmutableBytesWritable(row), put);
92 98
			context.getCounter(dedupConf.getEntityType(), "bodies marked deleted").increment(1);
93 99
		}
94 100
	}
95 101

  
96 102
	private byte[] buildRel(final byte[] from, final byte[] to, final Dedup.RelName relClass) {
97
		Builder oafRel = DedupUtils.getDedup(dedupConf, new String(from), new String(to), relClass);
98
		Oaf oaf =
103
		final Builder oafRel = DedupUtils.getDedup(dedupConf, new String(from), new String(to), relClass);
104
		final Oaf oaf =
99 105
				Oaf.newBuilder().setKind(Kind.relation).setTimestamp(System.currentTimeMillis())
100
				.setDataInfo(AbstractDNetOafXsltFunctions.getDataInfo(null, "", "0.8", false, true).setInferenceprovenance("dedup")).setRel(oafRel)
101
						.build();
106
						.setDataInfo(AbstractDNetOafXsltFunctions.getDataInfo(null, "", "0.8", false, true).setInferenceprovenance("dedup")).setRel(oafRel)
107
				.build();
102 108
		return oaf.toByteArray();
103 109
	}
104 110

  
105 111
	private void emitDedupRel(final Context context, final byte[] cf, final byte[] from, final byte[] to, final byte[] value) throws IOException,
106
	InterruptedException {
107
		Put put = new Put(from).add(cf, to, value);
108
		put.setWriteToWAL(WRITE_TO_WAL);
112
			InterruptedException {
113
		final Put put = new Put(from).add(cf, to, value);
114
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
109 115
		context.write(new ImmutableBytesWritable(from), put);
110 116
	}
111 117

  
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/hbase/index/PrepareFeedMapper.java
4 4
import java.util.Map;
5 5
import java.util.Map.Entry;
6 6

  
7
import org.apache.commons.collections.MapUtils;
7 8
import org.apache.hadoop.hbase.client.Result;
8 9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9 10
import org.apache.hadoop.hbase.mapreduce.TableMapper;
......
18 19
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
19 20
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
20 21
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses;
22
import eu.dnetlib.data.mapreduce.util.DedupUtils;
21 23
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
22 24
import eu.dnetlib.data.mapreduce.util.RelDescriptor;
23 25
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
......
41 43

  
42 44
	@Override
43 45
	protected void setup(final Context context) throws IOException, InterruptedException {
44
		String json = context.getConfiguration().get(JobParams.INDEX_ENTITY_LINKS);
46
		final String json = context.getConfiguration().get(JobParams.INDEX_ENTITY_LINKS);
45 47
		System.out.println(JobParams.INDEX_ENTITY_LINKS + ":\n" + json);
46 48
		entityConfigTable = IndexConfig.load(json).getConfigMap();
47 49

  
48
		String contextMap = context.getConfiguration().get("contextmap");
50
		final String contextMap = context.getConfiguration().get("contextmap");
49 51
		System.out.println("contextmap:\n" + contextMap);
50 52

  
51
		String relClassJson = context.getConfiguration().get("relClasses");
53
		final String relClassJson = context.getConfiguration().get("relClasses");
52 54
		System.out.println("relClassesJson:\n" + relClassJson);
53 55
		relClasses = RelClasses.fromJSon(relClassJson);
54 56
		System.out.println("relClasses:\n" + relClasses);
......
67 69

  
68 70
		if (isValid(oaf)) {
69 71

  
72
			if (deletedByInference(oaf) && DedupUtils.isRoot(keyIn)) {
73
				incrementCounter(context, "deleted by inference (root)", type.toString(), 1);
74
				return;
75
			}
76

  
70 77
			if (!deletedByInference(oaf) || entityConfigTable.includeDuplicates(type)) {
78

  
71 79
				emit(new String(keyIn.copyBytes()), context, oaf);
72 80
				incrementCounter(context, Kind.entity.toString(), type.toString(), 1);
73 81

  
74
				for (LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
82
				for (final LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
75 83

  
76 84
					final Map<byte[], byte[]> columnMap = value.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
77 85

  
78
					if (hasData(columnMap)) {
86
					if (!MapUtils.isEmpty(columnMap)) {
79 87
						emitRelationship(oaf.getEntity(), context, columnMap, ld);
80 88
						incrementCounter(context, type.toString(), ld.getRelDescriptor().getIt(), columnMap.size());
81
					} // else {
82
						// incrementCounter(context, type.toString(), ld.getRelDescriptor().getIt() + "_empty", 1);
83
						// }
89
					}
84 90
				}
85 91
			} else {
86 92
				incrementCounter(context, "deleted by inference", type.toString(), 1);
......
90 96
		}
91 97
	}
92 98

  
93
	private Oaf mergeUpdates(final Result value, final Context context, final Type type, OafRowKeyDecoder keyDecoder) throws InvalidProtocolBufferException {
99
	private Oaf mergeUpdates(final Result value, final Context context, final Type type, final OafRowKeyDecoder keyDecoder)
100
			throws InvalidProtocolBufferException {
94 101
		try {
95 102
			return UpdateMerger.mergeBodyUpdates(context, value.getFamilyMap(Bytes.toBytes(type.toString())));
96
		} catch (InvalidProtocolBufferException e) {
103
		} catch (final InvalidProtocolBufferException e) {
97 104
			System.err.println(String.format("Unable to parse proto (Type: %s) in row: %s", type.toString(), keyDecoder.getKey()));
98 105
			throw e;
99 106
		}
......
105 112
		final Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(Kind.relation);
106 113

  
107 114
		// iterates the column map
108
		for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
115
		for (final Entry<byte[], byte[]> e : columnMap.entrySet()) {
109 116

  
110 117
			final Oaf oaf = decodeProto(context, e.getValue());
111 118
			if (!isValid(oaf)) {
112 119
				incrementCounter(context, "invalid oaf rel", ld.getRelDescriptor().getIt(), 1);
113 120
			} else if (!deletedByInference(oaf)) {
114 121

  
115
				OafRel.Builder relBuilder = OafRel.newBuilder(oaf.getRel());
122
				final OafRel.Builder relBuilder = OafRel.newBuilder(oaf.getRel());
116 123

  
117 124
				if (ld.isSymmetric()) {
118
					RelDescriptor rd = ld.getRelDescriptor();
125
					final RelDescriptor rd = ld.getRelDescriptor();
119 126
					relBuilder.setCachedTarget(cachedTarget).setRelType(rd.getRelType()).setSubRelType(rd.getSubRelType());
120 127
				}
121 128

  
......
124 131
					continue;
125 132
				}
126 133

  
127
				OafRel oafRel = relBuilder.setChild(ld.isChild()).build();
128

  
134
				final OafRel oafRel = relBuilder.setChild(ld.isChild()).build();
129 135
				// String rowKey = patchTargetId(target, OafRelDecoder.decode(oafRel).getRelTargetId());
130 136

  
131 137
				emit(ld.isSymmetric() ? oafRel.getTarget() : oafRel.getSource(), context, merge(oafBuilder, oaf).setRel(oafRel).build());
......
135 141
		}
136 142
	}
137 143

  
138
	private String patchTargetId(final Type target, final String id) {
139
		return id.replaceFirst("^.*\\|", target.getNumber() + "|");
140
	}
144
	// private String patchTargetId(final Type target, final String id) {
145
	// return id.replaceFirst("^.*\\|", target.getNumber() + "|");
146
	// }
141 147

  
142 148
	private Oaf.Builder merge(final Oaf.Builder builder, final Oaf prototype) {
143 149
		return builder.setDataInfo(prototype.getDataInfo()).setTimestamp(prototype.getTimestamp());
......
147 153
		return rel.getSource().contains(rel.getTarget());
148 154
	}
149 155

  
150
	private boolean hasData(final Map<byte[], byte[]> columnMap) {
151
		return (columnMap != null) && !columnMap.isEmpty();
152
	}
153

  
154 156
	private boolean isValid(final Oaf oaf) {
155 157
		return (oaf != null) && oaf.isInitialized();
156 158
	}
......
162 164
	private Oaf decodeProto(final Context context, final byte[] body) {
163 165
		try {
164 166
			return Oaf.parseFrom(body);
165
		} catch (InvalidProtocolBufferException e) {
167
		} catch (final InvalidProtocolBufferException e) {
166 168
			e.printStackTrace(System.err);
167 169
			context.getCounter("decodeProto", e.getClass().getName()).increment(1);
168 170
		}
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/util/DedupUtils.java
63 63
		return getRelType(type) + CF_SEPARATOR + SubRelType.dedup + CF_SEPARATOR + Dedup.RelName.merges;
64 64
	}
65 65

  
66
	public static String getDedupCF_merges(final String type) {
67
		return getDedupCF_merges(Type.valueOf(type));
68
	}
69

  
66 70
	public static byte[] getDedupCF_mergesBytes(final Type type) {
67 71
		return Bytes.toBytes(getDedupCF_merges(type));
68 72
	}
69 73

  
74
	public static byte[] getDedupCF_mergesBytes(final String type) {
75
		return getDedupCF_mergesBytes(Type.valueOf(type));
76
	}
77

  
70 78
	public static String getDedupCF_mergedIn(final Type type) {
71 79
		return getRelType(type) + CF_SEPARATOR + SubRelType.dedup + CF_SEPARATOR + Dedup.RelName.isMergedIn;
72 80
	}
73 81

  
82
	public static String getDedupCF_mergedIn(final String type) {
83
		return getDedupCF_mergedIn(Type.valueOf(type));
84
	}
85

  
74 86
	public static byte[] getDedupCF_mergedInBytes(final Type type) {
75 87
		return Bytes.toBytes(getDedupCF_mergedIn(type));
76 88
	}
77 89

  
90
	public static byte[] getDedupCF_mergedInBytes(final String type) {
91
		return getDedupCF_mergedInBytes(Type.valueOf(type));
92
	}
93

  
78 94
	public static String getSimilarityCF(final Type type) {
79 95
		return getRelType(type) + CF_SEPARATOR + SubRelType.dedupSimilarity + CF_SEPARATOR + DedupSimilarity.RelName.isSimilarTo;
80 96
	}
81 97

  
98
	public static String getSimilarityCF(final String type) {
99
		return getSimilarityCF(Type.valueOf(type));
100
	}
101

  
82 102
	public static byte[] getSimilarityCFBytes(final Type type) {
83 103
		return Bytes.toBytes(getSimilarityCF(type));
84 104
	}
85 105

  
106
	public static byte[] getSimilarityCFBytes(final String type) {
107
		return getSimilarityCFBytes(Type.valueOf(type));
108
	}
109

  
86 110
	public static String getRelTypeString(final Type type) {
87 111
		return getRelType(type).toString();
88 112
	}
......
101 125
	}
102 126

  
103 127
	public static ColumnFamily decodeCF(final byte[] b) {
104
		String[] s = new String(b).split(CF_SEPARATOR);
128
		final String[] s = new String(b).split(CF_SEPARATOR);
105 129
		return new DedupUtils().getCF(RelType.valueOf(s[0]), SubRelType.valueOf(s[1]));
106 130
	}
107 131

  
......
110 134
	}
111 135

  
112 136
	public static OafRel.Builder getDedup(final DedupConfig dedupConf, final String from, final String to, final Dedup.RelName relClass) {
113
		Type type = Type.valueOf(dedupConf.getEntityType());
114
		RelType relType = DedupUtils.getRelType(type);
115
		Builder oafRel =
137
		final Type type = Type.valueOf(dedupConf.getEntityType());
138
		final RelType relType = DedupUtils.getRelType(type);
139
		final Builder oafRel =
116 140
				OafRel.newBuilder().setRelType(relType).setSubRelType(SubRelType.dedup).setRelClass(relClass.toString()).setChild(false)
117 141
						.setSource(new String(from)).setTarget(new String(to));
118 142
		switch (type) {
modules/dnet-mapreduce-jobs/branches/offlineDedup/src/main/java/eu/dnetlib/data/mapreduce/util/XmlRecordFactory.java
97 97

  
98 98
	public XmlRecordFactory(final EntityConfigTable entityConfigTable, final ContextMapper contextMapper, final RelClasses relClasses,
99 99
			final String schemaLocation, final boolean entityDefaults, final boolean relDefaults, final boolean childDefeaults)
100
			throws TransformerConfigurationException, TransformerFactoryConfigurationError {
100
					throws TransformerConfigurationException, TransformerFactoryConfigurationError {
101 101
		this.entityConfigTable = entityConfigTable;
102 102
		this.contextMapper = contextMapper;
103 103
		this.relClasses = relClasses;
......
137 137

  
138 138
	public String build() {
139 139

  
140
		OafEntityDecoder entity = mainEntity.decodeEntity();
140
		final OafEntityDecoder entity = mainEntity.decodeEntity();
141 141
		// System.out.println("building");
142 142
		// System.out.println("main: " + mainEntity);
143 143
		// System.out.println("rel:  " + relations);
......
148 148
		final List<String> metadata = decodeType(entity, null, entityDefaults, false);
149 149

  
150 150
		// rels has to be processed before the contexts because they enrich the contextMap with the funding info.
151
		List<String> rels = listRelations();
151
		final List<String> rels = listRelations();
152 152
		metadata.addAll(buildContexts(type));
153 153
		metadata.add(parseDataInfo(mainEntity));
154 154

  
......
159 159
	}
160 160

  
161 161
	private String parseDataInfo(final OafDecoder decoder) {
162
		DataInfo dataInfo = decoder.getOaf().getDataInfo();
162
		final DataInfo dataInfo = decoder.getOaf().getDataInfo();
163 163

  
164
		StringBuilder sb = new StringBuilder();
164
		final StringBuilder sb = new StringBuilder();
165 165
		sb.append("<datainfo>");
166 166
		sb.append(asXmlElement("inferred", dataInfo.getInferred() + "", null, null));
167 167
		sb.append(asXmlElement("deletedbyinference", dataInfo.getDeletedbyinference() + "", null, null));
......
179 179
		metadata.addAll(listFields(decoder.getMetadata(), filter, defaults, expandingRel));
180 180
		metadata.addAll(listFields(decoder.getOafEntity(), filter, defaults, expandingRel));
181 181

  
182
		if (decoder.getEntity() instanceof Result && !expandingRel) {
182
		if ((decoder.getEntity() instanceof Result) && !expandingRel) {
183 183
			metadata.add(asXmlElement("bestlicense", "", getBestLicense(), null));
184 184

  
185 185
			metadata.addAll(listFields(decoder.getEntity(), filter, defaults, expandingRel));
186 186
		}
187
		if (decoder.getEntity() instanceof Person && !expandingRel) {
187
		if ((decoder.getEntity() instanceof Person) && !expandingRel) {
188 188
			metadata.addAll(listFields(decoder.getEntity(), filter, defaults, expandingRel));
189 189
		}
190
		if (decoder.getEntity() instanceof Project && !expandingRel) {
190
		if ((decoder.getEntity() instanceof Project) && !expandingRel) {
191 191
			metadata.addAll(listFields(decoder.getEntity(), filter, defaults, expandingRel));
192 192
		}
193 193

  
......
196 196

  
197 197
	private Qualifier getBestLicense() {
198 198
		Qualifier bestLicense = getQualifier("UNKNOWN", "not available", "dnet:access_modes");
199
		LicenseComparator lc = new LicenseComparator();
200
		for (Instance instance : ((Result) mainEntity.decodeEntity().getEntity()).getInstanceList()) {
199
		final LicenseComparator lc = new LicenseComparator();
200
		for (final Instance instance : ((Result) mainEntity.decodeEntity().getEntity()).getInstanceList()) {
201 201
			if (lc.compare(bestLicense, instance.getLicence()) > 0) {
202 202
				bestLicense = instance.getLicence();
203 203
			}
......
213 213

  
214 214
		final List<String> rels = Lists.newArrayList();
215 215

  
216
		for (OafDecoder decoder : this.relations) {
216
		for (final OafDecoder decoder : this.relations) {
217 217

  
218 218
			final OafRel rel = decoder.getOafRel();
219 219
			final OafEntity cachedTarget = rel.getCachedTarget();
......
223 223
			if (relDecoder.getRelSourceId().equals(key) || relDecoder.getRelTargetId().equals(key)) {
224 224

  
225 225
				final List<String> metadata = Lists.newArrayList();
226
				Type targetType = relDecoder.getTargetType(mainEntity.getEntity().getType());
227
				Set<String> relFilter = entityConfigTable.getFilter(targetType, relDecoder.getRelDescriptor());
226
				final Type targetType = relDecoder.getTargetType(mainEntity.getEntity().getType());
227
				final Set<String> relFilter = entityConfigTable.getFilter(targetType, relDecoder.getRelDescriptor());
228 228
				metadata.addAll(listFields(relDecoder.getSubRel(), relFilter, false, true));
229 229

  
230 230
				String semanticclass = "";
231 231
				String semanticscheme = "";
232 232

  
233
				RelDescriptor relDescriptor = relDecoder.getRelDescriptor();
233
				final RelDescriptor relDescriptor = relDecoder.getRelDescriptor();
234 234

  
235
				if (cachedTarget != null && cachedTarget.isInitialized()) {
235
				if ((cachedTarget != null) && cachedTarget.isInitialized()) {
236 236

  
237 237
					final Set<String> filter = entityConfigTable.getFilter(targetType, relDescriptor);
238 238
					metadata.addAll(decodeType(OafEntityDecoder.decode(cachedTarget), filter, relDefaults, true));
239 239
				}
240 240

  
241
				RelMetadata relMetadata = relDecoder.getRelMetadata();
241
				final RelMetadata relMetadata = relDecoder.getRelMetadata();
242 242
				// debug
243 243
				if (relMetadata == null) {
244 244
					// System.err.println(this);
245
					semanticclass = semanticscheme = "UNKNOWN";
245
					semanticclass = semanticscheme = "";
246 246
				} else {
247 247
					semanticclass = relClasses.getInverse(relMetadata.getSemantics().getClassname());
248 248
					semanticscheme = relMetadata.getSemantics().getSchemename();
......
250 250

  
251 251
				incrementCounter(relDescriptor.getSubRelType().toString());
252 252

  
253
				LinkDescriptor ld = entityConfigTable.getDescriptor(relDecoder.getTargetType(mainEntity.getEntity().getType()), relDescriptor);
253
				final LinkDescriptor ld = entityConfigTable.getDescriptor(relDecoder.getTargetType(mainEntity.getEntity().getType()), relDescriptor);
254 254

  
255
				String relId = ld != null && !ld.isSymmetric() ? relDecoder.getRelTargetId() : relDecoder.getRelSourceId();
255
				final String relId = (ld != null) && !ld.isSymmetric() ? relDecoder.getRelTargetId() : relDecoder.getRelSourceId();
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff