Project

General

Profile

1
package eu.dnetlib.resolver.mdstore.plugin;
2

    
3

    
4
import com.mongodb.BasicDBObject;
5
import com.mongodb.MongoNamespace;
6
import com.mongodb.client.ListIndexesIterable;
7
import com.mongodb.client.MongoCollection;
8
import eu.dnetlib.data.mdstore.modular.MDStoreFeeder;
9
import eu.dnetlib.data.mdstore.modular.action.DoneCallback;
10
import eu.dnetlib.data.mdstore.modular.action.FeedAction;
11
import eu.dnetlib.data.mdstore.modular.action.MDStorePlugin;
12
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
13
import eu.dnetlib.data.mdstore.modular.mongodb.MDStoreTransactionManagerImpl;
14
import eu.dnetlib.rmi.data.MDStoreServiceException;
15
import org.apache.commons.logging.Log;
16
import org.apache.commons.logging.LogFactory;
17
import org.bson.Document;
18
import org.springframework.beans.factory.annotation.Autowired;
19

    
20
import java.io.BufferedReader;
21
import java.io.InputStreamReader;
22
import java.util.Map;
23

    
24
import static org.apache.commons.lang3.StringUtils.isNotBlank;
25

    
26
public class DLIMergeRecord implements MDStorePlugin {
27

    
28
    private static final Log log = LogFactory.getLog(DLIMergeRecord.class);
29

    
30
    @Autowired
31
    private MDStoreTransactionManagerImpl transactionManager;
32

    
33
    @Autowired
34
    private MDStoreFeeder mdStoreFeeder;
35

    
36
    @Override
37
    public void run(MDStoreDao dao, Map<String, String> params, DoneCallback doneCallback) throws MDStoreServiceException {
38
        final String id = params.get("mdStoreId");
39
        final String host = params.get("mongoHost");
40
        final String nsPrefix= params.get("nsPrefix");
41
        final String sparkPath = params.get("sparkPath");
42
        final String sparkJobPath= params.get("sparkJobPath");
43
        final String sparkApplicationName= params.get("sparkApplicationName");
44
        final String mongoDBName= params.get("mongoDBName");
45
        final String number_of_core= params.get("numExecutor");
46

    
47
        if (isNotBlank(id) && isNotBlank(host) && isNotBlank(nsPrefix) && isNotBlank(sparkJobPath) && isNotBlank(sparkPath)) {
48
            log.debug("starting spark job");
49
            final String mdStoreCollection = transactionManager.getMDStoreCollection(id);
50
            final String [] command= {sparkPath+"bin/spark-submit",sparkJobPath ,host, transactionManager.getDb().getName(), mdStoreCollection, nsPrefix, number_of_core, sparkApplicationName };
51
            try {
52
                final ProcessBuilder builder = new ProcessBuilder(command);
53
                final Process p = builder.start();
54
                final BufferedReader reader = new BufferedReader(new InputStreamReader(p.getErrorStream()));
55
                String line;
56
                while ((line = reader.readLine()) != null) {
57
                    log.info(line);
58
                }
59
                p.waitFor();
60
                if (p.exitValue() != 0) {
61
                    throw new MDStoreServiceException("The spark job exit with error");
62
                }
63
                log.info("Merging complete... creating index in the new collection");
64
                ListIndexesIterable<Document> documents = transactionManager.getDb().getCollection(mdStoreCollection).listIndexes();
65
                MongoCollection<Document> outMdStore = transactionManager.getDb().getCollection("out" + mdStoreCollection);
66
                for (Document d :documents) {
67
                    d.get("key", Map.class).keySet().forEach(it-> {
68
                        String s = it.toString();
69
                        outMdStore.createIndex(new BasicDBObject(s, 1));
70
                    });
71
                }
72
                log.info("index Created, dropping old collection and rename the new one");
73
                transactionManager.getDb().getCollection(mdStoreCollection).drop();
74
                int size = (int) outMdStore.count();
75

    
76
                outMdStore.renameCollection(new MongoNamespace(mongoDBName, mdStoreCollection));
77

    
78
                mdStoreFeeder.touch(id, size);
79
                doneCallback.call(params);
80
            } catch (Throwable e) {
81
                throw new  MDStoreServiceException(e);
82
            }
83
        }
84
        else {
85
            throw new  MDStoreServiceException("missing one of the following parameters {mdStoreId,mongoHost,nsPrefix,sparkPath,sparkJobPath}");
86
        }
87

    
88
    }
89

    
90
    @Override
91
    public String getStatus() {
92
        return "30/100";
93
    }
94
}
(1-1/3)