Project

General

Profile

« Previous | Next » 

Revision 42382

dedup experiments

View differences:

modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/SubjectsMap.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment;
2

  
3
import java.util.HashMap;
4
import java.util.Map.Entry;
5

  
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.bson.BsonDocument;
9
import org.bson.BsonDocumentWrapper;
10
import org.bson.codecs.configuration.CodecRegistry;
11
import org.bson.conversions.Bson;
12

  
13
/**
14
 * Created by claudio on 07/03/16.
15
 */
16
public class SubjectsMap extends HashMap<String, Subjects> {
17

  
18
	private static final Log log = LogFactory.getLog(SubjectsMap.class);
19

  
20
	public SubjectsMap mergeFrom(SubjectsMap sm) {
21

  
22
		if (sm != null) {
23
			for (Entry<String, Subjects> e : sm.entrySet()) {
24
				if (!this.containsKey(e.getKey())) {
25
					Subjects sub = new Subjects();
26

  
27
					sub.addAll(e.getValue());
28

  
29
					this.put(e.getKey(), sub);
30
				} else {
31
					for (String s : e.getValue()) {
32
						final Subjects subjects = this.get(e.getKey());
33
						subjects.add(s);
34
					}
35
				}
36
			}
37
		}
38

  
39
		return this;
40
	}
41

  
42
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/PublicationAnalysisMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment;
2

  
3
import java.io.IOException;
4
import java.util.List;
5

  
6
import eu.dnetlib.data.mapreduce.util.DedupUtils;
7
import eu.dnetlib.data.mapreduce.util.OafDecoder;
8
import eu.dnetlib.data.proto.FieldTypeProtos.StringField;
9
import eu.dnetlib.data.proto.ResultProtos;
10
import org.apache.commons.lang.StringUtils;
11
import org.apache.hadoop.hbase.client.Result;
12
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
13
import org.apache.hadoop.hbase.mapreduce.TableMapper;
14
import org.apache.hadoop.io.NullWritable;
15

  
16
/**
17
 * Created by claudio on 22/04/16.
18
 */
19
public class PublicationAnalysisMapper extends TableMapper<NullWritable, NullWritable> {
20

  
21
	public static final String RESULT = "result";
22
	private static final int MAX_DESCRIPTIONS = 50;
23

  
24
	@Override
25
	protected void setup(final Context context) throws IOException, InterruptedException {
26
		super.setup(context);
27
	}
28

  
29
	@Override
30
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
31

  
32
		if (new String(key.copyBytes()).contains("dedup_wf")) {
33
			context.getCounter(RESULT, "roots").increment(1);
34
			return;
35
		}
36

  
37
		final byte[] body = value.getValue(RESULT.getBytes(), DedupUtils.BODY_B);
38
		if (body == null) {
39
			context.getCounter(RESULT, "missing body").increment(1);
40
			return;
41
		}
42
		final OafDecoder decoder = OafDecoder.decode(body);
43
		final ResultProtos.Result result = decoder.getEntity().getResult();
44
		if (result.getMetadata().getResulttype().getClassid().equals("dataset")) {
45
			context.getCounter(RESULT, "dataset").increment(1);
46
			return;
47
		} else {
48
			context.getCounter(RESULT, "publication").increment(1);
49
		}
50

  
51
		if (result.getMetadata().getDescriptionCount() > MAX_DESCRIPTIONS) {
52
			context.getCounter(RESULT, "abstracts > " + MAX_DESCRIPTIONS).increment(1);
53
		} else {
54
			context.getCounter(RESULT, "abstracts: " + result.getMetadata().getDescriptionCount()).increment(1);
55
		}
56

  
57
		final List<StringField> descList = result.getMetadata().getDescriptionList();
58

  
59
		boolean empty = true;
60
		for(StringField desc : descList) {
61
			empty = empty && StringUtils.isBlank(desc.getValue());
62
		}
63

  
64
		context.getCounter(RESULT, "empty abstract: " + empty).increment(1);
65
	}
66

  
67
	@Override
68
	protected void cleanup(final Context context) throws IOException, InterruptedException {
69
		super.cleanup(context);
70
	}
71
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/Subjects.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment;
2

  
3
import java.util.HashSet;
4

  
5
import org.bson.BsonDocument;
6
import org.bson.BsonDocumentWrapper;
7
import org.bson.codecs.configuration.CodecRegistry;
8
import org.bson.conversions.Bson;
9

  
10
/**
11
 * Created by claudio on 07/03/16.
12
 */
13
public class Subjects extends HashSet<String> {
14

  
15

  
16
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/JoinPersonGroupMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment;
2

  
3
import java.io.IOException;
4
import java.io.StringReader;
5
import java.util.List;
6
import java.util.Set;
7

  
8
import com.google.common.base.Splitter;
9
import com.google.common.collect.Iterables;
10
import com.google.common.collect.Lists;
11
import com.google.common.collect.Sets;
12
import eu.dnetlib.data.mapreduce.JobParams;
13
import eu.dnetlib.data.mapreduce.util.DedupUtils;
14
import eu.dnetlib.data.mapreduce.util.OafDecoder;
15
import eu.dnetlib.data.proto.PersonProtos;
16
import eu.dnetlib.pace.config.DedupConfig;
17
import eu.dnetlib.pace.model.Person;
18
import org.apache.commons.lang.StringUtils;
19
import org.apache.commons.lang.math.RandomUtils;
20
import org.apache.hadoop.hbase.client.Put;
21
import org.apache.hadoop.hbase.client.Result;
22
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
23
import org.apache.hadoop.hbase.mapreduce.TableMapper;
24
import org.apache.hadoop.io.Text;
25
import org.apache.hadoop.mapreduce.Mapper;
26
import org.dom4j.Document;
27
import org.dom4j.Element;
28
import org.dom4j.io.SAXReader;
29

  
30
public class JoinPersonGroupMapper extends Mapper<Text, Text, Text, Text> {
31

  
32
	public static final String PERSON = "person";
33

  
34
	private static final int MAX_TOKENS = 5;
35
	private static final int MIN_FEATURES = 10;
36

  
37
	private Text outKey;
38
	private Text outValue;
39

  
40
	private SubjectParser sp;
41

  
42
	@Override
43
	protected void setup(final Context context) throws IOException, InterruptedException {
44
		outKey = new Text();
45
		outValue = new Text();
46

  
47
		sp = new SubjectParser();
48
	}
49

  
50
	@Override
51
	protected void map(final Text key, final Text value, final Context context) throws IOException, InterruptedException {
52
		// System.out.println("got key: " + new String(keyIn.copyBytes()));
53

  
54
		final SAXReader r = new SAXReader();
55
		try {
56
			final Document doc = r.read(new StringReader(value.toString()));
57
			final SubjectsMap sm = sp.parse(doc);
58

  
59
			final CsvEntry entry = new CsvEntry();
60
			for(Subjects s : sm.values()) {
61
				for(String subject : s) {
62
					entry.addFeature(subject);
63
				}
64
			}
65

  
66
			final List creatorNodes = doc.selectNodes("//*[local-name() = 'creator']");
67
			final List<Person> authors = Lists.newArrayList();
68

  
69
			for(int i = 0; i<creatorNodes.size(); i++) {
70
				final Element e = (Element) creatorNodes.get(i);
71
				authors.add(new Person(e.getText(), false));
72
			}
73

  
74
			for(Person p1 : authors) {
75

  
76
				context.getCounter(PERSON, "accurate " + p1.isAccurate()).increment(1);
77
				final Set<String> hashes = getOutKeys(p1);
78
				context.getCounter(PERSON, String.format("accurate %s keys", p1.isAccurate())).increment(hashes.size());
79
				for(String s1 : hashes) {
80
					//final String s1 = normalize(p1);
81
					final CsvEntry c = new CsvEntry(s1, entry.getFeatures());
82
					for (Person p2 : authors) {
83
						final String s2 = normalize(p2.getSurnameString());
84
						if (p1.isAccurate() && p2.isAccurate()) {
85
							if (!p1.getSurnameString().equalsIgnoreCase(p2.getSurnameString())) {
86
								c.addFeature(s2);
87
							}
88
						}
89
					}
90

  
91
					c.getFeatures().remove(s1);
92

  
93
					if (s1.length() <= 3) {
94
						context.getCounter(PERSON, "key size <= 3").increment(1);
95
						return;
96
					}
97

  
98
					if(c.getFeatures().size() < MIN_FEATURES) {
99
						context.getCounter(PERSON, "features < " + MIN_FEATURES).increment(1);
100
						return;
101
					}
102

  
103
					outKey.set(s1);
104
					outValue.set(c.toString());
105

  
106
					context.write(outKey, outValue);
107
				}
108
			}
109

  
110
		} catch (final Throwable e) {
111
			System.out.println("GOT EX " + e);
112
			e.printStackTrace(System.err);
113
			context.getCounter(PERSON, e.getClass().toString()).increment(1);
114
		}
115

  
116
	}
117

  
118
	private Set<String> getOutKeys(final Person p1) {
119
		final Set<String> hashes = Sets.newHashSet();
120
		if (p1.isAccurate()) {
121
			hashes.add(normalize(p1));
122
		} else {
123
			final String s = normalize(p1.getOriginal());
124
			for (final String token1 : tokens(s)) {
125
				for (final String token2 : tokens(s)) {
126
					if (!token1.equals(token2)) {
127
						hashes.add(firstLC(token1) + token2);
128
					}
129
				}
130
			}
131
		}
132
		return hashes;
133
	}
134

  
135
	private String normalize(final Person p) {
136

  
137
		final String s = p.getSurnameString() + firstLC(p.getNameString());
138
		return normalize(s);
139
	}
140

  
141
	private String normalize(final String s) {
142
		return s.replaceAll("[^a-zA-Z ]", "").toLowerCase().trim();
143
	}
144

  
145
	private Iterable<String> tokens(final String s) {
146
		return Iterables.limit(Splitter.on(" ").omitEmptyStrings().trimResults().split(s), MAX_TOKENS);
147
	}
148

  
149
	private String firstLC(final String s) {
150
		return StringUtils.substring(s, 0, 1).toLowerCase();
151
	}
152

  
153
}
0 154

  
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/SubjectParser.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment;
2

  
3
import java.util.List;
4

  
5
import com.google.common.base.Splitter;
6
import org.apache.commons.lang.StringUtils;
7
import org.dom4j.Element;
8

  
9
/**
10
 * Created by claudio on 25/03/16.
11
 */
12
public class SubjectParser {
13

  
14
	public static final String REGEX_SUBJECT = "^(info:eu-repo)\\/(classification)\\/([a-zA-Z]*)\\/(.*)$";
15
	private static final int MIN_LENGTH = 5;
16

  
17
	public SubjectsMap parse(final org.dom4j.Document doc) {
18

  
19
		final List subjectNodes = doc.selectNodes("//*[local-name() = 'subject']");
20
		final SubjectsMap subjectMap = new SubjectsMap();
21

  
22
		for(int i = 0; i<subjectNodes.size(); i++) {
23
			final Element e = (Element) subjectNodes.get(i);
24
			final String subject = e.getText();
25

  
26
			final String type = guessType(subject);
27
			if (!subjectMap.containsKey(type)) {
28
				subjectMap.put(type, new Subjects());
29
			}
30

  
31
			if (StringUtils.isNotBlank(type)) {
32
				if ("keyword".equals(type)) {
33
					final Splitter splitter = Splitter.on(",").trimResults().omitEmptyStrings();
34
					for (String token : splitter.split(subject)) {
35
						final String value = token.replaceAll("[^a-zA-Z ]", "").toLowerCase();
36
						if (value.length() >= MIN_LENGTH) {
37
							subjectMap.get(type).add(value);
38
						}
39
					}
40
				} else {
41
					String token = subject.replaceFirst(REGEX_SUBJECT, "$4");
42

  
43
					if (StringUtils.isNotBlank(token)) {
44
						final String value = token.replaceAll("[^a-zA-Z ]", "").toLowerCase();
45
						if (value.length() >= MIN_LENGTH) {
46
							subjectMap.get(type).add(value);
47
						}
48
					}
49
				}
50
			}
51
		}
52

  
53
		return subjectMap;
54
	}
55

  
56
	private String guessType(final String subject) {
57
		if (subject.startsWith("info:eu-repo")) {
58
			final String s = subject.replaceAll(REGEX_SUBJECT, "$3");
59
			return s;
60
		} else {
61
			return "keyword";
62
		}
63
	}
64
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/JoinPersonGroupReducer.java
2 2

  
3 3
import java.io.IOException;
4 4

  
5
import java.util.List;
6

  
5 7
import com.google.common.base.Function;
6 8
import com.google.common.collect.Iterables;
7
import eu.dnetlib.data.mapreduce.util.OafDecoder;
8
import eu.dnetlib.data.proto.PersonProtos.Person;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.io.NullWritable;
9
import com.google.common.collect.Lists;
10

  
11
import org.apache.commons.lang.StringUtils;
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14

  
11 15
import org.apache.hadoop.io.Text;
12 16
import org.apache.hadoop.mapreduce.Reducer;
13 17

  
14
public class JoinPersonGroupReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, NullWritable, NullWritable> {
18
public class JoinPersonGroupReducer extends Reducer<Text, Text, Text, Text> {
15 19

  
20
	/**
21
	 * logger.
22
	 */
23
	private static final Log log = LogFactory.getLog(JoinPersonGroupReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
16 24

  
25
	private Text tKey;
26
	private Text tValue;
27

  
28
	private final static int MIN_ENTRIES_THRESHOLD = 100;
29

  
30
	private int minEntriesThreshold;
31

  
17 32
	@Override
18
	protected void reduce(final ImmutableBytesWritable key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
33
	protected void setup(final Context context) throws IOException, InterruptedException {
34
		super.setup(context);
35
		tKey = new Text("");
36
		tValue = new Text();
19 37

  
20
		final Iterable<OafDecoder> decoders = Iterables.transform(values, new Function<ImmutableBytesWritable, OafDecoder>() {
21
			@Override
22
			public OafDecoder apply(final ImmutableBytesWritable ibw) {
23
				return OafDecoder.decode(ibw.copyBytes());
24
			}
25
		});
38
		minEntriesThreshold = context.getConfiguration().getInt("min.entries.threshold", MIN_ENTRIES_THRESHOLD);
39
	}
26 40

  
27
		final Iterable<Person> persons = Iterables.transform(decoders, new Function<OafDecoder, Person>() {
41
	@Override
42
	protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
43

  
44
		final CsvSerialiser csvSerialiser = new CsvSerialiser();
45
		String outKey = key.toString().replaceAll("[^a-zA-Z ]", "").toLowerCase();
46

  
47
		if (StringUtils.isBlank(outKey)) {
48
			context.getCounter("person", "blank key").increment(1);
49
			return;
50
		}
51

  
52
		final List<CsvEntry> entries = Lists.newArrayList(Iterables.transform(values, new Function<Text, CsvEntry>() {
53

  
28 54
			@Override
29
			public Person apply(final OafDecoder d) {
30
				return d.getEntity().getPerson();
55
			public CsvEntry apply(final Text t) {
56
				return CsvEntry.fromJson(t.toString());
31 57
			}
32
		});
58
		}));
33 59

  
34
		int i = 0;
35
		for (final Person p : persons) {
36
			i++;
60
		trackPersonInfo(entries.size(), context, "person");
37 61

  
62
		if (entries.size() < minEntriesThreshold) {
63
			return;
64
		}
38 65

  
66
		//tKey.set(outKey);
67
		tValue.set(csvSerialiser.asCSV(entries));
68
		context.write(tKey, tValue);
69

  
70
		context.getCounter("person", "csv").increment(1);
71
	}
72

  
73
	private void trackPersonInfo(final int count, final Context context, final String counterName) {
74

  
75
		if (count > 0 && count <= 10) {
76
			context.getCounter(counterName, count + "").increment(1);
77
			return;
39 78
		}
79

  
80
		if (count > 10 && count <= 20) {
81
			context.getCounter(counterName, "[10, 20)").increment(1);
82
			return;
83
		}
84

  
85
		if (count > 20 && count <= 30) {
86
			context.getCounter(counterName, "[20, 30)").increment(1);
87
			return;
88
		}
89

  
90
		if (count > 30 && count <= 40) {
91
			context.getCounter(counterName, "[30, 40)").increment(1);
92
			return;
93
		}
94

  
95
		if (count > 40 && count <= 50) {
96
			context.getCounter(counterName, "[40, 50)").increment(1);
97
			return;
98
		}
99

  
100
		if (count > 50 && count <= 70) {
101
			context.getCounter(counterName, "[50, 70)").increment(1);
102
			return;
103
		}
104

  
105
		if (count > 70 && count <= 100) {
106
			context.getCounter(counterName, "[70, 100)").increment(1);
107
			return;
108
		}
109

  
110
		if (count > 100) {
111
			context.getCounter(counterName, "[100, *)").increment(1);
112
			return;
113
		}
114

  
40 115
	}
41 116

  
117
	@Override
118
	public void cleanup(final Context context) throws IOException, InterruptedException {
119
		super.cleanup(context);
120
	}
121

  
42 122
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/CsvEntry.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment;
2

  
3
import java.util.Set;
4

  
5
import com.google.common.collect.Sets;
6
import com.google.gson.Gson;
7

  
8
/**
9
 * Created by claudio on 20/04/16.
10
 */
11
public class CsvEntry {
12

  
13
	private String key;
14

  
15
	private Set<String> features = Sets.newLinkedHashSet();
16

  
17
	public CsvEntry() {
18
	}
19

  
20
	public CsvEntry(final String key, final Set<String> features) {
21
		this.key = key;
22
		this.features = features;
23
	}
24

  
25
	public CsvEntry(final Set<String> features) {
26
		this.features = features;
27
	}
28

  
29
	public void addFeature(final String f) {
30
		getFeatures().add(f);
31
	}
32

  
33
	public Set<String> getFeatures() {
34
		return features;
35
	}
36

  
37
	public void setFeatures(final Set<String> features) {
38
		this.features = features;
39
	}
40

  
41
	public static CsvEntry fromJson(final String json) {
42
		return new Gson().fromJson(json, CsvEntry.class);
43
	}
44

  
45
	public String getKey() {
46
		return key;
47
	}
48

  
49
	public void setKey(final String key) {
50
		this.key = key;
51
	}
52

  
53
	@Override
54
	public String toString() {
55
		return new Gson().toJson(this);
56
	}
57

  
58
	@Override
59
	public boolean equals(final Object o) {
60
		return (o instanceof CsvEntry) && ((CsvEntry) o).getFeatures().equals(getFeatures());
61
	}
62

  
63
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/CsvSerialiser.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment;
2

  
3
import java.io.StringWriter;
4
import java.util.List;
5
import java.util.Set;
6

  
7
import com.google.common.base.Joiner;
8
import com.google.common.collect.Iterables;
9
import com.google.common.collect.Sets;
10
import org.apache.commons.lang.StringUtils;
11

  
12
/**
13
 * Created by claudio on 26/04/16.
14
 */
15
public class CsvSerialiser {
16

  
17
	private final static int MAX_FEATURES = 100;
18
	private final static int MAX_ROWS = 1000;
19

  
20
	private int maxRows = MAX_ROWS;
21
	private int maxFeatures = MAX_FEATURES;
22

  
23
	public CsvSerialiser() {
24
	}
25

  
26
	public CsvSerialiser(int maxRows, int maxFeatures) {
27
		this.maxRows = maxRows;
28
		this.maxFeatures = maxFeatures;
29
	}
30

  
31
	public String asCSV(final List<CsvEntry> list) {
32
		final Set<String> features = Sets.newLinkedHashSet();
33

  
34
		for(CsvEntry e : Iterables.limit(list, maxRows)) {
35
			features.addAll(e.getFeatures());
36
		}
37

  
38
		final Iterable<String> cappedFeatures = Iterables.limit(features, maxFeatures);
39
		//context.getCounter("person", "features " + Iterables.size(cappedFeatures)).increment(1);
40

  
41
		final StringWriter csv = new StringWriter();
42
		csv.append("k,");
43
		csv.append(Joiner.on(",").join(cappedFeatures) + "\n");
44
		for(CsvEntry e : Iterables.limit(list, maxRows)) {
45
			final StringWriter line = new StringWriter();
46
			line.append(e.getKey()+",");
47
			for(String f : cappedFeatures) {
48
				if(e.getFeatures().contains(f)) {
49
					line.append("1,");
50
				} else {
51
					line.append("0,");
52
				}
53
			}
54
			csv.append(StringUtils.substringBeforeLast(line.toString(), ",")  + "\n");
55
		}
56

  
57
		return csv.toString();
58
	}
59

  
60
}

Also available in: Unified diff