Revision 62147
Added by Michele Artini over 2 years ago
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/src/main/java/eu/dnetlib/msro/workflows/hadoop/hbase/DropHBaseTableJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.hadoop.hbase; |
|
2 |
|
|
3 |
import org.apache.commons.logging.Log; |
|
4 |
import org.apache.commons.logging.LogFactory; |
|
5 |
|
|
6 |
import com.googlecode.sarasvati.Arc; |
|
7 |
import com.googlecode.sarasvati.NodeToken; |
|
8 |
|
|
9 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
|
10 |
|
|
11 |
/** |
|
12 |
* The Class DropHBaseTableJobNode. |
|
13 |
*/ |
|
14 |
public class DropHBaseTableJobNode extends AbstractHBaseAdminJobNode { |
|
15 |
|
|
16 |
/** The Constant log. */ |
|
17 |
private static final Log log = LogFactory.getLog(DropHBaseTableJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
18 |
|
|
19 |
/* |
|
20 |
* (non-Javadoc) |
|
21 |
* |
|
22 |
* @see eu.dnetlib.msro.workflows.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken) |
|
23 |
*/ |
|
24 |
@Override |
|
25 |
protected String execute(final NodeToken token) throws Exception { |
|
26 |
|
|
27 |
final String tableName = tableName(token); |
|
28 |
final String cluster = cluster(token); |
|
29 |
|
|
30 |
log.info("Dropping hbase table '" + tableName + "' on cluster: '" + cluster + "'"); |
|
31 |
|
|
32 |
getServiceLocator().getService(HadoopService.class).dropHbaseTable(cluster, tableName); |
|
33 |
|
|
34 |
return Arc.DEFAULT_ARC; |
|
35 |
} |
|
36 |
|
|
37 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/src/main/java/eu/dnetlib/msro/workflows/hadoop/hbase/DefineHBaseOpenaireSchemaJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.hadoop.hbase; |
|
2 |
|
|
3 |
import java.util.Set; |
|
4 |
|
|
5 |
import com.googlecode.sarasvati.Arc; |
|
6 |
import com.googlecode.sarasvati.NodeToken; |
|
7 |
import eu.dnetlib.openaire.hadoop.utils.HBaseTableUtils; |
|
8 |
import org.apache.commons.lang3.StringUtils; |
|
9 |
import org.apache.commons.logging.Log; |
|
10 |
import org.apache.commons.logging.LogFactory; |
|
11 |
|
|
12 |
public class DefineHBaseOpenaireSchemaJobNode extends AbstractHBaseAdminJobNode { |
|
13 |
|
|
14 |
private static final Log log = LogFactory.getLog(DefineHBaseOpenaireSchemaJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
15 |
|
|
16 |
private String schema; |
|
17 |
|
|
18 |
@Override |
|
19 |
protected String execute(final NodeToken token) throws Exception { |
|
20 |
|
|
21 |
final String schemaOverride = StringUtils.isNotBlank(getSchema()) ? getSchema() : asCSV(HBaseTableUtils.listAllColumns()); |
|
22 |
log.info("table definition: " + schemaOverride); |
|
23 |
token.getEnv().setAttribute(getTableColumnsParamName(), schemaOverride); |
|
24 |
|
|
25 |
return Arc.DEFAULT_ARC; |
|
26 |
} |
|
27 |
|
|
28 |
public String getSchema() { |
|
29 |
return schema; |
|
30 |
} |
|
31 |
|
|
32 |
public void setSchema(final String schema) { |
|
33 |
this.schema = schema; |
|
34 |
} |
|
35 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/src/main/java/eu/dnetlib/msro/workflows/hadoop/PrepareMDStoreImportJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.hadoop; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
|
|
5 |
import com.googlecode.sarasvati.Arc; |
|
6 |
import com.googlecode.sarasvati.NodeToken; |
|
7 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
8 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
9 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
10 |
import eu.dnetlib.msro.rmi.MSROException; |
|
11 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
12 |
import org.apache.commons.lang.StringUtils; |
|
13 |
import org.apache.commons.logging.Log; |
|
14 |
import org.apache.commons.logging.LogFactory; |
|
15 |
import org.springframework.beans.factory.annotation.Autowired; |
|
16 |
|
|
17 |
public class PrepareMDStoreImportJobNode extends SimpleJobNode { |
|
18 |
|
|
19 |
private static final Log log = LogFactory.getLog(PrepareMDStoreImportJobNode.class); |
|
20 |
|
|
21 |
@Autowired |
|
22 |
private UniqueServiceLocator serviceLocator; |
|
23 |
|
|
24 |
private String hdfsPathParam; |
|
25 |
|
|
26 |
private String hdfsPath; |
|
27 |
|
|
28 |
private String mappingParam; |
|
29 |
|
|
30 |
private String mapping; |
|
31 |
|
|
32 |
@Override |
|
33 |
protected String execute(final NodeToken token) throws Exception { |
|
34 |
|
|
35 |
token.getEnv().setAttribute(getHdfsPathParam(), getHdfsPath()); |
|
36 |
token.getEnv().setAttribute(getMappingParam(), readXslt(getMapping())); |
|
37 |
|
|
38 |
return Arc.DEFAULT_ARC; |
|
39 |
} |
|
40 |
|
|
41 |
private String readXslt(final String profileId) throws IOException, MSROException, ISLookUpException { |
|
42 |
if (StringUtils.isBlank(profileId)) throw new MSROException("missing profile id"); |
|
43 |
|
|
44 |
log.info("loading mapping from profile id: " + profileId); |
|
45 |
|
|
46 |
final String xquery = |
|
47 |
String.format("/RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value='%s']/BODY/CONFIGURATION/SCRIPT/CODE/*[local-name()='stylesheet']", profileId); |
|
48 |
return serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(xquery); |
|
49 |
} |
|
50 |
|
|
51 |
public String getHdfsPathParam() { |
|
52 |
return hdfsPathParam; |
|
53 |
} |
|
54 |
|
|
55 |
public void setHdfsPathParam(final String hdfsPathParam) { |
|
56 |
this.hdfsPathParam = hdfsPathParam; |
|
57 |
} |
|
58 |
|
|
59 |
public String getHdfsPath() { |
|
60 |
return hdfsPath; |
|
61 |
} |
|
62 |
|
|
63 |
public void setHdfsPath(final String hdfsPath) { |
|
64 |
this.hdfsPath = hdfsPath; |
|
65 |
} |
|
66 |
|
|
67 |
public String getMapping() { |
|
68 |
return mapping; |
|
69 |
} |
|
70 |
|
|
71 |
public void setMapping(final String mapping) { |
|
72 |
this.mapping = mapping; |
|
73 |
} |
|
74 |
|
|
75 |
public String getMappingParam() { |
|
76 |
return mappingParam; |
|
77 |
} |
|
78 |
|
|
79 |
public void setMappingParam(final String mappingParam) { |
|
80 |
this.mappingParam = mappingParam; |
|
81 |
} |
|
82 |
|
|
83 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/src/main/java/eu/dnetlib/msro/workflows/hadoop/CreateHdfsDirectoryJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.hadoop; |
|
2 |
|
|
3 |
import com.googlecode.sarasvati.NodeToken; |
|
4 |
import eu.dnetlib.data.hadoop.rmi.HadoopBlackboardActions; |
|
5 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
|
6 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
|
7 |
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode; |
|
8 |
|
|
9 |
public class CreateHdfsDirectoryJobNode extends BlackboardJobNode { |
|
10 |
|
|
11 |
private String cluster; |
|
12 |
|
|
13 |
private boolean force = false; |
|
14 |
|
|
15 |
@Override |
|
16 |
protected String obtainServiceId(final NodeToken token) { |
|
17 |
return getServiceLocator().getServiceId(HadoopService.class); |
|
18 |
} |
|
19 |
|
|
20 |
@Override |
|
21 |
protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception { |
|
22 |
|
|
23 |
job.setAction(HadoopBlackboardActions.CREATE_HDFS_DIR.toString()); |
|
24 |
job.getParameters().put("cluster", getCluster()); |
|
25 |
job.getParameters().put("force", String.valueOf(isForce())); |
|
26 |
|
|
27 |
// The "path" parameter is set by the following call |
|
28 |
job.getParameters().putAll(parseJsonParameters(token)); |
|
29 |
} |
|
30 |
|
|
31 |
public String getCluster() { |
|
32 |
return cluster; |
|
33 |
} |
|
34 |
|
|
35 |
public void setCluster(final String cluster) { |
|
36 |
this.cluster = cluster; |
|
37 |
} |
|
38 |
|
|
39 |
public boolean isForce() { |
|
40 |
return force; |
|
41 |
} |
|
42 |
|
|
43 |
public void setForce(boolean force) { |
|
44 |
this.force = force; |
|
45 |
} |
|
46 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/src/main/java/eu/dnetlib/msro/workflows/hadoop/hbase/AbstractHBaseJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.hadoop.hbase; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 |
import com.googlecode.sarasvati.Engine; |
|
7 |
import com.googlecode.sarasvati.NodeToken; |
|
8 |
import com.googlecode.sarasvati.env.Env; |
|
9 |
import eu.dnetlib.data.hadoop.rmi.HadoopBlackboardActions; |
|
10 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
|
11 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
12 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
13 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
14 |
import eu.dnetlib.enabling.resultset.rmi.ResultSetException; |
|
15 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
|
16 |
import eu.dnetlib.miscutils.functional.xml.DnetXsltFunctions; |
|
17 |
import eu.dnetlib.msro.rmi.MSROException; |
|
18 |
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode; |
|
19 |
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode; |
|
20 |
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener; |
|
21 |
import eu.dnetlib.msro.workflows.resultset.ProcessCountingResultSetFactory; |
|
22 |
import eu.dnetlib.msro.workflows.util.ProgressProvider; |
|
23 |
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider; |
|
24 |
import eu.dnetlib.msro.workflows.util.WorkflowsConstants; |
|
25 |
import org.apache.commons.lang.StringUtils; |
|
26 |
import org.apache.commons.logging.Log; |
|
27 |
import org.apache.commons.logging.LogFactory; |
|
28 |
import org.springframework.beans.factory.annotation.Autowired; |
|
29 |
import org.springframework.beans.factory.annotation.Required; |
|
30 |
|
|
31 |
/** |
|
32 |
* Created by claudio on 08/04/16. |
|
33 |
*/ |
|
34 |
public abstract class AbstractHBaseJobNode extends BlackboardJobNode implements ProgressJobNode { |
|
35 |
|
|
36 |
private static final Log log = LogFactory.getLog(StoreHBaseRecordsJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
37 |
|
|
38 |
private final String INPUT_HBASE_TABLE_PARAM = "hbaseTable"; |
|
39 |
private final String INPUT_EPR_PARAM = "input_epr"; |
|
40 |
private final String INPUT_CLUSTER_PARAM = "cluster"; |
|
41 |
|
|
42 |
private final String XSLT_PARAM = "xslt"; |
|
43 |
|
|
44 |
private final String OUTPUT_HBASE_TABLE_PARAM = "table"; |
|
45 |
private final String OUTPUT_CLUSTER_PARAM = "cluster"; |
|
46 |
private final String SIMULATION_PARAM = "simulation"; |
|
47 |
|
|
48 |
@Autowired |
|
49 |
protected UniqueServiceLocator serviceLocator; |
|
50 |
|
|
51 |
protected String inputEprParam; |
|
52 |
protected String hbaseTableProperty; |
|
53 |
protected String cluster; |
|
54 |
protected String mapping; |
|
55 |
|
|
56 |
protected boolean simulation = false; |
|
57 |
|
|
58 |
protected ProgressProvider progressProvider; |
|
59 |
|
|
60 |
protected ProcessCountingResultSetFactory processCountingResultSetFactory; |
|
61 |
|
|
62 |
protected abstract HadoopBlackboardActions getAction(); |
|
63 |
|
|
64 |
@Override |
|
65 |
protected String obtainServiceId(final NodeToken token) { |
|
66 |
return getServiceLocator().getServiceId(HadoopService.class); |
|
67 |
} |
|
68 |
|
|
69 |
@Override |
|
70 |
protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception { |
|
71 |
log.info("Invoking blackboard method: " + getAction().toString()); |
|
72 |
|
|
73 |
job.setAction(getAction().toString()); |
|
74 |
job.getParameters().put(INPUT_EPR_PARAM, DnetXsltFunctions.encodeBase64(prepareEpr(token))); |
|
75 |
job.getParameters().put(XSLT_PARAM, DnetXsltFunctions.encodeBase64(readXslt(getMapping()))); |
|
76 |
job.getParameters().put(OUTPUT_HBASE_TABLE_PARAM, tableName(token)); |
|
77 |
job.getParameters().put(OUTPUT_CLUSTER_PARAM, cluster(token)); |
|
78 |
job.getParameters().put(SIMULATION_PARAM, String.valueOf(isSimulation())); |
|
79 |
} |
|
80 |
|
|
81 |
@Override |
|
82 |
protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) { |
|
83 |
return new BlackboardWorkflowJobListener(engine, token) { |
|
84 |
|
|
85 |
@Override |
|
86 |
protected void populateEnv(final Env env, final Map<String, String> responseParams) { |
|
87 |
final String count = responseParams.get("count"); |
|
88 |
log.info(String.format("%s %s objects to HBase table %s, cluster %s", getAction().toString(), count, tableName(token), cluster(token))); |
|
89 |
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + getName() + ":count", count); |
|
90 |
} |
|
91 |
}; |
|
92 |
} |
|
93 |
|
|
94 |
protected String tableName(final NodeToken token) { |
|
95 |
if (token.getEnv().hasAttribute(INPUT_HBASE_TABLE_PARAM)) { |
|
96 |
String table = token.getEnv().getAttribute(INPUT_HBASE_TABLE_PARAM); |
|
97 |
log.debug("found override value in wfEnv for 'hbaseTable' param: " + table); |
|
98 |
return table; |
|
99 |
} |
|
100 |
return getPropertyFetcher().getProperty(getHbaseTableProperty()); |
|
101 |
} |
|
102 |
|
|
103 |
protected String cluster(final NodeToken token) { |
|
104 |
if (token.getEnv().hasAttribute(INPUT_CLUSTER_PARAM)) { |
|
105 |
String cluster = token.getEnv().getAttribute("cluster"); |
|
106 |
log.debug("found override value in wfEnv for 'cluster' param: " + cluster); |
|
107 |
return cluster; |
|
108 |
} |
|
109 |
return getCluster(); |
|
110 |
} |
|
111 |
|
|
112 |
private String prepareEpr(final NodeToken token) throws ResultSetException { |
|
113 |
final String epr = token.getEnv().getAttribute(inputEprParam); |
|
114 |
final ResultsetProgressProvider resultsetProgressProvider = processCountingResultSetFactory.createProgressProvider(token.getProcess(), epr); |
|
115 |
|
|
116 |
setProgressProvider(resultsetProgressProvider); |
|
117 |
|
|
118 |
return resultsetProgressProvider.getEpr().toString(); |
|
119 |
} |
|
120 |
|
|
121 |
private String readXslt(final String profileId) throws IOException, MSROException, ISLookUpException { |
|
122 |
if (StringUtils.isBlank(profileId)) throw new MSROException("missing profile id"); |
|
123 |
|
|
124 |
log.info("loading mapping from profile id: " + profileId); |
|
125 |
|
|
126 |
final String xquery = |
|
127 |
String.format("/RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value='%s']/BODY/CONFIGURATION/SCRIPT/CODE/*[local-name()='stylesheet']", profileId); |
|
128 |
return serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(xquery); |
|
129 |
} |
|
130 |
|
|
131 |
public String getInputEprParam() { |
|
132 |
return inputEprParam; |
|
133 |
} |
|
134 |
|
|
135 |
public void setInputEprParam(final String inputEprParam) { |
|
136 |
this.inputEprParam = inputEprParam; |
|
137 |
} |
|
138 |
|
|
139 |
public String getHbaseTableProperty() { |
|
140 |
return hbaseTableProperty; |
|
141 |
} |
|
142 |
|
|
143 |
public void setHbaseTableProperty(final String hbaseTableProperty) { |
|
144 |
this.hbaseTableProperty = hbaseTableProperty; |
|
145 |
} |
|
146 |
|
|
147 |
@Override |
|
148 |
public ProgressProvider getProgressProvider() { |
|
149 |
return progressProvider; |
|
150 |
} |
|
151 |
|
|
152 |
public void setProgressProvider(final ProgressProvider progressProvider) { |
|
153 |
this.progressProvider = progressProvider; |
|
154 |
} |
|
155 |
|
|
156 |
public ProcessCountingResultSetFactory getProcessCountingResultSetFactory() { |
|
157 |
return processCountingResultSetFactory; |
|
158 |
} |
|
159 |
|
|
160 |
@Required |
|
161 |
public void setProcessCountingResultSetFactory(final ProcessCountingResultSetFactory processCountingResultSetFactory) { |
|
162 |
this.processCountingResultSetFactory = processCountingResultSetFactory; |
|
163 |
} |
|
164 |
|
|
165 |
public String getMapping() { |
|
166 |
return mapping; |
|
167 |
} |
|
168 |
|
|
169 |
public void setMapping(final String mapping) { |
|
170 |
this.mapping = mapping; |
|
171 |
} |
|
172 |
|
|
173 |
public String getCluster() { |
|
174 |
return cluster; |
|
175 |
} |
|
176 |
|
|
177 |
public void setCluster(final String cluster) { |
|
178 |
this.cluster = cluster; |
|
179 |
} |
|
180 |
|
|
181 |
public boolean isSimulation() { |
|
182 |
return simulation; |
|
183 |
} |
|
184 |
|
|
185 |
public void setSimulation(final boolean simulation) { |
|
186 |
this.simulation = simulation; |
|
187 |
} |
|
188 |
|
|
189 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/src/main/java/eu/dnetlib/msro/workflows/hadoop/SetClusterAndTableJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.hadoop; |
|
2 |
|
|
3 |
import javax.annotation.Resource; |
|
4 |
|
|
5 |
import org.apache.commons.lang.StringUtils; |
|
6 |
import org.springframework.beans.factory.annotation.Value; |
|
7 |
|
|
8 |
import com.googlecode.sarasvati.Arc; |
|
9 |
import com.googlecode.sarasvati.NodeToken; |
|
10 |
|
|
11 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
|
12 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
13 |
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode; |
|
14 |
|
|
15 |
/** |
|
16 |
* The Class SetClusterAndTableJobNode. |
|
17 |
*/ |
|
18 |
public class SetClusterAndTableJobNode extends AsyncJobNode { |
|
19 |
|
|
20 |
/** The cluster. */ |
|
21 |
private String cluster; |
|
22 |
|
|
23 |
/** The table. */ |
|
24 |
@Value("${hbase.mapred.datatable}") |
|
25 |
private String table; |
|
26 |
|
|
27 |
/** The table param. */ |
|
28 |
private String tableParam; |
|
29 |
|
|
30 |
/** The service locator. */ |
|
31 |
@Resource |
|
32 |
private UniqueServiceLocator serviceLocator; |
|
33 |
|
|
34 |
/* |
|
35 |
* (non-Javadoc) |
|
36 |
* |
|
37 |
* @see eu.dnetlib.msro.workflows.nodes.AsyncJobNode#execute(com.googlecode.sarasvati.NodeToken) |
|
38 |
*/ |
|
39 |
@Override |
|
40 |
protected String execute(final NodeToken token) throws Exception { |
|
41 |
|
|
42 |
if (StringUtils.isBlank(getCluster())) throw new IllegalArgumentException("missing cluster name parameter"); |
|
43 |
if (StringUtils.isBlank(getTable())) throw new IllegalArgumentException("missing table name parameter"); |
|
44 |
if (!serviceLocator.getService(HadoopService.class).existHbaseTable(getCluster(), getTable())) |
|
45 |
throw new IllegalArgumentException(String.format("unexisting table %s on cluster %s", getTable(), getCluster())); |
|
46 |
|
|
47 |
token.getEnv().setAttribute("cluster", getCluster()); |
|
48 |
token.getEnv().setAttribute(getTableParam(), getTable()); |
|
49 |
|
|
50 |
return Arc.DEFAULT_ARC; |
|
51 |
} |
|
52 |
|
|
53 |
/** |
|
54 |
* Gets the cluster. |
|
55 |
* |
|
56 |
* @return the cluster |
|
57 |
*/ |
|
58 |
public String getCluster() { |
|
59 |
return cluster; |
|
60 |
} |
|
61 |
|
|
62 |
/** |
|
63 |
* Sets the cluster. |
|
64 |
* |
|
65 |
* @param cluster |
|
66 |
* the new cluster |
|
67 |
*/ |
|
68 |
public void setCluster(final String cluster) { |
|
69 |
this.cluster = cluster; |
|
70 |
} |
|
71 |
|
|
72 |
/** |
|
73 |
* Gets the table param. |
|
74 |
* |
|
75 |
* @return the table param |
|
76 |
*/ |
|
77 |
public String getTableParam() { |
|
78 |
return tableParam; |
|
79 |
} |
|
80 |
|
|
81 |
/** |
|
82 |
* Sets the table param. |
|
83 |
* |
|
84 |
* @param tableParam |
|
85 |
* the new table param |
|
86 |
*/ |
|
87 |
public void setTableParam(final String tableParam) { |
|
88 |
this.tableParam = tableParam; |
|
89 |
} |
|
90 |
|
|
91 |
public String getTable() { |
|
92 |
return table; |
|
93 |
} |
|
94 |
|
|
95 |
public void setTable(String table) { |
|
96 |
this.table = table; |
|
97 |
} |
|
98 |
|
|
99 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/src/main/java/eu/dnetlib/msro/workflows/hadoop/hbase/ExistHBaseTableJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.hadoop.hbase; |
|
2 |
|
|
3 |
import com.googlecode.sarasvati.NodeToken; |
|
4 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
|
5 |
import org.apache.commons.logging.Log; |
|
6 |
import org.apache.commons.logging.LogFactory; |
|
7 |
|
|
8 |
public class ExistHBaseTableJobNode extends AbstractHBaseAdminJobNode { |
|
9 |
|
|
10 |
private static final Log log = LogFactory.getLog(ExistHBaseTableJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
11 |
|
|
12 |
private String existOutNode; |
|
13 |
|
|
14 |
private String dontExistOutNode; |
|
15 |
|
|
16 |
@Override |
|
17 |
protected String execute(final NodeToken token) throws Exception { |
|
18 |
final String tableName = tableName(token); |
|
19 |
final String cluster = cluster(token); |
|
20 |
|
|
21 |
log.info("checking table existance: '" + tableName + "' on cluster: '" + cluster + "'"); |
|
22 |
|
|
23 |
final HadoopService hadoopService = getServiceLocator().getService(HadoopService.class); |
|
24 |
boolean exists = hadoopService.existHbaseTable(cluster, tableName); |
|
25 |
|
|
26 |
log.info("table '" + tableName + "' exists: " + exists); |
|
27 |
|
|
28 |
if (exists) { |
|
29 |
final String tableDesc = hadoopService.describeHBaseTableConfiguration(cluster, tableName); |
|
30 |
token.getEnv().setAttribute(getTableConfigurationParamName(), tableDesc); |
|
31 |
} |
|
32 |
|
|
33 |
return exists ? getExistOutNode() : getDontExistOutNode(); |
|
34 |
} |
|
35 |
|
|
36 |
public String getExistOutNode() { |
|
37 |
return existOutNode; |
|
38 |
} |
|
39 |
|
|
40 |
public void setExistOutNode(final String existOutNode) { |
|
41 |
this.existOutNode = existOutNode; |
|
42 |
} |
|
43 |
|
|
44 |
public String getDontExistOutNode() { |
|
45 |
return dontExistOutNode; |
|
46 |
} |
|
47 |
|
|
48 |
public void setDontExistOutNode(final String dontExistOutNode) { |
|
49 |
this.dontExistOutNode = dontExistOutNode; |
|
50 |
} |
|
51 |
|
|
52 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/src/main/java/eu/dnetlib/msro/workflows/hadoop/FetchRelClassesJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.hadoop; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
|
|
5 |
import javax.annotation.Resource; |
|
6 |
|
|
7 |
import org.apache.commons.logging.Log; |
|
8 |
import org.apache.commons.logging.LogFactory; |
|
9 |
|
|
10 |
import com.google.gson.Gson; |
|
11 |
import com.googlecode.sarasvati.Arc; |
|
12 |
import com.googlecode.sarasvati.NodeToken; |
|
13 |
|
|
14 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
15 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
16 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
17 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
18 |
|
|
19 |
public class FetchRelClassesJobNode extends SimpleJobNode { |
|
20 |
|
|
21 |
private static final Log log = LogFactory.getLog(FetchRelClassesJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
22 |
|
|
23 |
private String relClassesProperty; |
|
24 |
|
|
25 |
private String relClassesName; |
|
26 |
|
|
27 |
@Resource |
|
28 |
private UniqueServiceLocator serviceLocator; |
|
29 |
|
|
30 |
@Override |
|
31 |
protected String execute(final NodeToken token) throws Exception { |
|
32 |
String relClasses = new Gson().toJson(fetchRelClasses()); |
|
33 |
|
|
34 |
log.info("loaded relClasses: " + relClasses); |
|
35 |
|
|
36 |
token.getEnv().setAttribute(getRelClassesName(), relClasses); |
|
37 |
|
|
38 |
return Arc.DEFAULT_ARC; |
|
39 |
} |
|
40 |
|
|
41 |
private List<String> fetchRelClasses() throws ISLookUpException { |
|
42 |
|
|
43 |
String xquery = getPropertyFetcher().getProperty(getRelClassesProperty()); |
|
44 |
|
|
45 |
log.info("Loading relClasses with xquery: " + xquery); |
|
46 |
|
|
47 |
return serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery); |
|
48 |
} |
|
49 |
|
|
50 |
public String getRelClassesProperty() { |
|
51 |
return relClassesProperty; |
|
52 |
} |
|
53 |
|
|
54 |
public void setRelClassesProperty(final String relClassesProperty) { |
|
55 |
this.relClassesProperty = relClassesProperty; |
|
56 |
} |
|
57 |
|
|
58 |
public String getRelClassesName() { |
|
59 |
return relClassesName; |
|
60 |
} |
|
61 |
|
|
62 |
public void setRelClassesName(final String relClassesName) { |
|
63 |
this.relClassesName = relClassesName; |
|
64 |
} |
|
65 |
|
|
66 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/src/main/java/eu/dnetlib/msro/workflows/hadoop/hbase/AbstractHBaseAdminJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.hadoop.hbase; |
|
2 |
|
|
3 |
import java.util.Map.Entry; |
|
4 |
import java.util.Set; |
|
5 |
import javax.annotation.Resource; |
|
6 |
|
|
7 |
import com.google.common.base.Joiner; |
|
8 |
import com.google.common.base.Splitter; |
|
9 |
import com.google.common.collect.Sets; |
|
10 |
import com.googlecode.sarasvati.NodeToken; |
|
11 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
12 |
import eu.dnetlib.msro.rmi.MSROException; |
|
13 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
14 |
import org.apache.commons.lang.StringUtils; |
|
15 |
import org.apache.commons.logging.Log; |
|
16 |
import org.apache.commons.logging.LogFactory; |
|
17 |
|
|
18 |
public abstract class AbstractHBaseAdminJobNode extends SimpleJobNode { |
|
19 |
|
|
20 |
/** The Constant log. */ |
|
21 |
private static final Log log = LogFactory.getLog(AbstractHBaseAdminJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
22 |
|
|
23 |
private String tableColumnsParamName = "columns"; |
|
24 |
private String tableConfigurationParamName = "tableConf"; |
|
25 |
private String hbaseTableProperty; |
|
26 |
private String cluster; |
|
27 |
|
|
28 |
@Resource |
|
29 |
private UniqueServiceLocator serviceLocator; |
|
30 |
|
|
31 |
@Override |
|
32 |
protected void beforeStart(final NodeToken token) { |
|
33 |
for (Entry<String, String> e : parseJsonParameters(token).entrySet()) { |
|
34 |
token.getEnv().setAttribute(e.getKey(), e.getValue()); |
|
35 |
} |
|
36 |
} |
|
37 |
|
|
38 |
protected String tableName(final NodeToken token) { |
|
39 |
if (token.getEnv().hasAttribute("hbaseTable")) { |
|
40 |
String table = token.getEnv().getAttribute("hbaseTable"); |
|
41 |
log.debug("found override value in wfEnv for 'hbaseTable' param: " + table); |
|
42 |
return table; |
|
43 |
} |
|
44 |
return getPropertyFetcher().getProperty(getHbaseTableProperty()); |
|
45 |
} |
|
46 |
|
|
47 |
protected String cluster(final NodeToken token) { |
|
48 |
if (token.getEnv().hasAttribute("cluster")) { |
|
49 |
String cluster = token.getEnv().getAttribute("cluster"); |
|
50 |
log.debug("found override value in wfEnv for 'cluster' param: " + cluster); |
|
51 |
return cluster; |
|
52 |
} |
|
53 |
return getCluster(); |
|
54 |
} |
|
55 |
|
|
56 |
protected Set<String> getColumns(final NodeToken token) throws MSROException { |
|
57 |
String envCols = token.getEnv().getAttribute(getTableColumnsParamName()); |
|
58 |
if (StringUtils.isBlank(envCols)) { throw new MSROException("cannot find table description"); } |
|
59 |
log.debug("using columns from env: " + envCols); |
|
60 |
return Sets.newHashSet(Splitter.on(",").omitEmptyStrings().split(envCols)); |
|
61 |
} |
|
62 |
|
|
63 |
protected String asCSV(final Iterable<String> columns) { |
|
64 |
return Joiner.on(",").skipNulls().join(columns); |
|
65 |
} |
|
66 |
|
|
67 |
public String getCluster() { |
|
68 |
return cluster; |
|
69 |
} |
|
70 |
|
|
71 |
public void setCluster(final String cluster) { |
|
72 |
this.cluster = cluster; |
|
73 |
} |
|
74 |
|
|
75 |
public String getHbaseTableProperty() { |
|
76 |
return hbaseTableProperty; |
|
77 |
} |
|
78 |
|
|
79 |
public void setHbaseTableProperty(final String hbaseTableProperty) { |
|
80 |
this.hbaseTableProperty = hbaseTableProperty; |
|
81 |
} |
|
82 |
|
|
83 |
public String getTableColumnsParamName() { |
|
84 |
return tableColumnsParamName; |
|
85 |
} |
|
86 |
|
|
87 |
public void setTableColumnsParamName(final String tableColumnsParamName) { |
|
88 |
this.tableColumnsParamName = tableColumnsParamName; |
|
89 |
} |
|
90 |
|
|
91 |
public UniqueServiceLocator getServiceLocator() { |
|
92 |
return serviceLocator; |
|
93 |
} |
|
94 |
|
|
95 |
public String getTableConfigurationParamName() { |
|
96 |
return tableConfigurationParamName; |
|
97 |
} |
|
98 |
|
|
99 |
public void setTableConfigurationParamName(final String tableConfigurationParamName) { |
|
100 |
this.tableConfigurationParamName = tableConfigurationParamName; |
|
101 |
} |
|
102 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/src/main/java/eu/dnetlib/msro/workflows/hadoop/hbase/GetHBaseTableDescriptionJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.hadoop.hbase; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
|
|
5 |
import org.apache.commons.logging.Log; |
|
6 |
import org.apache.commons.logging.LogFactory; |
|
7 |
|
|
8 |
import com.googlecode.sarasvati.Arc; |
|
9 |
import com.googlecode.sarasvati.NodeToken; |
|
10 |
|
|
11 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
|
12 |
|
|
13 |
public class GetHBaseTableDescriptionJobNode extends AbstractHBaseAdminJobNode { |
|
14 |
|
|
15 |
private static final Log log = LogFactory.getLog(GetHBaseTableDescriptionJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
16 |
|
|
17 |
@Override |
|
18 |
protected String execute(final NodeToken token) throws Exception { |
|
19 |
final String tableName = tableName(token); |
|
20 |
final String cluster = cluster(token); |
|
21 |
|
|
22 |
log.info("getting table description: '" + tableName + "' on cluster: '" + cluster + "'"); |
|
23 |
|
|
24 |
final List<String> columns = getServiceLocator().getService(HadoopService.class).describeHbaseTable(cluster, tableName); |
|
25 |
log.debug(String.format("table '%s': " + columns, tableName)); |
|
26 |
token.getEnv().setAttribute(getTableColumnsParamName(), asCSV(columns)); |
|
27 |
|
|
28 |
return Arc.DEFAULT_ARC; |
|
29 |
} |
|
30 |
|
|
31 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/src/main/java/eu/dnetlib/msro/workflows/hadoop/IndexDSUpdateJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.hadoop; |
|
2 |
|
|
3 |
import javax.annotation.Resource; |
|
4 |
|
|
5 |
import org.apache.commons.logging.Log; |
|
6 |
import org.apache.commons.logging.LogFactory; |
|
7 |
|
|
8 |
import com.googlecode.sarasvati.Arc; |
|
9 |
import com.googlecode.sarasvati.NodeToken; |
|
10 |
|
|
11 |
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException; |
|
12 |
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService; |
|
13 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
14 |
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode; |
|
15 |
|
|
16 |
public class IndexDSUpdateJobNode extends AsyncJobNode { |
|
17 |
|
|
18 |
private static final Log log = LogFactory.getLog(IndexDSUpdateJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
19 |
|
|
20 |
@Resource |
|
21 |
private UniqueServiceLocator serviceLocator; |
|
22 |
|
|
23 |
@Override |
|
24 |
public String execute(final NodeToken token) throws Exception { |
|
25 |
|
|
26 |
final String dsId = token.getEnv().getAttribute("index_id"); |
|
27 |
final String version = token.getEnv().getAttribute("index.feed.timestamp"); |
|
28 |
log.info("updating indexDS: " + dsId + " version: " + version); |
|
29 |
updateIndexDS(dsId, version); |
|
30 |
|
|
31 |
return Arc.DEFAULT_ARC; |
|
32 |
} |
|
33 |
|
|
34 |
/** |
|
35 |
* method updates the given indexDataStructureId INDEX_SIZE, INDEX_LAST_UPDATE |
|
36 |
* |
|
37 |
* @param dsId |
|
38 |
* @param version |
|
39 |
* @return true if the update was performed successfully, false otherwise |
|
40 |
* @throws ISRegistryException |
|
41 |
*/ |
|
42 |
private boolean updateIndexDS(final String dsId, final String version) throws ISRegistryException { |
|
43 |
|
|
44 |
final String xquery = "for $x in collection('')/RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '" + dsId + "']" |
|
45 |
+ " return update value $x//INDEX_LAST_UPDATE with '" + version + "'"; |
|
46 |
|
|
47 |
log.debug("\n\n updating indexDataStructure: " + xquery + "\n\n"); |
|
48 |
|
|
49 |
return serviceLocator.getService(ISRegistryService.class).executeXUpdate(xquery); |
|
50 |
} |
|
51 |
|
|
52 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/pom.xml | ||
---|---|---|
1 |
<?xml version="1.0" encoding="UTF-8"?> |
|
2 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> |
|
3 |
<parent> |
|
4 |
<groupId>eu.dnetlib</groupId> |
|
5 |
<artifactId>dnet45-parent</artifactId> |
|
6 |
<version>1.0.0</version> |
|
7 |
<relativePath /> |
|
8 |
</parent> |
|
9 |
<modelVersion>4.0.0</modelVersion> |
|
10 |
<groupId>eu.dnetlib</groupId> |
|
11 |
<artifactId>dnet-deduplication</artifactId> |
|
12 |
<packaging>jar</packaging> |
|
13 |
<version>2.0.0</version> |
|
14 |
<scm> |
|
15 |
<developerConnection>scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-deduplication/tags/dnet-deduplication-2.0.0</developerConnection> |
|
16 |
</scm> |
|
17 |
<dependencies> |
|
18 |
<dependency> |
|
19 |
<groupId>eu.dnetlib</groupId> |
|
20 |
<artifactId>dnet-msro-service</artifactId> |
|
21 |
<version>[4.0.0,5.0.0)</version> |
|
22 |
</dependency> |
|
23 |
<dependency> |
|
24 |
<groupId>eu.dnetlib</groupId> |
|
25 |
<artifactId>dnet-hadoop-service-rmi</artifactId> |
|
26 |
<version>[1.0.0,2.0.0)</version> |
|
27 |
</dependency> |
|
28 |
<dependency> |
|
29 |
<groupId>eu.dnetlib</groupId> |
|
30 |
<artifactId>dnet-actionmanager-api</artifactId> |
|
31 |
<version>[4.0.0,5.0.0)</version> |
|
32 |
</dependency> |
|
33 |
<dependency> |
|
34 |
<groupId>eu.dnetlib</groupId> |
|
35 |
<artifactId>dnet-modular-ui</artifactId> |
|
36 |
<version>[3.0.0,4.0.0)</version> |
|
37 |
</dependency> |
|
38 |
|
|
39 |
<dependency> |
|
40 |
<groupId>eu.dnetlib</groupId> |
|
41 |
<artifactId>dnet-index-client</artifactId> |
|
42 |
<version>[2.3.4,3.0.0)</version> |
|
43 |
</dependency> |
|
44 |
|
|
45 |
<dependency> |
|
46 |
<groupId>eu.dnetlib</groupId> |
|
47 |
<artifactId>dnet-openaireplus-mapping-utils</artifactId> |
|
48 |
<version>[6.3.24,7.0.0)</version> |
|
49 |
<exclusions> |
|
50 |
<exclusion> |
|
51 |
<groupId>eu.dnetlib</groupId> |
|
52 |
<artifactId>dnet-hadoop-commons</artifactId> |
|
53 |
</exclusion> |
|
54 |
</exclusions> |
|
55 |
</dependency> |
|
56 |
|
|
57 |
|
|
58 |
<dependency> |
|
59 |
<groupId>javax.servlet</groupId> |
|
60 |
<artifactId>javax.servlet-api</artifactId> |
|
61 |
<version>${javax.servlet.version}</version> |
|
62 |
<scope>provided</scope> |
|
63 |
</dependency> |
|
64 |
<dependency> |
|
65 |
<groupId>com.fasterxml.jackson.core</groupId> |
|
66 |
<artifactId>jackson-databind</artifactId> |
|
67 |
<version>${jackson.version}</version> |
|
68 |
</dependency> |
|
69 |
<dependency> |
|
70 |
<groupId>com.google.guava</groupId> |
|
71 |
<artifactId>guava</artifactId> |
|
72 |
<version>${google.guava.version}</version> |
|
73 |
</dependency> |
|
74 |
|
|
75 |
<dependency> |
|
76 |
<groupId>junit</groupId> |
|
77 |
<artifactId>junit</artifactId> |
|
78 |
<version>${junit.version}</version> |
|
79 |
<scope>test</scope> |
|
80 |
</dependency> |
|
81 |
|
|
82 |
</dependencies> |
|
83 |
</project> |
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/src/main/java/eu/dnetlib/msro/workflows/hadoop/utils/Similarity.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.hadoop.utils; |
|
2 |
|
|
3 |
import com.google.gson.Gson; |
|
4 |
|
|
5 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
6 |
import eu.dnetlib.miscutils.collections.Pair; |
|
7 |
|
|
8 |
public class Similarity { |
|
9 |
|
|
10 |
private Pair<String, String> pair; |
|
11 |
private Type type; |
|
12 |
|
|
13 |
public Similarity(final Pair<String, String> pair, final Type type) { |
|
14 |
super(); |
|
15 |
this.setPair(pair); |
|
16 |
this.setType(type); |
|
17 |
} |
|
18 |
|
|
19 |
public Pair<String, String> getPair() { |
|
20 |
return pair; |
|
21 |
} |
|
22 |
|
|
23 |
public void setPair(final Pair<String, String> pair) { |
|
24 |
this.pair = pair; |
|
25 |
} |
|
26 |
|
|
27 |
public Type getType() { |
|
28 |
return type; |
|
29 |
} |
|
30 |
|
|
31 |
public void setType(final Type type) { |
|
32 |
this.type = type; |
|
33 |
} |
|
34 |
|
|
35 |
@Override |
|
36 |
public String toString() { |
|
37 |
return new Gson().toJson(this, Similarity.class); |
|
38 |
} |
|
39 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/src/main/java/eu/dnetlib/msro/workflows/hadoop/MDStoreDatasourceResolverJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.hadoop; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
|
|
5 |
import javax.annotation.Resource; |
|
6 |
|
|
7 |
import org.apache.commons.logging.Log; |
|
8 |
import org.apache.commons.logging.LogFactory; |
|
9 |
import org.springframework.beans.factory.annotation.Value; |
|
10 |
|
|
11 |
import com.google.common.base.Splitter; |
|
12 |
import com.google.common.collect.Lists; |
|
13 |
import com.google.gson.Gson; |
|
14 |
import com.googlecode.sarasvati.Arc; |
|
15 |
import com.googlecode.sarasvati.NodeToken; |
|
16 |
|
|
17 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
18 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
19 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
20 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
21 |
|
|
22 |
/** |
|
23 |
* This jobNode resolves the original datasource ids to the list of md store ids holding the relative cleaned records. |
|
24 |
* |
|
25 |
* @author claudio |
|
26 |
* |
|
27 |
*/ |
|
28 |
public class MDStoreDatasourceResolverJobNode extends SimpleJobNode { |
|
29 |
|
|
30 |
private static final Log log = LogFactory.getLog(MDStoreDatasourceResolverJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
31 |
|
|
32 |
private String originalDatasourceIdsCSV; |
|
33 |
|
|
34 |
@Value("${dnet.openaire.dataload.datasource.mdstores.xquery}") |
|
35 |
private String xqueryTemplate; |
|
36 |
|
|
37 |
@Resource |
|
38 |
private UniqueServiceLocator serviceLocator; |
|
39 |
|
|
40 |
@Override |
|
41 |
public String execute(final NodeToken token) throws Exception { |
|
42 |
log.info("resolving MDStore ids for datasources: " + getOriginalDatasourceIdsCSV()); |
|
43 |
|
|
44 |
final List<String> mdIds = Lists.newArrayList(); |
|
45 |
for (final String originalId : splitter().split(getOriginalDatasourceIdsCSV())) { |
|
46 |
mdIds.addAll(resolveMdIds(originalId)); |
|
47 |
} |
|
48 |
|
|
49 |
log.info(String.format("adding %s mdStore ids in wf env", mdIds.size())); |
|
50 |
|
|
51 |
token.getEnv().setAttribute("mdId", new Gson().toJson(mdIds)); |
|
52 |
|
|
53 |
return Arc.DEFAULT_ARC; |
|
54 |
} |
|
55 |
|
|
56 |
// @Cacheable(value = "mdIds", key = "#acronym") |
|
57 |
public List<String> resolveMdIds(final String id) throws ISLookUpException { |
|
58 |
log.info("Resolving mdID for " + id + ". Cache not used."); |
|
59 |
final String xQuery = String.format(xqueryTemplate, id); |
|
60 |
final List<String> mdIds = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xQuery); |
|
61 |
return mdIds; |
|
62 |
} |
|
63 |
|
|
64 |
private Splitter splitter() { |
|
65 |
return Splitter.on(",").trimResults().omitEmptyStrings(); |
|
66 |
} |
|
67 |
|
|
68 |
public String getOriginalDatasourceIdsCSV() { |
|
69 |
return originalDatasourceIdsCSV; |
|
70 |
} |
|
71 |
|
|
72 |
public void setOriginalDatasourceIdsCSV(final String originalDatasourceIdsCSV) { |
|
73 |
this.originalDatasourceIdsCSV = originalDatasourceIdsCSV; |
|
74 |
} |
|
75 |
|
|
76 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/src/test/java/eu/dnetlib/msro/workflows/dedup/conf/DedupConfigurationOrchestrationTest.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup.conf; |
|
2 |
|
|
3 |
import static org.junit.Assert.assertNotNull; |
|
4 |
import static org.junit.Assert.assertTrue; |
|
5 |
|
|
6 |
import java.io.IOException; |
|
7 |
import java.nio.charset.Charset; |
|
8 |
import java.util.Queue; |
|
9 |
|
|
10 |
import org.apache.commons.io.IOUtils; |
|
11 |
import org.junit.Before; |
|
12 |
import org.junit.Test; |
|
13 |
import org.junit.Ignore; |
|
14 |
|
|
15 |
import com.google.common.collect.Lists; |
|
16 |
|
|
17 |
import eu.dnetlib.pace.config.DedupConfig; |
|
18 |
|
|
19 |
@Ignore |
|
20 |
public class DedupConfigurationOrchestrationTest { |
|
21 |
|
|
22 |
public DedupConfigurationOrchestration dco; |
|
23 |
|
|
24 |
@Before |
|
25 |
public void setUp() throws IOException { |
|
26 |
|
|
27 |
final Entity e = new Entity("result", "50", "Publication"); |
|
28 |
|
|
29 |
final String actionSetId = "001"; |
|
30 |
final Queue<DedupConfig> configurations = Lists.newLinkedList(); |
|
31 |
|
|
32 |
configurations.add(DedupConfig.loadDefault()); |
|
33 |
|
|
34 |
dco = new DedupConfigurationOrchestration(e, actionSetId, configurations); |
|
35 |
assertNotNull(dco); |
|
36 |
assertNotNull(dco.getActionSetId()); |
|
37 |
assertNotNull(dco.getEntity()); |
|
38 |
assertNotNull(dco.getConfigurations()); |
|
39 |
} |
|
40 |
|
|
41 |
@Test |
|
42 |
public void testSerialization() { |
|
43 |
|
|
44 |
final String json = dco.toString(); |
|
45 |
final DedupConfigurationOrchestration anotherDco = DedupConfigurationOrchestration.fromJSON(json); |
|
46 |
assertNotNull(anotherDco); |
|
47 |
assertTrue(json.equals(anotherDco.toString())); |
|
48 |
} |
|
49 |
|
|
50 |
@Test |
|
51 |
public void testSerializationOrgs() throws IOException { |
|
52 |
|
|
53 |
final Entity e = new Entity("organization", "20", "Organization"); |
|
54 |
|
|
55 |
final String actionSetId = "001"; |
|
56 |
final Queue<DedupConfig> configurations = Lists.newLinkedList(); |
|
57 |
|
|
58 |
configurations.add(DedupConfig.load(IOUtils.toString(getClass().getResourceAsStream("organisation.conf.json"), Charset.forName("UTF-8")))); |
|
59 |
dco = new DedupConfigurationOrchestration(e, actionSetId, configurations); |
|
60 |
|
|
61 |
System.out.println(dco.toString()); |
|
62 |
|
|
63 |
} |
|
64 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/src/main/java/eu/dnetlib/msro/workflows/hadoop/DeleteHdfsPathJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.hadoop; |
|
2 |
|
|
3 |
import com.googlecode.sarasvati.NodeToken; |
|
4 |
|
|
5 |
import eu.dnetlib.data.hadoop.rmi.HadoopBlackboardActions; |
|
6 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
|
7 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
|
8 |
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode; |
|
9 |
|
|
10 |
public class DeleteHdfsPathJobNode extends BlackboardJobNode { |
|
11 |
|
|
12 |
private String cluster; |
|
13 |
|
|
14 |
@Override |
|
15 |
protected String obtainServiceId(final NodeToken token) { |
|
16 |
return getServiceLocator().getServiceId(HadoopService.class); |
|
17 |
} |
|
18 |
|
|
19 |
@Override |
|
20 |
protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception { |
|
21 |
|
|
22 |
job.setAction(HadoopBlackboardActions.DELETE_HDFS_PATH.toString()); |
|
23 |
job.getParameters().put("cluster", getCluster()); |
|
24 |
|
|
25 |
// The "path" parameter is set by the following call |
|
26 |
job.getParameters().putAll(parseJsonParameters(token)); |
|
27 |
} |
|
28 |
|
|
29 |
public String getCluster() { |
|
30 |
return cluster; |
|
31 |
} |
|
32 |
|
|
33 |
public void setCluster(final String cluster) { |
|
34 |
this.cluster = cluster; |
|
35 |
} |
|
36 |
|
|
37 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/src/test/java/eu/dnetlib/msro/workflows/dedup/SerializationTest.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
|
|
5 |
import com.google.common.collect.Iterables; |
|
6 |
import com.googlecode.protobuf.format.JsonFormat; |
|
7 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
8 |
import org.apache.commons.codec.binary.Base64; |
|
9 |
import org.apache.commons.io.IOUtils; |
|
10 |
import org.apache.commons.logging.Log; |
|
11 |
import org.apache.commons.logging.LogFactory; |
|
12 |
import org.junit.Test; |
|
13 |
|
|
14 |
/** |
|
15 |
* Created by claudio on 05/04/16. |
|
16 |
*/ |
|
17 |
public class SerializationTest { |
|
18 |
|
|
19 |
private static final Log log = LogFactory.getLog(SerializationTest.class); |
|
20 |
|
|
21 |
@Test |
|
22 |
public void test() throws IOException { |
|
23 |
|
|
24 |
final String data = Iterables.getFirst(IOUtils.readLines(getClass().getResourceAsStream("oaf_data.base64")), ""); |
|
25 |
|
|
26 |
final byte[] oafBytes = Base64.decodeBase64(data); |
|
27 |
|
|
28 |
Oaf oaf = Oaf.parseFrom(oafBytes); |
|
29 |
|
|
30 |
JsonFormat jsonFormat = new JsonFormat(); |
|
31 |
String asJson = jsonFormat.printToString(oaf); |
|
32 |
|
|
33 |
log.info(asJson); |
|
34 |
|
|
35 |
} |
|
36 |
|
|
37 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/src/main/java/eu/dnetlib/msro/workflows/hadoop/ReuseHdfsRecordsJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.hadoop; |
|
2 |
|
|
3 |
import com.googlecode.sarasvati.NodeToken; |
|
4 |
|
|
5 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
6 |
|
|
7 |
public class ReuseHdfsRecordsJobNode extends SimpleJobNode { |
|
8 |
|
|
9 |
private boolean reuseMdRecords; |
|
10 |
|
|
11 |
@Override |
|
12 |
protected String execute(final NodeToken token) throws Exception { |
|
13 |
return String.valueOf(isReuseMdRecords()); |
|
14 |
} |
|
15 |
|
|
16 |
public boolean isReuseMdRecords() { |
|
17 |
return reuseMdRecords; |
|
18 |
} |
|
19 |
|
|
20 |
public void setReuseMdRecords(final boolean reuseMdRecords) { |
|
21 |
this.reuseMdRecords = reuseMdRecords; |
|
22 |
} |
|
23 |
|
|
24 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/src/test/java/eu/dnetlib/msro/workflows/dedup/SimilarityMeshBuilderTest.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
|
|
5 |
import org.junit.Before; |
|
6 |
import org.junit.Test; |
|
7 |
|
|
8 |
import com.google.common.collect.Lists; |
|
9 |
|
|
10 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
11 |
import eu.dnetlib.msro.workflows.hadoop.utils.Similarity; |
|
12 |
import eu.dnetlib.msro.workflows.hadoop.utils.SimilarityMeshBuilder; |
|
13 |
|
|
14 |
public class SimilarityMeshBuilderTest { |
|
15 |
|
|
16 |
private List<String> list; |
|
17 |
|
|
18 |
@Before |
|
19 |
public void setUp() throws Exception { |
|
20 |
list = Lists.newArrayList(); |
|
21 |
for (int i = 0; i < 10; i++) { |
|
22 |
list.add(i + ""); |
|
23 |
} |
|
24 |
} |
|
25 |
|
|
26 |
@Test |
|
27 |
public void test() { |
|
28 |
final List<Similarity> combinations = SimilarityMeshBuilder.build(Type.result, list); |
|
29 |
|
|
30 |
System.out.println(combinations); |
|
31 |
System.out.println(combinations.size()); |
|
32 |
|
|
33 |
} |
|
34 |
|
|
35 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/src/main/java/eu/dnetlib/data/dedup/DedupUserActionsDAOImpl.java | ||
---|---|---|
1 |
package eu.dnetlib.data.dedup; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
import java.util.Map; |
|
5 |
import java.util.UUID; |
|
6 |
|
|
7 |
import com.google.common.collect.Maps; |
|
8 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
9 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
10 |
import eu.dnetlib.functionality.index.solr.feed.InputDocumentFactory; |
|
11 |
import eu.dnetlib.functionality.modular.ui.dedup.SimilarityGroup; |
|
12 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
|
13 |
import org.apache.commons.lang.StringUtils; |
|
14 |
import org.apache.commons.logging.Log; |
|
15 |
import org.apache.commons.logging.LogFactory; |
|
16 |
import org.springframework.beans.factory.annotation.Autowired; |
|
17 |
|
|
18 |
public class DedupUserActionsDAOImpl extends AbstractDedupDAO implements DedupUserActionsDAO { |
|
19 |
|
|
20 |
private static final Log log = LogFactory.getLog(DedupUserActionsDAOImpl.class); |
|
21 |
|
|
22 |
@Autowired |
|
23 |
private DedupDbDAO dbDAO; |
|
24 |
|
|
25 |
@Autowired |
|
26 |
private DedupIndexDAO indexDAO; |
|
27 |
|
|
28 |
@Override |
|
29 |
public Map<String, List<String>> listConfigurations() throws ISLookUpException { |
|
30 |
final Map<String, List<String>> res = Maps.newHashMap(); |
|
31 |
|
|
32 |
final ISLookUpService lookUpService = serviceLocator.getService(ISLookUpService.class); |
|
33 |
final String listEntityTypesXQuery = |
|
34 |
"distinct-values(for $x in //RESOURCE_PROFILE[" |
|
35 |
+ ".//RESOURCE_TYPE/@value = 'DedupOrchestrationDSResourceType' and " |
|
36 |
+ ".//CONFIGURATION/@enabled='true'] return $x//ENTITY/@name/string())"; |
|
37 |
|
|
38 |
for (final String entityType : lookUpService.quickSearchProfile(listEntityTypesXQuery)) { |
|
39 |
final String xquery = |
|
40 |
String.format( |
|
41 |
"for $x in //RESOURCE_PROFILE[" + |
|
42 |
".//RESOURCE_TYPE/@value = 'DedupOrchestrationDSResourceType' and .//ENTITY/@name='%s' ] " + |
|
43 |
"return $x//ACTION_SET/@id/string()", entityType); |
|
44 |
res.put(entityType, lookUpService.quickSearchProfile(xquery)); |
|
45 |
} |
|
46 |
|
|
47 |
if (log.isDebugEnabled()) { |
|
48 |
log.debug("found configurations: " + res); |
|
49 |
} |
|
50 |
return res; |
|
51 |
} |
|
52 |
|
|
53 |
@Override |
|
54 |
public OafResult search(final String type, final String userQuery, final String actionSet, final int start, final int rows, final String fields) |
|
55 |
throws Exception { |
|
56 |
|
|
57 |
return indexDAO.search(type, userQuery, actionSet, start, rows, fields); |
|
58 |
} |
|
59 |
|
|
60 |
@Override |
|
61 |
public OafResult searchById(final String actionSet, final String type, final String objidentifier, final List<String> fields) throws Exception { |
|
62 |
|
|
63 |
return indexDAO.searchById(actionSet, type, objidentifier, fields); |
|
64 |
} |
|
65 |
|
|
66 |
@Override |
|
67 |
public boolean commit(final SimilarityGroup group) throws Exception { |
|
68 |
if (StringUtils.isBlank(group.getActionSet())) throw new IllegalArgumentException("missing actionset"); |
|
69 |
|
|
70 |
group.setId(UUID.randomUUID().toString()); |
|
71 |
group.setDate(InputDocumentFactory.getParsedDateField(DateUtils.now_ISO8601())); |
|
72 |
|
|
73 |
return dbDAO.commit(group) && indexDAO.commit(group); |
|
74 |
} |
|
75 |
|
|
76 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/src/test/java/eu/dnetlib/data/dedup/DedupInspectorFunctionsTest.java | ||
---|---|---|
1 |
package eu.dnetlib.data.dedup; |
|
2 |
|
|
3 |
import com.google.common.collect.Lists; |
|
4 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
5 |
import com.google.common.base.Function; |
|
6 |
import org.apache.commons.logging.Log; |
|
7 |
import org.apache.commons.logging.LogFactory; |
|
8 |
import org.junit.Before; |
|
9 |
import org.junit.Test; |
|
10 |
|
|
11 |
import java.util.Map; |
|
12 |
|
|
13 |
public class DedupInspectorFunctionsTest { |
|
14 |
|
|
15 |
private static final Log log = LogFactory.getLog(DedupInspectorFunctionsTest.class); |
|
16 |
|
|
17 |
private String orgFromIndex = "<record>CAESigMIFCK7ARK4ARIlCiNJbnRlcm5hdGlvbmFsIFRlbm5pcyBGZWRlcmF0aW9uIEx0ZCoHCgVm\r\nYWxzZTIHCgVmYWxzZToHCgVmYWxzZUIHCgVmYWxzZUoHCgVmYWxzZVIHCgVmYWxzZVoHCgVmYWxz\r\nZWIHCgVmYWxzZWoHCgVmYWxzZXIHCgVmYWxzZYIBNAoCR0ISDlVuaXRlZCBLaW5nZG9tGg5kbmV0\r\nOmNvdW50cmllcyIOZG5ldDpjb3VudHJpZXNCMnJjdWtfX19fX19fXzo6ODYxOTIxQzMtNjcyMy00\r\nOUMwLUIyQ0UtRDcyODJBOUMxRTk2SkkKMTEwfG9wZW5haXJlX19fXzo6YWIyZDMzMTA3NDFlYTgw\r\nZDNiODcyNmY2NTE1MDI4NTgSFFJlc2VhcmNoIENvdW5jaWxzIFVLWgoyMDE3LTExLTA0YjEyMHxy\r\nY3VrX19fX19fX186OjVjYjFmZTZhYjg1NDcwMTBiMjJkOTAyN2U3MjUyMzVkagoyMDE5LTA1LTE5\r\nIqsBCAEQARoDMC45IiRkZWR1cC1zaW1pbGFyaXR5LW9yZ2FuaXphdGlvbi1zaW1wbGUqegoic3lz\r\naW1wb3J0OmNyb3Nzd2FsazplbnRpdHlyZWdpc3RyeRIic3lzaW1wb3J0OmNyb3Nzd2FsazplbnRp\r\ndHlyZWdpc3RyeRoXZG5ldDpwcm92ZW5hbmNlX2FjdGlvbnMiF2RuZXQ6cHJvdmVuYW5jZV9hY3Rp\r\nb25z\r\n\n</record>"; |
|
18 |
|
|
19 |
private DedupIndexDAO dao; |
|
20 |
|
|
21 |
@Before |
|
22 |
public void setUp() { |
|
23 |
dao = new DedupIndexDAO(); |
|
24 |
} |
|
25 |
|
|
26 |
@Test |
|
27 |
public void test_1() { |
|
28 |
|
|
29 |
Oaf oaf = dao.getXml2OafFunction().apply(orgFromIndex); |
|
30 |
System.out.println(oaf); |
|
31 |
|
|
32 |
|
|
33 |
Map<String, String> map = dao.getOaf2FieldMapFunction("organization", Lists.newArrayList("legalname", "legalshortname", "country", "websiteurl", "provenance")).apply(oaf); |
|
34 |
|
|
35 |
System.out.println(map); |
|
36 |
|
|
37 |
|
|
38 |
} |
|
39 |
|
|
40 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/src/main/java/eu/dnetlib/functionality/modular/ui/workflows/values/ListHBaseMappingTitleValues.java | ||
---|---|---|
1 |
package eu.dnetlib.functionality.modular.ui.workflows.values; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
import java.util.Map; |
|
5 |
import javax.annotation.Resource; |
|
6 |
|
|
7 |
import com.google.common.collect.Lists; |
|
8 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
9 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
10 |
import eu.dnetlib.msro.workflows.util.ValidNodeValuesFetcher; |
|
11 |
import org.springframework.beans.factory.annotation.Required; |
|
12 |
|
|
13 |
public class ListHBaseMappingTitleValues extends ValidNodeValuesFetcher { |
|
14 |
|
|
15 |
@Resource |
|
16 |
private UniqueServiceLocator serviceLocator; |
|
17 |
|
|
18 |
private String sourceFormat; |
|
19 |
|
|
20 |
@Override |
|
21 |
protected List<DnetParamValue> obtainValues(final Map<String, String> params) throws Exception { |
|
22 |
|
|
23 |
final String xquery = |
|
24 |
"for $x in /RESOURCE_PROFILE[" |
|
25 |
+ ".//RESOURCE_TYPE/@value='TransformationRuleDSResourceType' and " |
|
26 |
+ ".//SOURCE_METADATA_FORMAT/@name = '" + getSourceFormat() + "' and " |
|
27 |
+ ".//SOURCE_METADATA_FORMAT/@layout = 'store' and " |
|
28 |
+ ".//SOURCE_METADATA_FORMAT/@interpretation = 'cleaned'] " |
|
29 |
+ "return concat($x//RESOURCE_IDENTIFIER/@value, ' @@@ ', $x//SCRIPT/TITLE/text())"; |
|
30 |
|
|
31 |
final List<String> result = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery); |
|
32 |
final List<DnetParamValue> values = Lists.newArrayList(); |
|
33 |
|
|
34 |
for (final String s : result) { |
|
35 |
final String[] arr = s.split("@@@"); |
|
36 |
values.add(new DnetParamValue(arr[0].trim(), arr[1].trim())); |
|
37 |
} |
|
38 |
|
|
39 |
return values; |
|
40 |
} |
|
41 |
|
|
42 |
public String getSourceFormat() { |
|
43 |
return sourceFormat; |
|
44 |
} |
|
45 |
|
|
46 |
@Required |
|
47 |
public void setSourceFormat(final String sourceFormat) { |
|
48 |
this.sourceFormat = sourceFormat; |
|
49 |
} |
|
50 |
|
|
51 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-2.0.0/src/main/java/eu/dnetlib/msro/workflows/actions/PrepareConfiguredActionSetJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.actions; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 |
import com.google.common.collect.Lists; |
|
7 |
import com.google.common.collect.Maps; |
|
8 |
import com.google.gson.Gson; |
|
9 |
import com.googlecode.sarasvati.Arc; |
|
10 |
import com.googlecode.sarasvati.NodeToken; |
|
11 |
import eu.dnetlib.actionmanager.set.RawSet; |
|
12 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
13 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
14 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
|
15 |
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestration; |
|
16 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
17 |
import org.apache.commons.lang.StringUtils; |
|
18 |
import org.apache.commons.logging.Log; |
|
19 |
import org.apache.commons.logging.LogFactory; |
|
20 |
import org.springframework.beans.factory.annotation.Autowired; |
|
21 |
|
|
22 |
/** |
|
23 |
* The Class PrepareConfiguredActionSetJobNode. |
|
24 |
*/ |
|
25 |
public class PrepareConfiguredActionSetJobNode extends SimpleJobNode { |
|
26 |
|
|
27 |
/** |
|
28 |
* logger. |
|
29 |
*/ |
|
30 |
private static final Log log = LogFactory.getLog(PrepareConfiguredActionSetJobNode.class); |
|
31 |
|
|
32 |
/** |
Also available in: Unified diff
[maven-release-plugin] copy for tag dnet-deduplication-2.0.0