Project

General

Profile

1
package eu.dnetlib.parthenos.workflows.nodes;
2

    
3
import java.io.IOException;
4
import java.util.List;
5
import java.util.Map;
6
import java.util.concurrent.ExecutorService;
7
import java.util.concurrent.Executors;
8
import java.util.concurrent.Future;
9

    
10
import com.google.common.collect.Lists;
11
import com.google.common.collect.Maps;
12
import com.google.gson.Gson;
13
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
14
import eu.dnetlib.msro.workflows.graph.Arc;
15
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
16
import eu.dnetlib.msro.workflows.procs.Env;
17
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
18
import eu.dnetlib.rmi.common.ResultSet;
19
import eu.dnetlib.rmi.manager.MSROException;
20
import org.apache.commons.logging.Log;
21
import org.apache.commons.logging.LogFactory;
22
import org.apache.http.HttpResponse;
23
import org.apache.http.NameValuePair;
24
import org.apache.http.client.HttpClient;
25
import org.apache.http.client.entity.UrlEncodedFormEntity;
26
import org.apache.http.client.methods.HttpPost;
27
import org.apache.http.impl.client.HttpClients;
28
import org.apache.http.message.BasicNameValuePair;
29
import org.springframework.beans.factory.annotation.Autowired;
30

    
31
/**
32
 * Created by Alessia Bardi on 09/10/2017.
33
 *
34
 * @author Alessia Bardi
35
 */
36
public abstract class PublishAbstractJobNode extends AsyncJobNode {
37

    
38
	private static final Log log = LogFactory.getLog(PublishAbstractJobNode.class);
39

    
40
	private String inputEprParam;
41

    
42
	@Autowired
43
	private ResultSetClient resultSetClient;
44

    
45
	private String publisherEndpoint;
46

    
47
	//for parallel requests to the publisher endpoint
48
	private int nThreads = 5;
49
	private ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
50
	private List<Future<Integer>> resList = Lists.newArrayList();
51

    
52
	@Override
53
	protected String execute(final Env env) throws Exception {
54

    
55
		final ResultSet<?> rsIn = env.getAttribute(getInputEprParam(), ResultSet.class);
56
		if ((rsIn == null)) { throw new MSROException("InputEprParam (" + getInputEprParam() + ") not found in ENV"); }
57

    
58
		int countAll = 0;
59
		int countOk = 0;
60
		Map<Integer, Integer> errors = Maps.newHashMap();
61
		log.info("Publisher endpoint: " + getPublisherEndpoint());
62

    
63
		for (String record : getResultSetClient().iter(rsIn, String.class)) {
64
			countAll++;
65
			Future<Integer> res = executorService.submit( () -> {
66
				try {
67
					HttpPost post = new HttpPost(getPublisherEndpoint());
68
					List<NameValuePair> params = Lists.newArrayList();
69
					params.add(new BasicNameValuePair("record", record));
70
					params.add(new BasicNameValuePair("parthenosTarget", getTarget()));
71
					UrlEncodedFormEntity ent =  new UrlEncodedFormEntity(params, "UTF-8");
72
					post.setEntity(ent);
73
					HttpClient client = HttpClients.createDefault();
74
					HttpResponse responsePOST = client.execute(post);
75
					int statusCode = responsePOST.getStatusLine().getStatusCode();
76
					switch (statusCode) {
77
					case 200:
78
						return statusCode;
79
					default:
80
						log.error(responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase());
81
						log.error("Source record causing error: " + record);
82
						errors.merge(statusCode, 1, Integer::sum);
83
						return statusCode;
84
					}
85
				} catch (IOException e) {
86
					e.printStackTrace();
87
					errors.merge(-1, 1, Integer::sum);
88
				}
89
				return -1;
90
			});
91
			resList.add(res);
92
		}
93
		executorService.shutdown();
94

    
95
		//now let's wait for the results. We can block ourselves here: we have nothing else to do
96
		log.info("Waiting for responses");
97
		for(Future<Integer> res : resList){
98
			if(res.get() == 200) countOk++;
99
		}
100
		log.info(String.format("Got all responses. Ok %s/%s", countOk, countAll));
101

    
102
		env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countOk", countOk);
103
		env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countAll", countAll);
104
		env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "errorsMap", new Gson().toJson(errors));
105

    
106
		log.info("publishing completed");
107
		if (!errors.isEmpty() || countAll == 0) {
108
			throw new MSROException("Problems in publishing on "+getTarget()+": "+countOk+"/"+countAll+" see error maps for details");
109
		} else return Arc.DEFAULT_ARC;
110

    
111
	}
112

    
113
	public abstract String getTarget();
114

    
115
	public String getInputEprParam() {
116
		return this.inputEprParam;
117
	}
118

    
119
	public void setInputEprParam(final String inputEprParam) {
120
		this.inputEprParam = inputEprParam;
121
	}
122

    
123
	public String getPublisherEndpoint() {
124
		return publisherEndpoint;
125
	}
126

    
127
	public void setPublisherEndpoint(final String publisherEndpoint) {
128
		this.publisherEndpoint = publisherEndpoint;
129
	}
130

    
131
	public ResultSetClient getResultSetClient() {
132
		return resultSetClient;
133
	}
134

    
135
	public void setResultSetClient(final ResultSetClient resultSetClient) {
136
		this.resultSetClient = resultSetClient;
137
	}
138

    
139
}
(3-3/13)