Project

General

Profile

« Previous | Next » 

Revision 57299

merged with branch dnet-hadoop

View differences:

modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/EndReadingMDStoreJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2

  
3
import java.net.URI;
4
import java.util.HashMap;
5
import java.util.Map;
6

  
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.springframework.beans.factory.annotation.Required;
10
import org.springframework.web.client.RestTemplate;
11
import org.springframework.web.util.UriComponentsBuilder;
12

  
13
import com.googlecode.sarasvati.Arc;
14
import com.googlecode.sarasvati.NodeToken;
15

  
16
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
17
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
18

  
19
public class EndReadingMDStoreJobNode extends SimpleJobNode {
20

  
21
    private static final Log log = LogFactory.getLog(EndReadingMDStoreJobNode.class);
22

  
23
    /* Spring managed params */
24
    private String mdStoreManagerUrl;
25

  
26
    @Override
27
    protected String execute(final NodeToken token) throws Exception {
28
        final String url = getMdStoreManagerUrl() + "/version/{versionId}/endReading";
29

  
30
        final Map<String, Object> params = new HashMap<>();
31
        params.put("versionId", token.getEnv().getAttribute("reading_versionId"));
32

  
33
        final URI uri = UriComponentsBuilder.fromUriString(url)
34
                .buildAndExpand(params)
35
                .toUri();
36

  
37
        final RestTemplate restTemplate = new RestTemplate();
38
        restTemplate.getForObject(uri, MDStoreVersion.class);
39

  
40
        log.info("mdstore version read complete ");
41
        return Arc.DEFAULT_ARC;
42
    }
43

  
44
    public String getMdStoreManagerUrl() {
45
        return mdStoreManagerUrl;
46
    }
47

  
48
    @Required
49
    public void setMdStoreManagerUrl(final String mdStoreManagerUrl) {
50
        this.mdStoreManagerUrl = mdStoreManagerUrl;
51
    }
52

  
53
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/PrepareMDStoreVersionJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2

  
3
import java.net.URI;
4
import java.util.HashMap;
5
import java.util.Map;
6

  
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.springframework.beans.factory.annotation.Required;
10
import org.springframework.web.client.RestTemplate;
11
import org.springframework.web.util.UriComponentsBuilder;
12

  
13
import com.googlecode.sarasvati.Arc;
14
import com.googlecode.sarasvati.NodeToken;
15

  
16
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
17
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
18

  
19
public class PrepareMDStoreVersionJobNode extends SimpleJobNode {
20

  
21
    private static final Log log = LogFactory.getLog(PrepareMDStoreVersionJobNode.class);
22

  
23
    private String mdId;
24

  
25
    /* Spring managed params */
26
    private String mdStoreManagerUrl;
27

  
28
    @Override
29
    protected String execute(final NodeToken token) throws Exception {
30
        final String url = getMdStoreManagerUrl() + "/mdstore/{mdId}/newVersion";
31

  
32
        final Map<String, Object> params = new HashMap<>();
33
        params.put("mdId", getMdId());
34

  
35
        final URI uri = UriComponentsBuilder.fromUriString(url)
36
                .buildAndExpand(params)
37
                .toUri();
38

  
39
        final RestTemplate restTemplate = new RestTemplate();
40
        final MDStoreVersion version = restTemplate.getForObject(uri, MDStoreVersion.class);
41

  
42
        log.info("mdstore version prepared " + version.getId());
43

  
44
        token.getEnv().setAttribute("mdId", mdId);
45
        token.getEnv().setAttribute("versionId", version.getId());
46

  
47
        return Arc.DEFAULT_ARC;
48
    }
49

  
50
    public String getMdStoreManagerUrl() {
51
        return mdStoreManagerUrl;
52
    }
53

  
54
    @Required
55
    public void setMdStoreManagerUrl(final String mdStoreManagerUrl) {
56
        this.mdStoreManagerUrl = mdStoreManagerUrl;
57
    }
58

  
59
    public String getMdId() {
60
        return mdId;
61
    }
62

  
63
    public void setMdId(final String mdId) {
64
        this.mdId = mdId;
65
    }
66

  
67
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/message/DnetMessageManager.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.message;
2

  
3

  
4
import eu.dnetlib.message.Message;
5
import eu.dnetlib.message.MessageManager;
6
import eu.dnetlib.message.MessageType;
7
import org.apache.commons.lang3.StringUtils;
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.springframework.beans.factory.annotation.Required;
11
import org.springframework.beans.factory.annotation.Value;
12

  
13
import java.util.ArrayList;
14
import java.util.HashMap;
15
import java.util.List;
16
import java.util.Map;
17
import java.util.concurrent.LinkedBlockingQueue;
18

  
19
public class DnetMessageManager {
20
    private static final Log log = LogFactory.getLog(DnetMessageManager.class);
21

  
22
    private MessageManager manager;
23

  
24
    private LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
25

  
26

  
27
    private final  Map<String, Message> onGonginMessages = new HashMap<>();
28

  
29
    private final Map<String, List<Message>> reportMessages = new HashMap<>();
30

  
31
    private String messageQueueServer;
32

  
33
    private String username;
34

  
35
    private String password;
36

  
37
    @Value("${dnet.openaire.messageManager.ongoingMessageQueueName}")
38
    private String onGoingMessageQueue;
39

  
40
    @Value("${dnet.openaire.messageManager.reportMessageQueueName}")
41
    private String reportMessageQueue;
42

  
43

  
44
    public DnetMessageManager() {
45

  
46
    }
47

  
48

  
49
    private String createReportId(final String wfId, final String jobName) {
50
        return String.format("%s::%s", wfId, jobName);
51
    }
52

  
53
    public void startListeningMessage() throws Exception {
54
        if (manager == null && StringUtils.isNotBlank(messageQueueServer) && StringUtils.isNotBlank(reportMessageQueue)) {
55
            manager = new MessageManager(messageQueueServer, username, password, messages);
56
            manager.startConsumingMessage(onGoingMessageQueue, true, false);
57
            manager.startConsumingMessage(reportMessageQueue, true, false);
58

  
59
            Runnable r = () -> {
60
                while (true) {
61
                    try {
62
                        Message currentMessage = messages.take();
63

  
64
                        if (currentMessage.getType() == MessageType.ONGOING) {
65
                            synchronized (onGonginMessages) {
66
                                onGonginMessages.put(currentMessage.getWorkflowId(), currentMessage);
67
                            }
68
                        } else {
69
                            synchronized (reportMessages) {
70
                                if (!reportMessages.containsKey(currentMessage.getWorkflowId()))
71
                                {
72
                                    reportMessages.put(currentMessage.getWorkflowId(), new ArrayList<>());
73
                                }
74
                                reportMessages.get(currentMessage.getWorkflowId()).add(currentMessage);
75
                            }
76
                        }
77
                    } catch (InterruptedException e) {
78
                        log.error("An error occured on retrieving messages from the blocking queue",e);
79
                        throw new RuntimeException("An error occured on retrieving messages from the blocking queue",e);
80
                    }
81

  
82
                }
83
            };
84
            new Thread(r).start();
85
        }
86
    }
87

  
88
    public List<Message> getReport(final String workflowId) {
89

  
90
        return getMessages(reportMessages, workflowId);
91
    }
92

  
93
    private List<Message> getMessages(final Map<String, List<Message>> messageMap, String reportId) {
94
        if (messageMap.containsKey(reportId)) {
95
            List<Message> m = messageMap.get(reportId);
96
            messageMap.remove(reportId);
97
            return m;
98
        }
99
        return null;
100
    }
101

  
102

  
103
    private Message getMessage(final Map<String, Message> messageMap, String reportId) {
104
        if (messageMap.containsKey(reportId)) {
105
            Message m = messageMap.get(reportId);
106
            messageMap.remove(reportId);
107
            return m;
108
        }
109
        return null;
110
    }
111

  
112

  
113
    public Message getOnGoingMessages(final String workflowId) {
114
        return getMessage(onGonginMessages, workflowId);
115
    }
116

  
117

  
118
    public String getMessageQueueServer() {
119
        return messageQueueServer;
120
    }
121

  
122
    @Required
123
    public void setMessageQueueServer(String messageQueueServer) {
124
        this.messageQueueServer = messageQueueServer;
125
    }
126

  
127
    public String getUsername() {
128
        return username;
129
    }
130

  
131
    @Required
132
    public void setUsername(String username) {
133
        this.username = username;
134
    }
135

  
136
    public String getPassword() {
137
        return password;
138
    }
139

  
140
    @Required
141
    public void setPassword(String password) {
142
        this.password = password;
143
    }
144
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/StartReadingMDStoreJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2

  
3
import java.net.URI;
4
import java.util.HashMap;
5
import java.util.Map;
6

  
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.springframework.beans.factory.annotation.Required;
10
import org.springframework.web.client.RestTemplate;
11
import org.springframework.web.util.UriComponentsBuilder;
12

  
13
import com.googlecode.sarasvati.Arc;
14
import com.googlecode.sarasvati.NodeToken;
15

  
16
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
17
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
18

  
19
public class StartReadingMDStoreJobNode extends SimpleJobNode {
20

  
21
    private static final Log log = LogFactory.getLog(StartReadingMDStoreJobNode.class);
22

  
23
    private String mdId;
24

  
25
    /* Spring managed params */
26
    private String mdStoreManagerUrl;
27

  
28
    @Override
29
    protected String execute(final NodeToken token) throws Exception {
30
        final String url = getMdStoreManagerUrl() + "/mdstore/{mdId}/startReading";
31

  
32
        final Map<String, Object> params = new HashMap<>();
33
        params.put("mdId", getMdId());
34

  
35
        final URI uri = UriComponentsBuilder.fromUriString(url)
36
                .buildAndExpand(params)
37
                .toUri();
38

  
39
        final RestTemplate restTemplate = new RestTemplate();
40
        final MDStoreVersion version = restTemplate.getForObject(uri, MDStoreVersion.class);
41

  
42
        log.info("mdstore version ready to read " + version.getId());
43

  
44
        token.getEnv().setAttribute("reading_mdId", mdId);
45
        token.getEnv().setAttribute("reading_versionId", version.getId());
46

  
47
        return Arc.DEFAULT_ARC;
48
    }
49

  
50
    public String getMdStoreManagerUrl() {
51
        return mdStoreManagerUrl;
52
    }
53

  
54
    @Required
55
    public void setMdStoreManagerUrl(final String mdStoreManagerUrl) {
56
        this.mdStoreManagerUrl = mdStoreManagerUrl;
57
    }
58

  
59
    public String getMdId() {
60
        return mdId;
61
    }
62

  
63
    public void setMdId(final String mdId) {
64
        this.mdId = mdId;
65
    }
66

  
67
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/PrepareEnvCollectHadoopJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2
import java.util.Map;
3
import java.util.Optional;
4
import java.util.stream.Collectors;
5

  
6
import eu.dnetlib.dhp.model.mdstore.Provenance;
7
import eu.dnetlib.enabling.datasources.common.Datasource;
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.springframework.beans.factory.annotation.Autowired;
11

  
12
import com.google.gson.Gson;
13
import com.googlecode.sarasvati.Arc;
14
import com.googlecode.sarasvati.NodeToken;
15

  
16
import eu.dnetlib.collector.worker.model.ApiDescriptor;
17
import eu.dnetlib.enabling.datasources.common.ApiParam;
18
import eu.dnetlib.enabling.datasources.common.LocalDatasourceManager;
19
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
20
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
21

  
22
public class PrepareEnvCollectHadoopJobNode extends SimpleJobNode {
23

  
24
    private static final Log log = LogFactory.getLog(PrepareEnvCollectHadoopJobNode.class);
25

  
26
    @Autowired
27
    private LocalDatasourceManager<?, ?> dsManager;
28

  
29
    private String hdfsBasePath;
30

  
31
    @Override
32
    protected String execute(final NodeToken token) throws Exception {
33

  
34
        // param 1 : hdfs path
35
        // param 2 : api descriptor (json)
36
        // param 3 : nameNode
37

  
38
        final String dsId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ID);
39
        final String apiId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE);
40
        final String mdId = token.getEnv().getAttribute("mdId");
41
        final String versionId = token.getEnv().getAttribute("versionId");
42

  
43
        final Optional<ApiDescriptor> opt = dsManager.getApis(dsId)
44
                .stream()
45
                .filter(a -> a.getId().equals(apiId))
46
                .map(a -> {
47
                    final ApiDescriptor res = new ApiDescriptor();
48
                    res.setBaseUrl(a.getBaseurl());
49
                    res.setId(a.getId());
50
                    res.setProtocol(a.getProtocol());
51
                    res.getParams().put("metadata_identifier_path", a.getMetadataIdentifierPath());
52
                    res.getParams().putAll(a.getApiParams()
53
                            .stream()
54
                            .map(o -> (ApiParam) o)
55
                            .collect(Collectors.toMap(ApiParam::getParam, ApiParam::getValue)));
56
                    return res;
57
                })
58
                .findFirst();
59

  
60
        if (opt.isPresent()) {
61
            final ApiDescriptor api = opt.get();
62
            final String hdfsPath = String.format("%s/%s/%s/store", hdfsBasePath, mdId, versionId);
63
            final String seqFilePath = String.format("%s/%s/%s/seqFile", hdfsBasePath, mdId, versionId);
64
            token.getEnv().setAttribute("apiDescription", new Gson().toJson(api));
65
            token.getEnv().setAttribute("mdStorePath", hdfsPath);
66
            token.getEnv().setAttribute("sequenceFilePath", seqFilePath);
67
            final Provenance provenance = new Provenance();
68
            provenance.setDatasourceId(dsId);
69
            final Datasource<?, ?> ds = dsManager.getDs(dsId);
70
            provenance.setDatasourceName(ds.getOfficialname());
71
            provenance.setNsPrefix(ds.getNamespaceprefix());
72
            token.getEnv().setAttribute("dataSourceInfo", new Gson().toJson(provenance));
73
            token.getEnv().setAttribute("timestamp", ""+System.currentTimeMillis());
74
            token.getEnv().setAttribute("identifierPath",api.getParams().get("metadata_identifier_path"));
75
            token.getEnv().setAttribute("workflowId",token.getProcess().getEnv().getAttribute("system:processId"));
76

  
77
            token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE_BASEURL, api.getBaseUrl());
78
            token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_PREFIX + "protocol", api.getProtocol());
79
            final Map<String, String> params = api.getParams();
80
            if (params != null) {
81
                for(Map.Entry<String, String> e : params.entrySet()) {
82
                    token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_PREFIX + e.getKey(), e.getValue());
83
                }
84
            }
85

  
86
            return Arc.DEFAULT_ARC;
87
        } else {
88
            return "abort";
89
        }
90

  
91
    }
92

  
93
    public String getHdfsBasePath() {
94
        return hdfsBasePath;
95
    }
96

  
97
    public void setHdfsBasePath(String hdfsBasePath) {
98
        this.hdfsBasePath = hdfsBasePath;
99
    }
100
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/SubmitDnetHadoopJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2

  
3
import com.googlecode.sarasvati.Engine;
4
import com.googlecode.sarasvati.NodeToken;
5
import com.googlecode.sarasvati.env.Env;
6
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
7
import eu.dnetlib.message.Message;
8
import eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.message.DnetMessageManager;
9
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode;
10
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
11
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
12
import eu.dnetlib.msro.workflows.util.ProgressProvider;
13
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.springframework.beans.factory.annotation.Autowired;
17

  
18
import java.util.List;
19
import java.util.Map;
20

  
21
public class SubmitDnetHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode {
22

  
23
    private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class);
24

  
25
    @Autowired
26
    DnetMessageManager dnetMessageManager;
27

  
28
    private boolean ongoing = true;
29

  
30
    private int currentValue;
31

  
32
    private String wfId;
33

  
34

  
35
    @Override
36
    protected void prepareJob(BlackboardJob job, NodeToken token) throws Exception {
37
        this.wfId = token.getProcess().getEnv().getAttribute("system:processId");
38

  
39

  
40
        Runnable r = () -> {
41
            while (ongoing) {
42
                Message mess = dnetMessageManager.getOnGoingMessages(wfId);
43
                if (mess != null && mess.getBody() != null && mess.getBody().containsKey("ongoing")) {
44
                    try {
45
                        this.currentValue = Integer.parseInt(mess.getBody().get("ongoing"));
46
                        Thread.sleep(1000);
47
                    } catch (Throwable e) {
48
                        log.error("Error ono receiving messages ", e);
49
                    }
50
                }
51
            }
52
        };
53
        new Thread(r).start();
54
        super.prepareJob(job, token);
55
    }
56

  
57

  
58
    @Override
59
    protected BlackboardWorkflowJobListener generateBlackboardListener(Engine engine, NodeToken token) {
60
        return new BlackboardWorkflowJobListener(engine, token) {
61
            @Override
62
            protected void onFailed(final BlackboardJob job) {
63
                ongoing = false;
64
                log.warn("Blackboard workflow node FAILED: " + job.getError());
65
                token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
66
                token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, job.getError());
67
                complete(job, "abort");
68
            }
69
            @Override
70
            protected void populateEnv(Env env, Map<String, String> responseParams) {
71
                ongoing = false;
72

  
73
                List<Message> reports = dnetMessageManager.getReport(wfId);
74
                if (reports == null) {
75
                    int numberOftries = 0;
76
                    try {
77
                        while (reports == null && numberOftries < 3) {
78
                            reports = dnetMessageManager.getReport(wfId);
79
                            Thread.sleep(3000 * numberOftries++);
80
                        }
81
                    } catch (InterruptedException e) {
82
                        log.error("Error on waiting report", e);
83
                    }
84
                }
85

  
86
                if (reports == null) {
87
                    throw new RuntimeException("Unable to get report for WorklowId " + wfId);
88
                }
89

  
90

  
91
                reports.forEach(it -> it.getBody().forEach(env::setAttribute));
92
            }
93
        };
94
    }
95

  
96
    @Override
97
    public int getTotalValue() {
98
        return 0;
99
    }
100

  
101
    @Override
102
    public int getCurrentValue() {
103
        return currentValue;
104
    }
105

  
106
    @Override
107
    public boolean isInaccurate() {
108
        return false;
109
    }
110

  
111
    @Override
112
    public ProgressProvider getProgressProvider() {
113
        return this;
114
    }
115
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/AbortMDStoreVersionJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2

  
3
import java.net.URI;
4
import java.util.HashMap;
5
import java.util.Map;
6

  
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.springframework.beans.factory.annotation.Required;
10
import org.springframework.web.client.RestTemplate;
11
import org.springframework.web.util.UriComponentsBuilder;
12

  
13
import com.googlecode.sarasvati.Arc;
14
import com.googlecode.sarasvati.NodeToken;
15

  
16
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
17

  
18
public class AbortMDStoreVersionJobNode extends SimpleJobNode {
19

  
20
    private static final Log log = LogFactory.getLog(AbortMDStoreVersionJobNode.class);
21

  
22
    /* Spring managed params */
23
    private String mdStoreManagerUrl;
24

  
25
    @Override
26
    protected String execute(final NodeToken token) throws Exception {
27
        final String url = getMdStoreManagerUrl() + "/version/{versionId}?force=true";
28

  
29
        final Map<String, Object> params = new HashMap<>();
30
        params.put("versionId", token.getEnv().getAttribute("versionId"));
31

  
32
        final URI uri = UriComponentsBuilder.fromUriString(url)
33
                .buildAndExpand(params)
34
                .toUri();
35

  
36
        final RestTemplate restTemplate = new RestTemplate();
37
        restTemplate.delete(uri);
38

  
39
        log.info("mdstore version deleted ");
40
        return Arc.DEFAULT_ARC;
41
    }
42

  
43
    public String getMdStoreManagerUrl() {
44
        return mdStoreManagerUrl;
45
    }
46

  
47
    @Required
48
    public void setMdStoreManagerUrl(final String mdStoreManagerUrl) {
49
        this.mdStoreManagerUrl = mdStoreManagerUrl;
50
    }
51

  
52
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/DeleteMDStoreHadoopJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2

  
3
import java.net.URI;
4
import java.util.HashMap;
5
import java.util.Map;
6

  
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.springframework.beans.factory.annotation.Required;
10
import org.springframework.web.client.RestTemplate;
11
import org.springframework.web.util.UriComponentsBuilder;
12

  
13
import com.googlecode.sarasvati.Arc;
14
import com.googlecode.sarasvati.NodeToken;
15

  
16
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
17

  
18
public class DeleteMDStoreHadoopJobNode extends SimpleJobNode {
19

  
20
    private static final Log log = LogFactory.getLog(DeleteMDStoreHadoopJobNode.class);
21

  
22
    private String mdId;
23

  
24
    /* Spring managed params */
25
    private String mdStoreManagerUrl;
26

  
27
    @Override
28
    protected String execute(final NodeToken token) throws Exception {
29
        final String url = getMdStoreManagerUrl() + "/mdstore/{mdId}";
30

  
31
        final Map<String, Object> params = new HashMap<>();
32
        params.put("mdId", getMdId());
33

  
34
        final URI uri = UriComponentsBuilder.fromUriString(url)
35
                .buildAndExpand(params)
36
                .toUri();
37

  
38
        final RestTemplate restTemplate = new RestTemplate();
39
        restTemplate.delete(uri);
40

  
41
        log.info("mdstor deleted: " + mdId);
42

  
43
        return Arc.DEFAULT_ARC;
44
    }
45

  
46
    public String getMdStoreManagerUrl() {
47
        return mdStoreManagerUrl;
48
    }
49

  
50
    @Required
51
    public void setMdStoreManagerUrl(final String mdStoreManagerUrl) {
52
        this.mdStoreManagerUrl = mdStoreManagerUrl;
53
    }
54

  
55
    public String getMdId() {
56
        return mdId;
57
    }
58

  
59
    public void setMdId(final String mdId) {
60
        this.mdId = mdId;
61
    }
62

  
63
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/CommitMDStoreVersionJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2

  
3
import java.net.URI;
4
import java.util.HashMap;
5
import java.util.Map;
6

  
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.springframework.beans.factory.annotation.Required;
10
import org.springframework.web.client.RestTemplate;
11
import org.springframework.web.util.UriComponentsBuilder;
12

  
13
import com.googlecode.sarasvati.Arc;
14
import com.googlecode.sarasvati.NodeToken;
15

  
16
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
17

  
18
public class CommitMDStoreVersionJobNode extends SimpleJobNode {
19

  
20
    private static final Log log = LogFactory.getLog(CommitMDStoreVersionJobNode.class);
21

  
22
    /* Spring managed params */
23
    private String mdStoreManagerUrl;
24

  
25
    @Override
26
    protected String execute(final NodeToken token) throws Exception {
27
        final String url = getMdStoreManagerUrl() + "/version/{versionId}/commit/{size}";
28

  
29
        final Map<String, Object> params = new HashMap<>();
30
        params.put("versionId", token.getEnv().getAttribute("versionId"));
31
        params.put("size", token.getEnv().getAttribute("mdStoreSize"));
32

  
33
        final URI uri = UriComponentsBuilder.fromUriString(url)
34
                .buildAndExpand(params)
35
                .toUri();
36

  
37
        final RestTemplate restTemplate = new RestTemplate();
38
        restTemplate.getForObject(uri, Void.class);
39

  
40
        log.info("mdstore version committed ");
41
        return Arc.DEFAULT_ARC;
42
    }
43

  
44
    public String getMdStoreManagerUrl() {
45
        return mdStoreManagerUrl;
46
    }
47

  
48
    @Required
49
    public void setMdStoreManagerUrl(final String mdStoreManagerUrl) {
50
        this.mdStoreManagerUrl = mdStoreManagerUrl;
51
    }
52
}
53

  
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/CreateMDStoreHadoopJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2
import java.net.URI;
3
import java.util.HashMap;
4
import java.util.Map;
5

  
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.springframework.beans.factory.annotation.Required;
9
import org.springframework.web.client.RestTemplate;
10
import org.springframework.web.util.UriComponentsBuilder;
11

  
12
import com.googlecode.sarasvati.Arc;
13
import com.googlecode.sarasvati.NodeToken;
14

  
15
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreWithInfo;
16
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
17
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
18

  
19
public class CreateMDStoreHadoopJobNode extends SimpleJobNode {
20

  
21
    private static final Log log = LogFactory.getLog(CreateMDStoreHadoopJobNode.class);
22

  
23
    /* Workflow params */
24
    private String format;
25
    private String layout;
26
    private String interpretation;
27
    private String outputPrefix = "mdstore";
28

  
29
    /* Spring managed params */
30
    private String mdStoreManagerUrl;
31

  
32
    @Override
33
    protected String execute(final NodeToken token) throws Exception {
34
        final String url = getMdStoreManagerUrl() + "/new/{format}/{layout}/{interpretation}";
35

  
36
        final Map<String, Object> params = new HashMap<>();
37
        params.put("format", getFormat());
38
        params.put("layout", getLayout());
39
        params.put("interpretation", getInterpretation());
40

  
41
        final URI uri = UriComponentsBuilder.fromUriString(url)
42
                .queryParam("dsName", token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_NAME))
43
                .queryParam("dsId", token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ID))
44
                .queryParam("apiId", token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE))
45
                .buildAndExpand(params)
46
                .toUri();
47

  
48
        final RestTemplate restTemplate = new RestTemplate();
49
        final MDStoreWithInfo result = restTemplate.getForObject(uri, MDStoreWithInfo.class);
50

  
51
        log.info("mdstore created " + result.toString());
52

  
53
        token.getEnv().setAttribute(getOutputPrefix() + "format", format);
54
        token.getEnv().setAttribute(getOutputPrefix() + "layout", layout);
55
        token.getEnv().setAttribute(getOutputPrefix() + "interpretation", interpretation);
56
        token.getEnv().setAttribute(getOutputPrefix() + "id", result.getId());
57

  
58
        return Arc.DEFAULT_ARC;
59
    }
60

  
61
    public String getFormat() {
62
        return format;
63
    }
64

  
65
    public void setFormat(final String format) {
66
        this.format = format;
67
    }
68

  
69
    public String getLayout() {
70
        return layout;
71
    }
72

  
73
    public void setLayout(final String layout) {
74
        this.layout = layout;
75
    }
76

  
77
    public String getInterpretation() {
78
        return interpretation;
79
    }
80

  
81
    public void setInterpretation(final String interpretation) {
82
        this.interpretation = interpretation;
83
    }
84

  
85
    public String getOutputPrefix() {
86
        return outputPrefix;
87
    }
88

  
89
    public void setOutputPrefix(final String outputPrefix) {
90
        this.outputPrefix = outputPrefix;
91
    }
92

  
93
    public String getMdStoreManagerUrl() {
94
        return mdStoreManagerUrl;
95
    }
96

  
97
    @Required
98
    public void setMdStoreManagerUrl(final String mdStoreManagerUrl) {
99
        this.mdStoreManagerUrl = mdStoreManagerUrl;
100
    }
101

  
102
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/MDStoreToApiExtraFieldHadoopJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2

  
3
import java.net.URI;
4
import java.util.Date;
5
import java.util.HashMap;
6
import java.util.Map;
7

  
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.springframework.beans.factory.annotation.Autowired;
11
import org.springframework.web.client.RestTemplate;
12
import org.springframework.web.util.UriComponentsBuilder;
13

  
14
import com.googlecode.sarasvati.Arc;
15
import com.googlecode.sarasvati.NodeToken;
16

  
17
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreWithInfo;
18
import eu.dnetlib.enabling.datasources.common.LocalDatasourceManager;
19
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
20

  
21
public class MDStoreToApiExtraFieldHadoopJobNode extends SimpleJobNode {
22

  
23
    private String mdId;
24
    private String datasourceId;
25
    private String datasourceInterface;
26
    private String extraFieldForTotal;
27
    private String extraFieldForDate;
28
    private String extraFieldForMdId;
29

  
30
    /* Spring managed params */
31
    private String mdStoreManagerUrl;
32

  
33
    @Autowired
34
    private LocalDatasourceManager<?, ?> dsManager;
35

  
36
    private static final Log log = LogFactory.getLog(MDStoreToApiExtraFieldHadoopJobNode.class);
37

  
38
    @Override
39
    protected String execute(final NodeToken token) throws Exception {
40

  
41
        final String url = getMdStoreManagerUrl() + "/mdstore/{mdId}";
42

  
43
        final Map<String, Object> params = new HashMap<>();
44
        params.put("mdId", getMdId());
45

  
46
        final URI uri = UriComponentsBuilder.fromUriString(url)
47
                .buildAndExpand(params)
48
                .toUri();
49

  
50
        final RestTemplate restTemplate = new RestTemplate();
51
        final MDStoreWithInfo mdstore = restTemplate.getForObject(uri, MDStoreWithInfo.class);
52

  
53
        final int size = Math.toIntExact(mdstore.getSize());
54
        final Date date = mdstore.getLastUpdate();
55

  
56
        if (extraFieldForTotal.equals("last_collection_total")) {
57
            dsManager.setLastCollectionInfo(datasourceId, datasourceInterface, mdId, size, date);
58
        } else if (extraFieldForTotal.equals("last_aggregation_total")) {
59
            dsManager.setLastAggregationInfo(datasourceId, datasourceInterface, mdId, size, date);
60
        } else {
61
            log.warn("Invalid field for total: " + extraFieldForTotal);
62
        }
63

  
64
        return Arc.DEFAULT_ARC;
65
    }
66

  
67
    public String getMdId() {
68
        return mdId;
69
    }
70

  
71
    public void setMdId(final String mdId) {
72
        this.mdId = mdId;
73
    }
74

  
75
    public String getDatasourceId() {
76
        return datasourceId;
77
    }
78

  
79
    public void setDatasourceId(final String datasourceId) {
80
        this.datasourceId = datasourceId;
81
    }
82

  
83
    public String getDatasourceInterface() {
84
        return datasourceInterface;
85
    }
86

  
87
    public void setDatasourceInterface(final String datasourceInterface) {
88
        this.datasourceInterface = datasourceInterface;
89
    }
90

  
91
    public String getExtraFieldForTotal() {
92
        return extraFieldForTotal;
93
    }
94

  
95
    public void setExtraFieldForTotal(final String extraFieldForTotal) {
96
        this.extraFieldForTotal = extraFieldForTotal;
97
    }
98

  
99
    public String getExtraFieldForDate() {
100
        return extraFieldForDate;
101
    }
102

  
103
    public void setExtraFieldForDate(final String extraFieldForDate) {
104
        this.extraFieldForDate = extraFieldForDate;
105
    }
106

  
107
    public String getExtraFieldForMdId() {
108
        return extraFieldForMdId;
109
    }
110

  
111
    public void setExtraFieldForMdId(final String extraFieldForMdId) {
112
        this.extraFieldForMdId = extraFieldForMdId;
113
    }
114

  
115
    public String getMdStoreManagerUrl() {
116
        return mdStoreManagerUrl;
117
    }
118

  
119
    public void setMdStoreManagerUrl(final String mdStoreManagerUrl) {
120
        this.mdStoreManagerUrl = mdStoreManagerUrl;
121
    }
122

  
123
}
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/test/profiles/openaireplus/workflows/repo-hi/Aggregate_Metadata_from_PubsRepository_Hadoop.xml
1
<RESOURCE_PROFILE>
2
    <HEADER>
3
        <RESOURCE_IDENTIFIER value="513da605-e9b4-4150-9116-f82a34a8585f_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/>
4
        <RESOURCE_TYPE value="WorkflowDSResourceType"/>
5
        <RESOURCE_KIND value="WorkflowDSResources"/>
6
        <RESOURCE_URI value="value3"/>
7
        <DATE_OF_CREATION value="2019-04-03T16:44:41+02:00"/>
8
    </HEADER>
9
    <BODY>
10
        <WORKFLOW_NAME>Aggregate Metadata (publications) from PubsRepository [Hadoop]</WORKFLOW_NAME>
11
        <WORKFLOW_INFO>
12
            <FIELD name="Action">Aggregate Metadata</FIELD>
13
            <FIELD name="Consequence IS">Support</FIELD>
14
            <FIELD name="Datasource class">PubsRepository</FIELD>
15
            <FIELD name="Content">publications</FIELD>
16
        </WORKFLOW_INFO>
17
        <WORKFLOW_TYPE>REPO_HI</WORKFLOW_TYPE>
18
        <WORKFLOW_PRIORITY>20</WORKFLOW_PRIORITY>
19
        <CONFIGURATION start="manual">
20
            <NODE isStart="true" name="VerifyDatasource" type="VerifyDatasource">
21
                <DESCRIPTION>Verify if DS is pending</DESCRIPTION>
22
                <PARAMETERS>
23
                    <PARAM managedBy="system" name="expectedInterfaceTypologyPrefixes" required="false" type="string"/>
24
                    <PARAM managedBy="system" name="expectedCompliancePrefixes" required="false" type="string"/>
25
                </PARAMETERS>
26
                <ARCS>
27
                    <ARC to="createMetaWf"/>
28
                    <ARC name="validateDs" to="validateDs"/>
29
                </ARCS>
30
            </NODE>
31
            <NODE name="validateDs" type="ValidateDatasource">
32
                <DESCRIPTION>Validate DS</DESCRIPTION>
33
                <PARAMETERS/>
34
                <ARCS>
35
                    <ARC to="createMetaWf"/>
36
                </ARCS>
37
            </NODE>
38
            <NODE name="createMetaWf" type="RegisterMetaWf">
39
                <DESCRIPTION>Create MetaWorkflow</DESCRIPTION>
40
                <PARAMETERS>
41
                    <PARAM managedBy="system" name="wfName" required="true" type="string">Aggregate Metadata (publications) from PubsRepository [Hadoop]
42
                    </PARAM>
43
                </PARAMETERS>
44
                <ARCS>
45
                    <ARC to="createDc"/>
46
                    <ARC to="createOaf"/>
47
                </ARCS>
48
            </NODE>
49
            <NODE name="createDc" type="CreateMDStoreHadoop">
50
                <DESCRIPTION>Create oai_dc store</DESCRIPTION>
51
                <PARAMETERS>
52
                    <PARAM managedBy="system" name="format" required="true" type="string">oai_dc</PARAM>
53
                    <PARAM managedBy="system" name="interpretation" required="true" type="string">native</PARAM>
54
                    <PARAM managedBy="system" name="layout" required="true" type="string">store</PARAM>
55
                    <PARAM managedBy="system" name="outputPrefix" required="true" type="string">harv_</PARAM>
56
                </PARAMETERS>
57
                <ARCS>
58
                    <ARC to="updateMetaWf"/>
59
                </ARCS>
60
            </NODE>
61
            <NODE name="createOaf" type="CreateMDStoreHadoop">
62
                <DESCRIPTION>Create OAF store</DESCRIPTION>
63
                <PARAMETERS>
64
                    <PARAM managedBy="system" name="format" required="true" type="string">OAF</PARAM>
65
                    <PARAM managedBy="system" name="interpretation" required="true" type="string">cleaned</PARAM>
66
                    <PARAM managedBy="system" name="layout" required="true" type="string">store</PARAM>
67
                    <PARAM managedBy="system" name="outputPrefix" required="true" type="string">tran_</PARAM>
68
                </PARAMETERS>
69
                <ARCS>
70
                    <ARC to="updateMetaWf"/>
71
                </ARCS>
72
            </NODE>
73
            <NODE isJoin="true" name="updateMetaWf" type="UpdateMetaWf">
74
                <DESCRIPTION>Create MetaWorkflow</DESCRIPTION>
75
                <PARAMETERS>
76
                    <PARAM managedBy="system" name="beanName" required="true" type="string">metaWfPubsRepositoryHadoop</PARAM>
77
                </PARAMETERS>
78
                <ARCS>
79
                    <ARC to="updateMetaWfStatus"/>
80
                </ARCS>
81
            </NODE>
82
            <NODE name="updateMetaWfStatus" type="UpdateOpenaireMetaWfStatus">
83
                <DESCRIPTION>Update MetaWorkflow Status</DESCRIPTION>
84
                <PARAMETERS/>
85
                <ARCS>
86
                    <ARC to="success"/>
87
                </ARCS>
88
            </NODE>
89
        </CONFIGURATION>
90

  
91
        <STATUS/>
92
    </BODY>
93
</RESOURCE_PROFILE>
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/applicationContext-repohi.xml
400 400
            </bean>
401 401
        </property>
402 402
    </bean>
403

  
404

  
405

  
406
    <!-- Aggregate Metadata (publications) from PubsRepository [Hadoop] -->
407
    <bean id="metaWfPubsRepositoryHadoop"
408
          class="eu.dnetlib.msro.workflows.metawf.DatasourceMetaWorkflow"
409
          p:destroyWorkflowTemplate="classpath:/eu/dnetlib/msro/openaireplus/workflows/repo-hi/pubsRepositoryHadoop/repoBye.wf.st"
410
          scope="prototype">
411
        <property name="tree">
412
            <bean class="eu.dnetlib.msro.workflows.metawf.WorkflowTree"
413
                  p:name="collection"
414
                  p:template="classpath:/eu/dnetlib/msro/openaireplus/workflows/repo-hi/pubsRepositoryHadoop/collection.wf.st">
415
                <property name="children">
416
                    <list>
417
                        <bean class="eu.dnetlib.msro.workflows.metawf.WorkflowTree"
418
                              p:name="transform"
419
                              p:template="classpath:/eu/dnetlib/msro/openaireplus/workflows/repo-hi/pubsRepositoryHadoop/transform.wf.st" />
420
                    </list>
421
                </property>
422
            </bean>
423
        </property>
424
    </bean>
425

  
403 426
</beans>
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/pubsRepositoryHadoop/repoBye.wf.st
1
<NODE name="SET_INFO" isStart="true" type="SetProviderInfo">
2
	<DESCRIPTION>Set information about current provider</DESCRIPTION>
3
	<PARAMETERS>
4
		<PARAM required="true" type="string" name="providerId" managedBy="system">$params.("dataprovider:id")$</PARAM>
5
		<PARAM required="true" type="string" name="providerName" managedBy="system">$params.("dataprovider:name")$</PARAM>
6
		<PARAM required="true" type="string" name="api" managedBy="system">$params.("dataprovider:interface")$</PARAM>
7
	</PARAMETERS>
8
	<ARCS>
9
		<ARC to="DeleteMetaWorkflow"/>
10
	</ARCS>
11
</NODE>
12

  
13
<NODE name="DeleteMetaWorkflow" type="DeleteOpenaireMetaWf">
14
	<DESCRIPTION>Delete the MetaWorkflow</DESCRIPTION>
15
	<PARAMETERS>
16
		<PARAM required="true" type="string" name="metaWfId" managedBy="system">$params.("META_WORKFLOW_ID")$</PARAM>
17
	</PARAMETERS>
18
	<ARCS>
19
		<ARC to="DeleteMDStoreDC"/>
20
	</ARCS>
21
</NODE>
22

  
23
<NODE name="DeleteMDStoreDC" type="DeleteMDStoreHadoop">
24
	<DESCRIPTION>Delete the oai_dc mdstore</DESCRIPTION>
25
	<PARAMETERS>
26
		<PARAM required="true" type="string" name="mdId" managedBy="system">$params.("harv_id")$</PARAM>
27
	</PARAMETERS>
28
	<ARCS>
29
		<ARC to="DeleteMDStoreOAF"/>
30
	</ARCS>
31
</NODE>
32

  
33
<NODE name="DeleteMDStoreOAF" type="DeleteMDStoreHadoop">
34
	<DESCRIPTION>Delete the OAF mdstore</DESCRIPTION>
35
	<PARAMETERS>
36
		<PARAM required="true" type="string" name="mdId" managedBy="system">$params.("tran_id")$</PARAM>
37
	</PARAMETERS>
38
	<ARCS>
39
		<ARC to="RemoveApiExtraFields"/>
40
	</ARCS>
41
</NODE>
42

  
43
<NODE name="RemoveApiExtraFields" type="RemoveApiExtraFields">
44
	<DESCRIPTION>Reset the extrafields of the api</DESCRIPTION>
45
	<PARAMETERS>
46
		<PARAM required="true" type="string" name="datasourceId" managedBy="system">$params.("dataprovider:id")$</PARAM>
47
		<PARAM required="true" type="string" name="datasourceInterface" managedBy="system">$params.("dataprovider:interface")$</PARAM>
48
		<PARAM required="true" type="string" name="fields" managedBy="system">last_collection_total, last_collection_date, last_collection_mdId, last_aggregation_total, last_aggregation_date, last_aggregation_mdId</PARAM>
49
	</PARAMETERS>
50
	<ARCS>
51
		<ARC to="success"/>
52
	</ARCS>
53
</NODE>
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/pubsRepositoryHadoop/collection.wf.st
1
 <NODE name="SET_INFO" isStart="true" type="SetProviderInfo">
2
	<DESCRIPTION>Set information about current provider</DESCRIPTION>
3
	<PARAMETERS>
4
		<PARAM required="true" type="string" name="providerId" managedBy="system">$params.("dataprovider:id")$</PARAM>
5
		<PARAM required="true" type="string" name="providerName" managedBy="system">$params.("dataprovider:name")$</PARAM>
6
		<PARAM required="true" type="string" name="api" managedBy="system">$params.("dataprovider:interface")$</PARAM>
7
	</PARAMETERS>
8
	<ARCS>
9
		<ARC to="obtainParams"/>
10
	</ARCS>
11
</NODE>
12

  
13
<NODE name="obtainParams" type="ObtainOpenaireDataSourceParams">
14
	<DESCRIPTION>Obtain data source params</DESCRIPTION>
15
	<PARAMETERS>
16
		<PARAM required="true" type="string" name="providerId" managedBy="system">$params.("dataprovider:id")$</PARAM>
17
	</PARAMETERS>
18
	<ARCS>
19
		<ARC to="PREPARE_STORE_VERSION"/>
20
	</ARCS>
21
</NODE>
22
<NODE name="PREPARE_STORE_VERSION" type="PrepareMDStoreVersion">
23
	<DESCRIPTION>Prepare a new MdStore Version</DESCRIPTION>
24
	<PARAMETERS>
25
		<PARAM required="true" type="string" name="mdId" managedBy="system">$params.("harv_id")$</PARAM>
26
	</PARAMETERS>
27
	<ARCS>
28
        <ARC to="PREPARE_ENV_COLLECTION"/>
29
    </ARCS>
30
</NODE>
31
<NODE name="PREPARE_ENV_COLLECTION" type="PrepareEnvCollectHadoopJobNode">
32
    <DESCRIPTION>Put in the environment all the variable needed to the collection oozie job </DESCRIPTION>
33
    <PARAMETERS>
34
        <PARAM managedBy="user" name="hdfsBasePath" required="true" type="string"></PARAM>
35
    </PARAMETERS>
36
    <ARCS>
37
        <ARC to="COLLECT_HADOOP"/>
38
    </ARCS>
39
</NODE>
40
<NODE name="COLLECT_HADOOP" type="SubmitDnetHadoopJobNode">
41
    <DESCRIPTION>Start the Hadoop Job</DESCRIPTION>
42
    <PARAMETERS>
43
        <PARAM managedBy="system" name="hadoopJob" required="true" type="string">dnetHadoopCollection</PARAM>
44
        <PARAM managedBy="user" name="cluster" required="true" type="string">DHP</PARAM>
45
        <PARAM managedBy="system" name="envParams" required="true" type="string">
46
            {
47
                "apiDescription":"apiDescription",
48
                "mdStorePath":"mdStorePath",
49
                "sequenceFilePath":"sequenceFilePath",
50
                "dataSourceInfo":"dataSourceInfo"  ,
51
                "timestamp":"timestamp",
52
                "identifierPath":"identifierPath",
53
                "workflowId":"workflowId"
54
            }
55
        </PARAM>
56
    </PARAMETERS>
57
    <ARCS>
58
        <ARC to="COMMIT_VERSION"/>
59
        <ARC name="abort" to="ABORT_VERSION"/>
60
    </ARCS>
61
</NODE>
62

  
63
<NODE name="COMMIT_VERSION" type="CommitMDStoreVersion">
64
	<DESCRIPTION>Commit the mdstore version</DESCRIPTION>
65
	<PARAMETERS/>
66
	<ARCS>
67
		<ARC to="UPDATE_INFO"/>
68
	</ARCS>
69
</NODE>
70

  
71
<NODE name="ABORT_VERSION" type="AbortMDStoreVersion">
72
	<DESCRIPTION>Abort the mdstore version</DESCRIPTION>
73
	<PARAMETERS/>
74
	<ARCS>
75
		<ARC to="failure"/>
76
	</ARCS>
77
</NODE>
78

  
79
<NODE name="UPDATE_INFO" type="MDStoreToApiExtraFieldHadoop">
80
	<DESCRIPTION>Update datasouce API extra fields</DESCRIPTION>
81
	<PARAMETERS>
82
		<PARAM required="true" type="string" name="mdId" managedBy="system">$params.("harv_id")$</PARAM>
83
		<PARAM required="true" type="string" name="datasourceId" managedBy="system">$params.("dataprovider:id")$</PARAM>
84
		<PARAM required="true" type="string" name="datasourceInterface" managedBy="system">$params.("dataprovider:interface")$</PARAM>
85
		<PARAM required="true" type="string" name="extraFieldForTotal" managedBy="system">last_collection_total</PARAM>
86
		<PARAM required="true" type="string" name="extraFieldForDate" managedBy="system">last_collection_date</PARAM>
87
		<PARAM required="true" type="string" name="extraFieldForMdId" managedBy="system">last_collection_mdId</PARAM>
88
	</PARAMETERS>
89
	<ARCS>
90
		<ARC to="success"/>
91
	</ARCS>
92
</NODE>
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/pubsRepositoryHadoop/transform.wf.st
1
<NODE name="SET_INFO" isStart="true" type="SetProviderInfo">
2
	<DESCRIPTION>Set information about current provider</DESCRIPTION>
3
	<PARAMETERS>
4
		<PARAM required="true" type="string" name="providerId" managedBy="system">$params.("dataprovider:id")$</PARAM>
5
		<PARAM required="true" type="string" name="providerName" managedBy="system">$params.("dataprovider:name")$</PARAM>
6
		<PARAM required="true" type="string" name="api" managedBy="system">$params.("dataprovider:interface")$</PARAM>
7
	</PARAMETERS>
8
	<ARCS>
9
		<ARC to="obtainParams" />
10
	</ARCS>
11
</NODE>
12

  
13
<NODE name="obtainParams" type="ObtainOpenaireDataSourceParams">
14
	<DESCRIPTION>Fetch records from MDStore</DESCRIPTION>
15
	<PARAMETERS>
16
		<PARAM required="true" type="string" name="providerId" managedBy="system">$params.("dataprovider:id")$</PARAM>
17
	</PARAMETERS>
18
	<ARCS>
19
		<ARC to="START_READING"/>
20
	</ARCS>
21
</NODE>
22

  
23
<NODE name="START_READING" type="StartReadingMDStore">
24
	<DESCRIPTION>Start reading Hadoop MD Store</DESCRIPTION>
25
	<PARAMETERS>
26
		<PARAM required="true" type="string" name="mdId" managedBy="system">$params.("harv_id")$</PARAM>
27
	</PARAMETERS>
28
	<ARCS>
29
		<ARC to="PREPARE_STORE_VERSION"/>
30
	</ARCS>
31
</NODE>
32

  
33
<NODE name="PREPARE_STORE_VERSION" type="PrepareMDStoreVersion">
34
	<DESCRIPTION>Prepare a new MdStore Version</DESCRIPTION>
35
	<PARAMETERS>
36
		<PARAM required="true" type="string" name="mdId" managedBy="system">$params.("tran_id")$</PARAM>
37
	</PARAMETERS>
38
	<ARCS>
39
        <ARC to="PREPARE_ENV_TRANSFORMATION"/>
40
    </ARCS>
41
</NODE>
42
<NODE name="PREPARE_ENV_TRANSFORMATION" type="PrepareEnvTransformHadoopJobNode">
43
    <DESCRIPTION>Retrieve all the parameters needed to run the transformation workflow</DESCRIPTION>
44
    <PARAMETERS>
45
        <PARAM category="TRANSFORMATION_RULE_ID" function="listProfiles('TransformationRuleDSResourceType', '//TITLE')" managedBy="user" name="ruleId" required="true" type="string"></PARAM>
46
        <PARAM managedBy="user" name="hdfsBasePath" required="true" type="string"></PARAM>
47
    </PARAMETERS>
48
    <ARCS>
49
        <ARC to="TRANSFORM_HADOOP"/>
50
    </ARCS>
51
</NODE>
52
<NODE name="TRANSFORM_HADOOP" type="SubmitDnetHadoopJobNode">
53
    <DESCRIPTION>Start the Hadoop Job</DESCRIPTION>
54
    <PARAMETERS>
55
        <PARAM managedBy="system" name="hadoopJob" required="true" type="string">dnetHadoopTrasnformation</PARAM>
56
        <PARAM managedBy="user" name="cluster" required="true" type="string">DHP</PARAM>
57
        <PARAM managedBy="system" name="envParams" required="true" type="string">
58
            {
59
                "mdstoreInputPath":"mdstoreInputPath",
60
                "mdstoreOutputPath":"mdstoreOutputPath",
61
                "transformationRule":"transformationRule",
62
                "timestamp":"timestamp",
63
                "workflowId":"workflowId"
64
            }
65
        </PARAM>
66
    </PARAMETERS>
67
    <ARCS>
68
        <ARC to="COMMIT_VERSION"/>
69
        <ARC name="abort" to="ABORT_VERSION"/>
70
    </ARCS>
71
</NODE>
72
<NODE name="COMMIT_VERSION" type="CommitMDStoreVersion">
73
    <DESCRIPTION>Commit the mdstore version</DESCRIPTION>
74
    <PARAMETERS/>
75
    <ARCS>
76
        <ARC to="END_READING"/>
77
    </ARCS>
78
</NODE>
79
<NODE name="END_READING" type="EndReadingMDStore">
80
    <DESCRIPTION>End reading Hadoop MD Store</DESCRIPTION>
81
    <PARAMETERS/>
82
    <ARCS>
83
        <ARC to="UPDATE_INFO"/>
84
    </ARCS>
85
</NODE>
86
<NODE name="ABORT_VERSION" type="AbortMDStoreVersion">
87
    <DESCRIPTION>Abort the mdstore version</DESCRIPTION>
88
    <PARAMETERS/>
89
    <ARCS>
90
        <ARC to="END_READING_ABORT"/>
91
    </ARCS>
92
</NODE>
93
<NODE name="END_READING_ABORT" type="EndReadingMDStore">
94
    <DESCRIPTION>End reading Hadoop MD Store</DESCRIPTION>
95
    <PARAMETERS/>
96
    <ARCS>
97
        <ARC to="failure"/>
98
    </ARCS>
99
</NODE>
100

  
101
<NODE name="UPDATE_INFO" type="MDStoreToApiExtraFieldHadoop">
102
	<DESCRIPTION>Update datasouce API extra fields</DESCRIPTION>
103
	<PARAMETERS>
104
		<PARAM required="true" type="string" name="mdId" managedBy="system">$params.("tran_id")$</PARAM>
105
		<PARAM required="true" type="string" name="datasourceId" managedBy="system">$params.("dataprovider:id")$</PARAM>
106
		<PARAM required="true" type="string" name="datasourceInterface" managedBy="system">$params.("dataprovider:interface")$</PARAM>
107
		<PARAM required="true" type="string" name="extraFieldForTotal" managedBy="system">last_aggregation_total</PARAM>
108
		<PARAM required="true" type="string" name="extraFieldForDate" managedBy="system">last_aggregation_date</PARAM>
109
		<PARAM required="true" type="string" name="extraFieldForMdId" managedBy="system">last_aggregation_mdId</PARAM>
110
	</PARAMETERS>
111
	<ARCS>
112
		<ARC to="success"/>
113
	</ARCS>
114
</NODE>
115

  
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/message/applicationContext-msro-openaire-message.properties
1
dnet.openaire.messageManager.host =
2
dnet.openaire.messageManager.username =  
3
dnet.openaire.messageManager.password =  
4
dnet.openaire.messageManager.ongoingMessageQueueName = 
5
dnet.openaire.messageManager.reportMessageQueueName =
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/message/applicationContext-msro-openaire-message.xml
1
<?xml version="1.0" encoding="UTF-8"?>
2
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3
       xmlns:p="http://www.springframework.org/schema/p"
4
       xmlns="http://www.springframework.org/schema/beans"
5
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
6

  
7

  
8
    <bean id="dnetMessageManager" class="eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.message.DnetMessageManager"
9
          p:messageQueueServer="${dnet.openaire.messageManager.host}"
10
          p:username="${dnet.openaire.messageManager.username}"
11
          p:password="${dnet.openaire.messageManager.password}"
12
          init-method="startListeningMessage"
13
    ></bean>
14

  
15

  
16
</beans>
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/applicationContext-msro-openaire-nodes.xml
4 4
       xmlns="http://www.springframework.org/schema/beans"
5 5
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
6 6

  
7
	<bean id="wfNodeUpdateActionSets"
8
		class="eu.dnetlib.msro.openaireplus.workflows.nodes.actions.UpdateSetsJobNode"
9
		scope="prototype" />
7
    <bean id="wfNodeUpdateActionSets"
8
          class="eu.dnetlib.msro.openaireplus.workflows.nodes.actions.UpdateSetsJobNode"
9
          scope="prototype"/>
10 10

  
11
	<bean id="wfNodePrepareActionSets"
12
		class="eu.dnetlib.msro.openaireplus.workflows.nodes.actions.PrepareActionSetsJobNode"
13
		scope="prototype" />
11
    <bean id="wfNodePrepareActionSets"
12
          class="eu.dnetlib.msro.openaireplus.workflows.nodes.actions.PrepareActionSetsJobNode"
13
          scope="prototype"/>
14 14

  
15
	<bean id="wfNodeCleanActionSetsProfile"
16
		class="eu.dnetlib.msro.openaireplus.workflows.nodes.actions.CleanActionSetsProfileJobNode"
17
		p:xupdate="${dnet.actionmanager.garbage.sets.xupdate}"
18
		scope="prototype" />
15
    <bean id="wfNodeCleanActionSetsProfile"
16
          class="eu.dnetlib.msro.openaireplus.workflows.nodes.actions.CleanActionSetsProfileJobNode"
17
          p:xupdate="${dnet.actionmanager.garbage.sets.xupdate}"
18
          scope="prototype"/>
19 19

  
20
	<bean id="wfNodeGarbageSets"
21
		class="eu.dnetlib.msro.openaireplus.workflows.nodes.actions.GarbageSetsJobNode"
22
		scope="prototype" />
20
    <bean id="wfNodeGarbageSets"
21
          class="eu.dnetlib.msro.openaireplus.workflows.nodes.actions.GarbageSetsJobNode"
22
          scope="prototype"/>
23 23

  
24
	<bean id="wfNodeGarbageSetsHDFS"
25
		class="eu.dnetlib.msro.openaireplus.workflows.nodes.actions.GarbageSetsHDFSJobNode"
26
		scope="prototype" />
24
    <bean id="wfNodeGarbageSetsHDFS"
25
          class="eu.dnetlib.msro.openaireplus.workflows.nodes.actions.GarbageSetsHDFSJobNode"
26
          scope="prototype"/>
27 27

  
28
	<bean id="wfNodePromoteActionsHDFS"
29
		class="eu.dnetlib.msro.openaireplus.workflows.nodes.actions.PromoteActionsHDFSJobNode"
30
		scope="prototype" />
28
    <bean id="wfNodePromoteActionsHDFS"
29
          class="eu.dnetlib.msro.openaireplus.workflows.nodes.actions.PromoteActionsHDFSJobNode"
30
          scope="prototype"/>
31 31

  
32
	<bean id="wfNodePromoteActions"
33
		class="eu.dnetlib.msro.openaireplus.workflows.nodes.actions.PromoteActionsJobNode"
34
		scope="prototype" />
32
    <bean id="wfNodePromoteActions"
33
          class="eu.dnetlib.msro.openaireplus.workflows.nodes.actions.PromoteActionsJobNode"
34
          scope="prototype"/>
35 35

  
36
	<bean id="wfNodePrepareCopyTable"
37
		class="eu.dnetlib.msro.openaireplus.workflows.nodes.hbase.PrepareCopyTableJobNode"
38
		scope="prototype" />
36
    <bean id="wfNodePrepareCopyTable"
37
          class="eu.dnetlib.msro.openaireplus.workflows.nodes.hbase.PrepareCopyTableJobNode"
38
          scope="prototype"/>
39 39

  
40 40

  
41
	<bean id="wfNodeIncrementalTransformation"
42
		class="eu.dnetlib.msro.openaireplus.workflows.nodes.IncrementalTransformationJobNode"
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff