Project

General

Profile

« Previous | Next » 

Revision 43444

Added workflows for entity registy

View differences:

webapps/dnet-wds-container/trunk/src/main/resources/eu/dnetlib/cnr-site.properties
32 32

  
33 33
hbase.mapred.datatable 							=	db_openaireplus
34 34
hbase.actions.table 							=	db_actions
35
dnet.datasourcemanager.db.name=dnet_wds
36
services.datasourceManager.core=dnetPostgresDSManagerCore
35 37

  
36 38
# Mail configuration
37 39

  
webapps/dnet-wds-container/trunk/pom.xml
90 90
			<version>1.0.0-SNAPSHOT</version>
91 91
		</dependency>
92 92

  
93
                <dependency>
94
                        <groupId>eu.dnetlib</groupId>
95
                        <artifactId>dnet-openaire</artifactId>
96
                        <version>1.0.0-SNAPSHOT</version>
97
                </dependency>
93
		<dependency>
94
			<groupId>eu.dnetlib</groupId>
95
			<artifactId>dnet-wds</artifactId>
96
			<version>1.0.0-SNAPSHOT</version>
97
		</dependency>
98 98

  
99 99
		<dependency>
100 100
			<groupId>xerces</groupId>
modules/dnet-hadoop-services/trunk/src/main/java/eu/dnetlib/data/actionmanager/is/ISClient.java
10 10
import com.google.common.collect.Iterables;
11 11
import com.google.common.collect.Lists;
12 12
import com.google.common.collect.Sets;
13

  
14 13
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
15 14
import eu.dnetlib.miscutils.datetime.DateUtils;
16 15
import eu.dnetlib.rmi.data.hadoop.actionmanager.ActionManagerException;
......
21 20
import eu.dnetlib.rmi.enabling.ISLookUpService;
22 21
import eu.dnetlib.rmi.enabling.ISRegistryException;
23 22
import eu.dnetlib.rmi.enabling.ISRegistryService;
24
import eu.dnetlib.soap.EndpointReferenceBuilder;
25 23
import org.antlr.stringtemplate.StringTemplate;
26 24
import org.apache.commons.logging.Log;
27 25
import org.apache.commons.logging.LogFactory;
......
34 32
public class ISClient {
35 33

  
36 34
	private static final Log log = LogFactory.getLog(ISClient.class); // NOPMD by marko on 11/24/08 5:02 PM
35
	/**
36
	 * endpoint builder.
37
	 */
38
	private static final String ENDPOINT_TEMPLATE = "http://%s:%s/%s/services/actionManager";
37 39
	@Resource
38 40
	private UniqueServiceLocator serviceLocator;
39 41
	/**
......
44 46
	 * service endpoint.
45 47
	 */
46 48
	private Endpoint endpoint;
47
	/**
48
	 * endpoint builder.
49
	 */
50
	private EndpointReferenceBuilder<Endpoint> eprBuilder;
49
	private String hostname;
50
	private String port;
51
	private String context;
51 52

  
52 53
	public String registerSetProfile(final ActionManagerSet set) throws ActionManagerException {
53 54
		if (existsSet(set.getId())) { throw new ActionManagerException("Set " + set.getId() + " already registered"); }
54 55
		try {
55 56
			StringTemplate template = new StringTemplate(actionManagerSetDsTemplate.getTemplate());
56
			template.setAttribute("serviceUri", eprBuilder.getAddress(endpoint));
57
			template.setAttribute("serviceUri", String.format(ENDPOINT_TEMPLATE, hostname, port, context));
57 58
			template.setAttribute("set", set);
58 59
			template.setAttribute("latest", RawSet.newInstance());
59 60
			return serviceLocator.getService(ISRegistryService.class).registerProfile(template.toString());
......
284 285
		this.endpoint = endpoint;
285 286
	}
286 287

  
287
	public EndpointReferenceBuilder<Endpoint> getEprBuilder() {
288
		return eprBuilder;
288
	public String getHostname() {
289
		return hostname;
289 290
	}
290 291

  
291 292
	@Required
292
	public void setEprBuilder(final EndpointReferenceBuilder<Endpoint> eprBuilder) {
293
		this.eprBuilder = eprBuilder;
293
	public void setHostname(final String hostname) {
294
		this.hostname = hostname;
294 295
	}
295 296

  
297
	public String getPort() {
298
		return port;
299
	}
300

  
301
	@Required
302
	public void setPort(final String port) {
303
		this.port = port;
304
	}
305

  
306
	public String getContext() {
307
		return context;
308
	}
309

  
310
	@Required
311
	public void setContext(final String context) {
312
		this.context = context;
313
	}
296 314
}
modules/dnet-hadoop-services/trunk/src/main/resources/eu/dnetlib/data/actionmanager/applicationContext-dnet-actionmanager-service.xml
1 1
<?xml version="1.0" encoding="UTF-8"?>
2 2
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3
       xmlns:jaxws="http://cxf.apache.org/jaxws" xmlns:sec="http://cxf.apache.org/configuration/security"
4
       xmlns:wsa="http://cxf.apache.org/ws/addressing" xmlns:p="http://www.springframework.org/schema/p"
5
       xmlns:http="http://cxf.apache.org/transports/http/configuration" xmlns:t="http://dnetlib.eu/springbeans/t"
6
       xmlns:template="http://dnetlib.eu/springbeans/template" xmlns:util="http://www.springframework.org/schema/util"
3
       xmlns:jaxws="http://cxf.apache.org/jaxws"
4
       xmlns:p="http://www.springframework.org/schema/p"
5
       xmlns:http="http://cxf.apache.org/transports/http/configuration"
7 6
       xmlns="http://www.springframework.org/schema/beans"
8 7
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
9
                                    http://cxf.apache.org/ws/addressing http://cxf.apache.org/schemas/ws-addr-conf.xsd
10
                                    http://cxf.apache.org/configuration/security http://cxf.apache.org/schemas/configuration/security.xsd
8

  
9

  
11 10
                                    http://cxf.apache.org/transports/http/configuration http://cxf.apache.org/schemas/configuration/http-conf.xsd
12
                            http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd
13
                            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
14
                            http://dnetlib.eu/springbeans/template http://dnetlib.eu/springbeans/template.xsd">
11
                            http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd">
15 12

  
16 13
	<bean id="actionManagerService" class="eu.dnetlib.data.actionmanager.ActionManagerServiceImpl"
17 14
	      p:notificationHandler-ref="actionManagerNotificationHandler"
......
69 66
	<!-- Information Service Client -->
70 67
	<bean id="actionmanagerISClient" class="eu.dnetlib.data.actionmanager.is.ISClient"
71 68
	      p:actionManagerSetDsTemplate-ref="actionManagerSetDsTemplate"
72
	      p:endpoint-ref="actionManagerServiceEndpoint" p:eprBuilder-ref="jaxwsEndpointReferenceBuilder" />
69
	      p:endpoint-ref="actionManagerServiceEndpoint" p:hostname="${container.hostname}"
70
	      p:context="${container.context}"
71
	      p:port="${container.port}"/>
73 72

  
74 73
	<bean id="actionManagerSetDsTemplate"
75 74
	      class="eu.dnetlib.springutils.stringtemplate.StringTemplateFactory"
modules/dnet-wds/trunk/src/main/java/eu/dnetlib/msro/workflows/nodes/stats/BackupStatsCacheJobNode.java
1
package eu.dnetlib.msro.workflows.nodes.stats;
2

  
3
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
4
import eu.dnetlib.msro.workflows.procs.Token;
5
import org.apache.commons.lang3.StringUtils;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8

  
9
/**
10
 * Ask the StatsManagerService to backup the stats cache of the portal specified by the additional BB parameter
11
 * <code>StatsManagerServiceBBAction.BACKUP_CACHE.getTargetPortalParamName()</code>. The cache can be restored with the last generated
12
 * backup by sending a "restore" BB message.
13
 *
14
 * @author alessia
15
 * @see RestoreStatsCacheJobNode
16
 */
17
public class BackupStatsCacheJobNode extends AbstractStatsJobNode {
18

  
19
	private static final Log log = LogFactory.getLog(BackupStatsCacheJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
20

  
21
	@Override
22
	protected void prepareJob(final BlackboardJob job, final Token token) throws Exception {
23
		job.setAction(StatsManagerServiceBBAction.BACKUP_CACHE.action());
24
		final String portal = getPortalName(token.getEnv());
25
		if (StringUtils.isNotBlank(portal)) {
26
			job.getParameters().put(StatsManagerServiceBBAction.BACKUP_CACHE.getTargetPortalParamName(), portal);
27
		} else {
28
			log.warn(StatsManagerServiceBBAction.BACKUP_CACHE.getTargetPortalParamName() + " not set. The StatsManagerService will use its default.");
29
		}
30
	}
31
}
modules/dnet-wds/trunk/src/main/java/eu/dnetlib/msro/workflows/nodes/stats/StatsManagerServiceBBAction.java
1
package eu.dnetlib.msro.workflows.nodes.stats;
2

  
3
public enum StatsManagerServiceBBAction {
4
	PROMOTE_SHADOW_STATS {
5
		@Override
6
		public String action() {
7
			return "promoteShadow";
8
		}
9
	},
10
	PROMOTE_SHADOW_CACHE {
11
		@Override
12
		public String action() {
13
			return "promoteCache";
14
		}
15
	},
16
	REFRESH_SHADOW_CACHE {
17
		@Override
18
		public String action() {
19
			return "refreshCache";
20
		}
21

  
22
	},
23
	VALIDATE_SHADOW_STATS {
24
		@Override
25
		public String action() {
26
			return "validate";
27
		}
28

  
29
	},
30
	BACKUP_CACHE {
31
		@Override
32
		public String action() {
33
			return "backup";
34
		}
35

  
36
	},
37
	RESTORE_CACHE {
38
		@Override
39
		public String action() {
40
			return "restore";
41
		}
42

  
43
	},
44
	MIGRATE_CACHE {
45
		@Override
46
		public String action() {
47
			return "migrate";
48
		}
49

  
50
		@Override
51
		public String getTargetPortalParamName() {
52
			return "targetCache";
53
		}
54

  
55
	};
56

  
57
	private static String DEFAULT_PORTAL_PARAM_NAME = "cache";
58

  
59
	public abstract String action();
60

  
61
	public String getSourcePortalParamName() {
62
		return DEFAULT_PORTAL_PARAM_NAME;
63
	}
64

  
65
	public String getTargetPortalParamName() {
66
		return DEFAULT_PORTAL_PARAM_NAME;
67
	}
68
}
modules/dnet-wds/trunk/src/main/java/eu/dnetlib/msro/workflows/nodes/utils/CleaningXsltFunctions.java
1
package eu.dnetlib.msro.workflows.nodes.utils;
2

  
3
import java.text.Normalizer;
4

  
5
/**
6
 * Created by claudio on 14/06/16.
7
 */
8
public class CleaningXsltFunctions {
9

  
10
	public static String clean(final String s) {
11
		return Normalizer.normalize(s, Normalizer.Form.NFD)
12
				.replaceAll("\\(.+\\)", "")
13
				.replaceAll("(\\W|\\p{InCombiningDiacriticalMarks}|\\p{Punct}|\\n|\\s)+", "")
14
				.toLowerCase()
15
				.trim();
16
	}
17
}
modules/dnet-wds/trunk/test/java/eu/dnetlib/wds/collector/plugins/CMRIteratorTest.java
2 2

  
3 3
import java.util.Iterator;
4 4

  
5
import org.antlr.stringtemplate.StringTemplate;
6
import org.apache.commons.io.IOUtils;
5 7
import org.junit.Test;
6 8
import org.springframework.core.io.ClassPathResource;
7 9

  
......
15 17
	@Test
16 18
	public void test() throws Exception {
17 19

  
18
		CMRCollectorPlugin plugin = new CMRCollectorPlugin();
20
		ClassPathResource resource = new ClassPathResource("eu/dnetlib/wds/collector/plugins/CMR2Datasource.st");
19 21

  
20
		ClassPathResource resource = new ClassPathResource("eu/dnetlib/wds/collector/plugins/CMR2Datacite.st");
22
		StringTemplate st = new StringTemplate(IOUtils.toString(resource.getInputStream()));
21 23

  
22
		plugin.setClassPathTemplate(resource);
23
		final Iterable<String> collect = plugin.collect(null, null, null);
24 24

  
25 25
		CMRDatasourcePlugin datasourcePlugin = new CMRDatasourcePlugin();
26 26

  
27
		datasourcePlugin.setXmlTemplate(st);
28

  
27 29
		final Iterator<String> iterator = datasourcePlugin.collect(null, null, null).iterator();
28 30

  
29
		for (String s : datasourcePlugin.collect(null, null, null)) {
30
			System.out.println("s = " + s);
31
		}
31
		System.out.println(iterator.next());
32
		System.out.println(iterator.next());
33
		System.out.println(iterator.next());
34
		System.out.println(iterator.next());
35
		System.out.println(iterator.next());
36

  
37

  
38

  
32 39
	}
33 40

  
34 41
}
modules/dnet-wds/trunk/src/main/java/eu/dnetlib/msro/workflows/nodes/dedup/PrepareConfiguredActionSetJobNode.java
1
package eu.dnetlib.msro.workflows.nodes.dedup;
2

  
3
import java.util.List;
4
import java.util.Map;
5

  
6
import com.google.common.collect.Lists;
7
import com.google.common.collect.Maps;
8
import com.google.gson.Gson;
9
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
10
import eu.dnetlib.miscutils.datetime.DateUtils;
11
import eu.dnetlib.msro.workflows.graph.Arc;
12
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
13
import eu.dnetlib.msro.workflows.procs.Env;
14
import eu.dnetlib.rmi.data.hadoop.actionmanager.RawSet;
15
import eu.dnetlib.rmi.enabling.ISLookUpDocumentNotFoundException;
16
import eu.dnetlib.rmi.enabling.ISLookUpService;
17
import eu.dnetlib.rmi.manager.MSROException;
18
import org.apache.commons.lang3.StringUtils;
19
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21
import org.springframework.beans.factory.annotation.Autowired;
22

  
23
/**
24
 * The Class PrepareConfiguredActionSetJobNode.
25
 */
26
public class PrepareConfiguredActionSetJobNode extends SimpleJobNode {
27

  
28
	/**
29
	 * logger.
30
	 */
31
	private static final Log log = LogFactory.getLog(PrepareConfiguredActionSetJobNode.class);
32

  
33
	/**
34
	 * The dedup config sequence param.
35
	 */
36
	private String dedupConfigSequence;
37

  
38
	/**
39
	 * The job property.
40
	 */
41
	private String jobProperty;
42

  
43
	/**
44
	 * The action set path param name.
45
	 */
46
	private String actionSetPathParam;
47

  
48
	private String setsParam;
49

  
50
	/**
51
	 * The service locator.
52
	 */
53
	@Autowired
54
	private UniqueServiceLocator serviceLocator;
55

  
56
	/*
57
	 * (non-Javadoc)
58
	 *
59
	 * @see eu.dnetlib.msro.workflows.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken)
60
	 */
61
	@Override
62
	protected String execute(final Env env) throws Exception {
63

  
64
		final List<Map<String, String>> setList = Lists.newArrayList();
65

  
66
		final Map<String, String> set = Maps.newHashMap();
67

  
68
		final String actionSetId = getDedupConfigSequence();
69
		final ISLookUpService isLookUpService = serviceLocator.getService(ISLookUpService.class);
70
		final String basePath = isLookUpService.getResourceProfileByQuery(
71
				"/RESOURCE_PROFILE[./HEADER/RESOURCE_TYPE/@value='ActionManagerServiceResourceType']//SERVICE_PROPERTIES/PROPERTY[@key='basePath']/@value/string()");
72
		if (StringUtils.isBlank(basePath)) {
73
			throw new IllegalStateException("missing basePath in ActionManagerService");
74
		}
75

  
76
		String actionSetDirectory;
77
		try {
78
			actionSetDirectory = isLookUpService.getResourceProfileByQuery(
79
					"/RESOURCE_PROFILE[./HEADER/RESOURCE_TYPE/@value='ActionManagerSetDSResourceType' and .//SET/@id = '" + actionSetId
80
							+ "']//SET/@ directory/string()");
81
		} catch (ISLookUpDocumentNotFoundException e) {
82
			throw new MSROException("missing directory in ActionSet profile: " + actionSetId);
83
		}
84

  
85
		final String rawSetId = RawSet.newInstance().getId();
86
		set.put("rawset", rawSetId);
87
		set.put("creationDate", DateUtils.now_ISO8601());
88
		set.put("set", actionSetId);
89
		set.put("enabled", "true");
90
		set.put("jobProperty", getJobProperty());
91

  
92
		env.setAttribute(set.get("jobProperty"), set.get("rawset"));
93

  
94
		final String path = basePath + "/" + actionSetDirectory + "/" + rawSetId;
95
		log.info("using action set path: " + path);
96
		env.setAttribute(getActionSetPathParam(), path);
97

  
98
		setList.add(set);
99
		final String sets = new Gson().toJson(setList);
100
		log.debug("built set: " + sets);
101

  
102
		env.setAttribute(getSetsParam(), sets);
103

  
104
		return Arc.DEFAULT_ARC;
105
	}
106

  
107
	/**
108
	 * Gets the job property.
109
	 *
110
	 * @return the job property
111
	 */
112
	public String getJobProperty() {
113
		return jobProperty;
114
	}
115

  
116
	/**
117
	 * Sets the job property.
118
	 *
119
	 * @param jobProperty the new job property
120
	 */
121
	public void setJobProperty(final String jobProperty) {
122
		this.jobProperty = jobProperty;
123
	}
124

  
125
	public String getActionSetPathParam() {
126
		return actionSetPathParam;
127
	}
128

  
129
	public void setActionSetPathParam(final String actionSetPathParam) {
130
		this.actionSetPathParam = actionSetPathParam;
131
	}
132

  
133
	public String getDedupConfigSequence() {
134
		return dedupConfigSequence;
135
	}
136

  
137
	public void setDedupConfigSequence(final String dedupConfigSequence) {
138
		this.dedupConfigSequence = dedupConfigSequence;
139
	}
140

  
141
	public String getSetsParam() {
142
		return setsParam;
143
	}
144

  
145
	public void setSetsParam(final String setsParam) {
146
		this.setsParam = setsParam;
147
	}
148
}
modules/dnet-wds/trunk/src/main/java/eu/dnetlib/msro/workflows/profiles/RepositoryXsltFunctions.java
1
package eu.dnetlib.msro.workflows.profiles;
2

  
3
/**
4
 * Created by claudio on 14/06/16.
5
 */
6

  
7
import java.io.IOException;
8
import java.io.StringWriter;
9
import java.util.List;
10
import java.util.Map;
11

  
12
import com.google.common.base.Splitter;
13
import com.google.common.collect.Lists;
14
import com.google.common.collect.Maps;
15
import org.antlr.stringtemplate.StringTemplate;
16
import org.apache.commons.io.IOUtils;
17
import org.apache.commons.lang3.StringEscapeUtils;
18
import org.apache.commons.lang3.StringUtils;
19
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21
import org.springframework.core.io.ClassPathResource;
22
import org.springframework.core.io.Resource;
23

  
24
public class RepositoryXsltFunctions {
25

  
26
	private static final Log log = LogFactory.getLog(RepositoryXsltFunctions.class); // NOPMD by marko on 11/24/08 5:02 PM
27

  
28
	private static Resource ifaceTemplate = new ClassPathResource("/eu/dnetlib/msro/openaireplus/workflows/profiles/repo_interface.st");
29

  
30
	private static Resource paramTemplate = new ClassPathResource("/eu/dnetlib/msro/openaireplus/workflows/profiles/repo_param.st");
31

  
32
	public static String buildInterface(final String infopackage) {
33

  
34
		final List<String> formats = Lists.newArrayList();
35
		final List<String> sets = Lists.newArrayList();
36
		final Map<String, String> otherParams = Maps.newHashMap();
37

  
38
		for (String param : parseParams(getValueBetween(infopackage, "===6===", null))) {
39
			StringTemplate p = getTemplate(paramTemplate);
40
			String paramName = StringUtils.substringBefore(param, "=").trim();
41
			String paramValue = StringUtils.substringAfter(param, "=").trim();
42

  
43
			p.setAttribute("param", paramName.toUpperCase());
44
			p.setAttribute("value", paramValue);
45

  
46
			try {
47
				switch (Params.valueOf(paramName.toUpperCase())) {
48
				case FORMAT:
49
					formats.add(p.toString());
50
					break;
51
				case SET:
52
					sets.add(p.toString());
53
					break;
54
				}
55
			} catch (Exception e) {
56
				if ((paramName != null) && !paramName.isEmpty()) {
57
					otherParams.put(paramName, StringEscapeUtils.escapeXml(paramValue));
58
				}
59
			}
60
		}
61

  
62
		final StringTemplate i = getTemplate(ifaceTemplate);
63
		i.setAttribute("id", getValueBetween(infopackage, null, "===1==="));
64
		i.setAttribute("fileMode", getValueBetween(infopackage, "===1===", "===2==="));
65
		i.setAttribute("fileDesc", getValueBetween(infopackage, "===2===", "===3==="));
66
		i.setAttribute("iisWf", getValueBetween(infopackage, "===3===", "===4==="));
67
		i.setAttribute("baseUrl", getBaseUrl(getValueBetween(infopackage, "===4===", "===5===")));
68
		i.setAttribute("accessProtocol", getAccessProtocol(getValueBetween(infopackage, "===5===", "===6==="), otherParams));
69
		i.setAttribute("formats", ensureMinOccurs(formats, "<FORMAT/>"));
70
		i.setAttribute("sets", ensureMinOccurs(sets, "<SET/>"));
71
		if (!otherParams.isEmpty()) {
72
			i.setAttribute("otherParams", otherParams);
73
		}
74

  
75
		final String iface = i.toString();
76
		return iface;
77
	}
78

  
79
	;
80

  
81
	private static String getValueBetween(final String s, final String pre, final String post) {
82
		if ((pre == null) && (post == null)) {
83
			return s;
84
		} else if (pre == null) {
85
			return StringUtils.substringBefore(s, post);
86
		} else if (post == null) {
87
			return StringUtils.substringAfter(s, pre);
88
		} else {
89
			return StringUtils.substringBetween(s, pre, post);
90
		}
91
	}
92

  
93
	private static List<String> ensureMinOccurs(final List<String> list, final String empty) {
94
		if (list.isEmpty()) {
95
			list.add(empty);
96
		}
97
		return list;
98
	}
99

  
100
	private static Iterable<String> parseParams(final String s) {
101
		return Splitter.on("***").omitEmptyStrings().trimResults().split(s);
102
	}
103

  
104
	private static String getBaseUrl(final String url) {
105
		final StringTemplate b = getTemplate(paramTemplate);
106
		b.setAttribute("param", "BASE_URL");
107
		b.setAttribute("value", url);
108
		return b.toString();
109
	}
110

  
111
	private static String getAccessProtocol(final String protocol, final Map<String, String> otherParams) {
112
		final StringTemplate a = getTemplate(paramTemplate);
113
		a.setAttribute("param", "ACCESS_PROTOCOL");
114
		a.setAttribute("value", protocol);
115
		if ((otherParams != null) && !otherParams.isEmpty()) {
116
			a.setAttribute("attrs", otherParams);
117
		}
118
		return a.toString();
119
	}
120

  
121
	private static StringTemplate getTemplate(final Resource res) {
122
		final StringWriter body = new StringWriter();
123
		try {
124
			IOUtils.copy(res.getInputStream(), body);
125
			return new StringTemplate(body.toString());
126
		} catch (IOException e) {
127
			log.error("unable to get template", e);
128
			throw new RuntimeException(e);
129
		}
130
	}
131

  
132
	private enum Params {
133
		FORMAT, SET
134
	}
135

  
136
}
modules/dnet-wds/trunk/src/main/java/eu/dnetlib/msro/workflows/nodes/dedup/utils/DedupConfigurationOrchestration.java
1
package eu.dnetlib.msro.workflows.nodes.dedup.utils;
2

  
3
import java.util.Queue;
4

  
5
import com.google.gson.Gson;
6
import com.google.gson.GsonBuilder;
7
import eu.dnetlib.pace.config.DedupConfig;
8

  
9
/**
10
 * The Class DedupConfigurationOrchestration.
11
 */
12
public class DedupConfigurationOrchestration {
13

  
14
	/**
15
	 * The entity.
16
	 */
17
	private Entity entity;
18

  
19
	/**
20
	 * The action set id.
21
	 */
22
	private String actionSetId;
23

  
24
	/**
25
	 * The configurations.
26
	 */
27
	private Queue<DedupConfig> configurations;
28

  
29
	public DedupConfigurationOrchestration() {
30
	}
31

  
32
	/**
33
	 * Instantiates a new dedup configuration orchestration.
34
	 *
35
	 * @param entity         the entity
36
	 * @param actionSetId    the action set id
37
	 * @param configurations the configurations
38
	 */
39
	public DedupConfigurationOrchestration(final Entity entity, final String actionSetId, final Queue<DedupConfig> configurations) {
40
		super();
41
		this.setEntity(entity);
42
		this.setActionSetId(actionSetId);
43
		this.setConfigurations(configurations);
44
	}
45

  
46
	/**
47
	 * From json.
48
	 *
49
	 * @param json the json
50
	 * @return the dedup configuration orchestration
51
	 */
52
	public static DedupConfigurationOrchestration fromJSON(final String json) {
53
		return new Gson().fromJson(json, DedupConfigurationOrchestration.class);
54
	}
55

  
56
	/**
57
	 * Gets the entity.
58
	 *
59
	 * @return the entity
60
	 */
61
	public Entity getEntity() {
62
		return entity;
63
	}
64

  
65
	public void setEntity(final Entity entity) {
66
		this.entity = entity;
67
	}
68

  
69
	/**
70
	 * Gets the action set id.
71
	 *
72
	 * @return the action set id
73
	 */
74
	public String getActionSetId() {
75
		return actionSetId;
76
	}
77

  
78
	public void setActionSetId(final String actionSetId) {
79
		this.actionSetId = actionSetId;
80
	}
81

  
82
	/**
83
	 * Gets the configurations.
84
	 *
85
	 * @return the configurations
86
	 */
87
	public Queue<DedupConfig> getConfigurations() {
88
		return configurations;
89
	}
90

  
91
	public void setConfigurations(final Queue<DedupConfig> configurations) {
92
		this.configurations = configurations;
93
	}
94

  
95
	/*
96
	 * (non-Javadoc)
97
	 * 
98
	 * @see java.lang.Object#toString()
99
	 */
100
	@Override
101
	public String toString() {
102
		return new GsonBuilder().setPrettyPrinting().create().toJson(this);
103
	}
104

  
105
}
modules/dnet-wds/trunk/src/main/java/eu/dnetlib/msro/workflows/nodes/hostedby/PatchHostedBy.java
1
package eu.dnetlib.msro.workflows.nodes.hostedby;
2

  
3
import java.io.StringReader;
4
import java.util.Map;
5
import java.util.function.UnaryOperator;
6

  
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.dom4j.Document;
10
import org.dom4j.Element;
11
import org.dom4j.Node;
12
import org.dom4j.io.SAXReader;
13

  
14
/**
15
 * The Class PatchHostedBy.
16
 */
17
public class PatchHostedBy implements UnaryOperator<String> {
18

  
19
	/**
20
	 * The Constant log.
21
	 */
22
	private static final Log log = LogFactory.getLog(PatchHostedBy.class);
23
	/**
24
	 * The set spec hosted by map.
25
	 */
26
	private final Map<String, HostedByEntry> setSpecHostedByMap;
27
	/**
28
	 * The counters.
29
	 */
30
	private final HostedByCounters counters;
31
	/**
32
	 * The xpath.
33
	 */
34
	private final String xpath;
35
	/**
36
	 * The reader.
37
	 */
38
	private final SAXReader reader = new SAXReader();
39

  
40
	/**
41
	 * Instantiates a new patch hosted by.
42
	 *
43
	 * @param setSpecHostedByMap the set spec hosted by map
44
	 * @param xpath              the xpath
45
	 * @param counters           the counters
46
	 */
47
	public PatchHostedBy(final Map<String, HostedByEntry> setSpecHostedByMap, final String xpath, final HostedByCounters counters) {
48
		this.setSpecHostedByMap = setSpecHostedByMap;
49
		this.xpath = xpath;
50
		this.counters = counters;
51
		if (log.isDebugEnabled()) {
52
			log.debug("****************************************");
53
			log.debug("SetSpec/hostedBy map:");
54
			for (final Map.Entry<String, HostedByEntry> e : setSpecHostedByMap.entrySet()) {
55
				log.debug("   " + e.getKey() + " -> " + e.getValue());
56
			}
57
			log.debug("****************************************");
58
		}
59
	}
60

  
61
	/**
62
	 * Evaluate.
63
	 *
64
	 * @param record the record
65
	 * @return the string
66
	 */
67
	@Override
68
	public String apply(final String record) {
69
		try {
70
			final Document doc = this.reader.read(new StringReader(record));
71
			final Element node = (Element) doc.selectSingleNode("//*[local-name()='hostedBy']");
72
			if (node != null) {
73
				final HostedByEntry ds = findHostedBy(doc);
74
				if (ds != null) {
75
					node.addAttribute("id", ds.getId());
76
					node.addAttribute("name", ds.getName());
77
					this.counters.increaseCounter(ds.getId());
78
				}
79
				return doc.asXML();
80
			} else if (log.isDebugEnabled()) {
81
				log.debug(" -- Missing hostedBy --");
82
			}
83
		} catch (final Throwable e) {
84
			log.error("Error adding hosted by to " + record);
85
		}
86
		return record;
87
	}
88

  
89
	/**
90
	 * Find hosted by.
91
	 *
92
	 * @param doc the doc
93
	 * @return the hosted by entry
94
	 */
95
	private HostedByEntry findHostedBy(final Document doc) {
96
		for (final Object o : doc.selectNodes(this.xpath)) {
97
			final String set = ((Node) o).getText().trim();
98
			if (this.setSpecHostedByMap.containsKey(set)) {
99
				if (log.isDebugEnabled()) {
100
					log.debug(set + " -> " + this.setSpecHostedByMap.get(set));
101
				}
102
				return this.setSpecHostedByMap.get(set);
103
			} else if (log.isDebugEnabled()) {
104
				log.debug(set + " -> UNKNOWN REPO");
105
			}
106
		}
107
		return null;
108
	}
109

  
110
}
modules/dnet-wds/trunk/src/main/java/eu/dnetlib/msro/workflows/nodes/dedup/DedupDuplicateScanJobNode.java
1
package eu.dnetlib.msro.workflows.nodes.dedup;
2

  
3
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
4
import eu.dnetlib.msro.workflows.graph.Arc;
5
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
6
import eu.dnetlib.msro.workflows.nodes.dedup.utils.DedupConfigurationOrchestration;
7
import eu.dnetlib.msro.workflows.procs.Token;
8
import org.apache.commons.collections.CollectionUtils;
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11

  
12
public class DedupDuplicateScanJobNode extends DedupConfigurationAwareJobNode {
13

  
14
	private static final Log log = LogFactory.getLog(DedupDuplicateScanJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
15

  
16
	@Override
17
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Token token) {
18
		return new DedupBlackboardWorkflowJobListener(token);
19
	}
20

  
21
	private class DedupBlackboardWorkflowJobListener extends BlackboardWorkflowJobListener {
22

  
23
		public DedupBlackboardWorkflowJobListener(final Token token) {
24
			super(token);
25
		}
26

  
27
		@Override
28
		protected void onDone(final BlackboardJob job) {
29

  
30
			final DedupConfigurationOrchestration confs = getDedupConfigurationOrchestration();
31

  
32
			confs.getConfigurations().poll();
33

  
34
			log.info("checking dedup configs queue, size: " + confs.getConfigurations().size());
35

  
36
			if (CollectionUtils.isEmpty(confs.getConfigurations())) {
37
				log.info("dedup similarity scan done");
38

  
39
				getToken().release("done");
40
			} else {
41
				log.debug("remaining confs: " + confs);
42

  
43
				getToken().release(Arc.DEFAULT_ARC);
44
			}
45
		}
46
	}
47

  
48
}
modules/dnet-wds/trunk/src/main/java/eu/dnetlib/msro/workflows/nodes/dedup/BuildSimilarityMeshJobNode.java
1
package eu.dnetlib.msro.workflows.nodes.dedup;
2

  
3
import java.io.StringReader;
4
import java.util.Iterator;
5
import java.util.List;
6
import java.util.Queue;
7

  
8
import com.google.common.collect.Lists;
9
import com.google.common.collect.Queues;
10
import eu.dnetlib.data.proto.TypeProtos.Type;
11
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
12
import eu.dnetlib.enabling.resultset.factory.ResultSetFactory;
13
import eu.dnetlib.msro.workflows.graph.Arc;
14
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
15
import eu.dnetlib.msro.workflows.nodes.utils.Similarity;
16
import eu.dnetlib.msro.workflows.nodes.utils.SimilarityMeshBuilder;
17
import eu.dnetlib.msro.workflows.procs.Env;
18
import eu.dnetlib.rmi.common.ResultSet;
19
import org.antlr.stringtemplate.StringTemplate;
20
import org.apache.commons.logging.Log;
21
import org.apache.commons.logging.LogFactory;
22
import org.dom4j.Document;
23
import org.dom4j.DocumentException;
24
import org.dom4j.Node;
25
import org.dom4j.io.SAXReader;
26
import org.springframework.beans.factory.annotation.Autowired;
27
import org.springframework.beans.factory.annotation.Required;
28

  
29
public class BuildSimilarityMeshJobNode extends AsyncJobNode {
30

  
31
	private static final Log log = LogFactory.getLog(BuildSimilarityMeshJobNode.class);
32

  
33
	/**
34
	 * The result set client factory.
35
	 */
36
	@Autowired
37
	private ResultSetClient resultSetClient;
38

  
39
	@Autowired
40
	private ResultSetFactory resultSetFactory;
41

  
42
	private StringTemplate similarity;
43

  
44
	private String inputEprParam;
45

  
46
	private String outputEprParam;
47

  
48
	@Override
49
	protected String execute(final Env env) throws Exception {
50

  
51
		final ResultSet<?> inputRs = env.getAttribute(getInputEprParam(), ResultSet.class);
52
		final Iterator<String> it = resultSetClient.iter(inputRs, String.class).iterator();
53

  
54
		final Queue<Object> queue = Queues.newLinkedBlockingQueue();
55
		final SAXReader reader = new SAXReader();
56

  
57
		if (it.hasNext()) {
58
			populateQueue(queue, reader, it.next());
59
		}
60

  
61
		final ResultSet<String> rsOut = resultSetFactory.createResultSet(() -> {
62
			return new Iterator<String>() {
63

  
64
				@Override
65
				public boolean hasNext() {
66
					synchronized (queue) {
67
						return !queue.isEmpty();
68
					}
69
				}
70

  
71
				@Override
72
				public String next() {
73
					synchronized (queue) {
74
						final Object o = queue.poll();
75
						while (queue.isEmpty() && it.hasNext()) {
76
							populateQueue(queue, reader, it.next());
77
						}
78
						return buildSimilarity((Similarity) o);
79
					}
80
				}
81

  
82
				@Override
83
				public void remove() {
84
					throw new UnsupportedOperationException();
85
				}
86
			};
87
		});
88

  
89
		env.setAttribute(getOutputEprParam(), rsOut);
90

  
91
		return Arc.DEFAULT_ARC;
92
	}
93

  
94
	private void populateQueue(final Queue<Object> q, final SAXReader r, final String xml) {
95
		try {
96
			final Document d = r.read(new StringReader(xml));
97
			final String groupid = d.valueOf("//FIELD[@name='id']");
98
			final List<?> items = d.selectNodes("//FIELD[@name='group']/ITEM");
99
			final String entitytype = d.valueOf("//FIELD[@name='entitytype']");
100
			final List<String> group = Lists.newArrayList();
101
			for (final Object id : items) {
102
				group.add(((Node) id).getText());
103
			}
104
			// compute the full mesh
105
			final Type type = Type.valueOf(entitytype);
106

  
107
			final List<Similarity> mesh = SimilarityMeshBuilder.build(type, group);
108
			// total += mesh.size();
109
			if (log.isDebugEnabled()) {
110
				log.debug(String.format("built mesh for group '%s', size %d", groupid, mesh.size()));
111
			}
112
			for (final Similarity s : mesh) {
113
				if (log.isDebugEnabled()) {
114
					log.debug(String.format("adding to queue: %s", s.toString()));
115
				}
116
				q.add(s);
117
			}
118
		} catch (final DocumentException e) {
119
			log.error("invalid document: " + xml);
120
		}
121
	}
122

  
123
	private String buildSimilarity(final Similarity s) {
124
		final StringTemplate template = new StringTemplate(getSimilarity().getTemplate());
125

  
126
		template.setAttribute("source", s.getPair().getKey());
127
		template.setAttribute("target", s.getPair().getValue());
128
		template.setAttribute("type", s.getType().toString());
129

  
130
		final String res = template.toString();
131
		return res;
132
	}
133

  
134
	public String getInputEprParam() {
135
		return inputEprParam;
136
	}
137

  
138
	public void setInputEprParam(final String inputEprParam) {
139
		this.inputEprParam = inputEprParam;
140
	}
141

  
142
	public String getOutputEprParam() {
143
		return outputEprParam;
144
	}
145

  
146
	public void setOutputEprParam(final String outputEprParam) {
147
		this.outputEprParam = outputEprParam;
148
	}
149

  
150
	public StringTemplate getSimilarity() {
151
		return similarity;
152
	}
153

  
154
	@Required
155
	public void setSimilarity(final StringTemplate similarity) {
156
		this.similarity = similarity;
157
	}
158

  
159
}
modules/dnet-wds/trunk/src/main/java/eu/dnetlib/msro/workflows/nodes/hostedby/PatchHostedByJobNode.java
1
package eu.dnetlib.msro.workflows.nodes.hostedby;
2

  
3
import java.io.StringReader;
4
import java.util.Map;
5
import java.util.function.UnaryOperator;
6

  
7
import com.google.common.collect.Maps;
8
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
9
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
10
import eu.dnetlib.enabling.resultset.factory.ResultSetFactory;
11
import eu.dnetlib.msro.workflows.graph.Arc;
12
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
13
import eu.dnetlib.msro.workflows.procs.Env;
14
import eu.dnetlib.rmi.common.ResultSet;
15
import eu.dnetlib.rmi.data.DatabaseException;
16
import eu.dnetlib.rmi.data.DatabaseService;
17
import org.apache.commons.lang3.StringUtils;
18
import org.apache.commons.logging.Log;
19
import org.apache.commons.logging.LogFactory;
20
import org.dom4j.Document;
21
import org.dom4j.DocumentException;
22
import org.dom4j.io.SAXReader;
23
import org.springframework.beans.factory.annotation.Autowired;
24
import org.springframework.beans.factory.annotation.Value;
25

  
26
public class PatchHostedByJobNode extends SimpleJobNode {
27

  
28
	private static final Log log = LogFactory.getLog(PatchHostedByJobNode.class);
29
	private String inputEprParam;
30
	private String outputEprParam;
31
	@Value("${dnet.openaire.db.name}")
32
	private String dbName;
33
	private String countersParam;
34
	private String hostedbyMapTable;
35
	private String xpathEntry;
36
	private String overrideDataSourceId;
37
	@Autowired
38
	private UniqueServiceLocator serviceLocator;
39
	@Autowired
40
	private ResultSetFactory resultSetFactory;
41
	@Autowired
42
	private ResultSetClient resultSetClient;
43

  
44
	/**
45
	 * {@inheritDoc}
46
	 */
47
	@Override
48
	protected String execute(final Env env) throws Exception {
49
		final ResultSet<?> inputEpr = env.getAttribute(this.inputEprParam, ResultSet.class);
50
		final HostedByCounters counters = new HostedByCounters();
51
		String datasourceId;
52
		if (StringUtils.isEmpty(this.overrideDataSourceId)) {
53
			datasourceId = env.getAttribute("parentDatasourceId", String.class);
54
		} else {
55
			datasourceId = getOverrideDataSourceId();
56
		}
57

  
58
		final UnaryOperator<String> mapper = new PatchHostedBy(loadHostedByMap(datasourceId), getXpathEntry(), counters);
59
		final ResultSet<String> epr = this.resultSetFactory.map(inputEpr, String.class, mapper);
60

  
61
		env.setAttribute(this.outputEprParam, epr);
62
		env.setAttribute(this.countersParam, counters);
63

  
64
		return Arc.DEFAULT_ARC;
65
	}
66

  
67
	private Map<String, HostedByEntry> loadHostedByMap(final String datasourceId) throws DocumentException, DatabaseException {
68
		final Map<String, HostedByEntry> map = Maps.newHashMap();
69

  
70
		final String sql = "SELECT d.id, d.officialname, p.entry from %s p JOIN datasources d ON (p.datasourceid = d.id) WHERE p.oa_source_id= '%s'";
71

  
72
		final ResultSet<String> epr = this.serviceLocator.getService(DatabaseService.class).searchSQL(getDbName(),
73
				String.format(sql, getHostedbyMapTable(), datasourceId));
74

  
75
		final SAXReader reader = new SAXReader();
76
		for (final String s : this.resultSetClient.iter(epr, String.class)) {
77
			final Document doc = reader.read(new StringReader(s));
78
			final String entry = doc.valueOf("//FIELD[@name='entry']");
79
			final String dsId = doc.valueOf("//FIELD[@name='id']");
80
			final String dsName = doc.valueOf("//FIELD[@name='officialname']");
81
			map.put(entry, new HostedByEntry(dsId, dsName));
82
		}
83

  
84
		log.info(String.format("built hostedByMap from dsId '%s', size: '%s'", datasourceId, map.size()));
85

  
86
		return map;
87
	}
88

  
89
	/**
90
	 * Getter for property 'inputEprParam'.
91
	 *
92
	 * @return Value for property 'inputEprParam'.
93
	 */
94
	public String getInputEprParam() {
95
		return this.inputEprParam;
96
	}
97

  
98
	/**
99
	 * Setter for property 'inputEprParam'.
100
	 *
101
	 * @param inputEprParam Value to set for property 'inputEprParam'.
102
	 */
103
	public void setInputEprParam(final String inputEprParam) {
104
		this.inputEprParam = inputEprParam;
105
	}
106

  
107
	/**
108
	 * Getter for property 'outputEprParam'.
109
	 *
110
	 * @return Value for property 'outputEprParam'.
111
	 */
112
	public String getOutputEprParam() {
113
		return this.outputEprParam;
114
	}
115

  
116
	/**
117
	 * Setter for property 'outputEprParam'.
118
	 *
119
	 * @param outputEprParam Value to set for property 'outputEprParam'.
120
	 */
121
	public void setOutputEprParam(final String outputEprParam) {
122
		this.outputEprParam = outputEprParam;
123
	}
124

  
125
	/**
126
	 * Getter for property 'dbName'.
127
	 *
128
	 * @return Value for property 'dbName'.
129
	 */
130
	public String getDbName() {
131
		return this.dbName;
132
	}
133

  
134
	/**
135
	 * Setter for property 'dbName'.
136
	 *
137
	 * @param dbName Value to set for property 'dbName'.
138
	 */
139
	public void setDbName(final String dbName) {
140
		this.dbName = dbName;
141
	}
142

  
143
	/**
144
	 * Getter for property 'countersParam'.
145
	 *
146
	 * @return Value for property 'countersParam'.
147
	 */
148
	public String getCountersParam() {
149
		return this.countersParam;
150
	}
151

  
152
	/**
153
	 * Setter for property 'countersParam'.
154
	 *
155
	 * @param countersParam Value to set for property 'countersParam'.
156
	 */
157
	public void setCountersParam(final String countersParam) {
158
		this.countersParam = countersParam;
159
	}
160

  
161
	/**
162
	 * @return the hostedbyMapTable
163
	 */
164
	public String getHostedbyMapTable() {
165
		return this.hostedbyMapTable;
166
	}
167

  
168
	/**
169
	 * @param hostedbyMapTable the hostedbyMapTable to set
170
	 */
171
	public void setHostedbyMapTable(final String hostedbyMapTable) {
172
		this.hostedbyMapTable = hostedbyMapTable;
173
	}
174

  
175
	public String getXpathEntry() {
176
		return this.xpathEntry;
177
	}
178

  
179
	public void setXpathEntry(final String xpathEntry) {
180
		this.xpathEntry = xpathEntry;
181
	}
182

  
183
	/**
184
	 * @return the overrideDataSourceId
185
	 */
186
	public String getOverrideDataSourceId() {
187
		return this.overrideDataSourceId;
188
	}
189

  
190
	/**
191
	 * @param overrideDataSourceId the overrideDataSourceId to set
192
	 */
193
	public void setOverrideDataSourceId(final String overrideDataSourceId) {
194
		this.overrideDataSourceId = overrideDataSourceId;
195
	}
196
}
modules/dnet-wds/trunk/src/main/java/eu/dnetlib/msro/workflows/nodes/hostedby/UpsertHostedByApisJobNode.java
1
package eu.dnetlib.msro.workflows.nodes.hostedby;
2

  
3
import java.util.Map.Entry;
4
import javax.annotation.Resource;
5

  
6
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
7
import eu.dnetlib.miscutils.datetime.DateUtils;
8
import eu.dnetlib.msro.workflows.graph.Arc;
9
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
10
import eu.dnetlib.msro.workflows.procs.Env;
11
import eu.dnetlib.rmi.datasource.DatasourceDesc;
12
import eu.dnetlib.rmi.datasource.DatasourceManagerService;
13
import eu.dnetlib.rmi.datasource.DatasourceManagerServiceException;
14
import eu.dnetlib.rmi.datasource.IfaceDesc;
15
import org.apache.commons.logging.Log;
16
import org.apache.commons.logging.LogFactory;
17

  
18
public class UpsertHostedByApisJobNode extends SimpleJobNode {
19

  
20
	public static final String extraFieldsForTotal = "last_aggregation_total";
21
	public static final String extraFieldForDate = "last_aggregation_date";
22
	public static final String extraFieldForMdId = "last_aggregation_mdId";
23
	private static final Log log = LogFactory.getLog(UpsertHostedByApisJobNode.class);
24
	private static final String HOSTED_BY_COMPLIANCE = "hostedBy";
25
	@Resource
26
	private UniqueServiceLocator serviceLocator;
27
	private String countersParam;
28
	private String mdId;
29

  
30
	@Override
31
	protected String execute(final Env env) throws Exception {
32
		final HostedByCounters counters = env.getAttribute(this.countersParam, HostedByCounters.class);
33
		final DatasourceManagerService dsManager = this.serviceLocator.getService(DatasourceManagerService.class);
34
		final String date = DateUtils.now_ISO8601();
35
		final String namespacePrefix = env.getAttribute("namespacePrefix", String.class);
36

  
37
		log.info(counters);
38

  
39
		for (final Entry<String, Integer> e : counters.getCounters().entrySet()) {
40
			updateHostedByApi(dsManager, e.getKey(), namespacePrefix, date, e.getValue());
41
		}
42

  
43
		return Arc.DEFAULT_ARC;
44
	}
45

  
46
	private void updateHostedByApi(final DatasourceManagerService dsManager,
47
			final String dsId,
48
			final String namepsacePrefix,
49
			final String date,
50
			final int size) {
51
		log.info("Verifying hostedBy api in ds: " + dsId);
52
		try {
53
			final DatasourceDesc ds = dsManager.getDatasource(dsId);
54

  
55
			for (final IfaceDesc iface : ds.getInterfaces()) {
56
				if (HOSTED_BY_COMPLIANCE.equals(iface.getCompliance())) {
57

  
58
					// dsManager.updateExtraField(dsId, iface.getId(), namepsacePrefix + ":" + extraFieldForDate, date, false);
59
					// dsManager.updateExtraField(dsId, iface.getId(), namepsacePrefix + ":" + extraFieldsForTotal, Integer.toString(size),
60
					// false);
61
					// dsManager.updateExtraField(dsId, iface.getId(), namepsacePrefix + ":" + extraFieldForMdId, mdId, false);
62
					return;
63
				}
64
			}
65

  
66
			final IfaceDesc iface = new IfaceDesc();
67
			iface.setId("api_________::" + dsId + "::hostedBy");
68
			iface.setTypology(ds.getDatasourceClass());
69
			iface.setCompliance(HOSTED_BY_COMPLIANCE);
70
			iface.setAccessProtocol("UNKNOWN");
71
			iface.setContentDescription("metadata");
72
			iface.setBaseUrl("");
73
			iface.setActive(false);
74
			iface.setRemovable(true);
75
			iface.getExtraFields().put(namepsacePrefix + ":" + extraFieldForDate, date);
76
			iface.getExtraFields().put(namepsacePrefix + ":" + extraFieldsForTotal, Integer.toString(size));
77
			iface.getExtraFields().put(namepsacePrefix + ":" + extraFieldForMdId, this.mdId);
78
			dsManager.addInterface(dsId, iface);
79
		} catch (final DatasourceManagerServiceException e) {
80
			log.warn("Error setting hostedBy api of ds: " + dsId, e);
81
		}
82
	}
83

  
84
	public String getMdId() {
85
		return this.mdId;
86
	}
87

  
88
	public void setMdId(final String mdId) {
89
		this.mdId = mdId;
90
	}
91

  
92
	public String getCountersParam() {
93
		return this.countersParam;
94
	}
95

  
96
	public void setCountersParam(final String countersParam) {
97
		this.countersParam = countersParam;
98
	}
99

  
100
}
modules/dnet-wds/trunk/src/main/java/eu/dnetlib/msro/workflows/nodes/dedup/MinDistSearchHadoopJobNode.java
1
package eu.dnetlib.msro.workflows.nodes.dedup;
2

  
3
import java.nio.file.FileSystems;
4
import java.nio.file.Path;
5

  
6
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
7
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
8
import eu.dnetlib.msro.workflows.graph.Arc;
9
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
10
import eu.dnetlib.msro.workflows.procs.Env;
11
import eu.dnetlib.msro.workflows.procs.Token;
12
import eu.dnetlib.rmi.data.hadoop.HadoopService;
13
import org.apache.commons.lang3.StringUtils;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.springframework.beans.factory.annotation.Autowired;
17

  
18
/**
19
 * Created by claudio on 14/10/15.
20
 */
21
public class MinDistSearchHadoopJobNode extends DedupConfigurationLoaderJobNode {
22

  
23
	private static final Log log = LogFactory.getLog(MinDistSearchHadoopJobNode.class);
24

  
25
	private final static String StatusParam = "MinDistSearchHadoopJobNode.status";
26
	private final static String DepthParam = "mindist_recursion_depth";
27
	private final static String UpdateCounterParam = "UpdateCounter.UPDATED";
28
	private final static String DebugParam = "mindist_DEBUG";
29

  
30
	@Autowired
31
	private UniqueServiceLocator serviceLocator;
32
	private boolean debug = false;
33
	private String outPathParam;
34

  
35
	private String workDir;
36

  
37
	@Override
38
	protected void prepareJob(final BlackboardJob job, final Token token) throws Exception {
39

  
40
		String depthString = token.getEnv().getAttribute(DepthParam, String.class);
41
		log.debug(String.format("found depthParam: '%s'", depthString));
42
		if (StringUtils.isBlank(depthString)) {
43
			depthString = "0";
44
		}
45

  
46
		int depth = Integer.valueOf(depthString);
47
		final String outputPath = getPath(getWorkDir(), depth);
48

  
49
		final HadoopService hadoopService = serviceLocator.getService(HadoopService.class);
50
		switch (getStatusFromEnv(token.getEnv())) {
51

  
52
		case DATALOAD:
53

  
54
			setHadoopJob("dedupSimilarity2GraphJob");
55

  
56
			job.getParameters().put("mapred.output.dir", getPath(getWorkDir(), depth) + "/out");
57

  
58
			hadoopService.createHdfsDirectory(getCluster(), outputPath, true);
59

  
60
			break;
61
		case DEPTH_N:
62

  
63
			setHadoopJob("dedupMinDistGraphJob");
64

  
65
			final String newOutputPath = getPath(getWorkDir(), depth + 1);
66
			hadoopService.createHdfsDirectory(getCluster(), newOutputPath, true);
67

  
68
			job.getParameters().put(DepthParam, String.valueOf(depth));
69
			job.getParameters().put(DebugParam, String.valueOf(isDebug()));
70

  
71
			job.getParameters().put("mapred.input.dir", outputPath + "/out");
72
			job.getParameters().put("mapred.output.dir", newOutputPath + "/out");
73

  
74
			if (log.isDebugEnabled()) {
75
				log.debug(String.format("input job parameters: %s", job.getParameters()));
76
			}
77

  
78
			token.getEnv().setAttribute(DepthParam, String.valueOf(depth + 1));
79
			token.getEnv().setAttribute(getOutPathParam(), newOutputPath + "/out");
80

  
81
			break;
82
		}
83

  
84
		super.prepareJob(job, token);
85
	}
86

  
87
	private String getPath(final String basePath, final int depth) {
88

  
89
		log.info("got basePath: " + basePath);
90

  
91
		Path fsPath = FileSystems.getDefault().getPath(basePath, "depth_" + depth);
92
		final String path = fsPath.toAbsolutePath().toString();
93

  
94
		log.info("built outputPath: " + path);
95

  
96
		return path;
97
	}
98

  
99
	private STATUS getStatusFromEnv(final Env env) {
100
		if (StringUtils.isBlank(env.getAttribute(StatusParam, String.class))) {
101
			return STATUS.DATALOAD;
102
		}
103
		STATUS current = STATUS.DATALOAD;
104
		try {
105
			current = STATUS.valueOf(env.getAttribute(StatusParam, String.class));
106
			log.debug("found status: " + current.toString());
107
		} catch (IllegalArgumentException e) {}
108
		return current;
109
	}
110

  
111
	@Override
112
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Token token) {
113
		return new BlackboardWorkflowJobListener(token) {
114

  
115
			@Override
116
			protected void onDone(final BlackboardJob job) {
117

  
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff