Revision 57636
Added by Sandro La Bruzzo over 4 years ago
DLIMergeRecord.java | ||
---|---|---|
7 | 7 |
import com.mongodb.client.MongoCollection; |
8 | 8 |
import eu.dnetlib.data.mdstore.modular.MDStoreFeeder; |
9 | 9 |
import eu.dnetlib.data.mdstore.modular.action.DoneCallback; |
10 |
import eu.dnetlib.data.mdstore.modular.action.FailedCallback; |
|
10 | 11 |
import eu.dnetlib.data.mdstore.modular.action.FeedAction; |
11 | 12 |
import eu.dnetlib.data.mdstore.modular.action.MDStorePlugin; |
12 | 13 |
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao; |
... | ... | |
34 | 35 |
private MDStoreFeeder mdStoreFeeder; |
35 | 36 |
|
36 | 37 |
@Override |
37 |
public void run(MDStoreDao dao, Map<String, String> params, DoneCallback doneCallback) throws MDStoreServiceException {
|
|
38 |
public void run(MDStoreDao dao, Map<String, String> params, DoneCallback doneCallback, FailedCallback failedCallback) {
|
|
38 | 39 |
final String id = params.get("mdStoreId"); |
39 | 40 |
final String host = params.get("mongoHost"); |
40 |
final String nsPrefix= params.get("nsPrefix"); |
|
41 |
final String nsPrefix = params.get("nsPrefix");
|
|
41 | 42 |
final String sparkPath = params.get("sparkPath"); |
42 |
final String sparkJobPath= params.get("sparkJobPath"); |
|
43 |
final String sparkApplicationName= params.get("sparkApplicationName"); |
|
44 |
final String mongoDBName= params.get("mongoDBName"); |
|
45 |
final String number_of_core= params.get("numExecutor"); |
|
43 |
final String sparkJobPath = params.get("sparkJobPath");
|
|
44 |
final String sparkApplicationName = params.get("sparkApplicationName");
|
|
45 |
final String mongoDBName = params.get("mongoDBName");
|
|
46 |
final String number_of_core = params.get("numExecutor");
|
|
46 | 47 |
|
47 | 48 |
if (isNotBlank(id) && isNotBlank(host) && isNotBlank(nsPrefix) && isNotBlank(sparkJobPath) && isNotBlank(sparkPath)) { |
48 |
log.debug("starting spark job"); |
|
49 |
final String mdStoreCollection = transactionManager.getMDStoreCollection(id); |
|
50 |
final String [] command= {sparkPath+"bin/spark-submit",sparkJobPath ,host, transactionManager.getDb().getName(), mdStoreCollection, nsPrefix, number_of_core, sparkApplicationName }; |
|
51 | 49 |
try { |
50 |
log.debug("starting spark job"); |
|
51 |
final String mdStoreCollection = transactionManager.getMDStoreCollection(id); |
|
52 |
final String[] command = {sparkPath + "bin/spark-submit", sparkJobPath, host, transactionManager.getDb().getName(), mdStoreCollection, nsPrefix, number_of_core, sparkApplicationName}; |
|
53 |
|
|
52 | 54 |
final ProcessBuilder builder = new ProcessBuilder(command); |
53 | 55 |
final Process p = builder.start(); |
54 | 56 |
final BufferedReader reader = new BufferedReader(new InputStreamReader(p.getErrorStream())); |
... | ... | |
63 | 65 |
log.info("Merging complete... creating index in the new collection"); |
64 | 66 |
ListIndexesIterable<Document> documents = transactionManager.getDb().getCollection(mdStoreCollection).listIndexes(); |
65 | 67 |
MongoCollection<Document> outMdStore = transactionManager.getDb().getCollection("out" + mdStoreCollection); |
66 |
for (Document d :documents) { |
|
67 |
d.get("key", Map.class).keySet().forEach(it-> { |
|
68 |
for (Document d : documents) {
|
|
69 |
d.get("key", Map.class).keySet().forEach(it -> {
|
|
68 | 70 |
String s = it.toString(); |
69 | 71 |
outMdStore.createIndex(new BasicDBObject(s, 1)); |
70 | 72 |
}); |
... | ... | |
78 | 80 |
mdStoreFeeder.touch(id, size); |
79 | 81 |
doneCallback.call(params); |
80 | 82 |
} catch (Throwable e) { |
81 |
throw new MDStoreServiceException(e);
|
|
83 |
throw new RuntimeException(e);
|
|
82 | 84 |
} |
85 |
} else { |
|
86 |
throw new RuntimeException("missing one of the following parameters {mdStoreId,mongoHost,nsPrefix,sparkPath,sparkJobPath}"); |
|
83 | 87 |
} |
84 |
else { |
|
85 |
throw new MDStoreServiceException("missing one of the following parameters {mdStoreId,mongoHost,nsPrefix,sparkPath,sparkJobPath}"); |
|
86 |
} |
|
87 | 88 |
|
88 | 89 |
} |
89 | 90 |
|
Also available in: Unified diff
Added failedCallback to mdstore plugin invocation