Revision 52042
Added by Claudio Atzori almost 6 years ago
modules/cnr-mongo-mdstore/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/mongodb/MDStoreDaoImpl.java | ||
---|---|---|
13 | 13 |
import com.mongodb.client.model.CreateCollectionOptions; |
14 | 14 |
import eu.dnetlib.data.mdstore.MDStoreServiceException; |
15 | 15 |
import eu.dnetlib.data.mdstore.modular.MDStoreDescription; |
16 |
import eu.dnetlib.data.mdstore.modular.RecordParser; |
|
16 | 17 |
import eu.dnetlib.data.mdstore.modular.RecordParserFactory; |
17 | 18 |
import eu.dnetlib.data.mdstore.modular.connector.MDStore; |
18 | 19 |
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDBStatus; |
... | ... | |
91 | 92 |
@Override |
92 | 93 |
public MDStore getMDStore(final String mdId) throws MDStoreServiceException { |
93 | 94 |
final String internalId = transactionManager.getMDStoreCollection(mdId); |
94 |
return new MongoMDStore(mdId, getDb().getCollection(internalId, DBObject.class), getRecordParserFactory().newInstance(), isDiscardRecords(), getDb());
|
|
95 |
return new MongoMDStore(mdId, getDb().getCollection(internalId, DBObject.class), getRecordParser(), isDiscardRecords(), getDb()); |
|
95 | 96 |
} |
96 | 97 |
|
97 | 98 |
/** |
... | ... | |
100 | 101 |
@Override |
101 | 102 |
public MDStore readMDStore(final String mdId) throws MDStoreServiceException { |
102 | 103 |
final String internalId = transactionManager.readMdStore(mdId); |
103 |
return new MongoMDStore(mdId, getDb().getCollection(internalId, DBObject.class), getRecordParserFactory().newInstance(), isDiscardRecords(), getDb());
|
|
104 |
return new MongoMDStore(mdId, getDb().getCollection(internalId, DBObject.class), getRecordParser(), isDiscardRecords(), getDb()); |
|
104 | 105 |
} |
105 | 106 |
|
106 | 107 |
/** |
... | ... | |
109 | 110 |
@Override |
110 | 111 |
public MDStore startTransaction(final String mdId, final boolean refresh) throws MDStoreServiceException { |
111 | 112 |
final String transactionId = transactionManager.startTransaction(mdId, refresh); |
112 |
return new MongoMDStore(transactionId, getDb().getCollection(transactionId, DBObject.class), getRecordParserFactory().newInstance(), isDiscardRecords(),
|
|
113 |
return new MongoMDStore(transactionId, getDb().getCollection(transactionId, DBObject.class), getRecordParser(), isDiscardRecords(), |
|
113 | 114 |
getDb()); |
114 | 115 |
} |
115 | 116 |
|
117 |
private RecordParser getRecordParser() { |
|
118 |
final RecordParser parser = getRecordParserFactory().newInstance(); |
|
119 |
parser.setTimestamp(DateUtils.now()); |
|
120 |
return parser; |
|
121 |
} |
|
122 |
|
|
116 | 123 |
/** |
117 | 124 |
* {@inheritDoc} |
118 | 125 |
*/ |
modules/cnr-mongo-mdstore/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/mongodb/MongoBulkWritesManager.java | ||
---|---|---|
122 | 122 |
obj.put("id", recordProperties.get("id")); |
123 | 123 |
obj.put("originalId", recordProperties.get("originalId")); |
124 | 124 |
obj.put("body", record); |
125 |
obj.put("timestamp", System.currentTimeMillis());
|
|
125 |
obj.put("timestamp", recordProperties.get("timestamp"));
|
|
126 | 126 |
if (indexFieldProperties != null) |
127 | 127 |
obj.putAll(indexFieldProperties); |
128 | 128 |
return obj; |
modules/cnr-modular-mdstore-service/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/SimpleRecordParser.java | ||
---|---|---|
20 | 20 |
public class SimpleRecordParser implements RecordParser { |
21 | 21 |
static final Log log = LogFactory.getLog(SimpleRecordParser.class); // NOPMD by marko on 11/24/08 5:02 PM |
22 | 22 |
|
23 |
private long ts; |
|
24 |
|
|
23 | 25 |
@Override |
24 | 26 |
public Map<String, String> parseRecord(String record) { |
25 | 27 |
Map<String, String> props = new HashMap<String, String>(); |
28 |
props.put("timestamp", String.valueOf(getTimestamp())); |
|
26 | 29 |
|
27 | 30 |
try { |
28 | 31 |
// DocumentBuilder builder = DocumentBuilderFactory.newInstance().newDocumentBuilder(); |
... | ... | |
47 | 50 |
|
48 | 51 |
} |
49 | 52 |
|
53 |
@Override |
|
54 |
public void setTimestamp(final long ts) { |
|
55 |
this.ts = ts; |
|
56 |
} |
|
57 |
|
|
58 |
@Override |
|
59 |
public long getTimestamp() { |
|
60 |
return ts; |
|
61 |
} |
|
62 |
|
|
50 | 63 |
} |
modules/cnr-modular-mdstore-service/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/StreamingRecordParser.java | ||
---|---|---|
19 | 19 |
*/ |
20 | 20 |
public class StreamingRecordParser implements RecordParser { |
21 | 21 |
|
22 |
private long ts; |
|
23 |
|
|
22 | 24 |
@Override |
23 | 25 |
public Map<String, String> parseRecord(String record) { |
24 | 26 |
|
... | ... | |
27 | 29 |
XMLStreamReader parser = factory.createXMLStreamReader(new ByteArrayInputStream(record.getBytes())); |
28 | 30 |
|
29 | 31 |
HashMap<String, String> res = new HashMap<String, String>(); |
32 |
res.put("timestamp", String.valueOf(getTimestamp())); |
|
30 | 33 |
|
31 | 34 |
Stack<String> elementStack = new Stack<String>(); |
32 | 35 |
elementStack.push("/"); |
... | ... | |
75 | 78 |
return elementStack.get(elementStack.size() - 3); |
76 | 79 |
} |
77 | 80 |
|
81 |
@Override |
|
82 |
public void setTimestamp(final long ts) { |
|
83 |
this.ts = ts; |
|
84 |
} |
|
85 |
|
|
86 |
@Override |
|
87 |
public long getTimestamp() { |
|
88 |
return ts; |
|
89 |
} |
|
90 |
|
|
78 | 91 |
} |
modules/cnr-modular-mdstore-service/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/RecordParser.java | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import java.util.Map; |
4 | 4 |
|
5 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
|
6 |
|
|
5 | 7 |
/** |
6 | 8 |
* Parses a mdrecord and extracts the minimum information (like id, date etc) which is necessary for the mdstoring |
7 | 9 |
* process. |
... | ... | |
11 | 13 |
*/ |
12 | 14 |
public interface RecordParser { |
13 | 15 |
|
14 |
public Map<String, String> parseRecord(String record);
|
|
16 |
Map<String, String> parseRecord(String record); |
|
15 | 17 |
|
18 |
void setTimestamp(long ts); |
|
19 |
|
|
20 |
default long getTimestamp() { |
|
21 |
return DateUtils.now(); |
|
22 |
} |
|
23 |
|
|
16 | 24 |
} |
modules/cnr-modular-mdstore-service/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/action/FeedAction.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mdstore.modular.action; |
2 | 2 |
|
3 |
import java.util.List; |
|
4 |
|
|
3 | 5 |
import eu.dnetlib.data.mdstore.MDStoreServiceException; |
4 | 6 |
import eu.dnetlib.data.mdstore.modular.MDFormatDescription; |
5 | 7 |
import eu.dnetlib.data.mdstore.modular.MDStoreFeeder; |
... | ... | |
11 | 13 |
import org.springframework.beans.factory.annotation.Autowired; |
12 | 14 |
import org.springframework.beans.factory.annotation.Required; |
13 | 15 |
|
14 |
import java.util.List; |
|
15 |
import java.util.Map; |
|
16 |
|
|
17 | 16 |
public class FeedAction extends AbstractMDStoreAction { |
18 | 17 |
|
19 | 18 |
private static final Log log = LogFactory.getLog(FeedAction.class); |
... | ... | |
49 | 48 |
} |
50 | 49 |
|
51 | 50 |
|
52 |
feeder.feed(mdId, epr, storingType, true, mdformats, new DoneCallback() { |
|
53 |
|
|
54 |
@Override |
|
55 |
public void call(final Map<String, String> params) { |
|
56 |
job.getParameters().put("mdstoreSize", "" + params.get("mdstoreSize")); |
|
57 |
job.getParameters().put("writeOps", "" + params.get("writeOps")); |
|
58 |
completeWithSuccess(handler, job); |
|
59 |
} |
|
60 |
}, new FailedCallback() { |
|
61 |
|
|
62 |
@Override |
|
63 |
public void call(final Throwable e) { |
|
64 |
log.error("Error feeding mdstore: " + mdId, e); |
|
65 |
completeWithFail(handler, job, e); |
|
66 |
} |
|
67 |
}); |
|
51 |
feeder.feed(mdId, epr, storingType, true, mdformats, params -> { |
|
52 |
job.getParameters().put("mdstoreSize", "" + params.get("mdstoreSize")); |
|
53 |
job.getParameters().put("writeOps", "" + params.get("writeOps")); |
|
54 |
completeWithSuccess(handler, job); |
|
55 |
}, e -> { |
|
56 |
log.error("Error feeding mdstore: " + mdId, e); |
|
57 |
completeWithFail(handler, job, e); |
|
58 |
}); |
|
68 | 59 |
} |
69 | 60 |
|
70 | 61 |
public MDStoreFeeder getFeeder() { |
Also available in: Unified diff
use the same (hopefully) format for field timestamp, and the same value for each record involved in a transaction