Project

General

Profile

« Previous | Next » 

Revision 56985

Using connectionPool for HttpClient to limit overhead

View differences:

modules/dnet-parthenos/trunk/src/main/java/eu/dnetlib/parthenos/workflows/nodes/PublishAbstractJobNode.java
24 24
import org.apache.http.NameValuePair;
25 25
import org.apache.http.client.HttpClient;
26 26
import org.apache.http.client.entity.UrlEncodedFormEntity;
27
import org.apache.http.client.methods.CloseableHttpResponse;
27 28
import org.apache.http.client.methods.HttpPost;
29
import org.apache.http.impl.client.CloseableHttpClient;
28 30
import org.apache.http.impl.client.HttpClients;
31
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
29 32
import org.apache.http.message.BasicNameValuePair;
30 33
import org.springframework.beans.factory.annotation.Autowired;
31 34

  
......
62 65
		int partial = 0;
63 66
		Map<Integer, Integer> errors = Maps.newHashMap();
64 67
		log.info("Publisher endpoint: " + getPublisherEndpoint());
68
		PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
69
		cm.setMaxTotal(nThreads);
70

  
71
		CloseableHttpClient client = HttpClients.custom().setConnectionManager(cm).build();
65 72
		//need to slow down the producer to avoid OOM errors due to many tasks in the queue of the executor
66 73
		//see for example here: https://stackoverflow.com/questions/42108351/executorservice-giving-out-of-memory-error
67 74
		//let's stop and wait after submission of nLatch tasks
......
80 87
			}
81 88
			partial++;
82 89
			Future<Integer> res = executorService.submit( () -> {
90
				CloseableHttpResponse responsePOST = null;
83 91
				try {
84 92
					HttpPost post = new HttpPost(getPublisherEndpoint());
85 93
					List<NameValuePair> params = Lists.newArrayList();
86 94
					params.add(new BasicNameValuePair("record", record));
87 95
					params.add(new BasicNameValuePair("parthenosTarget", getTarget()));
88
					UrlEncodedFormEntity ent =  new UrlEncodedFormEntity(params, "UTF-8");
96
					UrlEncodedFormEntity ent = new UrlEncodedFormEntity(params, "UTF-8");
97
					responsePOST = client.execute(post);
89 98
					post.setEntity(ent);
90
					HttpClient client = HttpClients.createDefault();
91
					HttpResponse responsePOST = client.execute(post);
92 99
					int statusCode = responsePOST.getStatusLine().getStatusCode();
93 100
					switch (statusCode) {
94
					case 200:
95
						return statusCode;
96
					default:
97
						log.error(responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase());
98
						log.error("Source record causing error: " + record);
99
						errors.merge(statusCode, 1, Integer::sum);
100
						return statusCode;
101
						case 200:
102
							return statusCode;
103
						default:
104
							log.error(responsePOST.getStatusLine().getStatusCode() + ": " + responsePOST.getStatusLine().getReasonPhrase());
105
							log.error("Source record causing error: " + record);
106
							errors.merge(statusCode, 1, Integer::sum);
107
							return statusCode;
101 108
					}
102
				} catch (IOException e) {
109
				} catch (Exception e) {
103 110
					log.error(e.getMessage());
104 111
					errors.merge(-1, 1, Integer::sum);
112
					return -1;
105 113
				}
106
				return -1;
114
				finally{
115
					if(responsePOST != null) responsePOST.close();
116
				}
107 117
			});
108 118
			resList.add(res);
109 119
		}
......
114 124
		for(Future<Integer> res : resList){
115 125
			if(res.get() == 200) countOk++;
116 126
		}
127
		client.close();
117 128
		log.info(String.format("Got all responses. Ok %s/%s", countOk, countAll));
118 129

  
119 130
		env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "countOk", countOk);

Also available in: Unified diff