Project

General

Profile

« Previous | Next » 

Revision 52960

added jobs for predatory journal analysis

View differences:

modules/dnet-mapreduce-jobs/branches/beta/src/main/java/eu/dnetlib/data/mapreduce/hbase/misc/PredatoryJournalsReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.misc;
2

  
3
import java.io.IOException;
4

  
5
import com.google.protobuf.InvalidProtocolBufferException;
6
import eu.dnetlib.data.proto.OafProtos.Oaf;
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.hbase.util.Bytes;
11
import org.apache.hadoop.io.Text;
12
import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
13
import org.apache.hadoop.mapreduce.Reducer;
14

  
15
public class PredatoryJournalsReducer extends Reducer<Text, ImmutableBytesWritable, Text, Text> {
16

  
17
	private static final Log log = LogFactory.getLog(PredatoryJournalsReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
18

  
19
	private static final String PREDATORY_JOURNALS = "Predatory Journal Counters";
20
	private static final String PUB_CHECKS = "Publication Counters";
21

  
22

  
23
	private Text keyOut = new Text();
24
	private Text valueOut = new Text();
25

  
26
	@Override
27
	protected void setup(final Context context) throws IOException, InterruptedException {
28
		super.setup(context);
29
	}
30

  
31
	@Override
32
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
33

  
34
		try {
35
			int count = 0;
36
			String dsName = "";
37
			Oaf oaf = null;
38
			for(ImmutableBytesWritable ibw : values) {
39
				final byte[] data = ibw.copyBytes();
40
				oaf = parseOaf(data);
41

  
42
				if (oaf != null) {
43
					dsName = oaf.getEntity().getDatasource().getMetadata().getOfficialname().getValue();
44
				} else {
45
					final int one = Bytes.toInt(data);
46
					if (one != 1) {
47
						throw new IllegalStateException("woops unexpected number, got " + one);
48
					}
49
					count += one;
50
				}
51
			}
52

  
53
			if (count > 0 && oaf != null) {
54
				valueOut.set(String.format("\"%s\", %s", dsName.replaceAll(",", " "), count));
55
				context.write(keyOut, valueOut);
56
				context.getCounter(PREDATORY_JOURNALS, "journals").increment(count);
57
			}
58

  
59
			if (oaf != null && count == 0) {
60
				context.getCounter(PUB_CHECKS, "pubs from journals not linked to projects").increment(1);
61
			}
62

  
63
			if (oaf == null && count > 0) {
64
				context.getCounter(PUB_CHECKS, "pubs from non-journal and linked to projects").increment(count);
65
			}
66

  
67
		} catch (final Exception e) {
68
			context.getCounter("error", e.getClass().getName()).increment(1);
69
			throw new RuntimeException(e);
70
		}
71
	}
72

  
73
	private Oaf parseOaf(final byte[] data) {
74
		try {
75
			return Oaf.parseFrom(data);
76
		} catch (InvalidProtocolBufferException e) {
77
			return null;
78
		}
79
	}
80

  
81
}
modules/dnet-mapreduce-jobs/branches/beta/src/main/java/eu/dnetlib/data/mapreduce/hbase/misc/PredatoryJournalsMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.misc;
2

  
3
import java.io.IOException;
4
import java.util.Map;
5
import java.util.Set;
6
import java.util.stream.Collectors;
7
import java.util.stream.Stream;
8

  
9
import com.google.common.collect.Sets;
10
import eu.dnetlib.data.mapreduce.util.DedupUtils;
11
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
12
import eu.dnetlib.data.proto.DatasourceProtos.Datasource;
13
import eu.dnetlib.data.proto.FieldTypeProtos.KeyValue;
14
import eu.dnetlib.data.proto.OafProtos.Oaf;
15
import eu.dnetlib.data.proto.ResultProtos.Result.Instance;
16
import org.apache.commons.lang.StringUtils;
17
import org.apache.commons.logging.Log;
18
import org.apache.commons.logging.LogFactory;
19
import org.apache.hadoop.hbase.client.Result;
20
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
21
import org.apache.hadoop.hbase.mapreduce.TableMapper;
22
import org.apache.hadoop.hbase.util.Bytes;
23
import org.apache.hadoop.io.Text;
24

  
25
/**
26
 * Reads publications and datasources,
27
 *  for publications, if publication is related to an EC project emit(collectedFrom@id, 1);
28
 *  for datasources emit(id, ds)
29
 *
30
 * @author claudio
31
 */
32
public class PredatoryJournalsMapper extends TableMapper<Text, ImmutableBytesWritable> {
33

  
34
	/**
35
	 * logger.
36
	 */
37
	private static final Log log = LogFactory.getLog(PredatoryJournalsMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
38

  
39
	private static final String PREDATORY_JOURNALS = "Predatory Journal Counters";
40

  
41
	private static final byte[] isProducedBy = "resultProject_outcome_isProducedBy".getBytes();
42

  
43
	private Text keyOut;
44

  
45
	private ImmutableBytesWritable valueOut;
46

  
47
	@Override
48
	protected void setup(final Context context) throws IOException, InterruptedException {
49
		super.setup(context);
50

  
51
		keyOut = new Text("");
52
		valueOut = new ImmutableBytesWritable();
53
	}
54

  
55
	@Override
56
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
57
		try {
58
			final OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
59

  
60
			final byte[] body = value.getValue(keyDecoder.getType().toString().getBytes(), DedupUtils.BODY_B);
61

  
62
			if (body != null) {
63
				final Oaf oaf = Oaf.parseFrom(body);
64

  
65
				if (oaf.getDataInfo().getDeletedbyinference() != false) {
66

  
67
					switch (keyDecoder.getType()) {
68
					case result:
69

  
70
						final Map<byte[], byte[]> projectRels = value.getFamilyMap(isProducedBy);
71

  
72
						if (projectRels != null) {
73
							long ecProjects = projectRels.keySet().stream()
74
									.map(String::new)
75
									.filter(s -> StringUtils.contains(s, "corda"))
76
									.count();
77

  
78
							final Set<String> ids = Sets.newHashSet();
79
							if (ecProjects > 0) {
80
								ids.addAll(
81
										Stream.concat(
82
												oaf.getEntity().getCollectedfromList().stream()
83
													.map(KeyValue::getKey),
84
												oaf.getEntity().getResult().getInstanceList().stream()
85
													.map(Instance::getHostedby)
86
													.map(KeyValue::getKey)
87
										).collect(Collectors.toList()));
88

  
89
								ids.forEach(dsId -> {
90
									emit(context, dsId, Bytes.toBytes(1));
91
									context.getCounter(PREDATORY_JOURNALS, "publications").increment(1);
92
								});
93
							}
94
						}
95
						break;
96
					case datasource:
97

  
98
						final Datasource ds = oaf.getEntity().getDatasource();
99
						final String dsType = ds.getMetadata().getDatasourcetype().getClassid();
100

  
101
						if (StringUtils.contains(dsType, "journal")) {
102
							emit(context, oaf.getEntity().getId(), body);
103
							context.getCounter(PREDATORY_JOURNALS, "journals").increment(1);
104
						}
105
						break;
106

  
107
					default:
108
						break;
109
					}
110
				}
111
			}
112
		} catch (final Throwable e) {
113
			log.error("error exporting the following record from HBase: " + value.toString(), e);
114
			context.getCounter("error", e.getClass().getName()).increment(1);
115
			throw new RuntimeException(e);
116
		}
117
	}
118

  
119
	private void emit(final Context context, final String key, byte[] value) {
120
		keyOut.set(key);
121
		valueOut.set(value);
122
		try {
123
			context.write(keyOut, valueOut);
124
		} catch (IOException | InterruptedException e) {
125
			throw new RuntimeException(e);
126
		}
127
	}
128

  
129
}

Also available in: Unified diff