Revision 36641
Added by Alessia Bardi over 9 years ago
modules/dnet-mapreduce-jobs/branches/0.0.6.3.x/src/main/java/eu/dnetlib/data/mapreduce/hbase/oai/OaiFeedMapper.java | ||
---|---|---|
22 | 22 |
import com.mongodb.DB; |
23 | 23 |
import com.mongodb.DBCollection; |
24 | 24 |
import com.mongodb.DBObject; |
25 |
import com.mongodb.Mongo; |
|
25 |
import com.mongodb.MongoClient;
|
|
26 | 26 |
|
27 | 27 |
import eu.dnetlib.data.mapreduce.JobParams; |
28 | 28 |
import eu.dnetlib.data.mapreduce.hbase.oai.config.OAIConfiguration; |
... | ... | |
37 | 37 |
|
38 | 38 |
public class OaiFeedMapper extends Mapper<Text, Text, NullWritable, NullWritable> { |
39 | 39 |
|
40 |
enum RecordStatus { |
|
41 |
NEW, UPDATED, UNCHANGED; |
|
42 |
} |
|
43 |
|
|
40 |
private MongoClient mongo; |
|
44 | 41 |
private DBCollection collection; |
45 | 42 |
private DBCollection discardedCollection; |
46 | 43 |
private OAIConfigurationStringReader oaiConfigurationReader; |
... | ... | |
86 | 83 |
oaiConfiguration = oaiConfigurationReader.getOaiConfiguration(); |
87 | 84 |
|
88 | 85 |
System.out.println("parsed configuration:" + oaiConfiguration.toString()); |
89 |
|
|
90 |
Mongo mongo = new Mongo(host, Integer.parseInt(port)); |
|
86 |
mongo = new MongoClient(host, Integer.parseInt(port)); |
|
91 | 87 |
DB mongoDB = mongo.getDB(db); |
92 | 88 |
collection = mongoDB.getCollection(collectionName); |
93 | 89 |
discardedCollection = mongoDB.getCollection("discarded-" + collectionName); |
... | ... | |
112 | 108 |
e.printStackTrace(System.err); |
113 | 109 |
throw new RuntimeException(e); |
114 | 110 |
} |
111 |
} |
|
115 | 112 |
|
113 |
@Override |
|
114 |
protected void cleanup(final Context context) throws IOException, InterruptedException { |
|
115 |
super.cleanup(context); |
|
116 |
mongo.close(); |
|
116 | 117 |
} |
117 | 118 |
|
118 | 119 |
@Override |
... | ... | |
135 | 136 |
if (checkRecordFields(recordFields, context, recordKey, recordBody)) { |
136 | 137 |
id = recordFields.get(OAIConfigurationReader.ID_FIELD).iterator().next(); |
137 | 138 |
oaiID = getOAIIdentifier(id); |
138 |
handleRecord(getRecordStatus(oaiID, recordBody), context, oaiID, recordBody, recordFields);
|
|
139 |
handleRecord(context, oaiID, recordBody, recordFields); |
|
139 | 140 |
} |
140 | 141 |
} |
141 | 142 |
} |
... | ... | |
146 | 147 |
context.getCounter("oai", "invalid").increment(1); |
147 | 148 |
return false; |
148 | 149 |
} |
149 |
if (recordFields.containsEntry("duplicate", "true")) { |
|
150 |
if (skipDuplicates) { |
|
151 |
context.getCounter("oai", "discardedDuplicate").increment(1); |
|
152 |
return false; |
|
153 |
} else return true; |
|
150 |
if (recordFields.containsEntry("duplicate", "true") && skipDuplicates) { |
|
151 |
context.getCounter("oai", "discardedDuplicate").increment(1); |
|
152 |
return false; |
|
154 | 153 |
} |
155 | 154 |
if (!recordFields.containsKey(OAIConfigurationReader.ID_FIELD)) { |
156 | 155 |
discard(context, recordKey, recordBody, "missing " + OAIConfigurationReader.ID_FIELD); |
157 | 156 |
return false; |
158 | 157 |
} |
159 |
|
|
160 | 158 |
return true; |
161 | 159 |
|
162 | 160 |
} |
163 | 161 |
|
164 |
private void handleRecord(final RecordStatus status, |
|
165 |
final Context context, |
|
166 |
final String oaiID, |
|
167 |
final String record, |
|
168 |
final Multimap<String, String> recordProperties) { |
|
169 |
DBObject obj = null; |
|
170 |
String counterReason = ""; |
|
171 |
switch (status) { |
|
172 |
case NEW: |
|
173 |
obj = this.createBasicObject(oaiID, record, recordProperties, context); |
|
174 |
obj.put(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate); |
|
175 |
obj.put(OAIConfigurationReader.DATESTAMP_FIELD, feedDate); |
|
176 |
obj.put(OAIConfigurationReader.UPDATED_FIELD, false); |
|
177 |
counterReason = "new record"; |
|
178 |
break; |
|
179 |
case UPDATED: |
|
180 |
obj = this.createBasicObject(oaiID, record, recordProperties, context); |
|
181 |
obj.put(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate); |
|
182 |
obj.put(OAIConfigurationReader.DATESTAMP_FIELD, feedDate); |
|
183 |
obj.put(OAIConfigurationReader.UPDATED_FIELD, true); |
|
184 |
counterReason = "updated record"; |
|
185 |
break; |
|
186 |
case UNCHANGED: |
|
187 |
// I just want to set the last collection date, without changing/resetting any of the other dates |
|
188 |
obj = new BasicDBObject("$set", new BasicDBObject(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate)); |
|
189 |
counterReason = "unchanged record"; |
|
190 |
break; |
|
191 |
} |
|
162 |
private void handleRecord(final Context context, final String oaiID, final String record, final Multimap<String, String> recordProperties) { |
|
163 |
DBObject obj = this.createBasicObject(oaiID, record, recordProperties, context); |
|
164 |
obj.put(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate); |
|
165 |
obj.put(OAIConfigurationReader.DATESTAMP_FIELD, feedDate); |
|
166 |
obj.put(OAIConfigurationReader.UPDATED_FIELD, false); |
|
192 | 167 |
collection.insert(obj); |
193 |
context.getCounter("oai", counterReason).increment(1); |
|
194 | 168 |
context.getCounter("oai", "total").increment(1); |
195 | 169 |
} |
196 | 170 |
|
... | ... | |
203 | 177 |
return oaiConfiguration.getIdScheme() + ":" + oaiConfiguration.getIdNamespace() + ":" + id; |
204 | 178 |
} |
205 | 179 |
|
206 |
private RecordStatus getRecordStatus(final String oaiId, final String recordBody) { |
|
207 |
if (isNewRecord(oaiId)) return RecordStatus.NEW; |
|
208 |
if (isChanged(oaiId, recordBody)) return RecordStatus.UPDATED; |
|
209 |
else return RecordStatus.UNCHANGED; |
|
210 |
} |
|
211 |
|
|
212 |
private boolean isNewRecord(final String oaiIdentifier) { |
|
213 |
// return this.collection.findOne(new BasicDBObject(OAIConfigurationReader.ID_FIELD, oaiIdentifier)) == null; |
|
214 |
return true; |
|
215 |
} |
|
216 |
|
|
217 |
/** |
|
218 |
* Tells if record has changed w.r.t. the version we currently have in the store. |
|
219 |
* |
|
220 |
* <p> |
|
221 |
* FIXME: see MongoPublisherStore and its implementation of the method. For now we just consider that a record has changed and return |
|
222 |
* true. |
|
223 |
* </p> |
|
224 |
* |
|
225 |
* @param oaiID |
|
226 |
* oai identifier of the record |
|
227 |
* @param recordBody |
|
228 |
* the record body |
|
229 |
* @return true |
|
230 |
*/ |
|
231 |
private boolean isChanged(final String oaiID, final String recordBody) { |
|
232 |
return true; |
|
233 |
} |
|
234 |
|
|
235 | 180 |
protected DBObject createBasicObject(final String oaiID, final String record, final Multimap<String, String> recordProperties, final Context context) { |
236 | 181 |
DBObject obj = new BasicDBObject(); |
237 | 182 |
for (final String key : recordProperties.keySet()) { |
... | ... | |
271 | 216 |
return obj; |
272 | 217 |
} |
273 | 218 |
|
274 |
@Override |
|
275 |
protected void cleanup(final Context context) throws IOException, InterruptedException { |
|
276 |
|
|
277 |
super.cleanup(context); |
|
278 |
} |
|
279 |
|
|
280 | 219 |
public DBCollection getCollection() { |
281 | 220 |
return collection; |
282 | 221 |
} |
Also available in: Unified diff
Using MongoClient instead of deprecated Mongo. Removed record status management, since records are always new anyway.