Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodImport;
2

    
3
import java.io.IOException;
4
import java.sql.Connection;
5
import java.sql.PreparedStatement;
6
import java.util.ArrayList;
7
import java.util.Arrays;
8
import java.util.Iterator;
9
import java.util.List;
10

    
11
import org.apache.hadoop.conf.Configuration;
12
import org.apache.hadoop.io.NullWritable;
13
import org.apache.hadoop.io.Text;
14
import org.apache.hadoop.mapreduce.Reducer;
15
import org.apache.log4j.Logger;
16
import org.json.JSONObject;
17
import com.jolbox.bonecp.BoneCPDataSource;
18

    
19
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.DB;
20
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.MapCountries;
21
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.MapLanguages;
22
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.RDFizer;
23

    
24

    
25
public class LodImportReducer extends Reducer<Text, Text, NullWritable, NullWritable> {
26

    
27
    private Logger log = Logger.getLogger(LodImportReducer.class);
28
    private BoneCPDataSource ds;
29
    private String fileName = "";
30
    private Configuration hadoopConf;
31
    //    private VirtGraph graph;
32
    private int counter;
33
    private int commitCounter;
34
    private String[] buildQuery = new String[2];
35
    private String insertQuery = "";
36
    private String deleteQuery = "";
37
    private String defaultGraph = "";
38
    private int entitiesPerQuery;
39
    private MapCountries mapCountries;
40
    private MapLanguages mapLanguages;
41
    private Connection conn;
42
    private Connection dConn;
43
    private JSONObject entitiesMappings;
44
    private JSONObject relationsMappings;
45

    
46

    
47
    public static enum REDUCES_COUNTER {
48
        RESULT,
49
        PROJECT,
50
        DATASOURCE,
51
        PERSON,
52
        ORGANIZATION,
53
        TOTAL_ENTITIES,
54
        TOTAL_RELATIONS
55
    }
56

    
57

    
58
    @Override
59
    protected void setup(Context context) throws IOException, InterruptedException {
60
        this.hadoopConf = context.getConfiguration();
61
        this.fileName = hadoopConf.get("lod.inputFile");
62
        if (!fileName.contains("relations")) defaultGraph = hadoopConf.get("lod.defaultGraph");
63
        else defaultGraph = hadoopConf.get("lod.relationsGraph");
64
        this.counter = 0;
65
        mapCountries = new MapCountries();
66
        mapLanguages = new MapLanguages();
67

    
68
        buildQuery[0] = "";
69
        buildQuery[1] = "";
70

    
71
        entitiesMappings = new JSONObject(hadoopConf.get("lod.jsonEntities"));
72
        relationsMappings = new JSONObject(hadoopConf.get("lod.jsonRels"));
73

    
74
        if (!fileName.contains("relations")) entitiesPerQuery = Integer.parseInt(hadoopConf.get("lod.entitiesPerQuery"));
75
        else entitiesPerQuery = Integer.parseInt(hadoopConf.get("lod.relationsPerQuery"));
76

    
77
        try {
78
            DB db = new DB();
79
            ds = db.getDatasource(hadoopConf.get("lod.conLine"), hadoopConf.get("lod.username"), hadoopConf.get("lod.password"), hadoopConf.get("lod.minCpart"), hadoopConf.get("lod.maxCpart"), hadoopConf.get("lod.part"));
80
            conn = ds.getConnection();
81
            conn.setAutoCommit(false);
82
        } catch (Exception e) {
83
            log.error(e.toString(), e);
84
        }
85

    
86
    }
87

    
88
    @Override
89
    protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
90

    
91
        if (entitiesMappings == null || relationsMappings == null) {
92
            throw new InterruptedException("Could not read json mappings!");
93
        }
94
        Iterator<Text> it = values.iterator();
95

    
96
        while (it.hasNext()) {
97
            String data = it.next().toString();
98
            String[] split = data.split(hadoopConf.get("lod.delim"));
99
            List<String> row = new ArrayList<String>(Arrays.asList(split));
100

    
101
            if (!fileName.contains("relations")) {
102
                buildQuery = RDFizer.RDFizeEntityRow(row, ds, entitiesMappings, hadoopConf, mapCountries, mapLanguages, defaultGraph);
103
                insertQuery += buildQuery[0];
104
                deleteQuery += buildQuery[1];
105
            } else {
106
                buildQuery = RDFizer.RDFizeRelationRow(row, relationsMappings, hadoopConf);
107
                insertQuery += buildQuery[0];
108
            }
109

    
110
        }
111
        
112
        counter++;
113
        if (counter > entitiesPerQuery ) {
114
        	 try {
115
                 PreparedStatement ps = conn.prepareStatement("DB.DBA.TTLP_MT (?, ?, ?, ?, ?, ?,?)");
116
                 ps.setString(1, insertQuery);
117
                 ps.setString(2, "");
118
                 ps.setString(3, defaultGraph);
119
                 final int NQUAD_LEVEL = 64;
120
                 ps.setInt(4, NQUAD_LEVEL);
121
                 ps.setInt(5, 0);
122
                 ps.setInt(6, 15);
123
                 ps.setInt(7, 0);
124
                 ps.execute();                 
125
                 
126
                 conn.commit();
127

    
128
                 ps.close();
129

    
130
                 buildQuery = new String[2];
131
                 buildQuery[0] = "";
132
                 buildQuery[1] = "";
133
                 insertQuery = "";
134

    
135
                 counter = 0;
136
                 if (!fileName.contains("relations")) {
137
                     //s// TODO: keep counters for all entity types
138
                	 context.getCounter(REDUCES_COUNTER.TOTAL_ENTITIES).increment(entitiesPerQuery);    
139
                 } else {
140
                     context.getCounter(REDUCES_COUNTER.TOTAL_RELATIONS).increment(1);
141
                 }   	
142
        }catch(Exception e){
143
        	log.error("Virtuoso write failed  at  query: " + insertQuery + " \n and with error " + e.toString(), e);
144
            try {
145
                if (conn != null) {
146
                    conn.rollback();
147
                    conn.close();
148
                    conn = ds.getConnection();
149
                    conn.setAutoCommit(false);
150
                } else {
151
                    conn.close();
152
                    conn = ds.getConnection();
153
                    conn.setAutoCommit(false);
154
                }
155

    
156
            } catch (Exception e1) {
157
                log.error("Getting another Connection fail:" + e1.toString(), e1);
158
            }
159
          }
160
       	
161
        }
162

    
163
    }
164

    
165

    
166

    
167
    @Override
168
    protected void cleanup(Context context) throws IOException, InterruptedException {
169
        try {
170
            PreparedStatement ps = conn.prepareStatement("DB.DBA.TTLP_MT (?, ?, ?, ?, ?, ?,?)");
171
            ps.setString(1, insertQuery);
172
            ps.setString(2, "");
173
            ps.setString(3, defaultGraph);
174
            final int NQUAD_LEVEL = 64;
175
            ps.setInt(4, NQUAD_LEVEL);
176
            ps.setInt(5, 0);
177
            ps.setInt(6, 15);
178
            ps.setInt(7, 0);
179
            ps.execute();
180
            conn.commit();
181
            ps.close();
182

    
183
            ps = conn.prepareStatement("?");
184
            ps.setString(1, "commit work");
185
            ps.close();
186

    
187
        } catch (Exception e) {
188
            log.error("Virtuoso write failed  at  query" + buildQuery + " \n and with error " + e.toString(), e);
189
//            throw new InterruptedException("Virtuoso write failed  at  query" + insertQuery + " \n and with error " + e.toString());
190
        } finally {
191
            log.info("Cleaning up reducer...\nClosing Graph and Datasource");
192
            try {
193
                conn.commit();
194
                conn.close();
195
//            	graph.close();
196
//            	ds.close();
197
            } catch (Exception e) {
198
                log.error("Could not Close connection \n" + e.toString(), e);
199
            }
200

    
201
            log.info("Graph and Datasource Closed...");
202
        }
203
    }
204

    
205
}
(2-2/2)