Project

General

Profile

1
package eu.dnetlib.resolver.mdstore.plugin;
2

    
3

    
4
import com.mongodb.BasicDBObject;
5
import com.mongodb.MongoNamespace;
6
import com.mongodb.client.ListIndexesIterable;
7
import com.mongodb.client.MongoCollection;
8
import eu.dnetlib.data.mdstore.modular.MDStoreFeeder;
9
import eu.dnetlib.data.mdstore.modular.action.DoneCallback;
10
import eu.dnetlib.data.mdstore.modular.action.FailedCallback;
11
import eu.dnetlib.data.mdstore.modular.action.FeedAction;
12
import eu.dnetlib.data.mdstore.modular.action.MDStorePlugin;
13
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
14
import eu.dnetlib.data.mdstore.modular.mongodb.MDStoreTransactionManagerImpl;
15
import eu.dnetlib.rmi.data.MDStoreServiceException;
16
import org.apache.commons.logging.Log;
17
import org.apache.commons.logging.LogFactory;
18
import org.bson.Document;
19
import org.springframework.beans.factory.annotation.Autowired;
20

    
21
import java.io.BufferedReader;
22
import java.io.InputStreamReader;
23
import java.util.Map;
24

    
25
import static org.apache.commons.lang3.StringUtils.isNotBlank;
26

    
27
public class DLIMergeRecord implements MDStorePlugin {
28

    
29
    private static final Log log = LogFactory.getLog(DLIMergeRecord.class);
30

    
31
    @Autowired
32
    private MDStoreTransactionManagerImpl transactionManager;
33

    
34
    @Autowired
35
    private MDStoreFeeder mdStoreFeeder;
36

    
37
    @Override
38
    public void run(MDStoreDao dao, Map<String, String> params, DoneCallback doneCallback, FailedCallback failedCallback) {
39
        final String id = params.get("mdStoreId");
40
        final String host = params.get("mongoHost");
41
        final String nsPrefix = params.get("nsPrefix");
42
        final String sparkPath = params.get("sparkPath");
43
        final String sparkJobPath = params.get("sparkJobPath");
44
        final String sparkApplicationName = params.get("sparkApplicationName");
45
        final String mongoDBName = params.get("mongoDBName");
46
        final String number_of_core = params.get("numExecutor");
47

    
48
        if (isNotBlank(id) && isNotBlank(host) && isNotBlank(nsPrefix) && isNotBlank(sparkJobPath) && isNotBlank(sparkPath)) {
49
            try {
50
                log.debug("starting spark job");
51
                final String mdStoreCollection = transactionManager.getMDStoreCollection(id);
52
                final String[] command = {sparkPath + "bin/spark-submit", sparkJobPath, host, transactionManager.getDb().getName(), mdStoreCollection, nsPrefix, number_of_core, sparkApplicationName};
53

    
54
                final ProcessBuilder builder = new ProcessBuilder(command);
55
                final Process p = builder.start();
56
                final BufferedReader reader = new BufferedReader(new InputStreamReader(p.getErrorStream()));
57
                String line;
58
                while ((line = reader.readLine()) != null) {
59
                    log.info(line);
60
                }
61
                p.waitFor();
62
                if (p.exitValue() != 0) {
63
                    throw new MDStoreServiceException("The spark job exit with error");
64
                }
65
                log.info("Merging complete... creating index in the new collection");
66
                ListIndexesIterable<Document> documents = transactionManager.getDb().getCollection(mdStoreCollection).listIndexes();
67
                MongoCollection<Document> outMdStore = transactionManager.getDb().getCollection("out" + mdStoreCollection);
68
                for (Document d : documents) {
69
                    d.get("key", Map.class).keySet().forEach(it -> {
70
                        String s = it.toString();
71
                        outMdStore.createIndex(new BasicDBObject(s, 1));
72
                    });
73
                }
74
                log.info("index Created, dropping old collection and rename the new one");
75
                transactionManager.getDb().getCollection(mdStoreCollection).drop();
76
                int size = (int) outMdStore.count();
77

    
78
                outMdStore.renameCollection(new MongoNamespace(mongoDBName, mdStoreCollection));
79

    
80
                mdStoreFeeder.touch(id, size);
81
                doneCallback.call(params);
82
            } catch (Throwable e) {
83
                throw new RuntimeException(e);
84
            }
85
        } else {
86
            throw new RuntimeException("missing one of the following parameters {mdStoreId,mongoHost,nsPrefix,sparkPath,sparkJobPath}");
87
        }
88

    
89
    }
90

    
91
    @Override
92
    public String getStatus() {
93
        return "30/100";
94
    }
95
}
(1-1/3)