Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.objectStore;
2

    
3
import java.io.StringReader;
4
import java.util.Iterator;
5
import java.util.Map;
6
import javax.xml.ws.wsaddressing.W3CEndpointReference;
7
import javax.xml.xpath.XPath;
8
import javax.xml.xpath.XPathFactory;
9

    
10
import com.googlecode.sarasvati.Engine;
11
import com.googlecode.sarasvati.NodeToken;
12
import com.googlecode.sarasvati.env.Env;
13
import eu.dnetlib.data.objectstore.rmi.MetadataObjectRecord;
14
import eu.dnetlib.data.objectstore.rmi.ObjectStoreService;
15
import eu.dnetlib.enabling.resultset.IterableResultSetFactory;
16
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
17
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
18
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
19
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
20
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
21
import eu.dnetlib.msro.workflows.resultset.ProcessCountingResultSetFactory;
22
import eu.dnetlib.msro.workflows.util.ProgressProvider;
23
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider;
24
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
25
import org.apache.commons.logging.Log;
26
import org.apache.commons.logging.LogFactory;
27
import org.xml.sax.InputSource;
28

    
29
public class DownloadIntoObjectStoreJobNode extends BlackboardJobNode implements ProgressJobNode {
30

    
31
	private static final Log log = LogFactory.getLog(DownloadIntoObjectStoreJobNode.class);
32
	private String eprParam;
33
	private String objectStoreId;
34
	private String idXpath; // "//*[local-name()='objIdentifier']
35
	private String mimeType;
36
	private String objectIsInsideEpr;
37
	private String storageType;
38
	private IterableResultSetFactory iterableResultSetFactory;
39
	private ResultSetClientFactory resultSetClientFactory;
40
	private ResultsetProgressProvider progressProvider;
41
	private ProcessCountingResultSetFactory processCountingResultSetFactory;
42

    
43
	@Override
44
	protected String obtainServiceId(final NodeToken token) {
45
		return getServiceLocator().getServiceId(ObjectStoreService.class);
46
	}
47

    
48
	public String getEprParam() {
49
		return eprParam;
50
	}
51

    
52
	public void setEprParam(final String eprParam) {
53
		this.eprParam = eprParam;
54
	}
55

    
56
	public String getObjectStoreId() {
57
		return objectStoreId;
58
	}
59

    
60
	public void setObjectStoreId(final String objectStoreId) {
61
		this.objectStoreId = objectStoreId;
62
	}
63

    
64
	public ProgressProvider getProgressProvider(final NodeToken token) {
65

    
66
		return progressProvider;
67
	}
68

    
69
	@Override
70
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
71

    
72
		job.setAction("FEEDOBJECT");
73
		final String eprS = token.getEnv().getAttribute(getEprParam());
74
		job.getParameters().put("obsID", getObjectStoreId());
75
		job.getParameters().put("mime", getMimeType());
76
		final Iterator<String> client = resultSetClientFactory.getClient(eprS).iterator();
77

    
78
		final W3CEndpointReference epr = iterableResultSetFactory.createIterableResultSet(
79
				() -> new MetadataObjectIterator(client, "//*[local-name()='objIdentifier']", "xml"));
80
		this.progressProvider = processCountingResultSetFactory.createProgressProvider(token.getProcess(), epr);
81
		job.getParameters().put("epr", progressProvider.getEpr().toString());
82

    
83
	}
84

    
85
	@Override
86
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
87
		return new BlackboardWorkflowJobListener(engine, token) {
88

    
89
			@Override
90
			protected void populateEnv(final Env env, final Map<String, String> responseParams) {
91
				log.info("Number of stored records: " + responseParams.get("total"));
92
				env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "total", responseParams.get("total"));
93
			}
94
		};
95
	}
96

    
97
	public String getObjectIsInsideEpr() {
98
		return objectIsInsideEpr;
99
	}
100

    
101
	public void setObjectIsInsideEpr(final String objectIsInsideEpr) {
102
		this.objectIsInsideEpr = objectIsInsideEpr;
103
	}
104

    
105
	@Override
106
	public ResultsetProgressProvider getProgressProvider() {
107
		return progressProvider;
108
	}
109

    
110
	public void setProgressProvider(final ResultsetProgressProvider progressProvider) {
111
		this.progressProvider = progressProvider;
112
	}
113

    
114
	public ProcessCountingResultSetFactory getProcessCountingResultSetFactory() {
115
		return processCountingResultSetFactory;
116
	}
117

    
118
	public void setProcessCountingResultSetFactory(final ProcessCountingResultSetFactory processCountingResultSetFactory) {
119
		this.processCountingResultSetFactory = processCountingResultSetFactory;
120
	}
121

    
122
	public ResultSetClientFactory getResultSetClientFactory() {
123
		return resultSetClientFactory;
124
	}
125

    
126
	public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) {
127
		this.resultSetClientFactory = resultSetClientFactory;
128
	}
129

    
130
	public IterableResultSetFactory getIterableResultSetFactory() {
131
		return iterableResultSetFactory;
132
	}
133

    
134
	public void setIterableResultSetFactory(final IterableResultSetFactory iterableResultSetFactory) {
135
		this.iterableResultSetFactory = iterableResultSetFactory;
136
	}
137

    
138
	public String getIdXpath() {
139
		return idXpath;
140
	}
141

    
142
	public void setIdXpath(final String idXpath) {
143
		this.idXpath = idXpath;
144
	}
145

    
146
	public String getMimeType() {
147
		return mimeType;
148
	}
149

    
150
	public void setMimeType(final String mimeType) {
151
		this.mimeType = mimeType;
152
	}
153

    
154
	public String getStorageType() {
155
		return storageType;
156
	}
157

    
158
	public void setStorageType(final String storageType) {
159
		this.storageType = storageType;
160
	}
161

    
162
	class MetadataObjectIterator implements Iterator<String> {
163

    
164
		private Iterator<String> inputIterator;
165

    
166
		private String mime;
167

    
168
		public MetadataObjectIterator(final Iterator<String> inputIterator, final String xpath, final String mime) {
169
			this.inputIterator = inputIterator;
170
		}
171

    
172
		@Override
173
		public boolean hasNext() {
174
			return inputIterator.hasNext();
175
		}
176

    
177
		@Override
178
		public String next() {
179
			try {
180
				String record = inputIterator.next();
181
				XPath xpath = XPathFactory.newInstance().newXPath();
182
				InputSource doc = new InputSource(new StringReader(record));
183
				String identifier = xpath.evaluate(getIdXpath(), doc);
184
				MetadataObjectRecord objectrecord = new MetadataObjectRecord(identifier, record, mime);
185
				return objectrecord.toJSON();
186
			} catch (Exception e) {
187
				return null;
188
			}
189
		}
190

    
191
		@Override
192
		public void remove() {
193

    
194
		}
195

    
196
	}
197

    
198
}
(1-1/5)