Project

General

Profile

1 35869 claudio.at
package eu.dnetlib.msro.workflows.dedup;
2 35834 claudio.at
3
import org.apache.commons.logging.Log;
4
import org.apache.commons.logging.LogFactory;
5
6
import com.googlecode.sarasvati.Arc;
7
import com.googlecode.sarasvati.Engine;
8
import com.googlecode.sarasvati.NodeToken;
9
10
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
11
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
12
13 36240 claudio.at
public class DedupGrouperJobNode extends DedupConfigurationAwareJobNode {
14 35834 claudio.at
15
	// TODO factor out this constant, it should be a configuration parameter
16
	public static final int DEDUP_GROUPER_MAX_LOOPS = 10;
17
18
	public static final String DEDUP_GROUPER_LOOPER = "dedup.grouper.looper";
19
	public static final String DEDUP_GROUPER_CURR_WRITTEN_RELS = "dedup.grouper.written.rels";
20
	public static final String DEDUP_GROUPER_PREV_WRITTEN_RELS = "dedup.grouper.prev.written.rels";
21
22
	private static final Log log = LogFactory.getLog(DedupGrouperJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
23
24
	private class DedupBlackboardWorkflowJobListener extends BlackboardWorkflowJobListener {
25
26
		public DedupBlackboardWorkflowJobListener(final Engine engine, final NodeToken token) {
27
			super(engine, token);
28
		}
29
30
		@Override
31
		protected void onDone(final BlackboardJob job) {
32
33
			final int times = currentIteration(getToken());
34
			final String curr = job.getParameters().get(DEDUP_GROUPER_CURR_WRITTEN_RELS);
35
36
			if (times == 0) {
37
				getToken().getFullEnv().setAttribute(DEDUP_GROUPER_PREV_WRITTEN_RELS, -1);
38
			}
39
40
			if ((times >= DEDUP_GROUPER_MAX_LOOPS) || isStable(getToken(), curr)) {
41
				super.complete(job, "done");
42
			} else {
43
				log.info("incrementing dedup.grouper.looper to " + (times + 1));
44
				getToken().getFullEnv().setAttribute(DEDUP_GROUPER_LOOPER, times + 1);
45
				getToken().getFullEnv().setAttribute(DEDUP_GROUPER_PREV_WRITTEN_RELS, curr);
46
				super.complete(job, Arc.DEFAULT_ARC);
47
			}
48
		}
49
	}
50
51
	private int currentIteration(final NodeToken token) {
52
		try {
53
			final String sTimes = token.getFullEnv().getAttribute(DEDUP_GROUPER_LOOPER);
54
			log.info("read dedup.grouper.looper from fullEnv: '" + sTimes + "'");
55
			return Integer.parseInt(sTimes);
56
		} catch (final NumberFormatException e) {
57
			log.info("got empty dedup.grouper.looper, initializing to 0");
58
			return 0;
59
		}
60
	}
61
62
	private boolean isStable(final NodeToken token, final String sCurr) {
63
		final String sPrev = token.getFullEnv().getAttribute(DEDUP_GROUPER_PREV_WRITTEN_RELS);
64
65
		log.info("Comparing written rels, prev=" + sPrev + ", curr=" + sCurr);
66
		try {
67
			final boolean b = Integer.parseInt(sCurr) == Integer.parseInt(sPrev);
68
			if (b) {
69
				log.info("  --- The number of written rels is STABLE");
70
			}
71
			return b;
72
		} catch (final Exception e) {
73
			log.error("Invalid parsing of written rels counters - curr: " + sCurr + ", prev: " + sPrev);
74
			return false;
75
		}
76
	}
77
78
	@Override
79
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
80
		return new DedupBlackboardWorkflowJobListener(engine, token);
81
	}
82
83
}