Project

General

Profile

« Previous | Next » 

Revision 55265

complete wf of collection

View differences:

modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/java/eu/dnetlib/msro/message/DnetMessageManager.java
1 1
package eu.dnetlib.msro.message;
2 2

  
3
import java.util.HashMap;
4
import java.util.Map;
5
import java.util.concurrent.LinkedBlockingQueue;
6

  
3
import eu.dnetlib.message.Message;
4
import eu.dnetlib.message.MessageManager;
5
import eu.dnetlib.message.MessageType;
7 6
import org.apache.commons.logging.Log;
8 7
import org.apache.commons.logging.LogFactory;
9 8
import org.springframework.beans.factory.annotation.Required;
10 9

  
11
import eu.dnetlib.message.Message;
12
import eu.dnetlib.message.MessageManager;
13
import eu.dnetlib.message.MessageType;
10
import java.util.ArrayList;
11
import java.util.HashMap;
12
import java.util.List;
13
import java.util.Map;
14
import java.util.concurrent.LinkedBlockingQueue;
14 15

  
15 16
public class DnetMessageManager {
17
    private static final Log log = LogFactory.getLog(DnetMessageManager.class);
16 18

  
17
	private static final Log log = LogFactory.getLog(DnetMessageManager.class);
19
    private MessageManager manager;
18 20

  
19
	private MessageManager manager;
21
    private LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
20 22

  
21
	private final LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
22 23

  
23
	private final Map<String, Message> onGonginMessages = new HashMap<>();
24
    private final  Map<String, Message> onGonginMessages = new HashMap<>();
24 25

  
25
	private final Map<String, Message> reportMessages = new HashMap<>();
26
    private final Map<String, List<Message>> reportMessages = new HashMap<>();
26 27

  
27
	private String messageQueueServer;
28
    private String messageQueueServer;
28 29

  
29
	private String username;
30
    private String username;
30 31

  
31
	private String password;
32
    private String password;
32 33

  
33
	public DnetMessageManager() {
34 34

  
35
	}
35
    public DnetMessageManager() {
36 36

  
37
	private String createReportId(final String wfId, final String jobName) {
38
		return String.format("%s::%s", wfId, jobName);
39
	}
37
    }
40 38

  
41
	public void startListeningMessage() throws Exception {
42
		if (manager == null) {
43
			manager = new MessageManager(messageQueueServer, username, password, messages);
44
			manager.startConsumingMessage("dev_ongoing", false, false);
45
			manager.startConsumingMessage("dev_report", true, false);
46 39

  
47
			final Runnable r = () -> {
48
				while (true) {
49
					try {
50
						final Message currentMessage = messages.take();
40
    private String createReportId(final String wfId, final String jobName) {
41
        return String.format("%s::%s", wfId, jobName);
42
    }
51 43

  
52
						if (currentMessage.getType() == MessageType.ONGOING) {
53
							synchronized (onGonginMessages) {
54
								onGonginMessages.put(currentMessage.getWorkflowId(), currentMessage);
55
							}
56
						} else {
57
							synchronized (reportMessages) {
58
								reportMessages.put(createReportId(currentMessage.getWorkflowId(), currentMessage.getJobName()), currentMessage);
59
							}
60
						}
61
					} catch (final InterruptedException e) {
62
						log.error("An error occured on retrieving messages from the blocking queue", e);
63
						throw new RuntimeException("An error occured on retrieving messages from the blocking queue", e);
64
					}
44
    public void startListeningMessage() throws Exception {
45
        if (manager == null) {
46
            manager = new MessageManager(messageQueueServer, username, password, messages);
47
            manager.startConsumingMessage("dev_ongoing", true, false);
48
            manager.startConsumingMessage("dev_report", true, false);
65 49

  
66
				}
67
			};
68
			new Thread(r).start();
69
		}
70
	}
50
            Runnable r = () -> {
51
                while (true) {
52
                    try {
53
                        Message currentMessage = messages.take();
71 54

  
72
	public Message getReport(final String workflowId, final String jobName) {
73
		final String reportId = createReportId(workflowId, jobName);
74
		return getMessage(reportMessages, reportId);
75
	}
55
                        if (currentMessage.getType() == MessageType.ONGOING) {
56
                            synchronized (onGonginMessages) {
57
                                onGonginMessages.put(currentMessage.getWorkflowId(), currentMessage);
58
                            }
59
                        } else {
60
                            synchronized (reportMessages) {
61
                                if (!reportMessages.containsKey(currentMessage.getWorkflowId()))
62
                                {
63
                                    reportMessages.put(currentMessage.getWorkflowId(), new ArrayList<>());
64
                                }
65
                                reportMessages.get(currentMessage.getWorkflowId()).add(currentMessage);
66
                            }
67
                        }
68
                    } catch (InterruptedException e) {
69
                        log.error("An error occured on retrieving messages from the blocking queue",e);
70
                        throw new RuntimeException("An error occured on retrieving messages from the blocking queue",e);
71
                    }
76 72

  
77
	private Message getMessage(final Map<String, Message> messageMap, final String reportId) {
78
		if (messageMap.containsKey(reportId)) {
79
			final Message m = messageMap.get(reportId);
80
			messageMap.remove(reportId);
81
			return m;
82
		}
83
		return null;
84
	}
73
                }
74
            };
75
            new Thread(r).start();
76
        }
77
    }
85 78

  
86
	public Message getOnGoingMessages(final String workflowId) {
87
		return getMessage(onGonginMessages, workflowId);
88
	}
79
    public List<Message> getReport(final String workflowId) {
89 80

  
90
	public String getMessageQueueServer() {
91
		return messageQueueServer;
92
	}
81
        return getMessages(reportMessages, workflowId);
82
    }
93 83

  
94
	@Required
95
	public void setMessageQueueServer(final String messageQueueServer) {
96
		this.messageQueueServer = messageQueueServer;
97
	}
84
    private List<Message> getMessages(final Map<String, List<Message>> messageMap, String reportId) {
85
        if (messageMap.containsKey(reportId)) {
86
            List<Message> m = messageMap.get(reportId);
87
            messageMap.remove(reportId);
88
            return m;
89
        }
90
        return null;
91
    }
98 92

  
99
	public String getUsername() {
100
		return username;
101
	}
102 93

  
103
	@Required
104
	public void setUsername(final String username) {
105
		this.username = username;
106
	}
94
    private Message getMessage(final Map<String, Message> messageMap, String reportId) {
95
        if (messageMap.containsKey(reportId)) {
96
            Message m = messageMap.get(reportId);
97
            messageMap.remove(reportId);
98
            return m;
99
        }
100
        return null;
101
    }
107 102

  
108
	public String getPassword() {
109
		return password;
110
	}
111 103

  
112
	@Required
113
	public void setPassword(final String password) {
114
		this.password = password;
115
	}
104
    public Message getOnGoingMessages(final String workflowId) {
105
        return getMessage(onGonginMessages, workflowId);
106
    }
107

  
108

  
109
    public String getMessageQueueServer() {
110
        return messageQueueServer;
111
    }
112

  
113
    @Required
114
    public void setMessageQueueServer(String messageQueueServer) {
115
        this.messageQueueServer = messageQueueServer;
116
    }
117

  
118
    public String getUsername() {
119
        return username;
120
    }
121

  
122
    @Required
123
    public void setUsername(String username) {
124
        this.username = username;
125
    }
126

  
127
    public String getPassword() {
128
        return password;
129
    }
130

  
131
    @Required
132
    public void setPassword(String password) {
133
        this.password = password;
134
    }
116 135
}
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hadoop/PrepareEnvCollectHadoopJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop;
2

  
3
import java.util.Optional;
4
import java.util.stream.Collectors;
5

  
6
import eu.dnetlib.dhp.model.mdstore.Provenance;
7
import eu.dnetlib.enabling.datasources.common.Datasource;
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.springframework.beans.factory.annotation.Autowired;
11

  
12
import com.google.gson.Gson;
13
import com.googlecode.sarasvati.Arc;
14
import com.googlecode.sarasvati.NodeToken;
15

  
16
import eu.dnetlib.collector.worker.model.ApiDescriptor;
17
import eu.dnetlib.enabling.datasources.common.ApiParam;
18
import eu.dnetlib.enabling.datasources.common.LocalDatasourceManager;
19
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
20
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
21

  
22
public class PrepareEnvCollectHadoopJobNode extends SimpleJobNode {
23

  
24
	private static final Log log = LogFactory.getLog(PrepareEnvCollectHadoopJobNode.class);
25

  
26
	@Autowired
27
	private LocalDatasourceManager<?, ?> dsManager;
28

  
29
	private String hdfsBasePath;
30

  
31
	@Override
32
	protected String execute(final NodeToken token) throws Exception {
33

  
34
		// param 1 : hdfs path
35
		// param 2 : api descriptor (json)
36
		// param 3 : nameNode
37

  
38
		final String dsId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ID);
39
		final String apiId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE);
40
		final String mdId = token.getEnv().getAttribute("mdId");
41
		final String versionId = token.getEnv().getAttribute("versionId");
42

  
43
		final Optional<ApiDescriptor> opt = dsManager.getApis(dsId)
44
				.stream()
45
				.filter(a -> a.getId().equals(apiId))
46
				.map(a -> {
47
					final ApiDescriptor res = new ApiDescriptor();
48
					res.setBaseUrl(a.getBaseurl());
49
					res.setId(a.getId());
50
					res.setProtocol(a.getProtocol());
51
					res.getParams().put("metadata_identifier_path", a.getMetadataIdentifierPath());
52
					res.getParams().putAll(a.getApiParams()
53
							.stream()
54
							.map(o -> (ApiParam) o)
55
							.collect(Collectors.toMap(ApiParam::getParam, ApiParam::getValue)));
56
					return res;
57
				})
58
				.findFirst();
59

  
60
		if (opt.isPresent()) {
61
			final ApiDescriptor api = opt.get();
62
			final String hdfsPath = String.format("%s/%s/%s/store", hdfsBasePath, mdId, versionId);
63
			final String seqFilePath = String.format("%s/%s/%s/seqFile", hdfsBasePath, mdId, versionId);
64
			token.getEnv().setAttribute("apiDescription", new Gson().toJson(api));
65
			token.getEnv().setAttribute("mdStorePath", hdfsPath);
66
			token.getEnv().setAttribute("sequenceFilePath", seqFilePath);
67
			final Provenance provenance = new Provenance();
68
			provenance.setDatasourceId(dsId);
69
			final Datasource<?, ?> ds = dsManager.getDs(dsId);
70
			provenance.setDatasourceName(ds.getOfficialname());
71
			provenance.setNsPrefix(ds.getNamespaceprefix());
72
			token.getEnv().setAttribute("dataSourceInfo", new Gson().toJson(provenance));
73
			token.getEnv().setAttribute("timestamp", ""+System.currentTimeMillis());
74
			token.getEnv().setAttribute("identifierPath",api.getParams().get("metadata_identifier_path"));
75
			token.getEnv().setAttribute("workflowId",token.getProcess().getEnv().getAttribute("system:processId"));
76
			return Arc.DEFAULT_ARC;
77
		} else {
78
			return "abort";
79
		}
80

  
81
	}
82

  
83
	public String getHdfsBasePath() {
84
		return hdfsBasePath;
85
	}
86

  
87
	public void setHdfsBasePath(String hdfsBasePath) {
88
		this.hdfsBasePath = hdfsBasePath;
89
	}
90
}
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hadoop/SubmitDnetHadoopJobNode.java
1 1
package eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop;
2 2

  
3
import java.util.Map;
4
import java.util.Optional;
5
import java.util.stream.Collectors;
6

  
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.springframework.beans.factory.annotation.Autowired;
10

  
11
import com.google.gson.Gson;
12 3
import com.googlecode.sarasvati.Engine;
13 4
import com.googlecode.sarasvati.NodeToken;
14 5
import com.googlecode.sarasvati.env.Env;
15

  
16
import eu.dnetlib.collector.worker.model.ApiDescriptor;
17
import eu.dnetlib.enabling.datasources.common.ApiParam;
18
import eu.dnetlib.enabling.datasources.common.LocalDatasourceManager;
19 6
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
20 7
import eu.dnetlib.message.Message;
21 8
import eu.dnetlib.msro.message.DnetMessageManager;
......
24 11
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
25 12
import eu.dnetlib.msro.workflows.util.ProgressProvider;
26 13
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.springframework.beans.factory.annotation.Autowired;
27 17

  
18
import java.util.List;
19
import java.util.Map;
20

  
28 21
public class SubmitDnetHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode {
22
    private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class);
29 23

  
30
	private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class);
24
    @Autowired
25
    DnetMessageManager  dnetMessageManager;
31 26

  
32
	@Autowired
33
	DnetMessageManager dnetMessageManager;
27
    private boolean ongoing = true;
34 28

  
35
	@Autowired
36
	private LocalDatasourceManager<?, ?> dsManager;
29
    private int currentValue;
37 30

  
38
	private boolean ongoing = true;
31
    private String wfId;
39 32

  
40
	private int currentValue;
41 33

  
42
	private String wfId;
43 34

  
44
	@Override
45
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
35
    @Override
36
    protected void prepareJob(BlackboardJob job, NodeToken token) throws Exception {
37
        this.wfId =token.getProcess().getEnv().getAttribute("system:processId");
46 38

  
47
		final String dsId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ID);
48
		final String apiId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE);
49
		final String mdId = token.getEnv().getAttribute("mdId");
50
		final String versionId = token.getEnv().getAttribute("versionId");
51 39

  
52
		final Optional<ApiDescriptor> opt = dsManager.getApis(dsId).stream().filter(a -> a.getId().equals(apiId)).map(a -> {
53
			final ApiDescriptor res = new ApiDescriptor();
54
			res.setBaseUrl(a.getBaseurl());
55
			res.setId(a.getId());
56
			res.setProtocol(a.getProtocol());
57
			res.getParams().putAll(a.getApiParams().stream().map(o -> {
58
				return (ApiParam) o;
59
			}).collect(Collectors.toMap(ApiParam::getParam, ApiParam::getValue)));
60
			return res;
61
		}).findFirst();
40
        Runnable r = () -> {
41
          while (ongoing) {
42
              Message mess = dnetMessageManager.getOnGoingMessages(wfId);
43
              if (mess!= null && mess.getBody()!= null && mess.getBody().containsKey("ongoing")) {
44
                  try {
45
                      this.currentValue = Integer.parseInt(mess.getBody().get("ongoing"));
46
                      Thread.sleep(1000);
47
                  } catch (Throwable e) {
48
                    log.error("Error ono receiving messages ", e);
49
                  }
50
              }
51
          }
52
        };
53
        new Thread(r).start();
54
        super.prepareJob(job, token);
55
    }
62 56

  
63
		if (!opt.isPresent()) { return; }
64 57

  
65
		final ApiDescriptor api = opt.get();
66
		final String hdfsPath = "hdfs://mdstores/" + mdId + "/" + versionId;
67
		final String nameNode = "SubmitDnetHadoop";
58
    @Override
59
    protected BlackboardWorkflowJobListener generateBlackboardListener(Engine engine, NodeToken token) {
60
        return new BlackboardWorkflowJobListener(engine, token) {
68 61

  
69
		log.info("Collection param 1 : hdfsPath = " + hdfsPath);
70
		log.info("Collection param 2 : api descriptor json = " + new Gson().toJson(api));
71
		log.info("Collection param 3 : nameNode = " + nameNode);
72 62

  
73
		wfId = token.getProcess().getEnv().getAttribute("system:processId");
74
		job.getParameters().put("wfId", wfId);
75
		final Runnable runnable = () -> {
76
			while (ongoing) {
77
				final Message mess = dnetMessageManager.getOnGoingMessages(wfId);
78
				if (mess != null && mess.getBody() != null && mess.getBody().containsKey("progressCount")) {
79
					try {
80
						currentValue = Integer.parseInt(mess.getBody().get("progressCount"));
81
						Thread.sleep(1000);
82
					} catch (final Throwable e) {
83
						e.printStackTrace();
84
					}
85
				}
86
			}
87
		};
63
            @Override
64
            protected void onFailed(final BlackboardJob job) {
65
                ongoing = false;
66
                log.warn("Blackboard workflow node FAILED: " + job.getError());
67
                token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
68
                token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, job.getError());
69
                complete(job, "abort");
70
            }
88 71

  
89
		job.getParameters().put("hdfsPath", hdfsPath);
90
		job.getParameters().put("nameNode", nameNode);
91
		job.getParameters().put("api", new Gson().toJson(api));
72
            @Override
73
            protected void populateEnv(Env env, Map<String, String> responseParams) {
74
                ongoing = false;
75
                List<Message> reports = dnetMessageManager.getReport(wfId);
76
                if (reports != null)
77
                    reports.forEach(it -> it.getBody().forEach(env::setAttribute));
78
            }
79
        };
80
    }
92 81

  
93
		new Thread(runnable).start();
94
		super.prepareJob(job, token);
95
	}
82
    @Override
83
    public int getTotalValue() {
84
        return 0;
85
    }
96 86

  
97
	@Override
98
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
99
		return new BlackboardWorkflowJobListener(engine, token) {
87
    @Override
88
    public int getCurrentValue() {
89
        return currentValue;
90
    }
100 91

  
101
			@Override
102
			protected void onFailed(final BlackboardJob job) {
103
				ongoing = false;
104
				log.warn("Blackboard workflow node FAILED: " + job.getError());
105
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
106
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, job.getError());
107
				complete(job, "abort");
108
			}
92
    @Override
93
    public boolean isInaccurate() {
94
        return false;
95
    }
109 96

  
110
			@Override
111
			protected void populateEnv(final Env env, final Map<String, String> responseParams) {
112
				ongoing = false;
113

  
114
				final Message report = dnetMessageManager.getReport(wfId, "Collection");
115
				if (report != null) {
116
					report.getBody().forEach(env::setAttribute);
117
				}
118
			}
119

  
120
		};
121
	}
122

  
123
	@Override
124
	public int getTotalValue() {
125
		return 0;
126
	}
127

  
128
	@Override
129
	public int getCurrentValue() {
130
		return currentValue;
131
	}
132

  
133
	@Override
134
	public boolean isInaccurate() {
135
		return false;
136
	}
137

  
138
	@Override
139
	public ProgressProvider getProgressProvider() {
140
		return this;
141
	}
97
    @Override
98
    public ProgressProvider getProgressProvider() {
99
        return this;
100
    }
142 101
}
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/pubsRepositoryHadoop/collection.wf.st
19 19
		<ARC to="PREPARE_STORE_VERSION"/>
20 20
	</ARCS>
21 21
</NODE>
22

  
23 22
<NODE name="PREPARE_STORE_VERSION" type="PrepareMDStoreVersion">
24 23
	<DESCRIPTION>Prepare a new MdStore Version</DESCRIPTION>
25 24
	<PARAMETERS>
26 25
		<PARAM required="true" type="string" name="mdId" managedBy="system">$params.("harv_id")$</PARAM>
27 26
	</PARAMETERS>
28 27
	<ARCS>
29
		<ARC to="COLLECT_HADOOP"/>
30
	</ARCS>
28
        <ARC to="PREPARE_ENV_COLLECTION"/>
29
    </ARCS>
31 30
</NODE>
32

  
33
<NODE name="COLLECT_HADOOP" type="SubmitDnetHadoop">
34
	<DESCRIPTION>Start the Hadoop Job</DESCRIPTION>
35
	<PARAMETERS/>
36
	<ARCS>
37
		<ARC to="COMMIT_VERSION"/>
38
		<ARC to="ABORT_VERSION" name="abort" />
39
	</ARCS>
31
<NODE name="PREPARE_ENV_COLLECTION" type="PrepareEnvCollectHadoopJobNode">
32
    <DESCRIPTION>Put in the environment all the variable needed to the collection oozie job </DESCRIPTION>
33
    <PARAMETERS>
34
        <PARAM managedBy="user" name="hdfsBasePath" required="true" type="string"></PARAM>
35
    </PARAMETERS>
36
    <ARCS>
37
        <ARC to="COLLECT_HADOOP"/>
38
    </ARCS>
40 39
</NODE>
40
<NODE name="COLLECT_HADOOP" type="SubmitDnetHadoopJobNode">
41
    <DESCRIPTION>Start the Hadoop Job</DESCRIPTION>
42
    <PARAMETERS>
43
        <PARAM managedBy="system" name="hadoopJob" required="true" type="string">dnetHadoopCollection</PARAM>
44
        <PARAM managedBy="system" name="cluster" required="true" type="string">DHP</PARAM>
45
        <PARAM managedBy="system" name="envParams" required="true" type="string">
46
            {
47
                "apiDescription":"apiDescription",
48
                "mdStorePath":"mdStorePath",
49
                "sequenceFilePath":"sequenceFilePath",
50
                "dataSourceInfo":"dataSourceInfo"  ,
51
                "timestamp":"timestamp",
52
                "identifierPath":"identifierPath",
53
                "workflowId":"workflowId"
54
            }
55
        </PARAM>
56
    </PARAMETERS>
57
    <ARCS>
58
        <ARC to="COMMIT_VERSION"/>
59
        <ARC name="abort" to="ABORT_VERSION"/>
60
    </ARCS>
61
</NODE>
41 62

  
42 63
<NODE name="COMMIT_VERSION" type="CommitMDStoreVersion">
43 64
	<DESCRIPTION>Commit the mdstore version</DESCRIPTION>
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/applicationContext-msro-openaire-nodes.xml
373 373
		class="eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop.PrepareMDStoreVersionJobNode"
374 374
		p:mdStoreManagerUrl="${dhp.mdstore.manager.url}" scope="prototype" />
375 375

  
376
	<bean id="wfNodePrepareEnvCollectHadoopJobNode"
377
		class="eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop.PrepareEnvCollectHadoopJobNode"
378
		scope="prototype" />
379

  
376 380
	<bean id="wfNodeCommitMDStoreVersion"
377 381
		class="eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop.CommitMDStoreVersionJobNode"
378 382
		p:mdStoreManagerUrl="${dhp.mdstore.manager.url}" scope="prototype" />

Also available in: Unified diff