Project

General

Profile

« Previous | Next » 

Revision 48027

Node for writing context updates on hbase and refactoring

View differences:

ApplyClaimRelsJobNode.java
5 5
import com.googlecode.sarasvati.Arc;
6 6
import com.googlecode.sarasvati.NodeToken;
7 7
import eu.dnetlib.data.hadoop.rmi.HadoopService;
8
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
9 8
import eu.dnetlib.data.proto.FieldTypeProtos.DataInfo;
10 9
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier;
11 10
import eu.dnetlib.data.proto.KindProtos.Kind;
......
18 17
import eu.dnetlib.data.proto.ResultProjectProtos.ResultProject.Outcome;
19 18
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult;
20 19
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult.PublicationDataset;
21
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
22
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
23 20
import eu.dnetlib.msro.rmi.MSROException;
24
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
25
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
26
import eu.dnetlib.msro.workflows.util.ProgressProvider;
27 21
import eu.dnetlib.utils.ontologies.OntologyLoader;
28 22
import org.apache.commons.codec.binary.Base64;
29 23
import org.apache.commons.logging.Log;
30 24
import org.apache.commons.logging.LogFactory;
31 25
import org.apache.hadoop.util.StringUtils;
32
import org.springframework.beans.factory.annotation.Autowired;
33 26

  
34 27
/**
35 28
 * Created by alessia on 23/10/15.
36 29
 */
37
public class ApplyClaimRelsJobNode extends SimpleJobNode implements ProgressJobNode {
30
public class ApplyClaimRelsJobNode extends AbstractClaimsToHBASE {
38 31

  
39 32
	private static final Log log = LogFactory.getLog(ApplyClaimRelsJobNode.class);
40 33

  
41 34
	private final String SEPARATOR = "_";
42 35

  
43
	@Autowired
44
	private UniqueServiceLocator serviceLocator;
45 36

  
46
	@Autowired
47
	private ClaimDatabaseUtils claimDatabaseUtils;
48

  
49
	private String sql;
50
	private String countQuery;
51
	private int total = 0;
52
	private int processed = 0;
53

  
54
	private String clusterName;
55

  
56
	private String tableName;
57

  
58
//	private String fetchSqlAsText(final String path) throws IOException {
59
//		return IOUtils.toString(getClass().getResourceAsStream(path));
60
//	}
61

  
62 37
	@Override
63 38
	protected String execute(NodeToken token) throws Exception {
64 39
		//TODO: use claim.claim_date from the claim db
65 40
		long timestamp = System.currentTimeMillis();
66
		total = this.claimDatabaseUtils.count(countQuery);
67
		List<Claim> claimRels = this.claimDatabaseUtils.query(sql);
41
		setTotal(getClaimDatabaseUtils().count(getCountQuery()));
42
		List<Claim> claimRels = getClaimDatabaseUtils().query(getSql());
68 43
		int totalWrites = 0;
69 44
		int valid = 0;
70 45
		int discardedClaims = 0;
71 46

  
72
		HadoopService hadoopService = serviceLocator.getService(HadoopService.class);
47
		HadoopService hadoopService = getServiceLocator().getService(HadoopService.class);
73 48

  
74 49
		for (Claim claim : claimRels) {
75 50
			log.debug(claim);
76 51
			try {
77
				String sourceId = fullId(getOpenAIREType(claim.getSourceType()), claim.getSource());
78
				String targetId = fullId(getOpenAIREType(claim.getTargetType()), claim.getTarget());
52
				String sourceId = getFullId(getOpenAIREType(claim.getSourceType()), claim.getSource());
53
				String targetId = getFullId(getOpenAIREType(claim.getTargetType()), claim.getTarget());
79 54
				String value = getValue(sourceId, claim.getSemantics(), targetId, timestamp);
80

  
81
			/*
82
			public void addHBaseColumn(final String clusterName,
83
			final String tableName,
84
			final String rowKey,
85
			final String columnFamily,
86
			final String qualifier,
87
			final String value)
88
			 */
89
				hadoopService.addHBaseColumn(clusterName, tableName, sourceId, claim.getSemantics(), targetId, value);
90
				processed++;
55
				// adding relationship
56
				hadoopService.addHBaseColumn(getClusterName(), getTableName(), sourceId, claim.getSemantics(), targetId, value);
91 57
				totalWrites++;
92 58

  
93 59
				String inverseSemantics = OntologyLoader.fetchInverse(claim.getSemantics());
94 60
				String inverseValue = getValue(targetId, inverseSemantics, sourceId, timestamp);
95
				hadoopService.addHBaseColumn(clusterName, tableName, targetId, inverseSemantics, sourceId, inverseValue);
61
				//adding inverse relationship
62
				hadoopService.addHBaseColumn(getClusterName(), getTableName(), targetId, inverseSemantics, sourceId, inverseValue);
96 63
				totalWrites++;
97
			}catch(IllegalArgumentException e){
98
				log.error("Discarding claim "+claim+". Cause: "+e.getMessage());
64
				incrementProcessed();
65
			} catch (IllegalArgumentException e) {
66
				log.error("Discarding claim " + claim + ". Cause: " + e.getMessage());
99 67
				discardedClaims++;
100 68
			}
101 69
		}
102 70

  
103
		log.info("totalClaims: " + total);
104
		token.getEnv().setAttribute("claimSize", total);
105
		log.info("writeOps: " + totalWrites);
106
		token.getEnv().setAttribute("writeOps", totalWrites);
107
		log.info("validClaims: " + valid);
108
		token.getEnv().setAttribute("validClaims", valid);
109
		log.info("discardedClaims: " + discardedClaims);
110
		token.getEnv().setAttribute("discardedClaims", discardedClaims);
71
		log.info("totalClaimRels: " + getTotal());
72
		token.getEnv().setAttribute("claimRelsSize", getTotal());
73
		log.info("claim rels writeOps: " + totalWrites);
74
		token.getEnv().setAttribute("claimRelsWriteOps", totalWrites);
75
		log.info("validClaimRels: " + valid);
76
		token.getEnv().setAttribute("validClaimRels", valid);
77
		log.info("discardedClaimRels: " + discardedClaims);
78
		token.getEnv().setAttribute("discardedClaimRels", discardedClaims);
111 79

  
112 80
		return Arc.DEFAULT_ARC;
113 81
	}
114 82

  
115
	protected String getOpenAIREType(final String type){
116
		switch(type){
117
		case "publication":
118
		case "dataset":
119
			return "result";
120
		default:
121
			return type;
122
		}
123
	}
124

  
125

  
126
	public String getValue(final String sourceId, final String semantics, final String targetId, final long timestamp) throws MSROException{
83
	protected String getValue(final String sourceId, final String semantics, final String targetId, final long timestamp) throws MSROException {
127 84
		log.debug(StringUtils.format("%s -- %s -- %s", sourceId, semantics, targetId));
128 85
		String[] relInfo = semantics.split(SEPARATOR);
129
		if(relInfo.length < 3){
130
			throw new MSROException("Semantics "+semantics+" not supported: must be splittable in 3 by '_'");
86
		if (relInfo.length < 3) {
87
			throw new MSROException("Semantics " + semantics + " not supported: must be splittable in 3 by '_'");
131 88
		}
132 89
		Qualifier.Builder semanticsBuilder = Qualifier.newBuilder().setClassid(relInfo[2]).setClassname(relInfo[2]);
133 90

  
......
171 128
							))));
172 129
			break;
173 130
		default:
174
			throw new MSROException("Semantics "+relInfo[0]+" not supported");
131
			throw new MSROException("Semantics " + relInfo[0] + " not supported");
175 132
		}
176 133

  
177 134
		builder.setRel(relBuilder);
178 135
		return Base64.encodeBase64String(builder.build().toByteArray());
179 136
	}
180

  
181
	private String fullId(final String type, final String id) {
182
		final String fullId = AbstractDNetXsltFunctions.oafSimpleId(type, id);
183
		return OafRowKeyDecoder.decode(fullId).getKey();
184
	}
185

  
186
	public String getClusterName() {
187
		return clusterName;
188
	}
189

  
190
	public void setClusterName(final String clusterName) {
191
		this.clusterName = clusterName;
192
	}
193

  
194
	public String getTableName() {
195
		return tableName;
196
	}
197

  
198
	public void setTableName(final String tableName) {
199
		this.tableName = tableName;
200
	}
201

  
202
	public UniqueServiceLocator getServiceLocator() {
203
		return serviceLocator;
204
	}
205

  
206
	public void setServiceLocator(final UniqueServiceLocator serviceLocator) {
207
		this.serviceLocator = serviceLocator;
208
	}
209

  
210
	public ClaimDatabaseUtils getClaimDatabaseUtils() {
211
		return claimDatabaseUtils;
212
	}
213

  
214
	public void setClaimDatabaseUtils(final ClaimDatabaseUtils claimDatabaseUtils) {
215
		this.claimDatabaseUtils = claimDatabaseUtils;
216
	}
217

  
218
	public String getSql() {
219
		return sql;
220
	}
221

  
222
	public void setSql(final String sql) {
223
		this.sql = sql;
224
	}
225

  
226
	public String getCountQuery() {
227
		return countQuery;
228
	}
229

  
230
	public void setCountQuery(final String countQuery) {
231
		this.countQuery = countQuery;
232
	}
233

  
234
	@Override
235
	public ProgressProvider getProgressProvider() {
236
		return new ProgressProvider() {
237

  
238
			@Override
239
			public boolean isInaccurate() {
240
				return false;
241
			}
242

  
243
			@Override
244
			public int getTotalValue() {
245
				return total;
246
			}
247

  
248
			@Override
249
			public int getCurrentValue() {
250
				return processed;
251
			}
252
		};
253
	}
254 137
}

Also available in: Unified diff