Project

General

Profile

« Previous | Next » 

Revision 57636

Added failedCallback to mdstore plugin invocation

View differences:

DLIMergeRecord.java
7 7
import com.mongodb.client.MongoCollection;
8 8
import eu.dnetlib.data.mdstore.modular.MDStoreFeeder;
9 9
import eu.dnetlib.data.mdstore.modular.action.DoneCallback;
10
import eu.dnetlib.data.mdstore.modular.action.FailedCallback;
10 11
import eu.dnetlib.data.mdstore.modular.action.FeedAction;
11 12
import eu.dnetlib.data.mdstore.modular.action.MDStorePlugin;
12 13
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
......
34 35
    private MDStoreFeeder mdStoreFeeder;
35 36

  
36 37
    @Override
37
    public void run(MDStoreDao dao, Map<String, String> params, DoneCallback doneCallback) throws MDStoreServiceException {
38
    public void run(MDStoreDao dao, Map<String, String> params, DoneCallback doneCallback, FailedCallback failedCallback) {
38 39
        final String id = params.get("mdStoreId");
39 40
        final String host = params.get("mongoHost");
40
        final String nsPrefix= params.get("nsPrefix");
41
        final String nsPrefix = params.get("nsPrefix");
41 42
        final String sparkPath = params.get("sparkPath");
42
        final String sparkJobPath= params.get("sparkJobPath");
43
        final String sparkApplicationName= params.get("sparkApplicationName");
44
        final String mongoDBName= params.get("mongoDBName");
45
        final String number_of_core= params.get("numExecutor");
43
        final String sparkJobPath = params.get("sparkJobPath");
44
        final String sparkApplicationName = params.get("sparkApplicationName");
45
        final String mongoDBName = params.get("mongoDBName");
46
        final String number_of_core = params.get("numExecutor");
46 47

  
47 48
        if (isNotBlank(id) && isNotBlank(host) && isNotBlank(nsPrefix) && isNotBlank(sparkJobPath) && isNotBlank(sparkPath)) {
48
            log.debug("starting spark job");
49
            final String mdStoreCollection = transactionManager.getMDStoreCollection(id);
50
            final String [] command= {sparkPath+"bin/spark-submit",sparkJobPath ,host, transactionManager.getDb().getName(), mdStoreCollection, nsPrefix, number_of_core, sparkApplicationName };
51 49
            try {
50
                log.debug("starting spark job");
51
                final String mdStoreCollection = transactionManager.getMDStoreCollection(id);
52
                final String[] command = {sparkPath + "bin/spark-submit", sparkJobPath, host, transactionManager.getDb().getName(), mdStoreCollection, nsPrefix, number_of_core, sparkApplicationName};
53

  
52 54
                final ProcessBuilder builder = new ProcessBuilder(command);
53 55
                final Process p = builder.start();
54 56
                final BufferedReader reader = new BufferedReader(new InputStreamReader(p.getErrorStream()));
......
63 65
                log.info("Merging complete... creating index in the new collection");
64 66
                ListIndexesIterable<Document> documents = transactionManager.getDb().getCollection(mdStoreCollection).listIndexes();
65 67
                MongoCollection<Document> outMdStore = transactionManager.getDb().getCollection("out" + mdStoreCollection);
66
                for (Document d :documents) {
67
                    d.get("key", Map.class).keySet().forEach(it-> {
68
                for (Document d : documents) {
69
                    d.get("key", Map.class).keySet().forEach(it -> {
68 70
                        String s = it.toString();
69 71
                        outMdStore.createIndex(new BasicDBObject(s, 1));
70 72
                    });
......
78 80
                mdStoreFeeder.touch(id, size);
79 81
                doneCallback.call(params);
80 82
            } catch (Throwable e) {
81
                throw new  MDStoreServiceException(e);
83
                throw new RuntimeException(e);
82 84
            }
85
        } else {
86
            throw new RuntimeException("missing one of the following parameters {mdStoreId,mongoHost,nsPrefix,sparkPath,sparkJobPath}");
83 87
        }
84
        else {
85
            throw new  MDStoreServiceException("missing one of the following parameters {mdStoreId,mongoHost,nsPrefix,sparkPath,sparkJobPath}");
86
        }
87 88

  
88 89
    }
89 90

  

Also available in: Unified diff