Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.objectStore;
2

    
3
import java.io.StringReader;
4
import java.util.Iterator;
5
import java.util.Map;
6
import javax.xml.ws.wsaddressing.W3CEndpointReference;
7
import javax.xml.xpath.XPath;
8
import javax.xml.xpath.XPathFactory;
9

    
10
import com.googlecode.sarasvati.Engine;
11
import com.googlecode.sarasvati.NodeToken;
12
import com.googlecode.sarasvati.env.Env;
13
import eu.dnetlib.data.objectstore.rmi.MetadataObjectRecord;
14
import eu.dnetlib.data.objectstore.rmi.ObjectStoreService;
15
import eu.dnetlib.enabling.resultset.IterableResultSetFactory;
16
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
17
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
18
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
19
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
20
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
21
import eu.dnetlib.msro.workflows.resultset.ProcessCountingResultSetFactory;
22
import eu.dnetlib.msro.workflows.util.ProgressProvider;
23
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider;
24
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
25
import org.apache.commons.logging.Log;
26
import org.apache.commons.logging.LogFactory;
27
import org.xml.sax.InputSource;
28

    
29
public class DownloadIntoObjectStoreJobNode extends BlackboardJobNode implements ProgressJobNode {
30

    
31
	private static final Log log = LogFactory.getLog(DownloadIntoObjectStoreJobNode.class);
32
	private String eprParam;
33
	private String objectStoreId;
34
	private String idXpath; // "//*[local-name()='objIdentifier']
35
	private String mimeType;
36
	private String objectIsInsideEpr;
37
	private String storageType;
38
	private IterableResultSetFactory iterableResultSetFactory;
39
	private ResultSetClientFactory resultSetClientFactory;
40
	private ResultsetProgressProvider progressProvider;
41
	private ProcessCountingResultSetFactory processCountingResultSetFactory;
42

    
43
	@Override
44
	protected String obtainServiceId(final NodeToken token) {
45
		return getServiceLocator().getServiceId(ObjectStoreService.class);
46
	}
47

    
48
	public String getEprParam() {
49
		return eprParam;
50
	}
51

    
52
	public void setEprParam(final String eprParam) {
53
		this.eprParam = eprParam;
54
	}
55

    
56
	public String getObjectStoreId() {
57
		return objectStoreId;
58
	}
59

    
60
	public void setObjectStoreId(final String objectStoreId) {
61
		this.objectStoreId = objectStoreId;
62
	}
63

    
64
	public ProgressProvider getProgressProvider(final NodeToken token) {
65

    
66
		return progressProvider;
67
	}
68

    
69
	@Override
70
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
71

    
72
		job.setAction("FEEDOBJECT");
73
		final String eprS = token.getEnv().getAttribute(getEprParam());
74
		job.getParameters().put("obsID", getObjectStoreId());
75
		job.getParameters().put("mime", getMimeType());
76
		final Iterator<String> client = resultSetClientFactory.getClient(eprS).iterator();
77

    
78
		final W3CEndpointReference epr = iterableResultSetFactory.createIterableResultSet(new Iterable<String>() {
79

    
80
			@Override
81
			public Iterator<String> iterator() {
82
				return new MetadataObjectIterator(client, "//*[local-name()='objIdentifier']", "xml");
83
			}
84
		});
85
		this.progressProvider = processCountingResultSetFactory.createProgressProvider(token.getProcess(), epr);
86
		job.getParameters().put("epr", progressProvider.getEpr().toString());
87

    
88
	}
89

    
90
	@Override
91
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
92
		return new BlackboardWorkflowJobListener(engine, token) {
93

    
94
			@Override
95
			protected void populateEnv(final Env env, final Map<String, String> responseParams) {
96
				log.info("Number of stored records: " + responseParams.get("total"));
97
				env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "total", responseParams.get("total"));
98
			}
99
		};
100
	}
101

    
102
	public String getObjectIsInsideEpr() {
103
		return objectIsInsideEpr;
104
	}
105

    
106
	public void setObjectIsInsideEpr(final String objectIsInsideEpr) {
107
		this.objectIsInsideEpr = objectIsInsideEpr;
108
	}
109

    
110
	@Override
111
	public ResultsetProgressProvider getProgressProvider() {
112
		return progressProvider;
113
	}
114

    
115
	public void setProgressProvider(final ResultsetProgressProvider progressProvider) {
116
		this.progressProvider = progressProvider;
117
	}
118

    
119
	public ProcessCountingResultSetFactory getProcessCountingResultSetFactory() {
120
		return processCountingResultSetFactory;
121
	}
122

    
123
	public void setProcessCountingResultSetFactory(final ProcessCountingResultSetFactory processCountingResultSetFactory) {
124
		this.processCountingResultSetFactory = processCountingResultSetFactory;
125
	}
126

    
127
	public ResultSetClientFactory getResultSetClientFactory() {
128
		return resultSetClientFactory;
129
	}
130

    
131
	public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) {
132
		this.resultSetClientFactory = resultSetClientFactory;
133
	}
134

    
135
	public IterableResultSetFactory getIterableResultSetFactory() {
136
		return iterableResultSetFactory;
137
	}
138

    
139
	public void setIterableResultSetFactory(final IterableResultSetFactory iterableResultSetFactory) {
140
		this.iterableResultSetFactory = iterableResultSetFactory;
141
	}
142

    
143
	public String getIdXpath() {
144
		return idXpath;
145
	}
146

    
147
	public void setIdXpath(final String idXpath) {
148
		this.idXpath = idXpath;
149
	}
150

    
151
	public String getMimeType() {
152
		return mimeType;
153
	}
154

    
155
	public void setMimeType(final String mimeType) {
156
		this.mimeType = mimeType;
157
	}
158

    
159
	public String getStorageType() {
160
		return storageType;
161
	}
162

    
163
	public void setStorageType(final String storageType) {
164
		this.storageType = storageType;
165
	}
166

    
167
	class MetadataObjectIterator implements Iterator<String> {
168

    
169
		private Iterator<String> inputIterator;
170

    
171
		private String mime;
172

    
173
		public MetadataObjectIterator(final Iterator<String> inputIterator, final String xpath, final String mime) {
174
			this.inputIterator = inputIterator;
175
		}
176

    
177
		@Override
178
		public boolean hasNext() {
179
			return inputIterator.hasNext();
180
		}
181

    
182
		@Override
183
		public String next() {
184
			try {
185
				String record = inputIterator.next();
186
				XPath xpath = XPathFactory.newInstance().newXPath();
187
				InputSource doc = new InputSource(new StringReader(record));
188
				String identifier = xpath.evaluate(getIdXpath(), doc);
189
				MetadataObjectRecord objectrecord = new MetadataObjectRecord(identifier, record, mime);
190
				return objectrecord.toJSON();
191
			} catch (Exception e) {
192
				return null;
193
			}
194
		}
195

    
196
		@Override
197
		public void remove() {
198

    
199
		}
200

    
201
	}
202

    
203
}
(1-1/5)