Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.objectStore;
2

    
3
import java.io.StringReader;
4
import java.util.Iterator;
5
import java.util.Map;
6
import javax.xml.ws.wsaddressing.W3CEndpointReference;
7
import javax.xml.xpath.XPath;
8
import javax.xml.xpath.XPathFactory;
9

    
10
import eu.dnetlib.data.objectstore.rmi.MetadataObjectRecord;
11
import eu.dnetlib.data.objectstore.rmi.ObjectStoreService;
12
import eu.dnetlib.enabling.resultset.IterableResultSetFactory;
13
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
14
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
15
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
16
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
17
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
18
import eu.dnetlib.msro.workflows.procs.Env;
19
import eu.dnetlib.msro.workflows.procs.ProcessAware;
20
import eu.dnetlib.msro.workflows.procs.Token;
21
import eu.dnetlib.msro.workflows.procs.WorkflowProcess;
22
import eu.dnetlib.msro.workflows.resultset.ProcessCountingResultSetFactory;
23
import eu.dnetlib.msro.workflows.util.ProgressProvider;
24
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider;
25
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
26
import org.apache.commons.logging.Log;
27
import org.apache.commons.logging.LogFactory;
28
import org.xml.sax.InputSource;
29

    
30
public class DownloadIntoObjectStoreJobNode extends BlackboardJobNode implements ProgressJobNode, ProcessAware {
31

    
32
	private static final Log log = LogFactory.getLog(DownloadIntoObjectStoreJobNode.class);
33
	private String eprParam;
34
	private String objectStoreId;
35
	private String idXpath; // "//*[local-name()='objIdentifier']
36
	private String contentDescription;
37
	private String objectIsInsideEpr;
38
	private IterableResultSetFactory iterableResultSetFactory;
39
	private ResultSetClientFactory resultSetClientFactory;
40
	private ResultsetProgressProvider progressProvider;
41
	private ProcessCountingResultSetFactory processCountingResultSetFactory;
42
	private WorkflowProcess process;
43

    
44
	@Override
45
	protected String obtainServiceId(final Env env) {
46
		return getServiceLocator().getServiceId(ObjectStoreService.class);
47
	}
48

    
49
	public String getEprParam() {
50
		return eprParam;
51
	}
52

    
53
	public void setEprParam(final String eprParam) {
54
		this.eprParam = eprParam;
55
	}
56

    
57
	public String getObjectStoreId() {
58
		return objectStoreId;
59
	}
60

    
61
	public void setObjectStoreId(final String objectStoreId) {
62
		this.objectStoreId = objectStoreId;
63
	}
64

    
65
	public ProgressProvider getProgressProvider(final Env env) {
66

    
67
		return progressProvider;
68
	}
69

    
70
	@Override
71
	protected void prepareJob(final BlackboardJob job, final Env env) throws Exception {
72

    
73
		job.setAction("FEEDOBJECT");
74
		final String eprS = env.getAttribute(getEprParam(), String.class);
75
		job.getParameters().put("obsID", getObjectStoreId());
76
		job.getParameters().put("mime", getContentDescription());
77
		final Iterator<String> client = resultSetClientFactory.getClient(eprS).iterator();
78

    
79
		final W3CEndpointReference epr = iterableResultSetFactory.createIterableResultSet(new Iterable<String>() {
80

    
81
			@Override
82
			public Iterator<String> iterator() {
83
				return new MetadataObjectIterator(client, "//*[local-name()='objIdentifier']", "xml");
84
			}
85
		});
86
		this.progressProvider = processCountingResultSetFactory.createProgressProvider(process, epr);
87
		job.getParameters().put("epr", progressProvider.getEpr().toString());
88

    
89
	}
90

    
91
	@Override
92
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Token token) {
93
		return new BlackboardWorkflowJobListener(token) {
94

    
95
			@Override
96
			protected void responseToEnv(final Env env, final Map<String, String> responseParams) {
97
				log.info("Number of stored records: " + responseParams.get("total"));
98
				env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "total", responseParams.get("total"));
99
			}
100
		};
101
	}
102

    
103
	public String getObjectIsInsideEpr() {
104
		return objectIsInsideEpr;
105
	}
106

    
107
	public void setObjectIsInsideEpr(final String objectIsInsideEpr) {
108
		this.objectIsInsideEpr = objectIsInsideEpr;
109
	}
110

    
111
	@Override
112
	public ResultsetProgressProvider getProgressProvider() {
113
		return progressProvider;
114
	}
115

    
116
	public void setProgressProvider(final ResultsetProgressProvider progressProvider) {
117
		this.progressProvider = progressProvider;
118
	}
119

    
120
	public ProcessCountingResultSetFactory getProcessCountingResultSetFactory() {
121
		return processCountingResultSetFactory;
122
	}
123

    
124
	public void setProcessCountingResultSetFactory(final ProcessCountingResultSetFactory processCountingResultSetFactory) {
125
		this.processCountingResultSetFactory = processCountingResultSetFactory;
126
	}
127

    
128
	public ResultSetClientFactory getResultSetClientFactory() {
129
		return resultSetClientFactory;
130
	}
131

    
132
	public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) {
133
		this.resultSetClientFactory = resultSetClientFactory;
134
	}
135

    
136
	public IterableResultSetFactory getIterableResultSetFactory() {
137
		return iterableResultSetFactory;
138
	}
139

    
140
	public void setIterableResultSetFactory(final IterableResultSetFactory iterableResultSetFactory) {
141
		this.iterableResultSetFactory = iterableResultSetFactory;
142
	}
143

    
144
	public String getIdXpath() {
145
		return idXpath;
146
	}
147

    
148
	public void setIdXpath(final String idXpath) {
149
		this.idXpath = idXpath;
150
	}
151

    
152
	public String getContentDescription() {
153
		return contentDescription;
154
	}
155

    
156
	public void setContentDescription(final String contentDescription) {
157
		this.contentDescription = contentDescription;
158
	}
159

    
160
	@Override
161
	public void setProcess(final WorkflowProcess process) {
162
		this.process = process;
163
	}
164

    
165
	class MetadataObjectIterator implements Iterator<String> {
166

    
167
		private Iterator<String> inputIterator;
168

    
169
		private String mime;
170

    
171
		public MetadataObjectIterator(final Iterator<String> inputIterator, final String xpath, final String mime) {
172
			this.inputIterator = inputIterator;
173
		}
174

    
175
		@Override
176
		public boolean hasNext() {
177
			return inputIterator.hasNext();
178
		}
179

    
180
		@Override
181
		public String next() {
182
			try {
183
				String record = inputIterator.next();
184
				XPath xpath = XPathFactory.newInstance().newXPath();
185
				InputSource doc = new InputSource(new StringReader(record));
186
				String identifier = xpath.evaluate(getIdXpath(), doc);
187
				MetadataObjectRecord objectrecord = new MetadataObjectRecord(identifier, record, mime);
188
				return objectrecord.toJSON();
189
			} catch (Exception e) {
190
				return null;
191
			}
192
		}
193

    
194
		@Override
195
		public void remove() {
196

    
197
		}
198

    
199
	}
200

    
201
}
(1-1/4)