Project

General

Profile

1
package eu.dnetlib.parthenos.workflows.nodes;
2

    
3
import java.util.List;
4
import java.util.Map;
5

    
6
import com.google.common.collect.Lists;
7
import com.google.common.collect.Maps;
8
import com.google.gson.Gson;
9
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
10
import eu.dnetlib.msro.workflows.graph.Arc;
11
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
12
import eu.dnetlib.msro.workflows.procs.Env;
13
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
14
import eu.dnetlib.rmi.common.ResultSet;
15
import eu.dnetlib.rmi.manager.MSROException;
16
import org.apache.commons.logging.Log;
17
import org.apache.commons.logging.LogFactory;
18
import org.apache.http.HttpResponse;
19
import org.apache.http.NameValuePair;
20
import org.apache.http.client.HttpClient;
21
import org.apache.http.client.entity.UrlEncodedFormEntity;
22
import org.apache.http.client.methods.HttpPost;
23
import org.apache.http.impl.client.HttpClients;
24
import org.apache.http.message.BasicNameValuePair;
25
import org.springframework.beans.factory.annotation.Autowired;
26

    
27
public class PublishVirtuosoJobNode extends SimpleJobNode {
28

    
29
	private static final Log log = LogFactory.getLog(PublishVirtuosoJobNode.class);
30

    
31
	private String inputEprParam;
32

    
33
	@Autowired
34
	private ResultSetClient resultSetClient;
35

    
36
	private String publisherEndpoint;
37

    
38

    
39
	@Override
40
	protected String execute(final Env env) throws Exception {
41

    
42
		final ResultSet<?> rsIn = env.getAttribute(getInputEprParam(), ResultSet.class);
43
		if ((rsIn == null)) { throw new MSROException("InputEprParam (" + getInputEprParam()+ ") not found in ENV"); }
44

    
45
		int countAll = 0;
46
		int countOk = 0;
47
		Map<Integer,Integer> errors = Maps.newHashMap();
48
		log.info("Publisher endpoint: "+getPublisherEndpoint());
49

    
50
		//let's start sequentially
51
		for(String record : getResultSetClient().iter(rsIn, String.class)){
52

    
53
			HttpPost post = new HttpPost(getPublisherEndpoint());
54
			List<NameValuePair> params = Lists.newArrayList();
55
			params.add(new BasicNameValuePair("record", record));
56
			params.add(new BasicNameValuePair("parthenosTarget", "VIRTUOSO"));
57
			UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8");
58
			post.setEntity(ent);
59
			HttpClient client = HttpClients.createDefault();
60
			HttpResponse responsePOST = client.execute(post);
61
			countAll++;
62
			int statusCode = responsePOST.getStatusLine().getStatusCode();
63
			switch(statusCode){
64
			case 200: countOk++;
65
			default:
66
				errors.merge(statusCode, 1, Integer::sum);
67
			}
68

    
69
		}
70
		env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX+"countOk", countOk);
71
		env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX+"countAll", countAll);
72
		env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX+"errorsMap", new Gson().toJson(errors));
73

    
74
		log.debug("countOK: "+countOk);
75

    
76
		return Arc.DEFAULT_ARC;
77

    
78
	}
79

    
80
	public String getInputEprParam() {
81
		return this.inputEprParam;
82
	}
83

    
84
	public void setInputEprParam(final String inputEprParam) {
85
		this.inputEprParam = inputEprParam;
86
	}
87

    
88
	public String getPublisherEndpoint() {
89
		return publisherEndpoint;
90
	}
91

    
92
	public void setPublisherEndpoint(final String publisherEndpoint) {
93
		this.publisherEndpoint = publisherEndpoint;
94
	}
95

    
96
	public ResultSetClient getResultSetClient() {
97
		return resultSetClient;
98
	}
99

    
100
	public void setResultSetClient(final ResultSetClient resultSetClient) {
101
		this.resultSetClient = resultSetClient;
102
	}
103

    
104
}
(1-1/3)