Revision 48805
Added by Eri Katsari almost 7 years ago
SourceBuildNoCacheMapper.java | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.blocking.Blocking; |
4 | 4 |
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.configuration.LodConfiguration; |
5 |
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.configuration.Properties; |
|
5 | 6 |
import org.apache.hadoop.io.LongWritable; |
6 | 7 |
import org.apache.hadoop.io.Text; |
7 | 8 |
import org.apache.hadoop.mapreduce.Mapper; |
8 | 9 |
import org.apache.log4j.Logger; |
9 | 10 |
|
10 | 11 |
import java.io.IOException; |
11 |
import java.util.*; |
|
12 |
import java.util.ArrayList; |
|
13 |
import java.util.Arrays; |
|
14 |
import java.util.Collections; |
|
15 |
import java.util.HashSet; |
|
16 |
import java.util.List; |
|
17 |
import java.util.Set; |
|
12 | 18 |
|
13 | 19 |
/** |
14 | 20 |
* Mapper Class that reads HBASE contents and prepares them for the StatsDB |
... | ... | |
28 | 34 |
private Logger log = Logger.getLogger(this.getClass()); |
29 | 35 |
private LodConfiguration lodConfiguration = new LodConfiguration(); |
30 | 36 |
private String lastExecutionDate; |
31 |
private static final String PREFIX = "source_"; |
|
32 | 37 |
|
33 | 38 |
private HashSet<String> stopWordsMap = new HashSet<>(); |
34 |
private static final String FIELD_DELIM = "\t"; |
|
35 |
private static final String LINE_DELIM = "\t.\t"; |
|
36 | 39 |
|
37 | 40 |
private boolean createCompositekey; |
38 | 41 |
private Set<String> usedProperties = new HashSet<>(); |
... | ... | |
47 | 50 |
@Override |
48 | 51 |
protected void setup(Context context) throws IOException, InterruptedException { |
49 | 52 |
try { |
50 |
lodConfiguration.load(context.getConfiguration().get("lod.sourceMappings"));
|
|
51 |
stopWordsMap.addAll(Arrays.asList(context.getConfiguration().get("lod.stopwords").split(",")));
|
|
52 |
createCompositekey = context.getConfiguration().getBoolean("createCompositekey", true);
|
|
53 |
lodConfiguration.load(context.getConfiguration().get(Properties.LOD_SOURCE_MAPPINGS));
|
|
54 |
stopWordsMap.addAll(Arrays.asList(context.getConfiguration().get(Properties.LOD_STOPWORDS).split(",")));
|
|
55 |
createCompositekey = context.getConfiguration().getBoolean(eu.dnetlib.data.mapreduce.hbase.lodExport.utils.configuration.Properties.LOD_CREATE_COMPOSITEKEY, true);
|
|
53 | 56 |
usedProperties.add("<http://www.eurocris.org/ontologies/cerif/1.3#name>"); |
54 | 57 |
usedProperties.add("<http://lod.openaire.eu/vocab/year>"); |
55 |
|
|
56 |
|
|
57 | 58 |
} catch (Exception ex) { |
58 | 59 |
log.error("An error occured during Mapper Setup " + ex.toString(), ex); |
59 | 60 |
System.out.println(ex.getCause().toString()); |
... | ... | |
67 | 68 |
|
68 | 69 |
StringBuilder id = new StringBuilder(); |
69 | 70 |
context.getCounter(SOURCE_BUILD_COUNTERS.INPUT_SOURCE_RECORDS).increment(1); |
70 |
String[] triples = result.toString().split(LINE_DELIM); |
|
71 |
String[] triples = result.toString().split(Properties.LINE_DELIM);
|
|
71 | 72 |
|
72 | 73 |
|
73 | 74 |
List<String> tokenList = parseFieldsAndGenerateTokens(id, triples); |
... | ... | |
89 | 90 |
List<String> tokenList = new ArrayList<>(); |
90 | 91 |
|
91 | 92 |
for (String triple : triples) { |
92 |
String[] fields = triple.split(FIELD_DELIM); |
|
93 |
String[] fields = triple.split(Properties.FIELD_DELIM);
|
|
93 | 94 |
if (fields.length == 3) { |
94 | 95 |
if (id.length() < 1) { |
95 |
id.append(PREFIX).append(fields[0]); |
|
96 |
id.append(Properties.SOURCE_PREFIX).append(fields[0]);
|
|
96 | 97 |
} |
97 | 98 |
String property = fields[1]; |
98 | 99 |
String value = fields[2]; |
... | ... | |
109 | 110 |
private void writeSingleKeysAndValues(Text result, Context context, List<String> tokenList, StringBuilder id) throws IOException, InterruptedException { |
110 | 111 |
for (String token : tokenList) { |
111 | 112 |
//Write BlockingKey, RecordID to output |
112 |
context.write(new Text(token), new Text(id + FIELD_DELIM + result.toString())); |
|
113 |
context.write(new Text(token), new Text(id + Properties.FIELD_DELIM + result.toString()));
|
|
113 | 114 |
context.getCounter(SOURCE_BUILD_COUNTERS.BLOCKING_KEYS).increment(1); |
114 | 115 |
} |
115 | 116 |
} |
116 | 117 |
|
117 | 118 |
private void writeCompositeKeyAndValue(Text result, Context context, List<String> tokenList, StringBuilder id) throws IOException, InterruptedException { |
118 | 119 |
String compositeKey = createCompositeKey(tokenList); |
119 |
|
|
120 |
context.write(new Text(compositeKey), new Text(id + FIELD_DELIM + result.toString())); |
|
120 |
context.write(new Text(compositeKey), new Text(id + Properties.FIELD_DELIM + result.toString())); |
|
121 | 121 |
context.getCounter(SOURCE_BUILD_COUNTERS.BLOCKING_KEYS).increment(1); |
122 | 122 |
} |
123 | 123 |
|
Also available in: Unified diff
Final update- fixed distance algs, added props un wf