Revision 60359
Added by Michele Artini over 3 years ago
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/SubmitDnetHadoopJobNode.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp; |
2 | 2 |
|
3 |
import java.util.List; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 | 3 |
import org.apache.commons.logging.Log; |
7 | 4 |
import org.apache.commons.logging.LogFactory; |
8 | 5 |
import org.springframework.beans.factory.annotation.Autowired; |
9 | 6 |
|
10 | 7 |
import com.googlecode.sarasvati.Engine; |
11 | 8 |
import com.googlecode.sarasvati.NodeToken; |
12 |
import com.googlecode.sarasvati.env.Env; |
|
13 | 9 |
|
14 | 10 |
import eu.dnetlib.dhp.message.Message; |
15 | 11 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
... | ... | |
75 | 71 |
complete(job, "abort"); |
76 | 72 |
} |
77 | 73 |
|
78 |
@Override |
|
79 |
protected void populateEnv(final Env env, final Map<String, String> responseParams) { |
|
80 |
List<Message> reports = dnetMessageManager.getReports(wfId, true); |
|
81 |
if (reports.isEmpty()) { |
|
82 |
int numberOftries = 0; |
|
83 |
try { |
|
84 |
while (reports.isEmpty() && numberOftries < 3) { |
|
85 |
reports = dnetMessageManager.getReports(wfId, true); |
|
86 |
Thread.sleep(3000 * numberOftries++); |
|
87 |
} |
|
88 |
} catch (final InterruptedException e) { |
|
89 |
log.error("Error on waiting report", e); |
|
90 |
} |
|
91 |
} |
|
92 |
|
|
93 |
if (reports.isEmpty()) { throw new RuntimeException("Unable to get report for WorklowId " + wfId); } |
|
94 |
|
|
95 |
reports.forEach(it -> it.getBody().forEach(env::setAttribute)); |
|
96 |
} |
|
97 | 74 |
}; |
98 | 75 |
} |
99 | 76 |
|
Also available in: Unified diff
Removed MessageType from Message