Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.objectStore;
2

    
3
import java.io.StringReader;
4
import java.util.Iterator;
5
import java.util.Map;
6

    
7
import javax.xml.ws.wsaddressing.W3CEndpointReference;
8
import javax.xml.xpath.XPath;
9
import javax.xml.xpath.XPathFactory;
10

    
11
import org.apache.commons.logging.Log;
12
import org.apache.commons.logging.LogFactory;
13
import org.xml.sax.InputSource;
14

    
15
import com.googlecode.sarasvati.Engine;
16
import com.googlecode.sarasvati.NodeToken;
17
import com.googlecode.sarasvati.env.Env;
18

    
19
import eu.dnetlib.data.objectstore.rmi.MetadataObjectRecord;
20
import eu.dnetlib.data.objectstore.rmi.ObjectStoreService;
21
import eu.dnetlib.enabling.resultset.IterableResultSetFactory;
22
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
23
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
24
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
25
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
26
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
27
import eu.dnetlib.msro.workflows.resultset.ProcessCountingResultSetFactory;
28
import eu.dnetlib.msro.workflows.util.ProgressProvider;
29
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider;
30
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
31

    
32
public class DownloadIntoObjectStoreJobNode extends BlackboardJobNode implements ProgressJobNode {
33

    
34
	private static final Log log = LogFactory.getLog(DownloadIntoObjectStoreJobNode.class);
35

    
36
	class MetadataObjectIterator implements Iterator<String> {
37

    
38
		private Iterator<String> inputIterator;
39

    
40
		private String mime;
41

    
42
		public MetadataObjectIterator(final Iterator<String> inputIterator, final String xpath, final String mime) {
43
			this.inputIterator = inputIterator;
44
		}
45

    
46
		@Override
47
		public boolean hasNext() {
48
			return inputIterator.hasNext();
49
		}
50

    
51
		@Override
52
		public String next() {
53
			try {
54
				String record = inputIterator.next();
55
				XPath xpath = XPathFactory.newInstance().newXPath();
56
				InputSource doc = new InputSource(new StringReader(record));
57
				String identifier = xpath.evaluate(getIdXpath(), doc);
58
				MetadataObjectRecord objectrecord = new MetadataObjectRecord(identifier, record, mime);
59
				return objectrecord.toJSON();
60
			} catch (Exception e) {
61
				return null;
62
			}
63
		}
64

    
65
		@Override
66
		public void remove() {
67

    
68
		}
69

    
70
	}
71

    
72
	private String eprParam;
73

    
74
	private String objectStoreId;
75

    
76
	private String idXpath; // "//*[local-name()='objIdentifier']
77

    
78
	private String contentDescription;
79

    
80
	private String objectIsInsideEpr;
81

    
82
	private IterableResultSetFactory iterableResultSetFactory;
83
	private ResultSetClientFactory resultSetClientFactory;
84

    
85
	private ResultsetProgressProvider progressProvider;
86
	private ProcessCountingResultSetFactory processCountingResultSetFactory;
87

    
88
	@Override
89
	protected String obtainServiceId(final NodeToken token) {
90
		return getServiceLocator().getServiceId(ObjectStoreService.class);
91
	}
92

    
93
	public String getEprParam() {
94
		return eprParam;
95
	}
96

    
97
	public void setEprParam(final String eprParam) {
98
		this.eprParam = eprParam;
99
	}
100

    
101
	public String getObjectStoreId() {
102
		return objectStoreId;
103
	}
104

    
105
	public void setObjectStoreId(final String objectStoreId) {
106
		this.objectStoreId = objectStoreId;
107
	}
108

    
109
	public ProgressProvider getProgressProvider(final NodeToken token) {
110

    
111
		return progressProvider;
112
	}
113

    
114
	@Override
115
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
116

    
117
		job.setAction("FEEDOBJECT");
118
		final String eprS = token.getEnv().getAttribute(getEprParam());
119
		job.getParameters().put("obsID", getObjectStoreId());
120
		job.getParameters().put("mime", getContentDescription());
121
		final Iterator<String> client = resultSetClientFactory.getClient(eprS).iterator();
122

    
123
		final W3CEndpointReference epr = iterableResultSetFactory.createIterableResultSet(new Iterable<String>() {
124

    
125
			@Override
126
			public Iterator<String> iterator() {
127
				return new MetadataObjectIterator(client, "//*[local-name()='objIdentifier']", "xml");
128
			}
129
		});
130
		this.progressProvider = processCountingResultSetFactory.createProgressProvider(token.getProcess(), epr);
131
		job.getParameters().put("epr", progressProvider.getEpr().toString());
132

    
133
	}
134

    
135
	@Override
136
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
137
		return new BlackboardWorkflowJobListener(engine, token) {
138

    
139
			@Override
140
			protected void populateEnv(final Env env, final Map<String, String> responseParams) {
141
				log.info("Number of stored records: " + responseParams.get("total"));
142
				env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "total", responseParams.get("total"));
143
			}
144
		};
145
	}
146

    
147
	public String getObjectIsInsideEpr() {
148
		return objectIsInsideEpr;
149
	}
150

    
151
	public void setObjectIsInsideEpr(final String objectIsInsideEpr) {
152
		this.objectIsInsideEpr = objectIsInsideEpr;
153
	}
154

    
155
	@Override
156
	public ResultsetProgressProvider getProgressProvider() {
157
		return progressProvider;
158
	}
159

    
160
	public void setProgressProvider(final ResultsetProgressProvider progressProvider) {
161
		this.progressProvider = progressProvider;
162
	}
163

    
164
	public ProcessCountingResultSetFactory getProcessCountingResultSetFactory() {
165
		return processCountingResultSetFactory;
166
	}
167

    
168
	public void setProcessCountingResultSetFactory(final ProcessCountingResultSetFactory processCountingResultSetFactory) {
169
		this.processCountingResultSetFactory = processCountingResultSetFactory;
170
	}
171

    
172
	public ResultSetClientFactory getResultSetClientFactory() {
173
		return resultSetClientFactory;
174
	}
175

    
176
	public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) {
177
		this.resultSetClientFactory = resultSetClientFactory;
178
	}
179

    
180
	public IterableResultSetFactory getIterableResultSetFactory() {
181
		return iterableResultSetFactory;
182
	}
183

    
184
	public void setIterableResultSetFactory(final IterableResultSetFactory iterableResultSetFactory) {
185
		this.iterableResultSetFactory = iterableResultSetFactory;
186
	}
187

    
188
	public String getIdXpath() {
189
		return idXpath;
190
	}
191

    
192
	public void setIdXpath(final String idXpath) {
193
		this.idXpath = idXpath;
194
	}
195

    
196
	public String getContentDescription() {
197
		return contentDescription;
198
	}
199

    
200
	public void setContentDescription(final String contentDescription) {
201
		this.contentDescription = contentDescription;
202
	}
203

    
204
}
(1-1/4)