Project

General

Profile

1
package eu.dnetlib.parthenos.workflows.nodes;
2

    
3
import java.io.IOException;
4
import java.util.List;
5
import java.util.Map;
6
import java.util.concurrent.CountDownLatch;
7
import java.util.concurrent.ExecutorService;
8
import java.util.concurrent.Executors;
9
import java.util.concurrent.Future;
10

    
11
import com.google.common.collect.Lists;
12
import com.google.common.collect.Maps;
13
import com.google.gson.Gson;
14
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
15
import eu.dnetlib.msro.workflows.graph.Arc;
16
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
17
import eu.dnetlib.msro.workflows.procs.Env;
18
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
19
import eu.dnetlib.rmi.common.ResultSet;
20
import eu.dnetlib.rmi.manager.MSROException;
21
import org.apache.commons.logging.Log;
22
import org.apache.commons.logging.LogFactory;
23
import org.apache.http.HttpResponse;
24
import org.apache.http.NameValuePair;
25
import org.apache.http.client.HttpClient;
26
import org.apache.http.client.entity.UrlEncodedFormEntity;
27
import org.apache.http.client.methods.CloseableHttpResponse;
28
import org.apache.http.client.methods.HttpPost;
29
import org.apache.http.impl.client.CloseableHttpClient;
30
import org.apache.http.impl.client.HttpClients;
31
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
32
import org.apache.http.message.BasicNameValuePair;
33
import org.springframework.beans.factory.annotation.Autowired;
34

    
35
/**
36
 * Created by Alessia Bardi on 09/10/2017.
37
 *
38
 * @author Alessia Bardi
39
 */
40
public abstract class PublishAbstractJobNode extends AsyncJobNode {
41

    
42
	private static final Log log = LogFactory.getLog(PublishAbstractJobNode.class);
43

    
44
	private String inputEprParam;
45

    
46
	@Autowired
47
	private ResultSetClient resultSetClient;
48

    
49
	private String publisherEndpoint;
50

    
51
	//for parallel requests to the publisher endpoint
52
	private int nThreads = 5;
53
	private int nTasks = 150;
54
	private ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
55
	private List<Future<Integer>> resList = Lists.newArrayList();
56

    
57
	@Override
58
	protected String execute(final Env env) throws Exception {
59

    
60
		final ResultSet<?> rsIn = env.getAttribute(getInputEprParam(), ResultSet.class);
61
		if ((rsIn == null)) { throw new MSROException("InputEprParam (" + getInputEprParam() + ") not found in ENV"); }
62

    
63
		int countAll = 0;
64
		int countOk = 0;
65
		int partial = 0;
66
		Map<Integer, Integer> errors = Maps.newHashMap();
67
		log.info("Publisher endpoint: " + getPublisherEndpoint());
68
		PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
69
		cm.setMaxTotal(nThreads);
70

    
71
		CloseableHttpClient client = HttpClients.custom().setConnectionManager(cm).build();
72
		//need to slow down the producer to avoid OOM errors due to many tasks in the queue of the executor
73
		//see for example here: https://stackoverflow.com/questions/42108351/executorservice-giving-out-of-memory-error
74
		//let's stop and wait after submission of nLatch tasks
75
		for (String record : getResultSetClient().iter(rsIn, String.class)) {
76
			countAll++;
77
			if(partial == nTasks) {
78
				log.debug("Waiting for tasks to complete before resubmitting to executor (countAll = "+countAll+") . . . ");
79
				log.debug("Getting replies");
80
				long startWait = System.currentTimeMillis();
81
				for(Future<Integer> res : resList){
82
					if(res.get() == 200) countOk++;
83
				}
84
				resList.clear();
85
				partial = 0;
86
				log.debug(". . . Ready to submit again after "+(System.currentTimeMillis() - startWait)+" ms" );
87
			}
88
			partial++;
89
			Future<Integer> res = executorService.submit( () -> {
90
				CloseableHttpResponse responsePOST = null;
91
				try {
92
					HttpPost post = new HttpPost(getPublisherEndpoint());
93
					List<NameValuePair> params = Lists.newArrayList();
94
					params.add(new BasicNameValuePair("record", record));
95
					params.add(new BasicNameValuePair("parthenosTarget", getTarget()));
96
					UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8");
97
					post.setEntity(ent);
98
					responsePOST = client.execute(post);
99
					int statusCode = responsePOST.getStatusLine().getStatusCode();
100
					switch (statusCode) {
101
						case 200:
102
							return statusCode;
103
						default:
104
							log.error(responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase());
105
							log.debug("Source record causing error: " + record);
106
							errors.merge(statusCode, 1, Integer::sum);
107
							return statusCode;
108
					}
109
				} catch (Exception e) {
110
					log.error(e.getMessage());
111
					errors.merge(-1, 1, Integer::sum);
112
					return -1;
113
				}
114
				finally{
115
					if(responsePOST != null) responsePOST.close();
116
				}
117
			});
118
			resList.add(res);
119
		}
120
		executorService.shutdown();
121

    
122
		//now let's wait for the results. We can block ourselves here: we have nothing else to do
123
		log.info("Waiting for responses");
124
		for(Future<Integer> res : resList){
125
			if(res.get() == 200) countOk++;
126
		}
127
		client.close();
128
		cm.shutdown();
129
		log.info(String.format("Got all responses. Ok %s/%s", countOk, countAll));
130

    
131
		env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countOk", countOk);
132
		env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countAll", countAll);
133
		env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "errorsMap", new Gson().toJson(errors));
134

    
135
		log.info("publishing completed");
136
		if (!errors.isEmpty()) {
137
			log.warn("Problems in publishing on "+getTarget()+": "+countOk+"/"+countAll+" see error maps for details");
138
		}
139
		if(countAll == 0) log.warn("0 resources to publish");
140
		return Arc.DEFAULT_ARC;
141
	}
142

    
143
	public abstract String getTarget();
144

    
145
	public String getInputEprParam() {
146
		return this.inputEprParam;
147
	}
148

    
149
	public void setInputEprParam(final String inputEprParam) {
150
		this.inputEprParam = inputEprParam;
151
	}
152

    
153
	public String getPublisherEndpoint() {
154
		return publisherEndpoint;
155
	}
156

    
157
	public void setPublisherEndpoint(final String publisherEndpoint) {
158
		this.publisherEndpoint = publisherEndpoint;
159
	}
160

    
161
	public ResultSetClient getResultSetClient() {
162
		return resultSetClient;
163
	}
164

    
165
	public void setResultSetClient(final ResultSetClient resultSetClient) {
166
		this.resultSetClient = resultSetClient;
167
	}
168

    
169
}
(3-3/13)