Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodImport;
2

    
3

    
4
import java.io.IOException;
5
import java.sql.Connection;
6
import java.sql.Statement;
7

    
8
import org.apache.hadoop.conf.Configuration;
9
import org.apache.hadoop.io.LongWritable;
10
import org.apache.hadoop.io.Text;
11
import org.apache.hadoop.mapreduce.Mapper;
12
import org.apache.log4j.Logger;
13
import org.joda.time.DateTime;
14
import org.joda.time.format.DateTimeFormat;
15
import org.joda.time.format.DateTimeFormatter;
16

    
17
import com.jolbox.bonecp.BoneCPDataSource;
18

    
19
import eu.dnetlib.data.mapreduce.hbase.lodImport.utils.DB;
20

    
21

    
22
public class LodImportMapper extends Mapper<LongWritable, Text, Text, Text> {
23
    private Logger log = Logger.getLogger(this.getClass());
24

    
25
    private String fileName = "";
26
    private String lastExecutionDate = "";
27

    
28

    
29
    private Configuration hadoopConf;
30
    private BoneCPDataSource ds;
31
    private Connection conn;
32
    String baseURI;
33
    private String defaultGraph = "";
34
    private String deleteQuery = "";
35
    private int counter;
36
    private int entitiesPerQuery;
37
    
38
    public static enum ENTITIES_COUNTER {
39
        STORED, DISCARDED,
40
        ENTITIES,RELATIONS,NULL_T_DATES
41
    }
42

    
43
    @Override
44
    protected void setup(Mapper.Context context) throws IOException, InterruptedException {
45

    
46
        this.fileName = context.getConfiguration().get("lod.inputFile");
47
        this.lastExecutionDate = context.getConfiguration().get("lod.lastExecutionDate");
48
       
49
        this.hadoopConf = context.getConfiguration();
50
        
51
        if (fileName.contains("entities")){
52
        	defaultGraph = hadoopConf.get("lod.defaultGraph");
53
        	entitiesPerQuery = Integer.parseInt(hadoopConf.get("lod.entitiesPerQuery"));
54
        }
55
        else{
56
        	defaultGraph = hadoopConf.get("lod.relationsGraph");
57
        	entitiesPerQuery = Integer.parseInt(hadoopConf.get("lod.relationsPerQuery"));
58
        }
59
        
60
        baseURI = hadoopConf.get("lod.baseURI");
61
        
62
        try {
63
            DB db = new DB();
64
            ds = db.getDatasource(hadoopConf.get("lod.conLine"), hadoopConf.get("lod.username"), hadoopConf.get("lod.password"), "1", "1", "1");
65
            
66
            conn = ds.getConnection();
67
            conn.setAutoCommit(false);
68
            
69
            Statement stmt1;
70
    		stmt1 = conn.createStatement();
71
    		stmt1.execute("__dbf_set ('enable_qp', 1)");
72
    		stmt1.close();
73
            
74
           
75
        } catch (Exception e) {
76
            log.error(e.toString(), e);
77

    
78
        }
79

    
80
    }
81

    
82
    @Override
83
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
84

    
85

    
86
        DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd");
87
        //TODO remove enclosing  delim from csv -not needed
88

    
89
        String split[] = value.toString().split(context.getConfiguration().get("lod.delim"));
90
        if (split.length > 1) {
91

    
92
//            Text outputKey = new Text(split[1]);
93
//            Text outputValue = new Text();
94

    
95
            Text outputKey = null;
96
            Text outputValue = null;
97
            String resourceURI="";
98
            try {
99
                outputKey = new Text(split[1]);
100
                outputValue = new Text(value.toString().replace("\\"," "));
101
                resourceURI = baseURI+split[0]+"/"+split[1];
102
            } catch (Exception e) {
103
                log.error(split.toString(), e);
104
            }
105

    
106

    
107
//		String insertQuery = "INSERT INTO <debug> { <http://test.org/"+outputKey.toString().trim()+"> a foaf:Person; <http://test.prop/1> \""+cleanValue+"\"}";
108
//		VirtuosoUpdateRequest q2ur = VirtuosoUpdateFactory.create(insertQuery, graph);
109
//		q2ur.exec();
110
            if (fileName.contains("relations")) {
111
                outputKey = new Text(key.toString());
112
                writeOut(context, outputKey, outputValue);
113
                context.getCounter(ENTITIES_COUNTER.RELATIONS).increment(1);
114
            } else {
115

    
116
                context.getCounter(ENTITIES_COUNTER.ENTITIES).increment(1);
117
                String dateOfTransformationString = split[2].trim();
118
                
119
//                if (dateOfTransformationString == null || dateOfTransformationString.isEmpty() || dateOfTransformationString.equals("")){
120
//                	writeOut(context, outputKey, outputValue); 
121
//                	context.getCounter(ENTITIES_COUNTER.STORED).increment(1);
122
                	 context.getCounter(ENTITIES_COUNTER.NULL_T_DATES).increment(1);
123
//                	
124
//                	String deleteQuery ="<"+resourceURI+"> ?p ?o. ";
125
//                	delete(deleteQuery);
126
                	
127
                	
128
//                	}
129

    
130
                if (dateOfTransformationString.contains("T")) {
131
                    int index = dateOfTransformationString.indexOf("T");
132
                    dateOfTransformationString = dateOfTransformationString.substring(0, index);
133
                }
134

    
135
                if (lastExecutionDate == "" || lastExecutionDate == null || dateOfTransformationString == null || dateOfTransformationString == "" || dateOfTransformationString.equals("null") || dateOfTransformationString.isEmpty() || dateOfTransformationString.equals("")) {
136
//                    writeOut(context, outputKey, outputValue);
137
                    context.getCounter(ENTITIES_COUNTER.NULL_T_DATES).increment(1);
138
//                    context.getCounter(ENTITIES_COUNTER.STORED).increment(1);
139
                    
140
//                    String deleteString ="<"+resourceURI+"> ?p ?o. ";
141
//                	deleteQuery+=deleteString;
142
                } else {
143
//               System.err.println("Transformation:\t"+dateOfTransformationString+" \t"+outputKey+" \t"+dateOfTransformationString.length());
144
                    DateTime dt = formatter.parseDateTime(dateOfTransformationString);
145
                    DateTime de = formatter.parseDateTime(lastExecutionDate);
146
                    if (de.isBefore(dt)) {
147
                    	writeOut(context, outputKey, outputValue); 
148
                    	context.getCounter(ENTITIES_COUNTER.STORED).increment(1);
149
                    	String deleteString ="<"+resourceURI+"> ?p ?o. ";
150
                    	deleteQuery+=deleteString;
151
                    	}
152
                    else { context.getCounter(ENTITIES_COUNTER.DISCARDED).increment(1);}
153

    
154
                }
155
                counter++;;
156
                if (counter > entitiesPerQuery) {
157
                	delete(deleteQuery);
158
                	deleteQuery="";
159
                	counter = 0;
160
                }
161
            }
162
        }
163
    }
164

    
165
    @Override
166
    protected void cleanup(Context context) throws IOException, InterruptedException {
167
        super.cleanup(context);
168
    }
169

    
170
    private void writeOut(Context context, Text outputKey, Text outputValue) throws IOException, InterruptedException {
171
//        log.info("Written out "+  outputKey);
172
        context.write(outputKey, outputValue);
173

    
174

    
175
    }
176
    
177
    private void delete(String deleteQuery){
178
    	
179
    	String deleteQueryString ="SPARQL DEFINE sql:log-enable 0 WITH <"+defaultGraph+"> DELETE {"+deleteQuery+"} WHERE {"+deleteQuery+"}" ;
180
		try{
181
	    	Statement stmt;
182
			stmt = conn.createStatement();
183
			stmt.execute(deleteQueryString);
184
			conn.commit();
185
			stmt.close();
186
		}catch (Exception e){
187
			 log.error(e.toString(), e);
188
		}
189
    }
190

    
191

    
192
}
(1-1/2)