Revision 56985
Added by Alessia Bardi over 4 years ago
modules/dnet-parthenos/trunk/src/main/java/eu/dnetlib/parthenos/workflows/nodes/PublishAbstractJobNode.java | ||
---|---|---|
24 | 24 |
import org.apache.http.NameValuePair; |
25 | 25 |
import org.apache.http.client.HttpClient; |
26 | 26 |
import org.apache.http.client.entity.UrlEncodedFormEntity; |
27 |
import org.apache.http.client.methods.CloseableHttpResponse; |
|
27 | 28 |
import org.apache.http.client.methods.HttpPost; |
29 |
import org.apache.http.impl.client.CloseableHttpClient; |
|
28 | 30 |
import org.apache.http.impl.client.HttpClients; |
31 |
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; |
|
29 | 32 |
import org.apache.http.message.BasicNameValuePair; |
30 | 33 |
import org.springframework.beans.factory.annotation.Autowired; |
31 | 34 |
|
... | ... | |
62 | 65 |
int partial = 0; |
63 | 66 |
Map<Integer, Integer> errors = Maps.newHashMap(); |
64 | 67 |
log.info("Publisher endpoint: " + getPublisherEndpoint()); |
68 |
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(); |
|
69 |
cm.setMaxTotal(nThreads); |
|
70 |
|
|
71 |
CloseableHttpClient client = HttpClients.custom().setConnectionManager(cm).build(); |
|
65 | 72 |
//need to slow down the producer to avoid OOM errors due to many tasks in the queue of the executor |
66 | 73 |
//see for example here: https://stackoverflow.com/questions/42108351/executorservice-giving-out-of-memory-error |
67 | 74 |
//let's stop and wait after submission of nLatch tasks |
... | ... | |
80 | 87 |
} |
81 | 88 |
partial++; |
82 | 89 |
Future<Integer> res = executorService.submit( () -> { |
90 |
CloseableHttpResponse responsePOST = null; |
|
83 | 91 |
try { |
84 | 92 |
HttpPost post = new HttpPost(getPublisherEndpoint()); |
85 | 93 |
List<NameValuePair> params = Lists.newArrayList(); |
86 | 94 |
params.add(new BasicNameValuePair("record", record)); |
87 | 95 |
params.add(new BasicNameValuePair("parthenosTarget", getTarget())); |
88 |
UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8"); |
|
96 |
UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8"); |
|
97 |
responsePOST = client.execute(post); |
|
89 | 98 |
post.setEntity(ent); |
90 |
HttpClient client = HttpClients.createDefault(); |
|
91 |
HttpResponse responsePOST = client.execute(post); |
|
92 | 99 |
int statusCode = responsePOST.getStatusLine().getStatusCode(); |
93 | 100 |
switch (statusCode) { |
94 |
case 200: |
|
95 |
return statusCode; |
|
96 |
default: |
|
97 |
log.error(responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase()); |
|
98 |
log.error("Source record causing error: " + record); |
|
99 |
errors.merge(statusCode, 1, Integer::sum); |
|
100 |
return statusCode; |
|
101 |
case 200:
|
|
102 |
return statusCode;
|
|
103 |
default:
|
|
104 |
log.error(responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase());
|
|
105 |
log.error("Source record causing error: " + record);
|
|
106 |
errors.merge(statusCode, 1, Integer::sum);
|
|
107 |
return statusCode;
|
|
101 | 108 |
} |
102 |
} catch (IOException e) {
|
|
109 |
} catch (Exception e) { |
|
103 | 110 |
log.error(e.getMessage()); |
104 | 111 |
errors.merge(-1, 1, Integer::sum); |
112 |
return -1; |
|
105 | 113 |
} |
106 |
return -1; |
|
114 |
finally{ |
|
115 |
if(responsePOST != null) responsePOST.close(); |
|
116 |
} |
|
107 | 117 |
}); |
108 | 118 |
resList.add(res); |
109 | 119 |
} |
... | ... | |
114 | 124 |
for(Future<Integer> res : resList){ |
115 | 125 |
if(res.get() == 200) countOk++; |
116 | 126 |
} |
127 |
client.close(); |
|
117 | 128 |
log.info(String.format("Got all responses. Ok %s/%s", countOk, countAll)); |
118 | 129 |
|
119 | 130 |
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countOk", countOk); |
Also available in: Unified diff
Using connectionPool for HttpClient to limit overhead