Project

General

Profile

« Previous | Next » 

Revision 36641

Using MongoClient instead of deprecated Mongo. Removed record status management, since records are always new anyway.

View differences:

modules/dnet-mapreduce-jobs/branches/0.0.6.3.x/src/main/java/eu/dnetlib/data/mapreduce/hbase/oai/OaiFeedMapper.java
22 22
import com.mongodb.DB;
23 23
import com.mongodb.DBCollection;
24 24
import com.mongodb.DBObject;
25
import com.mongodb.Mongo;
25
import com.mongodb.MongoClient;
26 26

  
27 27
import eu.dnetlib.data.mapreduce.JobParams;
28 28
import eu.dnetlib.data.mapreduce.hbase.oai.config.OAIConfiguration;
......
37 37

  
38 38
public class OaiFeedMapper extends Mapper<Text, Text, NullWritable, NullWritable> {
39 39

  
40
	enum RecordStatus {
41
		NEW, UPDATED, UNCHANGED;
42
	}
43

  
40
	private MongoClient mongo;
44 41
	private DBCollection collection;
45 42
	private DBCollection discardedCollection;
46 43
	private OAIConfigurationStringReader oaiConfigurationReader;
......
86 83
		oaiConfiguration = oaiConfigurationReader.getOaiConfiguration();
87 84

  
88 85
		System.out.println("parsed configuration:" + oaiConfiguration.toString());
89

  
90
		Mongo mongo = new Mongo(host, Integer.parseInt(port));
86
		mongo = new MongoClient(host, Integer.parseInt(port));
91 87
		DB mongoDB = mongo.getDB(db);
92 88
		collection = mongoDB.getCollection(collectionName);
93 89
		discardedCollection = mongoDB.getCollection("discarded-" + collectionName);
......
112 108
			e.printStackTrace(System.err);
113 109
			throw new RuntimeException(e);
114 110
		}
111
	}
115 112

  
113
	@Override
114
	protected void cleanup(final Context context) throws IOException, InterruptedException {
115
		super.cleanup(context);
116
		mongo.close();
116 117
	}
117 118

  
118 119
	@Override
......
135 136
				if (checkRecordFields(recordFields, context, recordKey, recordBody)) {
136 137
					id = recordFields.get(OAIConfigurationReader.ID_FIELD).iterator().next();
137 138
					oaiID = getOAIIdentifier(id);
138
					handleRecord(getRecordStatus(oaiID, recordBody), context, oaiID, recordBody, recordFields);
139
					handleRecord(context, oaiID, recordBody, recordFields);
139 140
				}
140 141
			}
141 142
		}
......
146 147
			context.getCounter("oai", "invalid").increment(1);
147 148
			return false;
148 149
		}
149
		if (recordFields.containsEntry("duplicate", "true")) {
150
			if (skipDuplicates) {
151
				context.getCounter("oai", "discardedDuplicate").increment(1);
152
				return false;
153
			} else return true;
150
		if (recordFields.containsEntry("duplicate", "true") && skipDuplicates) {
151
			context.getCounter("oai", "discardedDuplicate").increment(1);
152
			return false;
154 153
		}
155 154
		if (!recordFields.containsKey(OAIConfigurationReader.ID_FIELD)) {
156 155
			discard(context, recordKey, recordBody, "missing " + OAIConfigurationReader.ID_FIELD);
157 156
			return false;
158 157
		}
159

  
160 158
		return true;
161 159

  
162 160
	}
163 161

  
164
	private void handleRecord(final RecordStatus status,
165
			final Context context,
166
			final String oaiID,
167
			final String record,
168
			final Multimap<String, String> recordProperties) {
169
		DBObject obj = null;
170
		String counterReason = "";
171
		switch (status) {
172
		case NEW:
173
			obj = this.createBasicObject(oaiID, record, recordProperties, context);
174
			obj.put(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate);
175
			obj.put(OAIConfigurationReader.DATESTAMP_FIELD, feedDate);
176
			obj.put(OAIConfigurationReader.UPDATED_FIELD, false);
177
			counterReason = "new record";
178
			break;
179
		case UPDATED:
180
			obj = this.createBasicObject(oaiID, record, recordProperties, context);
181
			obj.put(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate);
182
			obj.put(OAIConfigurationReader.DATESTAMP_FIELD, feedDate);
183
			obj.put(OAIConfigurationReader.UPDATED_FIELD, true);
184
			counterReason = "updated record";
185
			break;
186
		case UNCHANGED:
187
			// I just want to set the last collection date, without changing/resetting any of the other dates
188
			obj = new BasicDBObject("$set", new BasicDBObject(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate));
189
			counterReason = "unchanged record";
190
			break;
191
		}
162
	private void handleRecord(final Context context, final String oaiID, final String record, final Multimap<String, String> recordProperties) {
163
		DBObject obj = this.createBasicObject(oaiID, record, recordProperties, context);
164
		obj.put(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate);
165
		obj.put(OAIConfigurationReader.DATESTAMP_FIELD, feedDate);
166
		obj.put(OAIConfigurationReader.UPDATED_FIELD, false);
192 167
		collection.insert(obj);
193
		context.getCounter("oai", counterReason).increment(1);
194 168
		context.getCounter("oai", "total").increment(1);
195 169
	}
196 170

  
......
203 177
		return oaiConfiguration.getIdScheme() + ":" + oaiConfiguration.getIdNamespace() + ":" + id;
204 178
	}
205 179

  
206
	private RecordStatus getRecordStatus(final String oaiId, final String recordBody) {
207
		if (isNewRecord(oaiId)) return RecordStatus.NEW;
208
		if (isChanged(oaiId, recordBody)) return RecordStatus.UPDATED;
209
		else return RecordStatus.UNCHANGED;
210
	}
211

  
212
	private boolean isNewRecord(final String oaiIdentifier) {
213
		// return this.collection.findOne(new BasicDBObject(OAIConfigurationReader.ID_FIELD, oaiIdentifier)) == null;
214
		return true;
215
	}
216

  
217
	/**
218
	 * Tells if record has changed w.r.t. the version we currently have in the store.
219
	 * 
220
	 * <p>
221
	 * FIXME: see MongoPublisherStore and its implementation of the method. For now we just consider that a record has changed and return
222
	 * true.
223
	 * </p>
224
	 * 
225
	 * @param oaiID
226
	 *            oai identifier of the record
227
	 * @param recordBody
228
	 *            the record body
229
	 * @return true
230
	 */
231
	private boolean isChanged(final String oaiID, final String recordBody) {
232
		return true;
233
	}
234

  
235 180
	protected DBObject createBasicObject(final String oaiID, final String record, final Multimap<String, String> recordProperties, final Context context) {
236 181
		DBObject obj = new BasicDBObject();
237 182
		for (final String key : recordProperties.keySet()) {
......
271 216
		return obj;
272 217
	}
273 218

  
274
	@Override
275
	protected void cleanup(final Context context) throws IOException, InterruptedException {
276

  
277
		super.cleanup(context);
278
	}
279

  
280 219
	public DBCollection getCollection() {
281 220
		return collection;
282 221
	}

Also available in: Unified diff