Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.nio.charset.Charset;
5
import java.util.List;
6

    
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.io.Text;
11
import org.apache.hadoop.mapreduce.Reducer;
12
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
13

    
14
import com.googlecode.protobuf.format.JsonFormat;
15
import com.googlecode.protobuf.format.JsonFormat.ParseException;
16

    
17
import eu.dnetlib.data.mapreduce.JobParams;
18
import eu.dnetlib.data.mapreduce.util.OafDecoder;
19
import eu.dnetlib.data.proto.OafProtos.Oaf;
20
import eu.dnetlib.pace.util.DedupConfig;
21
import eu.dnetlib.pace.util.DedupConfigLoader;
22

    
23
public class DedupRootsToCsvReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text> {
24

    
25
	/**
26
	 * logger.
27
	 */
28
	private static final Log log = LogFactory.getLog(DedupRootsToCsvReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
29

    
30
	private static final String COUNTER_GROUP = "csv";
31

    
32
	enum Tables {
33
		Groups, NativeGroups, NativeEntities
34
	}
35

    
36
	private DedupConfig dedupConf;
37

    
38
	private String DELIM;
39

    
40
	private String WRAP;
41

    
42
	private Text tKey;
43
	private Text tValue;
44
	private MultipleOutputs<Text, Text> mos;
45

    
46
	@Override
47
	protected void setup(final Context context) throws IOException, InterruptedException {
48
		super.setup(context);
49
		tKey = new Text();
50
		tValue = new Text();
51

    
52
		mos = new MultipleOutputs<Text, Text>(context);
53
		dedupConf = DedupConfigLoader.load(context.getConfiguration().get(JobParams.WF_CONF));
54

    
55
		log.info("wf conf: " + dedupConf.toString());
56

    
57
		DELIM = context.getConfiguration().get("mapred.textoutputformat.separator", "!");
58
		WRAP = context.getConfiguration().get("mapred.textoutputformat.wrapper", "#");
59

    
60
		log.info("unsing field DELIMITER: '" + DELIM + "'");
61
	}
62

    
63
	@Override
64
	protected void reduce(final ImmutableBytesWritable key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException,
65
	InterruptedException {
66

    
67
		for (final ImmutableBytesWritable ibw : values) {
68
			final RootEntity rootEntity = RootEntity.decode(new String(ibw.copyBytes(), Charset.forName("UTF-8")));
69

    
70
			emitRoot(context, rootEntity.getJoaf());
71
			emitRels(context, key, rootEntity.getMergedIds());
72

    
73
			return;
74
		}
75
	}
76

    
77
	private void emitRoot(final Context context, final String joaf) throws IOException, InterruptedException {
78

    
79
		final Oaf.Builder builder = Oaf.newBuilder();
80

    
81
		try {
82
			JsonFormat.merge(joaf, builder);
83
		} catch (final ParseException e) {
84
			context.getCounter("roots csv", e.getClass().getSimpleName()).increment(1);
85
			return;
86
		}
87

    
88
		final OafDecoder d = OafDecoder.decode(builder.build());
89

    
90
		tKey.set((WRAP + d.getEntityId() + WRAP).getBytes(Charset.forName("UTF-8")));
91
		// tValue.set(value.getBytes(Charset.forName("UTF-8")));
92

    
93
		mos.write(Tables.NativeEntities.toString(), tKey, tValue, Tables.NativeEntities.toString());
94
		context.getCounter(COUNTER_GROUP, "native_entities").increment(1);
95
	}
96

    
97
	private void emitRels(final Context context, final ImmutableBytesWritable key, final List<String> mergedIds) throws IOException, InterruptedException {
98
		final StringBuilder sb = new StringBuilder();
99
		final String groupId = new String(key.copyBytes());
100

    
101
		// native_groups groups native_entities
102
		tKey.set((WRAP + groupId + WRAP).getBytes(Charset.forName("UTF-8")));
103
		tValue.set((WRAP + dedupConf.getConfigurationId() + WRAP).getBytes(Charset.forName("UTF-8")));
104
		mos.write(Tables.Groups.toString(), tKey, tValue, Tables.Groups.toString());
105
		context.getCounter(COUNTER_GROUP, "groups").increment(mergedIds.size());
106

    
107
		for (final String id : mergedIds) {
108
			sb.append(WRAP).append(id).append(WRAP).append(DELIM);
109

    
110
			tValue.set(sb.toString().getBytes(Charset.forName("UTF-8")));
111
			mos.write(Tables.NativeGroups.toString(), tKey, tValue, Tables.NativeGroups.toString());
112
		}
113
		context.getCounter(COUNTER_GROUP, "native_groups").increment(mergedIds.size());
114
	}
115

    
116
	@Override
117
	public void cleanup(final Context context) throws IOException, InterruptedException {
118
		mos.close();
119
	}
120

    
121
}
(16-16/23)