Revision 60316
Added by Claudio Atzori about 3 years ago
PrepareEnvCollectHadoopJobNode.java | ||
---|---|---|
5 | 5 |
|
6 | 6 |
import eu.dnetlib.dhp.model.mdstore.Provenance; |
7 | 7 |
import eu.dnetlib.enabling.datasources.common.Datasource; |
8 |
import eu.dnetlib.enabling.datasources.common.DsmNotFoundException; |
|
8 | 9 |
import org.apache.commons.logging.Log; |
9 | 10 |
import org.apache.commons.logging.LogFactory; |
10 | 11 |
import org.springframework.beans.factory.annotation.Autowired; |
... | ... | |
22 | 23 |
public class PrepareEnvCollectHadoopJobNode extends SimpleJobNode { |
23 | 24 |
|
24 | 25 |
private static final Log log = LogFactory.getLog(PrepareEnvCollectHadoopJobNode.class); |
26 |
public static final String METADATA_IDENTIFIER_PATH = "metadata_identifier_path"; |
|
25 | 27 |
|
26 | 28 |
@Autowired |
27 | 29 |
private LocalDatasourceManager<?, ?> dsManager; |
28 | 30 |
|
29 |
private String hdfsBasePath; |
|
31 |
/** |
|
32 |
* MDStore identifier |
|
33 |
*/ |
|
34 |
private String mdId; |
|
30 | 35 |
|
36 |
/** |
|
37 |
* REFRESH | INCREMENTAL |
|
38 |
*/ |
|
39 |
private String collectionMode; |
|
40 |
|
|
31 | 41 |
@Override |
32 | 42 |
protected String execute(final NodeToken token) throws Exception { |
33 | 43 |
|
... | ... | |
36 | 46 |
// param 3 : nameNode |
37 | 47 |
|
38 | 48 |
final String dsId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ID); |
49 |
log.info("dsId: " + dsId); |
|
39 | 50 |
final String apiId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE); |
40 |
final String mdId = token.getEnv().getAttribute("mdId"); |
|
41 |
final String versionId = token.getEnv().getAttribute("versionId"); |
|
51 |
log.info("apiId: " + apiId); |
|
42 | 52 |
|
43 | 53 |
final Optional<ApiDescriptor> opt = dsManager.getApis(dsId) |
44 | 54 |
.stream() |
... | ... | |
48 | 58 |
res.setBaseUrl(a.getBaseurl()); |
49 | 59 |
res.setId(a.getId()); |
50 | 60 |
res.setProtocol(a.getProtocol()); |
51 |
res.getParams().put("metadata_identifier_path", a.getMetadataIdentifierPath());
|
|
61 |
res.getParams().put(METADATA_IDENTIFIER_PATH, a.getMetadataIdentifierPath());
|
|
52 | 62 |
res.getParams().putAll(a.getApiParams() |
53 | 63 |
.stream() |
54 | 64 |
.map(o -> (ApiParam) o) |
... | ... | |
58 | 68 |
.findFirst(); |
59 | 69 |
|
60 | 70 |
if (opt.isPresent()) { |
71 |
token.getEnv().setAttribute("mdId", getMdId()); |
|
72 |
token.getEnv().setAttribute("collectionMode", getCollectionMode()); |
|
73 |
|
|
61 | 74 |
final ApiDescriptor api = opt.get(); |
62 |
final String hdfsPath = String.format("%s/%s/%s/store", hdfsBasePath, mdId, versionId); |
|
63 |
final String seqFilePath = String.format("%s/%s/%s/seqFile", hdfsBasePath, mdId, versionId); |
|
64 | 75 |
token.getEnv().setAttribute("apiDescription", new Gson().toJson(api)); |
65 |
token.getEnv().setAttribute("mdStorePath", hdfsPath); |
|
66 |
token.getEnv().setAttribute("sequenceFilePath", seqFilePath); |
|
76 |
|
|
67 | 77 |
final Provenance provenance = new Provenance(); |
68 | 78 |
provenance.setDatasourceId(dsId); |
69 | 79 |
final Datasource<?, ?> ds = dsManager.getDs(dsId); |
70 | 80 |
provenance.setDatasourceName(ds.getOfficialname()); |
71 | 81 |
provenance.setNsPrefix(ds.getNamespaceprefix()); |
72 |
token.getEnv().setAttribute("dataSourceInfo", new Gson().toJson(provenance)); |
|
82 |
final String dsProvenance = new Gson().toJson(provenance); |
|
83 |
log.info("datasource provenance: " + dsProvenance); |
|
84 |
|
|
85 |
token.getEnv().setAttribute("dataSourceInfo", dsProvenance); |
|
73 | 86 |
token.getEnv().setAttribute("timestamp", ""+System.currentTimeMillis()); |
74 |
token.getEnv().setAttribute("identifierPath",api.getParams().get("metadata_identifier_path"));
|
|
87 |
token.getEnv().setAttribute("identifierPath",api.getParams().get(METADATA_IDENTIFIER_PATH));
|
|
75 | 88 |
token.getEnv().setAttribute("workflowId",token.getProcess().getEnv().getAttribute("system:processId")); |
76 | 89 |
|
77 | 90 |
token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE_BASEURL, api.getBaseUrl()); |
... | ... | |
85 | 98 |
|
86 | 99 |
return Arc.DEFAULT_ARC; |
87 | 100 |
} else { |
88 |
return "abort";
|
|
101 |
throw new DsmNotFoundException("cannot find ds interface: " + apiId);
|
|
89 | 102 |
} |
103 |
} |
|
90 | 104 |
|
105 |
public String getMdId() { |
|
106 |
return mdId; |
|
91 | 107 |
} |
92 | 108 |
|
93 |
public String getHdfsBasePath() {
|
|
94 |
return hdfsBasePath;
|
|
109 |
public void setMdId(String mdId) {
|
|
110 |
this.mdId = mdId;
|
|
95 | 111 |
} |
96 | 112 |
|
97 |
public void setHdfsBasePath(String hdfsBasePath) {
|
|
98 |
this.hdfsBasePath = hdfsBasePath;
|
|
113 |
public String getCollectionMode() {
|
|
114 |
return collectionMode;
|
|
99 | 115 |
} |
116 |
|
|
117 |
public void setCollectionMode(String collectionMode) { |
|
118 |
this.collectionMode = collectionMode; |
|
119 |
} |
|
100 | 120 |
} |
Also available in: Unified diff
updated hadoop-specific aggregation workflows