Project

General

Profile

« Previous | Next » 

Revision 53779

avoid to import non necessary affiliations from DOIBoost

View differences:

modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataimport/CrossRefImportMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dataimport;
2

  
3
import com.google.gson.JsonObject;
4
import com.google.gson.JsonParser;
5
import eu.dnetlib.actionmanager.actions.ActionFactory;
6
import eu.dnetlib.actionmanager.actions.AtomicAction;
7
import eu.dnetlib.actionmanager.common.Agent;
8
import org.apache.hadoop.io.LongWritable;
9
import org.apache.hadoop.io.Text;
10
import org.apache.hadoop.mapreduce.Mapper;
11

  
12
import java.io.IOException;
13
import java.util.List;
14

  
15
public class CrossRefImportMapper extends Mapper<LongWritable, Text, Text, Text> {
16

  
17
    private String setName;
18
    private Agent agent;
19
    private Text keyout;
20
    private Text valueOut;
21
    private JsonParser parser;
22
    private ActionFactory factory;
23
    private boolean invisible;
24
    private boolean onlyOrganization;
25

  
26
    @Override
27
    protected void setup(Context context) throws IOException, InterruptedException {
28
        setName = context.getConfiguration().get("setName");
29
        agent = new Agent(context.getConfiguration().get("agentId"), context.getConfiguration().get("agentName"), Agent.AGENT_TYPE.service);
30
        keyout = new Text("");
31
        valueOut = new Text("");
32
        factory = new ActionFactory();
33
        parser = new JsonParser();
34
        invisible = context.getConfiguration().getBoolean("invisible", false);
35
        onlyOrganization= context.getConfiguration().getBoolean("onlyOrganization", false);
36
    }
37

  
38
    @Override
39
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
40
        final String inputJson = value.toString();
41
        final JsonObject rootElement = parser.parse(inputJson).getAsJsonObject();
42
        try {
43
            List<AtomicAction> atomicActions = DOIBoostToActions.generatePublicationActionsFromDump(rootElement, factory, setName, agent, invisible, onlyOrganization);
44
            if (atomicActions!= null) {
45
                for (AtomicAction action: atomicActions){
46

  
47
                    keyout.set(action.getRowKey());
48
                    valueOut.set(action.toJSON());
49
                    context.write(keyout, valueOut);
50
                    context.getCounter(this.getClass().getSimpleName(), action.getTargetColumnFamily()).increment(1);
51
                }
52
            }
53
        } catch (Throwable e) {
54
            System.err.println(inputJson);
55
            throw e;
56
        }
57
    }
58
}
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataimport/DOIBoostImportMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dataimport;
2

  
3
import com.google.common.base.Joiner;
4
import com.google.gson.JsonObject;
5
import com.google.gson.JsonParser;
6
import eu.dnetlib.actionmanager.actions.ActionFactory;
7
import eu.dnetlib.actionmanager.actions.AtomicAction;
8
import eu.dnetlib.actionmanager.common.Agent;
9
import org.apache.hadoop.io.LongWritable;
10
import org.apache.hadoop.io.Text;
11
import org.apache.hadoop.mapreduce.Mapper;
12

  
13
import java.io.IOException;
14
import java.util.List;
15

  
16
public class DOIBoostImportMapper extends Mapper<LongWritable, Text, Text, Text> {
17

  
18
    private String setName;
19
    private Agent agent;
20
    private Text keyout;
21
    private Text valueOut;
22
    private JsonParser parser;
23
    private ActionFactory factory;
24
    private boolean invisible;
25
    private boolean onlyOrganization;
26

  
27
    private static final String SEPARATOR = "@";
28

  
29
    @Override
30
    protected void setup(Context context) throws IOException, InterruptedException {
31
        setName = context.getConfiguration().get("setName");
32
        agent = new Agent(context.getConfiguration().get("agentId"), context.getConfiguration().get("agentName"), Agent.AGENT_TYPE.service);
33
        keyout = new Text("");
34
        valueOut = new Text("");
35
        factory = new ActionFactory();
36
        parser = new JsonParser();
37
        invisible = context.getConfiguration().getBoolean("invisible", false);
38
        onlyOrganization= context.getConfiguration().getBoolean("onlyOrganization", false);
39
    }
40

  
41
    @Override
42
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
43
        final String inputJson = value.toString();
44
        final JsonObject rootElement = parser.parse(inputJson).getAsJsonObject();
45
        try {
46
            List<AtomicAction> atomicActions = DOIBoostToActions.generatePublicationActionsFromDump(rootElement, factory, setName, agent, invisible, onlyOrganization);
47
            if (atomicActions!= null) {
48
                for (AtomicAction action: atomicActions){
49
                    keyout.set(Joiner.on(SEPARATOR).join(action.getTargetRowKey(), action.getTargetColumnFamily(), action.getTargetColumn()));
50
                    valueOut.set(action.toJSON());
51
                    context.write(keyout, valueOut);
52
                    context.getCounter(this.getClass().getSimpleName(), action.getTargetColumnFamily()).increment(1);
53
                }
54
            }
55
        } catch (Throwable e) {
56
            System.err.println(inputJson);
57
            throw e;
58
        }
59
    }
60
}
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataimport/DOIBoostImportReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.dataimport;
2

  
3
import eu.dnetlib.actionmanager.actions.AtomicAction;
4
import org.apache.hadoop.io.Text;
5
import org.apache.hadoop.mapreduce.Reducer;
6

  
7
import java.io.IOException;
8
import java.util.Iterator;
9

  
10
public class DOIBoostImportReducer extends Reducer<Text, Text, Text, Text> {
11

  
12
    private Text keyout;
13
    private Text valueOut;
14

  
15
    @Override
16
    protected void setup(Context context) throws IOException, InterruptedException {
17
        keyout = new Text("");
18
        valueOut = new Text("");
19
    }
20

  
21
    @Override
22
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
23
        final Iterator<Text> iterator = values.iterator();
24
        if (iterator.hasNext()) {
25

  
26
            final String aaString = iterator.next().toString();
27
            final AtomicAction aa = AtomicAction.fromJSON(aaString);
28

  
29
            keyout.set(aa.getRowKey());
30
            valueOut.set(aaString);
31
            context.write(keyout, valueOut);
32

  
33
            context.getCounter(this.getClass().getSimpleName(), aa.getTargetColumnFamily()).increment(1);
34
        }
35

  
36
        while (iterator.hasNext()) {
37
            final AtomicAction aa = AtomicAction.fromJSON(iterator.next().toString());
38
            context.getCounter(this.getClass().getSimpleName(), String.format("ignored %s", aa.getTargetColumnFamily())).increment(1);
39
        }
40

  
41
    }
42
}

Also available in: Unified diff