Revision 54125
Added by Miriam Baglioni over 5 years ago
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/projecttoresult/ProjectToResultFileReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult; |
|
2 |
|
|
3 |
|
|
4 |
import com.googlecode.protobuf.format.JsonFormat; |
|
5 |
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException; |
|
6 |
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator; |
|
7 |
import eu.dnetlib.data.proto.*; |
|
8 |
import org.apache.commons.logging.Log; |
|
9 |
import org.apache.commons.logging.LogFactory; |
|
10 |
import org.apache.hadoop.hbase.client.Put; |
|
11 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
12 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
|
13 |
import org.apache.hadoop.hbase.util.Bytes; |
|
14 |
import org.apache.hadoop.io.Text; |
|
15 |
import org.apache.hadoop.mapreduce.Reducer; |
|
16 |
|
|
17 |
import java.io.IOException; |
|
18 |
|
|
19 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
|
20 |
|
|
21 |
public class ProjectToResultFileReducer extends Reducer<ImmutableBytesWritable, Text, Text,Text> { |
|
22 |
private static final Log log = LogFactory.getLog(ProjectToResultReducer.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
23 |
|
|
24 |
private Text keyOut; |
|
25 |
private Text outValue; |
|
26 |
|
|
27 |
|
|
28 |
@Override |
|
29 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
30 |
super.setup(context); |
|
31 |
keyOut = new Text(""); |
|
32 |
outValue = new Text(); |
|
33 |
} |
|
34 |
|
|
35 |
|
|
36 |
@Override |
|
37 |
protected void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { |
|
38 |
ResultIterator rh = null; |
|
39 |
try { |
|
40 |
rh = new ResultProjectIterator(values, Bytes.toString(key.copyBytes())); |
|
41 |
} catch (NotValidResultSequenceException e) { |
|
42 |
context.getCounter(COUNTER_PROPAGATION, e.getMessage()).increment(1); |
|
43 |
return; |
|
44 |
} |
|
45 |
while (rh.hasNext()) { |
|
46 |
for (OafProtos.Oaf oaf : rh.next()) { |
|
47 |
keyOut.set(oaf.getRel().getTarget()); |
|
48 |
outValue.set(JsonFormat.printToString(oaf).getBytes()); |
|
49 |
context.write(keyOut, outValue); |
|
50 |
} |
|
51 |
context.getCounter(COUNTER_PROPAGATION, "Added relation to project").increment(1); |
|
52 |
} |
|
53 |
|
|
54 |
} |
|
55 |
} |
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/projecttoresult/ProjectToResultReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult; |
|
2 |
|
|
3 |
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException; |
|
4 |
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator; |
|
5 |
import eu.dnetlib.data.proto.*; |
|
6 |
import org.apache.commons.logging.Log; |
|
7 |
import org.apache.commons.logging.LogFactory; |
|
8 |
import org.apache.hadoop.hbase.client.Put; |
|
9 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
10 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
|
11 |
import org.apache.hadoop.hbase.util.Bytes; |
|
12 |
import org.apache.hadoop.io.Text; |
|
13 |
|
|
14 |
import java.io.IOException; |
|
15 |
|
|
16 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
|
17 |
|
|
18 |
public class ProjectToResultReducer extends TableReducer<ImmutableBytesWritable, Text, ImmutableBytesWritable> { |
|
19 |
private static final Log log = LogFactory.getLog(ProjectToResultReducer.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
20 |
private ImmutableBytesWritable keyOut; |
|
21 |
|
|
22 |
|
|
23 |
|
|
24 |
@Override |
|
25 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
26 |
super.setup(context); |
|
27 |
keyOut = new ImmutableBytesWritable(); |
|
28 |
} |
|
29 |
|
|
30 |
|
|
31 |
@Override |
|
32 |
protected void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { |
|
33 |
ResultIterator rh = null; |
|
34 |
try { |
|
35 |
rh = new ResultProjectIterator(values,Bytes.toString(key.copyBytes())); |
|
36 |
} catch (NotValidResultSequenceException e) { |
|
37 |
context.getCounter(COUNTER_PROPAGATION,e.getMessage()).increment(1); |
|
38 |
return; |
|
39 |
} |
|
40 |
while(rh.hasNext()){ |
|
41 |
for(OafProtos.Oaf oaf:rh.next()){ |
|
42 |
final String source = oaf.getRel().getSource(); |
|
43 |
final Put put = new Put(Bytes.toBytes(source)).add(Bytes.toBytes(RELATION + oaf.getRel().getRelClass()),Bytes.toBytes(oaf.getRel().getTarget()),oaf.toByteArray()); |
|
44 |
keyOut.set(Bytes.toBytes(source)); |
|
45 |
context.write(keyOut, put); |
|
46 |
|
|
47 |
} |
|
48 |
context.getCounter(COUNTER_PROPAGATION,"Added relation to project").increment(1); |
|
49 |
} |
|
50 |
|
|
51 |
} |
|
52 |
|
|
53 |
|
|
54 |
|
|
55 |
|
|
56 |
|
|
57 |
} |
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/projecttoresult/ResultProjectIterator.java | ||
---|---|---|
56 | 56 |
Value v = Value.fromJson(it.next().toString()); |
57 | 57 |
if(trust == null) trust = v.getTrust(); |
58 | 58 |
if (v.getType() == PropagationConstants.Type.fromresult) { |
59 |
fromResult.addAll(Arrays.asList(StringUtils.split(v.getValue(), ","))); |
|
59 |
if(StringUtils.isNotBlank(v.getValue())) |
|
60 |
fromResult.addAll(Arrays.asList(StringUtils.split(v.getValue(), ","))); |
|
60 | 61 |
} else { |
61 | 62 |
fromSemRel.addAll(Arrays.asList(StringUtils.split(v.getValue(), ","))); |
62 | 63 |
} |
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/projecttoresult/PropagationProjectToResultFileReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult; |
|
2 |
|
|
3 |
|
|
4 |
import com.googlecode.protobuf.format.JsonFormat; |
|
5 |
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException; |
|
6 |
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator; |
|
7 |
import eu.dnetlib.data.proto.*; |
|
8 |
import org.apache.commons.logging.Log; |
|
9 |
import org.apache.commons.logging.LogFactory; |
|
10 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
11 |
import org.apache.hadoop.hbase.util.Bytes; |
|
12 |
import org.apache.hadoop.io.Text; |
|
13 |
import org.apache.hadoop.mapreduce.Reducer; |
|
14 |
|
|
15 |
import java.io.IOException; |
|
16 |
|
|
17 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
|
18 |
|
|
19 |
public class PropagationProjectToResultFileReducer extends Reducer<ImmutableBytesWritable, Text, Text,Text> { |
|
20 |
private static final Log log = LogFactory.getLog(PropagationProjectToResultReducer.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
21 |
|
|
22 |
private Text keyOut; |
|
23 |
private Text outValue; |
|
24 |
|
|
25 |
|
|
26 |
@Override |
|
27 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
28 |
super.setup(context); |
|
29 |
keyOut = new Text(""); |
|
30 |
outValue = new Text(); |
|
31 |
} |
|
32 |
|
|
33 |
|
|
34 |
@Override |
|
35 |
protected void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { |
|
36 |
ResultIterator rh = null; |
|
37 |
try { |
|
38 |
rh = new ResultProjectIterator(values, Bytes.toString(key.copyBytes())); |
|
39 |
} catch (NotValidResultSequenceException e) { |
|
40 |
context.getCounter(COUNTER_PROPAGATION, e.getMessage()).increment(1); |
|
41 |
return; |
|
42 |
} |
|
43 |
while (rh.hasNext()) { |
|
44 |
for (OafProtos.Oaf oaf : rh.next()) { |
|
45 |
keyOut.set(oaf.getRel().getTarget()); |
|
46 |
outValue.set(JsonFormat.printToString(oaf).getBytes()); |
|
47 |
context.write(keyOut, outValue); |
|
48 |
} |
|
49 |
context.getCounter(COUNTER_PROPAGATION, "Added relation to project").increment(1); |
|
50 |
} |
|
51 |
|
|
52 |
} |
|
53 |
} |
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/projecttoresult/PropagationProjectToResultReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult; |
|
2 |
|
|
3 |
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException; |
|
4 |
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator; |
|
5 |
import eu.dnetlib.data.proto.*; |
|
6 |
import org.apache.commons.logging.Log; |
|
7 |
import org.apache.commons.logging.LogFactory; |
|
8 |
import org.apache.hadoop.hbase.client.Put; |
|
9 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
10 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
|
11 |
import org.apache.hadoop.hbase.util.Bytes; |
|
12 |
import org.apache.hadoop.io.Text; |
|
13 |
|
|
14 |
import java.io.IOException; |
|
15 |
|
|
16 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
|
17 |
|
|
18 |
public class PropagationProjectToResultReducer extends TableReducer<ImmutableBytesWritable, Text, ImmutableBytesWritable> { |
|
19 |
private static final Log log = LogFactory.getLog(PropagationProjectToResultReducer.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
20 |
private ImmutableBytesWritable keyOut; |
|
21 |
|
|
22 |
|
|
23 |
|
|
24 |
@Override |
|
25 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
26 |
super.setup(context); |
|
27 |
keyOut = new ImmutableBytesWritable(); |
|
28 |
} |
|
29 |
|
|
30 |
|
|
31 |
@Override |
|
32 |
protected void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { |
|
33 |
ResultIterator rh = null; |
|
34 |
try { |
|
35 |
rh = new ResultProjectIterator(values,Bytes.toString(key.copyBytes())); |
|
36 |
} catch (NotValidResultSequenceException e) { |
|
37 |
context.getCounter(COUNTER_PROPAGATION,e.getMessage()).increment(1); |
|
38 |
return; |
|
39 |
} |
|
40 |
while(rh.hasNext()){ |
|
41 |
for(OafProtos.Oaf oaf:rh.next()){ |
|
42 |
final String source = oaf.getRel().getSource(); |
|
43 |
final Put put = new Put(Bytes.toBytes(source)).add(Bytes.toBytes(RELATION + oaf.getRel().getRelClass()),Bytes.toBytes(oaf.getRel().getTarget()),oaf.toByteArray()); |
|
44 |
keyOut.set(Bytes.toBytes(source)); |
|
45 |
context.write(keyOut, put); |
|
46 |
|
|
47 |
} |
|
48 |
context.getCounter(COUNTER_PROPAGATION,"Added relation to project").increment(1); |
|
49 |
} |
|
50 |
|
|
51 |
} |
|
52 |
|
|
53 |
|
|
54 |
|
|
55 |
|
|
56 |
|
|
57 |
} |
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/projecttoresult/ProjectToResultMapper.java | ||
---|---|---|
39 | 39 |
sem_rels = context.getConfiguration().getStrings("propagatetoproject.semanticrelations", DEFAULT_RELATION_SET); |
40 | 40 |
trust = context.getConfiguration().get("propagatetoproject.trust","0.85"); |
41 | 41 |
|
42 |
|
|
43 | 42 |
} |
44 | 43 |
|
45 | 44 |
@Override |
Also available in: Unified diff
refactoring and fixed issue for empty project list