Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.oai;
2

    
3
import java.io.IOException;
4
import java.net.UnknownHostException;
5
import java.text.ParseException;
6
import java.util.Collection;
7
import java.util.Date;
8
import java.util.Map;
9
import java.util.zip.ZipEntry;
10
import java.util.zip.ZipOutputStream;
11

    
12
import com.google.common.base.Function;
13
import com.google.common.collect.Iterables;
14
import com.google.common.collect.Lists;
15
import com.google.common.collect.Maps;
16
import com.google.common.collect.Multimap;
17
import com.mongodb.*;
18
import com.mongodb.client.MongoCollection;
19
import com.mongodb.client.MongoDatabase;
20
import eu.dnetlib.data.mapreduce.JobParams;
21
import eu.dnetlib.data.mapreduce.hbase.oai.config.OAIConfiguration;
22
import eu.dnetlib.data.mapreduce.hbase.oai.config.OAIConfigurationReader;
23
import eu.dnetlib.data.mapreduce.hbase.oai.config.OAIConfigurationStringReader;
24
import eu.dnetlib.data.mapreduce.hbase.oai.utils.MongoSetCollection;
25
import eu.dnetlib.data.mapreduce.hbase.oai.utils.PublisherField;
26
import eu.dnetlib.data.mapreduce.hbase.oai.utils.RecordFieldsExtractor;
27
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
28
import eu.dnetlib.data.proto.TypeProtos.Type;
29
import eu.dnetlib.miscutils.functional.xml.IndentXmlString;
30
import org.apache.commons.io.output.ByteArrayOutputStream;
31
import org.apache.commons.lang.StringUtils;
32
import org.apache.hadoop.io.NullWritable;
33
import org.apache.hadoop.io.Text;
34
import org.apache.hadoop.mapreduce.Mapper;
35
import org.apache.solr.common.util.DateUtil;
36
import org.bson.types.Binary;
37

    
38
public class OaiFeedMapper extends Mapper<Text, Text, NullWritable, NullWritable> {
39

    
40
	enum RecordStatus {
41
		NEW, UPDATED, UNCHANGED;
42
	}
43

    
44
	private MongoCollection<DBObject> collection;
45
	private MongoCollection<DBObject> discardedCollection;
46
	private OAIConfigurationStringReader oaiConfigurationReader;
47
	private OAIConfiguration oaiConfiguration;
48

    
49
	private Date feedDate;
50

    
51
	private MongoSetCollection mongoSetCollection;
52

    
53
	private RecordFieldsExtractor extractor;
54

    
55
	// these are set in the setup
56
	private String format;
57
	private String interpretation;
58
	private String layout;
59
	private Map<String, PublisherField> fieldsToIndex = Maps.newHashMap();
60

    
61
	private String duplicateXPath;
62
	private boolean skipDuplicates;
63

    
64
	private MongoClient mongo;
65

    
66
	private Collection<String> enrichmentXPaths;
67

    
68
	@Override
69
	protected void setup(final Context context) throws UnknownHostException {
70

    
71
		String host = context.getConfiguration().get("services.publisher.oai.host");
72
		String port = context.getConfiguration().get("services.publisher.oai.port");
73
		String db = context.getConfiguration().get("services.publisher.oai.db");
74
		String collectionName = context.getConfiguration().get("services.publisher.oai.collection");
75

    
76
		System.out.println("Mongodb client params");
77
		System.out.println("host: " + host);
78
		System.out.println("port: " + port);
79
		System.out.println("db: " + db);
80
		System.out.println("collection: " + collectionName);
81

    
82
		String[] formatLayoutInterp = collectionName.split("-");
83
		format = formatLayoutInterp[0];
84
		layout = formatLayoutInterp[1];
85
		interpretation = formatLayoutInterp[2];
86

    
87
		String oaiConfigurationProfile = context.getConfiguration().get("oaiConfiguration");
88
		System.out.println("oaiConfiguration:\n" + IndentXmlString.apply(oaiConfigurationProfile));
89
		oaiConfigurationReader = new OAIConfigurationStringReader(oaiConfigurationProfile);
90
		oaiConfiguration = oaiConfigurationReader.getOaiConfiguration();
91

    
92
		System.out.println("parsed configuration:" + oaiConfiguration.toString());
93

    
94
		mongo = new MongoClient(host, Integer.parseInt(port));
95
		MongoDatabase mongoDB = mongo.getDatabase(db);
96
		//DB mongoDB = mongo.getDB(db);
97
		collection = mongoDB.getCollection(collectionName, DBObject.class).withWriteConcern(WriteConcern.UNACKNOWLEDGED);
98
		discardedCollection = mongoDB.getCollection("discarded-" + collectionName, DBObject.class).withWriteConcern(WriteConcern.UNACKNOWLEDGED);
99
		mongoSetCollection = new MongoSetCollection(mongo);
100

    
101
		duplicateXPath = context.getConfiguration().get("services.publisher.oai.duplicateXPath");
102
		skipDuplicates = Boolean.parseBoolean(context.getConfiguration().get("services.publisher.oai.skipDuplicates"));
103

    
104
		enrichmentXPaths = oaiConfiguration.getEnrichmentXPathsFor(format, layout, interpretation);
105
		Collection<PublisherField> indexFields = oaiConfiguration.getFieldsFor(format, layout, interpretation);
106
		extractor = new RecordFieldsExtractor(Lists.newArrayList(indexFields));
107
		extractor.setDuplicateXPath(duplicateXPath);
108
		extractor.setSkipDuplicates(skipDuplicates);
109

    
110
		for (PublisherField field : indexFields) {
111
			fieldsToIndex.put(field.getFieldName(), field);
112
		}
113

    
114
		String feedDateString = context.getConfiguration().get(JobParams.OAI_FEED_DATE);
115
		try {
116
			feedDate = DateUtil.parseDate(feedDateString);
117
		} catch (ParseException e) {
118
			e.printStackTrace(System.err);
119
			throw new RuntimeException(e);
120
		}
121

    
122
	}
123

    
124
	@Override
125
	protected void map(final Text key, final Text value, final Context context) throws IOException, InterruptedException {
126

    
127
		String recordKey = key.toString();
128
		Type entityType = OafRowKeyDecoder.decode(recordKey).getType();
129
		switch (entityType) {
130
		case person:
131
			context.getCounter("oai", "discardedPerson").increment(1);
132
			break;
133
		default:
134
			String recordBody = value.toString();
135
			if (StringUtils.isBlank(recordBody)) {
136
				discard(context, recordKey, recordBody, "blank body");
137
			} else {
138
				Multimap<String, String> recordFields = extractor.extractFields(recordBody, enrichmentXPaths);
139
				String id;
140
				String oaiID;
141
				if (checkRecordFields(recordFields, context, recordKey, recordBody)) {
142
					id = recordFields.get(OAIConfigurationReader.ID_FIELD).iterator().next();
143
					oaiID = getOAIIdentifier(id);
144
					handleRecord(context, oaiID, recordBody, recordFields);
145
				}
146
			}
147
		}
148
	}
149

    
150
	public boolean checkRecordFields(final Multimap<String, String> recordFields, final Context context, final String recordKey, final String recordBody) {
151
		if ((recordFields == null)) {
152
			context.getCounter("oai", "invalid").increment(1);
153
			return false;
154
		}
155
		if (recordFields.containsEntry("duplicate", "true")) {
156
			if (skipDuplicates) {
157
				context.getCounter("oai", "discardedDuplicate").increment(1);
158
				return false;
159
			} else return true;
160
		}
161
		if (!recordFields.containsKey(OAIConfigurationReader.ID_FIELD)) {
162
			discard(context, recordKey, recordBody, "missing " + OAIConfigurationReader.ID_FIELD);
163
			return false;
164
		}
165
		return true;
166
	}
167

    
168
	private void handleRecord(final Context context, final String oaiID, final String record, final Multimap<String, String> recordProperties) {
169
		DBObject obj = this.createBasicObject(oaiID, record, recordProperties,context);
170
		if (obj != null) { // it can be null if the compression did not succeeded: counter is updated in the compress method in that case
171
			obj.put(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate);
172
			obj.put(OAIConfigurationReader.DATESTAMP_FIELD, feedDate);
173
			obj.put(OAIConfigurationReader.UPDATED_FIELD, false);
174
			collection.insertOne(obj);
175
			context.getCounter("oai", "total").increment(1);
176
		}
177
	}
178

    
179

    
180
	private void discard(final Context context, final String recordKey, final String recordBody, final String reason) {
181
		context.getCounter("oai", reason).increment(1);
182
		discardedCollection.insertOne(new BasicDBObject("id", recordKey).append(OAIConfigurationReader.BODY_FIELD, recordBody));
183
	}
184

    
185
	private String getOAIIdentifier(final String id) {
186
		return oaiConfiguration.getIdScheme() + ":" + oaiConfiguration.getIdNamespace() + ":" + id;
187
	}
188

    
189
	protected DBObject createBasicObject(final String oaiID, final String record, final Multimap<String, String> recordProperties, final Context context) {
190
		DBObject obj = new BasicDBObject();
191
		for (final String key : recordProperties.keySet()) {
192
			if (key.equals(OAIConfigurationReader.ID_FIELD)) {
193
				obj.put(key, oaiID);
194
			} else {
195
				Collection<String> values = recordProperties.get(key);
196
				if (key.equals(OAIConfigurationReader.SET_FIELD)) {
197

    
198
					Iterable<String> setSpecs = Iterables.transform(values, new Function<String, String>() {
199

    
200
						@Override
201
						public String apply(final String s) {
202
							return mongoSetCollection.normalizeSetSpec(s);
203
						}
204

    
205
					});
206
					obj.put(key, setSpecs);
207
				} else {
208
					PublisherField keyField = fieldsToIndex.get(key);
209
					if (keyField == null) {
210
						context.getCounter("oai", key + " found for record but not in configuration. Assuming it is repeatable.").increment(1);
211
					}
212
					// let's check if the key is the name of a repeatable field or not
213
					if ((keyField != null) && !keyField.isRepeatable()) {
214
						if ((values != null) && !values.isEmpty()) {
215
							obj.put(key, values.iterator().next());
216
						}
217
					} else {
218
						obj.put(key, values);
219
					}
220
				}
221
			}
222
		}
223

    
224
		Binary compressedRecordBody = createCompressRecord(context, oaiID, record);
225
		if (compressedRecordBody != null) {
226
			obj.put(OAIConfigurationReader.BODY_FIELD, compressedRecordBody);
227
			obj.put(OAIConfigurationReader.DELETED_FIELD, false);
228
			return obj;
229
		} else return null;
230
	}
231

    
232
	public Binary createCompressRecord(final Context context, final String recordKey, final String recordBody) {
233
		try {
234
			ByteArrayOutputStream os = new ByteArrayOutputStream();
235
			ZipOutputStream zos = new ZipOutputStream(os);
236
			ZipEntry entry = new ZipEntry(OAIConfigurationReader.BODY_FIELD);
237
			zos.putNextEntry(entry);
238
			zos.write(recordBody.getBytes());
239
			zos.closeEntry();
240
			zos.flush();
241
			zos.close();
242
			return new Binary(os.toByteArray());
243
		} catch (IOException e) {
244
			discard(context, recordKey, recordBody, "cannot compress");
245
			return null;
246
		}
247
	}
248

    
249
	@Override
250
	protected void cleanup(final Context context) throws IOException, InterruptedException {
251

    
252
		super.cleanup(context);
253
	}
254

    
255
	public MongoCollection<DBObject> getCollection() {
256
		return collection;
257
	}
258

    
259
	public void setCollection(final MongoCollection<DBObject> collection) {
260
		this.collection = collection;
261
	}
262

    
263
	public MongoCollection<DBObject> getDiscardedCollection() {
264
		return discardedCollection;
265
	}
266

    
267
	public void setDiscardedCollection(final MongoCollection<DBObject> discardedCollection) {
268
		this.discardedCollection = discardedCollection;
269
	}
270

    
271
	public OAIConfigurationStringReader getOaiConfigurationReader() {
272
		return oaiConfigurationReader;
273
	}
274

    
275
	public void setOaiConfigurationReader(final OAIConfigurationStringReader oaiConfigurationReader) {
276
		this.oaiConfigurationReader = oaiConfigurationReader;
277
	}
278

    
279
	public OAIConfiguration getOaiConfiguration() {
280
		return oaiConfiguration;
281
	}
282

    
283
	public void setOaiConfiguration(final OAIConfiguration oaiConfiguration) {
284
		this.oaiConfiguration = oaiConfiguration;
285
	}
286

    
287
	public Date getFeedDate() {
288
		return feedDate;
289
	}
290

    
291
	public void setFeedDate(final Date feedDate) {
292
		this.feedDate = feedDate;
293
	}
294

    
295
	public MongoSetCollection getMongoSetCollection() {
296
		return mongoSetCollection;
297
	}
298

    
299
	public void setMongoSetCollection(final MongoSetCollection mongoSetCollection) {
300
		this.mongoSetCollection = mongoSetCollection;
301
	}
302

    
303
	public String getDuplicateXPath() {
304
		return duplicateXPath;
305
	}
306

    
307
	public void setDuplicateXPath(final String duplicateXPath) {
308
		this.duplicateXPath = duplicateXPath;
309
	}
310

    
311
	public boolean isSkipDuplicates() {
312
		return skipDuplicates;
313
	}
314

    
315
	public void setSkipDuplicates(final boolean skipDuplicates) {
316
		this.skipDuplicates = skipDuplicates;
317
	}
318

    
319
}
    (1-1/1)