Project

General

Profile

« Previous | Next » 

Revision 36670

updated to the new pace specs, cleanup

View differences:

DedupRootsToCsvReducer.java
17 17
import eu.dnetlib.data.mapreduce.JobParams;
18 18
import eu.dnetlib.data.mapreduce.util.OafDecoder;
19 19
import eu.dnetlib.data.proto.OafProtos.Oaf;
20
import eu.dnetlib.pace.util.DedupConfig;
21
import eu.dnetlib.pace.util.DedupConfigLoader;
20
import eu.dnetlib.pace.config.DedupConfig;
22 21

  
23 22
public class DedupRootsToCsvReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text> {
24 23

  
......
50 49
		tValue = new Text();
51 50

  
52 51
		mos = new MultipleOutputs<Text, Text>(context);
53
		dedupConf = DedupConfigLoader.load(context.getConfiguration().get(JobParams.WF_CONF));
52
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
54 53

  
55 54
		log.info("wf conf: " + dedupConf.toString());
56 55

  
......
62 61

  
63 62
	@Override
64 63
	protected void reduce(final ImmutableBytesWritable key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException,
65
	InterruptedException {
64
			InterruptedException {
66 65

  
67 66
		for (final ImmutableBytesWritable ibw : values) {
68 67
			final RootEntity rootEntity = RootEntity.decode(new String(ibw.copyBytes(), Charset.forName("UTF-8")));
......
100 99

  
101 100
		// native_groups groups native_entities
102 101
		tKey.set((WRAP + groupId + WRAP).getBytes(Charset.forName("UTF-8")));
103
		tValue.set((WRAP + dedupConf.getConfigurationId() + WRAP).getBytes(Charset.forName("UTF-8")));
102
		tValue.set((WRAP + dedupConf.getWf().getConfigurationId() + WRAP).getBytes(Charset.forName("UTF-8")));
104 103
		mos.write(Tables.Groups.toString(), tKey, tValue, Tables.Groups.toString());
105 104
		context.getCounter(COUNTER_GROUP, "groups").increment(mergedIds.size());
106 105

  

Also available in: Unified diff