Project

General

Profile

1
package eu.dnetlib.msro.workflows.dedup;
2

    
3
import java.util.Queue;
4

    
5
import org.apache.commons.lang.StringUtils;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.springframework.beans.factory.annotation.Autowired;
9

    
10
import com.google.common.base.Function;
11
import com.google.common.base.Splitter;
12
import com.google.common.collect.Iterables;
13
import com.google.common.collect.Lists;
14
import com.googlecode.sarasvati.Arc;
15
import com.googlecode.sarasvati.NodeToken;
16

    
17
import eu.dnetlib.msro.rmi.MSROException;
18
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestration;
19
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestrationLoader;
20
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
21

    
22
public class DedupCheckEntitySequenceJobNode extends AsyncJobNode {
23

    
24
	private static final Log log = LogFactory.getLog(DedupCheckEntitySequenceJobNode.class);
25

    
26
	@Autowired
27
	private DedupConfigurationOrchestrationLoader dedupOrchestrationLoader;
28

    
29
	private String dedupConfigSequenceParam;
30

    
31
	private String entitySequence;
32

    
33
	@Override
34
	protected String execute(final NodeToken token) throws Exception {
35

    
36
		if (StringUtils.isBlank(getEntitySequence())) throw new MSROException("missing entity sequence, e.g. a csv: organization,person,result");
37

    
38
		if (token.getFullEnv().hasAttribute(DedupGrouperJobNode.DEDUP_GROUPER_LOOPER)) {
39
			log.info("reset env variable: " + DedupGrouperJobNode.DEDUP_GROUPER_LOOPER + " to zero");
40
			token.getFullEnv().setAttribute(DedupGrouperJobNode.DEDUP_GROUPER_LOOPER, 0);
41
		}
42

    
43
		if (!token.getEnv().hasAttribute("entitySequence")) {
44

    
45
			log.info("parsing config sequence: " + getEntitySequence());
46

    
47
			token.getEnv().setAttribute("entitySequence", getEntitySequence());
48

    
49
			final Iterable<String> sequence = Splitter.on(",").omitEmptyStrings().split(getEntitySequence());
50
			final Queue<DedupConfigurationOrchestration> q =
51
					Lists.newLinkedList(Iterables.transform(sequence, new Function<String, DedupConfigurationOrchestration>() {
52

    
53
						@Override
54
						public DedupConfigurationOrchestration apply(final String entityName) {
55
							try {
56
								final DedupConfigurationOrchestration dco = Iterables.getFirst(dedupOrchestrationLoader.loadByEntityName(entityName), null);
57
								if (dco == null) throw new RuntimeException("unable to find DedupOrchestration profile for entity type: " + entityName);
58
								return dco;
59
							} catch (final Throwable e) {
60
								throw new RuntimeException("", e);
61
							}
62
						}
63
					}));
64

    
65
			log.info("built sequence of dedup orchestration profiles, size: " + q.size());
66
			final DedupConfigurationOrchestration dco = q.remove();
67
			log.info("closing mesh for entity: " + dco.getEntity().getName());
68
			setDedupConfParams(token, dco);
69
			token.getEnv().setTransientAttribute("entitySequenceQueue", q);
70

    
71
			return Arc.DEFAULT_ARC;
72
		}
73

    
74
		@SuppressWarnings("unchecked")
75
		final Queue<DedupConfigurationOrchestration> q = (Queue<DedupConfigurationOrchestration>) token.getEnv().getTransientAttribute("entitySequenceQueue");
76

    
77
		if (!q.isEmpty()) {
78
			log.info("remaining dedup orchestration profiles: " + q.size());
79
			final DedupConfigurationOrchestration dco = q.remove();
80
			log.info("closing mesh for entity: " + dco.getEntity().getName());
81

    
82
			setDedupConfParams(token, dco);
83
			return Arc.DEFAULT_ARC;
84
		}
85

    
86
		log.info("completed closing mesh for entities: " + getEntitySequence());
87
		return "done";
88

    
89
	}
90

    
91
	private void setDedupConfParams(final NodeToken token, final DedupConfigurationOrchestration dco) {
92
		token.getEnv().setAttribute("entityType", dco.getEntity().getName());
93
		token.getEnv().setAttribute("entityTypeId", dco.getEntity().getCode());
94
		token.getEnv().setAttribute(getDedupConfigSequenceParam(), dco.toString());
95
	}
96

    
97
	public String getEntitySequence() {
98
		return entitySequence;
99
	}
100

    
101
	public void setEntitySequence(final String entitySequence) {
102
		this.entitySequence = entitySequence;
103
	}
104

    
105
	public String getDedupConfigSequenceParam() {
106
		return dedupConfigSequenceParam;
107
	}
108

    
109
	public void setDedupConfigSequenceParam(final String dedupConfigSequenceParam) {
110
		this.dedupConfigSequenceParam = dedupConfigSequenceParam;
111
	}
112

    
113
}
(4-4/15)