Project

General

Profile

1 26600 sandro.lab
package eu.dnetlib.msro.workflows.nodes.objectStore;
2
3
import java.io.StringReader;
4
import java.util.Iterator;
5
import java.util.Map;
6
import javax.xml.ws.wsaddressing.W3CEndpointReference;
7
import javax.xml.xpath.XPath;
8
import javax.xml.xpath.XPathFactory;
9
10
import com.googlecode.sarasvati.Engine;
11
import com.googlecode.sarasvati.NodeToken;
12
import com.googlecode.sarasvati.env.Env;
13
import eu.dnetlib.data.objectstore.rmi.MetadataObjectRecord;
14 32877 michele.ar
import eu.dnetlib.data.objectstore.rmi.ObjectStoreService;
15 26600 sandro.lab
import eu.dnetlib.enabling.resultset.IterableResultSetFactory;
16
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
17
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
18
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
19
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
20
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
21
import eu.dnetlib.msro.workflows.resultset.ProcessCountingResultSetFactory;
22
import eu.dnetlib.msro.workflows.util.ProgressProvider;
23
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider;
24 27846 michele.ar
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
25 41310 sandro.lab
import org.apache.commons.logging.Log;
26
import org.apache.commons.logging.LogFactory;
27
import org.xml.sax.InputSource;
28 26600 sandro.lab
29
public class DownloadIntoObjectStoreJobNode extends BlackboardJobNode implements ProgressJobNode {
30
31
	private static final Log log = LogFactory.getLog(DownloadIntoObjectStoreJobNode.class);
32
	private String eprParam;
33
	private String objectStoreId;
34
	private String idXpath; // "//*[local-name()='objIdentifier']
35 41310 sandro.lab
	private String mimeType;
36 26600 sandro.lab
	private String objectIsInsideEpr;
37 41310 sandro.lab
	private String storageType;
38 26600 sandro.lab
	private IterableResultSetFactory iterableResultSetFactory;
39
	private ResultSetClientFactory resultSetClientFactory;
40
	private ResultsetProgressProvider progressProvider;
41
	private ProcessCountingResultSetFactory processCountingResultSetFactory;
42
43 32877 michele.ar
	@Override
44
	protected String obtainServiceId(final NodeToken token) {
45
		return getServiceLocator().getServiceId(ObjectStoreService.class);
46
	}
47
48 26600 sandro.lab
	public String getEprParam() {
49
		return eprParam;
50
	}
51
52
	public void setEprParam(final String eprParam) {
53
		this.eprParam = eprParam;
54
	}
55
56
	public String getObjectStoreId() {
57
		return objectStoreId;
58
	}
59
60
	public void setObjectStoreId(final String objectStoreId) {
61
		this.objectStoreId = objectStoreId;
62
	}
63
64
	public ProgressProvider getProgressProvider(final NodeToken token) {
65
66
		return progressProvider;
67
	}
68
69
	@Override
70
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
71
72
		job.setAction("FEEDOBJECT");
73
		final String eprS = token.getEnv().getAttribute(getEprParam());
74
		job.getParameters().put("obsID", getObjectStoreId());
75 41310 sandro.lab
		job.getParameters().put("mime", getMimeType());
76 26600 sandro.lab
		final Iterator<String> client = resultSetClientFactory.getClient(eprS).iterator();
77
78 50231 claudio.at
		final W3CEndpointReference epr = iterableResultSetFactory.createIterableResultSet(
79
				() -> new MetadataObjectIterator(client, "//*[local-name()='objIdentifier']", "xml"));
80 26600 sandro.lab
		this.progressProvider = processCountingResultSetFactory.createProgressProvider(token.getProcess(), epr);
81
		job.getParameters().put("epr", progressProvider.getEpr().toString());
82
83
	}
84
85
	@Override
86
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
87
		return new BlackboardWorkflowJobListener(engine, token) {
88
89
			@Override
90
			protected void populateEnv(final Env env, final Map<String, String> responseParams) {
91
				log.info("Number of stored records: " + responseParams.get("total"));
92 27846 michele.ar
				env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "total", responseParams.get("total"));
93 26600 sandro.lab
			}
94
		};
95
	}
96
97
	public String getObjectIsInsideEpr() {
98
		return objectIsInsideEpr;
99
	}
100
101
	public void setObjectIsInsideEpr(final String objectIsInsideEpr) {
102
		this.objectIsInsideEpr = objectIsInsideEpr;
103
	}
104
105
	@Override
106
	public ResultsetProgressProvider getProgressProvider() {
107
		return progressProvider;
108
	}
109
110
	public void setProgressProvider(final ResultsetProgressProvider progressProvider) {
111
		this.progressProvider = progressProvider;
112
	}
113
114
	public ProcessCountingResultSetFactory getProcessCountingResultSetFactory() {
115
		return processCountingResultSetFactory;
116
	}
117
118
	public void setProcessCountingResultSetFactory(final ProcessCountingResultSetFactory processCountingResultSetFactory) {
119
		this.processCountingResultSetFactory = processCountingResultSetFactory;
120
	}
121
122
	public ResultSetClientFactory getResultSetClientFactory() {
123
		return resultSetClientFactory;
124
	}
125
126
	public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) {
127
		this.resultSetClientFactory = resultSetClientFactory;
128
	}
129
130
	public IterableResultSetFactory getIterableResultSetFactory() {
131
		return iterableResultSetFactory;
132
	}
133
134
	public void setIterableResultSetFactory(final IterableResultSetFactory iterableResultSetFactory) {
135
		this.iterableResultSetFactory = iterableResultSetFactory;
136
	}
137
138
	public String getIdXpath() {
139
		return idXpath;
140
	}
141
142
	public void setIdXpath(final String idXpath) {
143
		this.idXpath = idXpath;
144
	}
145
146 41310 sandro.lab
	public String getMimeType() {
147
		return mimeType;
148 26600 sandro.lab
	}
149
150 41310 sandro.lab
	public void setMimeType(final String mimeType) {
151
		this.mimeType = mimeType;
152 26600 sandro.lab
	}
153
154 41310 sandro.lab
	public String getStorageType() {
155
		return storageType;
156
	}
157
158
	public void setStorageType(final String storageType) {
159
		this.storageType = storageType;
160
	}
161
162
	class MetadataObjectIterator implements Iterator<String> {
163
164
		private Iterator<String> inputIterator;
165
166
		private String mime;
167
168
		public MetadataObjectIterator(final Iterator<String> inputIterator, final String xpath, final String mime) {
169
			this.inputIterator = inputIterator;
170
		}
171
172
		@Override
173
		public boolean hasNext() {
174
			return inputIterator.hasNext();
175
		}
176
177
		@Override
178
		public String next() {
179
			try {
180
				String record = inputIterator.next();
181
				XPath xpath = XPathFactory.newInstance().newXPath();
182
				InputSource doc = new InputSource(new StringReader(record));
183
				String identifier = xpath.evaluate(getIdXpath(), doc);
184
				MetadataObjectRecord objectrecord = new MetadataObjectRecord(identifier, record, mime);
185
				return objectrecord.toJSON();
186
			} catch (Exception e) {
187
				return null;
188
			}
189
		}
190
191
		@Override
192
		public void remove() {
193
194
		}
195
196
	}
197
198 26600 sandro.lab
}