Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.misc;
2

    
3
import java.io.IOException;
4
import java.util.Map;
5
import java.util.Set;
6
import java.util.stream.Collectors;
7
import java.util.stream.Stream;
8

    
9
import com.google.common.collect.Sets;
10
import eu.dnetlib.data.mapreduce.util.DedupUtils;
11
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
12
import eu.dnetlib.data.proto.DatasourceProtos.Datasource;
13
import eu.dnetlib.data.proto.FieldTypeProtos.KeyValue;
14
import eu.dnetlib.data.proto.OafProtos.Oaf;
15
import eu.dnetlib.data.proto.ResultProtos.Result.Instance;
16
import org.apache.commons.lang.StringUtils;
17
import org.apache.commons.logging.Log;
18
import org.apache.commons.logging.LogFactory;
19
import org.apache.hadoop.hbase.client.Result;
20
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
21
import org.apache.hadoop.hbase.mapreduce.TableMapper;
22
import org.apache.hadoop.hbase.util.Bytes;
23
import org.apache.hadoop.io.Text;
24

    
25
/**
26
 * Reads publications and datasources,
27
 *  for publications, if publication is related to an EC project emit(collectedFrom@id, 1);
28
 *  for datasources emit(id, ds)
29
 *
30
 * @author claudio
31
 */
32
public class PredatoryJournalsMapper extends TableMapper<Text, ImmutableBytesWritable> {
33

    
34
	/**
35
	 * logger.
36
	 */
37
	private static final Log log = LogFactory.getLog(PredatoryJournalsMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
38

    
39
	private static final String PREDATORY_JOURNALS = "Predatory Journal Counters";
40

    
41
	private static final byte[] isProducedBy = "resultProject_outcome_isProducedBy".getBytes();
42

    
43
	private Text keyOut;
44

    
45
	private ImmutableBytesWritable valueOut;
46

    
47
	@Override
48
	protected void setup(final Context context) throws IOException, InterruptedException {
49
		super.setup(context);
50

    
51
		keyOut = new Text("");
52
		valueOut = new ImmutableBytesWritable();
53
	}
54

    
55
	@Override
56
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
57
		try {
58
			final OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
59

    
60
			final byte[] body = value.getValue(keyDecoder.getType().toString().getBytes(), DedupUtils.BODY_B);
61

    
62
			if (body != null) {
63
				final Oaf oaf = Oaf.parseFrom(body);
64

    
65
				if (oaf.getDataInfo().getDeletedbyinference() != false) {
66

    
67
					switch (keyDecoder.getType()) {
68
					case result:
69

    
70
						final Map<byte[], byte[]> projectRels = value.getFamilyMap(isProducedBy);
71

    
72
						if (projectRels != null) {
73
							long ecProjects = projectRels.keySet().stream()
74
									.map(String::new)
75
									.filter(s -> StringUtils.contains(s, "corda"))
76
									.count();
77

    
78
							final Set<String> ids = Sets.newHashSet();
79
							if (ecProjects > 0) {
80
								ids.addAll(
81
										Stream.concat(
82
												oaf.getEntity().getCollectedfromList().stream()
83
													.map(KeyValue::getKey),
84
												oaf.getEntity().getResult().getInstanceList().stream()
85
													.map(Instance::getHostedby)
86
													.map(KeyValue::getKey)
87
										).collect(Collectors.toList()));
88

    
89
								ids.forEach(dsId -> {
90
									emit(context, dsId, Bytes.toBytes(1));
91
									context.getCounter(PREDATORY_JOURNALS, "publications").increment(1);
92
								});
93
							}
94
						}
95
						break;
96
					case datasource:
97

    
98
						final Datasource ds = oaf.getEntity().getDatasource();
99
						final String dsType = ds.getMetadata().getDatasourcetype().getClassid();
100

    
101
						if (StringUtils.contains(dsType, "journal")) {
102
							emit(context, oaf.getEntity().getId(), body);
103
							context.getCounter(PREDATORY_JOURNALS, "journals").increment(1);
104
						}
105
						break;
106

    
107
					default:
108
						break;
109
					}
110
				}
111
			}
112
		} catch (final Throwable e) {
113
			log.error("error exporting the following record from HBase: " + value.toString(), e);
114
			context.getCounter("error", e.getClass().getName()).increment(1);
115
			throw new RuntimeException(e);
116
		}
117
	}
118

    
119
	private void emit(final Context context, final String key, byte[] value) {
120
		keyOut.set(key);
121
		valueOut.set(value);
122
		try {
123
			context.write(keyOut, valueOut);
124
		} catch (IOException | InterruptedException e) {
125
			throw new RuntimeException(e);
126
		}
127
	}
128

    
129
}
(1-1/2)