Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodImport;
2

    
3

    
4
import java.io.IOException;
5
import java.sql.Connection;
6
import java.sql.SQLException;
7
import java.sql.Statement;
8

    
9
import org.apache.hadoop.conf.Configuration;
10
import org.apache.hadoop.io.LongWritable;
11
import org.apache.hadoop.io.Text;
12
import org.apache.hadoop.mapreduce.Mapper;
13
import org.apache.log4j.Logger;
14
import org.joda.time.DateTime;
15
import org.joda.time.format.DateTimeFormat;
16
import org.joda.time.format.DateTimeFormatter;
17

    
18
import com.jolbox.bonecp.BoneCPDataSource;
19

    
20
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.DB;
21

    
22

    
23
public class LodImportMapper extends Mapper<LongWritable, Text, Text, Text> {
24
    private Logger log = Logger.getLogger(this.getClass());
25

    
26
    private String fileName = "";
27
    private String lastExecutionDate = "";
28

    
29

    
30
    private Configuration hadoopConf;
31
    private BoneCPDataSource ds;
32
    private Connection conn;
33
    String baseURI;
34
    private String defaultGraph = "";
35
    private String deleteQuery = "";
36
    private int counter;
37
    private int entitiesPerQuery;
38
    
39
    public static enum ENTITIES_COUNTER {
40
        STORED, DISCARDED,
41
        ENTITIES,RELATIONS,NULL_T_DATES,DELETED
42
    }
43

    
44
    @Override
45
    protected void setup(Mapper.Context context) throws IOException, InterruptedException {
46

    
47
        this.fileName = context.getConfiguration().get("lod.inputFile");
48
        this.lastExecutionDate = context.getConfiguration().get("lod.lastExecutionDate");
49
       
50
        this.hadoopConf = context.getConfiguration();
51
        
52
        if (fileName.contains("entities")){
53
        	defaultGraph = hadoopConf.get("lod.defaultGraph");
54
        	entitiesPerQuery = Integer.parseInt(hadoopConf.get("lod.entitiesPerQuery"));
55
        }
56
        else{
57
        	defaultGraph = hadoopConf.get("lod.relationsGraph");
58
        	entitiesPerQuery = Integer.parseInt(hadoopConf.get("lod.relationsPerQuery"));
59
        }
60
        
61
        baseURI = hadoopConf.get("lod.baseURI");
62
        
63
        try {
64
            DB db = new DB();
65
            ds = db.getDatasource(hadoopConf.get("lod.conLine"), hadoopConf.get("lod.username"), hadoopConf.get("lod.password"), "1", "1", "1");
66
            
67
            conn = ds.getConnection();
68
            conn.setAutoCommit(false);
69
            
70
            Statement stmt1;
71
    		stmt1 = conn.createStatement();
72
    		stmt1.execute("__dbf_set ('enable_qp', 1)");
73
    		stmt1.close();
74
            
75
           
76
        } catch (Exception e) {
77
            log.error(e.toString(), e);
78

    
79
        }
80

    
81
    }
82

    
83
    @Override
84
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
85

    
86

    
87
        DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd");
88
        //TODO remove enclosing  delim from csv -not needed
89

    
90
        String split[] = value.toString().split(context.getConfiguration().get("lod.delim"));
91
        if (split.length > 1) {
92

    
93
//            Text outputKey = new Text(split[1]);
94
//            Text outputValue = new Text();
95

    
96
            Text outputKey = null;
97
            Text outputValue = null;
98
            String resourceURI="";
99
            try {
100
                outputKey = new Text(split[1]);
101
                outputValue = new Text(value.toString().replace("\\"," "));
102
                resourceURI = baseURI+split[0]+"/"+split[1];
103
            } catch (Exception e) {
104
                log.error(split.toString(), e);
105
            }
106

    
107

    
108
//		String insertQuery = "INSERT INTO <debug> { <http://test.org/"+outputKey.toString().trim()+"> a foaf:Person; <http://test.prop/1> \""+cleanValue+"\"}";
109
//		VirtuosoUpdateRequest q2ur = VirtuosoUpdateFactory.create(insertQuery, graph);
110
//		q2ur.exec();
111
            if (fileName.contains("relations")) {
112
                outputKey = new Text(key.toString());
113
                writeOut(context, outputKey, outputValue);
114
                context.getCounter(ENTITIES_COUNTER.RELATIONS).increment(1);
115
            } else {
116

    
117
                context.getCounter(ENTITIES_COUNTER.ENTITIES).increment(1);
118
                String dateOfTransformationString = split[2].trim();
119
                
120
//                if (dateOfTransformationString == null || dateOfTransformationString.isEmpty() || dateOfTransformationString.equals("")){
121
//                	writeOut(context, outputKey, outputValue); 
122
//                	context.getCounter(ENTITIES_COUNTER.STORED).increment(1);
123
                	 context.getCounter(ENTITIES_COUNTER.NULL_T_DATES).increment(1);
124
//                	
125
//                	String deleteQuery ="<"+resourceURI+"> ?p ?o. ";
126
//                	delete(deleteQuery);
127
                	
128
                	
129
//                	}
130

    
131
                if (dateOfTransformationString.contains("T")) {
132
                    int index = dateOfTransformationString.indexOf("T");
133
                    dateOfTransformationString = dateOfTransformationString.substring(0, index);
134
                }
135

    
136
                if (lastExecutionDate == "" || lastExecutionDate == null || dateOfTransformationString == null || dateOfTransformationString == "" || dateOfTransformationString.equals("null") || dateOfTransformationString.isEmpty() || dateOfTransformationString.equals("")) {
137
//                    writeOut(context, outputKey, outputValue);
138
                    context.getCounter(ENTITIES_COUNTER.NULL_T_DATES).increment(1);
139
//                    context.getCounter(ENTITIES_COUNTER.STORED).increment(1);
140
                    
141
//                    String deleteString ="<"+resourceURI+"> ?p ?o. ";
142
//                	deleteQuery+=deleteString;
143
                } else {
144
//               System.err.println("Transformation:\t"+dateOfTransformationString+" \t"+outputKey+" \t"+dateOfTransformationString.length());
145
                    DateTime dt = formatter.parseDateTime(dateOfTransformationString);
146
                    DateTime de = formatter.parseDateTime(lastExecutionDate);
147
                    if (de.isBefore(dt)) {
148
                    	writeOut(context, outputKey, outputValue); 
149
                    	context.getCounter(ENTITIES_COUNTER.STORED).increment(1);
150
                    	String deleteString ="<"+resourceURI+"> ?p ?o. ";
151
                    	deleteQuery+=deleteString;
152
                    	}
153
                    else { context.getCounter(ENTITIES_COUNTER.DISCARDED).increment(1);}
154

    
155
                }
156
                counter++;;
157
                if (counter > entitiesPerQuery) {
158
                	delete(deleteQuery);
159
                	deleteQuery="";
160
                	counter = 0;
161
                	context.getCounter(ENTITIES_COUNTER.DELETED).increment(entitiesPerQuery);
162
                }
163
            }
164
        }
165
    }
166

    
167
    @Override
168
    protected void cleanup(Context context) throws IOException, InterruptedException {
169
        super.cleanup(context);
170
        delete(deleteQuery);
171
        try {
172
			conn.close();
173
		} catch (SQLException e) {
174
			log.error(e.toString(), e);
175
		}
176
        
177
    }
178

    
179
    private void writeOut(Context context, Text outputKey, Text outputValue) throws IOException, InterruptedException {
180
//        log.info("Written out "+  outputKey);
181
        context.write(outputKey, outputValue);
182

    
183

    
184
    }
185
    
186
    private void delete(String deleteQuery){
187
    	
188
    	String deleteQueryString ="SPARQL DEFINE sql:log-enable 0 WITH <"+defaultGraph+"> DELETE {"+deleteQuery+"} WHERE {"+deleteQuery+"}" ;
189
		try{
190
	    	Statement stmt;
191
			stmt = conn.createStatement();
192
			stmt.execute(deleteQueryString);
193
			conn.commit();
194
			stmt.close();
195
		}catch (Exception e){
196
			 log.error(e.toString(), e);
197
		}
198
    }
199

    
200

    
201
}
(1-1/2)