Project

General

Profile

1
package eu.dnetlib.parthenos.workflows.nodes;
2

    
3
import java.util.List;
4
import java.util.Map;
5

    
6
import com.google.common.collect.Lists;
7
import com.google.common.collect.Maps;
8
import com.google.gson.Gson;
9
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
10
import eu.dnetlib.msro.workflows.graph.Arc;
11
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
12
import eu.dnetlib.msro.workflows.procs.Env;
13
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
14
import eu.dnetlib.rmi.common.ResultSet;
15
import eu.dnetlib.rmi.manager.MSROException;
16
import org.apache.commons.logging.Log;
17
import org.apache.commons.logging.LogFactory;
18
import org.apache.http.HttpResponse;
19
import org.apache.http.NameValuePair;
20
import org.apache.http.client.HttpClient;
21
import org.apache.http.client.entity.UrlEncodedFormEntity;
22
import org.apache.http.client.methods.HttpPost;
23
import org.apache.http.impl.client.HttpClients;
24
import org.apache.http.message.BasicNameValuePair;
25
import org.springframework.beans.factory.annotation.Autowired;
26

    
27
/**
28
 * Created by Alessia Bardi on 09/10/2017.
29
 *
30
 * @author Alessia Bardi
31
 */
32
public abstract class PublishAbstractJobNode extends SimpleJobNode{
33

    
34
	private static final Log log = LogFactory.getLog(PublishAbstractJobNode.class);
35

    
36
	private String inputEprParam;
37

    
38
	@Autowired
39
	private ResultSetClient resultSetClient;
40

    
41
	private String publisherEndpoint;
42

    
43
	@Override
44
	protected String execute(final Env env) throws Exception {
45

    
46
		final ResultSet<?> rsIn = env.getAttribute(getInputEprParam(), ResultSet.class);
47
		if ((rsIn == null)) { throw new MSROException("InputEprParam (" + getInputEprParam() + ") not found in ENV"); }
48

    
49
		int countAll = 0;
50
		int countOk = 0;
51
		Map<Integer, Integer> errors = Maps.newHashMap();
52
		log.info("Publisher endpoint: " + getPublisherEndpoint());
53

    
54
		//let's start sequentially
55
		for (String record : getResultSetClient().iter(rsIn, String.class)) {
56

    
57
			HttpPost post = new HttpPost(getPublisherEndpoint());
58
			List<NameValuePair> params = Lists.newArrayList();
59
			params.add(new BasicNameValuePair("record", record));
60
			params.add(new BasicNameValuePair("parthenosTarget", getTarget()));
61
			UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8");
62
			post.setEntity(ent);
63
			HttpClient client = HttpClients.createDefault();
64
			HttpResponse responsePOST = client.execute(post);
65
			countAll++;
66
			int statusCode = responsePOST.getStatusLine().getStatusCode();
67
			switch (statusCode) {
68
			case 200:
69
				countOk++;
70
				break;
71
			default:
72
				log.error(responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase());
73
				errors.merge(statusCode, 1, Integer::sum);
74
			}
75

    
76
		}
77
		env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countOk", countOk);
78
		env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countAll", countAll);
79
		env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "errorsMap", new Gson().toJson(errors));
80

    
81
		if (!errors.isEmpty()) {
82
			throw new MSROException("Problems in publishing on "+getTarget());
83
		} else return Arc.DEFAULT_ARC;
84

    
85
	}
86

    
87
	public abstract String getTarget();
88

    
89
	public String getInputEprParam() {
90
		return this.inputEprParam;
91
	}
92

    
93
	public void setInputEprParam(final String inputEprParam) {
94
		this.inputEprParam = inputEprParam;
95
	}
96

    
97
	public String getPublisherEndpoint() {
98
		return publisherEndpoint;
99
	}
100

    
101
	public void setPublisherEndpoint(final String publisherEndpoint) {
102
		this.publisherEndpoint = publisherEndpoint;
103
	}
104

    
105
	public ResultSetClient getResultSetClient() {
106
		return resultSetClient;
107
	}
108

    
109
	public void setResultSetClient(final ResultSetClient resultSetClient) {
110
		this.resultSetClient = resultSetClient;
111
	}
112

    
113
}
(1-1/5)