Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.List;
5
import java.util.NavigableMap;
6

    
7
import org.apache.hadoop.hbase.client.Result;
8
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
import org.apache.hadoop.hbase.mapreduce.TableMapper;
10
import org.apache.hadoop.hbase.util.Bytes;
11
import org.apache.hadoop.io.Text;
12
import org.apache.http.HttpResponse;
13
import org.apache.http.client.HttpClient;
14
import org.apache.http.client.methods.HttpPost;
15
import org.apache.http.client.methods.HttpUriRequest;
16
import org.apache.http.impl.client.DefaultHttpClient;
17
import org.apache.http.params.BasicHttpParams;
18
import org.apache.http.params.HttpParams;
19

    
20
import com.google.common.collect.Lists;
21
import com.google.gson.Gson;
22

    
23
import eu.dnetlib.data.mapreduce.hbase.VolatileColumnFamily;
24
import eu.dnetlib.data.proto.RelTypeProtos.RelType;
25

    
26
public class FindPersonCoauthorsMapper extends TableMapper<Text, Text> {
27

    
28
	private final HttpClient client = new DefaultHttpClient();
29

    
30
	private final String url = "http://146.48.87.97:8888/addData";
31

    
32
	@Override
33
	protected void setup(final Context context) {
34
		// url = context.getConfiguration().get("dedup.person.coauthors.service.url");
35
	}
36

    
37
	@Override
38
	protected void map(final ImmutableBytesWritable rowkey, final Result row, final Context context) throws IOException, InterruptedException {
39
		final NavigableMap<byte[], byte[]> candidates = row.getFamilyMap(Bytes.toBytes(VolatileColumnFamily.dedupPerson.toString()));
40
		if ((candidates == null) || candidates.isEmpty()) return;
41

    
42
		final List<String> coauthors = Lists.newArrayList();
43
		for (final byte[] b : row.getFamilyMap(Bytes.toBytes(RelType.personResult.toString())).keySet()) {
44
			coauthors.add(Bytes.toString(b));
45
		}
46

    
47
		for (final byte[] candidate : candidates.keySet()) {
48
			emit(context, Bytes.toString(candidate), coauthors);
49
		}
50
	}
51

    
52
	private void emit(final Context context, final String candidate, final List<String> coauthors) {
53
		try {
54
			final HttpUriRequest request = new HttpPost(url);
55
			final HttpParams params = new BasicHttpParams();
56
			params.setParameter("id", candidate);
57
			params.setParameter("data", (new Gson().toJson(coauthors)));
58
			request.setParams(params);
59
			final HttpResponse response = client.execute(request);
60
			context.getCounter("HTTP call", "code " + response.getStatusLine().getStatusCode()).increment(1);
61
		} catch (final Exception e) {
62
			context.getCounter("HTTP call", "Exception " + e.getClass()).increment(1);
63
		}
64
	}
65
}
(18-18/22)