Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes;
2

    
3
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
4
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
5
import eu.dnetlib.msro.workflows.procs.Env;
6
import eu.dnetlib.msro.workflows.procs.ProcessAware;
7
import eu.dnetlib.msro.workflows.procs.Token;
8
import eu.dnetlib.msro.workflows.procs.WorkflowProcess;
9
import eu.dnetlib.msro.workflows.util.ProgressProvider;
10
import eu.dnetlib.rmi.data.MDStoreService;
11
import eu.dnetlib.rmi.enabling.ISLookUpService;
12
import org.springframework.beans.factory.annotation.Autowired;
13

    
14
import java.util.List;
15

    
16
public class MergeDLIRecord extends BlackboardJobNode implements ProcessAware, ProgressProvider {
17

    
18
    private final static String queryTemplate = "for $x in collection(' /db/DRIVER/RepositoryServiceResources/RepositoryServiceResourceType') where $x//RESOURCE_IDENTIFIER/@value/string()='%s' return $x//FIELD[./key='NamespacePrefix']/value/text()";
19

    
20
    private String mdStoreId;
21

    
22
    private String dsId;
23

    
24
    private String mongoHost;
25

    
26
    private String mongoDBName;
27

    
28
    private String sparkJobPath;
29

    
30
    private String sparkPath;
31

    
32
    private String numExecutor;
33

    
34
    private boolean skipJob = false;
35

    
36
    private String sparkApplicationName;
37

    
38
    private WorkflowProcess process;
39

    
40
    @Autowired
41
    private UniqueServiceLocator serviceLocator;
42

    
43
    private String getDatasourcePrefix() throws Exception {
44
        final String query = String.format(queryTemplate, dsId);
45
        List<String> result  = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(query);
46
        if (result!= null && result.size() ==1 )
47
            return result.get(0);
48
        throw new Exception("Unexpected result on query "+query);
49
    }
50

    
51

    
52
    @Override
53
    protected String obtainServiceId(Env env) {
54
        return getServiceLocator().getServiceId(MDStoreService.class, getMdStoreId());
55
    }
56

    
57
    @Override
58
    protected void prepareJob(BlackboardJob job, Token token) throws Exception {
59
        if (!skipJob) {
60
            job.setAction("RUN_PLUGIN");
61
            job.getParameters().put("plugin.name", "dliMergeRecord");
62
            job.getParameters().put("mdStoreId", getMdStoreId());
63
            job.getParameters().put("mongoHost", getMongoHost());
64
            job.getParameters().put("nsPrefix", getDatasourcePrefix());
65
            job.getParameters().put("sparkPath", getSparkPath());
66
            job.getParameters().put("sparkJobPath", getSparkJobPath());
67
            job.getParameters().put("mongoDBName", getMongoDBName());
68
            job.getParameters().put("numExecutor", getNumExecutor());
69
            job.getParameters().put("sparkApplicationName", process.getId());
70
        }
71

    
72
    }
73

    
74
    @Override
75
    public void setProcess(WorkflowProcess process) {
76
        this.process = process;
77
    }
78

    
79
    public String getSparkJobPath() {
80
        return sparkJobPath;
81
    }
82

    
83
    public void setSparkJobPath(String sparkJobPath) {
84
        this.sparkJobPath = sparkJobPath;
85
    }
86

    
87
    public String getSparkPath() {
88
        return sparkPath;
89
    }
90

    
91
    public void setSparkPath(String sparkPath) {
92
        this.sparkPath = sparkPath;
93
    }
94

    
95
    public String getSparkApplicationName() {
96
        return sparkApplicationName;
97
    }
98

    
99
    public void setSparkApplicationName(String sparkApplicationName) {
100
        this.sparkApplicationName = sparkApplicationName;
101
    }
102

    
103
    public String getMongoHost() {
104
        return mongoHost;
105
    }
106

    
107
    public void setMongoHost(String mongoHost) {
108
        this.mongoHost = mongoHost;
109
    }
110

    
111
    public String getMdStoreId() {
112
        return mdStoreId;
113
    }
114

    
115
    public void setMdStoreId(String mdStoreId) {
116
        this.mdStoreId = mdStoreId;
117
    }
118

    
119
    public String getMongoDBName() {
120
        return mongoDBName;
121
    }
122

    
123
    public void setMongoDBName(String mongoDBName) {
124
        this.mongoDBName = mongoDBName;
125
    }
126

    
127
    public boolean isSkipJob() {
128
        return skipJob;
129
    }
130

    
131
    public void setSkipJob(boolean skipJob) {
132
        this.skipJob = skipJob;
133
    }
134

    
135
    public String getDsId() {
136
        return dsId;
137
    }
138

    
139
    public void setDsId(String dsId) {
140
        this.dsId = dsId;
141
    }
142

    
143
    public String getNumExecutor() {
144
        return numExecutor;
145
    }
146

    
147
    public void setNumExecutor(String numExecutor) {
148
        this.numExecutor = numExecutor;
149
    }
150

    
151
    @Override
152
    public String getProgressDescription() {
153
        return null;
154
    }
155
}
(3-3/4)