Project

General

Profile

« Previous | Next » 

Revision 47716

Let's not fail for a non well-formed claim: we go on and keep track of it.

View differences:

modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/ApplyClaimRelsJobNode.java
64 64
		List<Claim> claimRels = this.claimDatabaseUtils.query(sql);
65 65
		int totalClaims = 0;
66 66
		int totalWrites = 0;
67
		int discardedClaims = 0;
67 68

  
68 69
		HadoopService hadoopService = serviceLocator.getService(HadoopService.class);
69 70

  
70 71
		for (Claim claim : claimRels) {
71 72
			log.debug(claim);
72
			totalClaims++;
73
			String sourceId = fullId(getOpenAIREType(claim.getSourceType()), claim.getSource());
74
			String targetId = fullId(getOpenAIREType(claim.getTargetType()), claim.getTarget());
75
			String value = getValue(sourceId, claim.getSemantics(), targetId,  timestamp);
73
			try {
74
				String sourceId = fullId(getOpenAIREType(claim.getSourceType()), claim.getSource());
75
				String targetId = fullId(getOpenAIREType(claim.getTargetType()), claim.getTarget());
76
				String value = getValue(sourceId, claim.getSemantics(), targetId, timestamp);
77

  
76 78
			/*
77 79
			public void addHBaseColumn(final String clusterName,
78 80
			final String tableName,
......
81 83
			final String qualifier,
82 84
			final String value)
83 85
			 */
84
			hadoopService.addHBaseColumn(clusterName, tableName, sourceId, claim.getSemantics(), targetId, value);
85
			totalWrites++;
86
				hadoopService.addHBaseColumn(clusterName, tableName, sourceId, claim.getSemantics(), targetId, value);
87
				totalClaims++;
88
				totalWrites++;
86 89

  
87
			String inverseSemantics = OntologyLoader.fetchInverse(claim.getSemantics());
88
			String inverseValue = getValue(targetId, inverseSemantics, sourceId,  timestamp);
89
			hadoopService.addHBaseColumn(clusterName, tableName, targetId, inverseSemantics, sourceId, inverseValue);
90
			totalWrites++;
91

  
90
				String inverseSemantics = OntologyLoader.fetchInverse(claim.getSemantics());
91
				String inverseValue = getValue(targetId, inverseSemantics, sourceId, timestamp);
92
				hadoopService.addHBaseColumn(clusterName, tableName, targetId, inverseSemantics, sourceId, inverseValue);
93
				totalWrites++;
94
			}catch(IllegalArgumentException e){
95
				log.error("Discarding claim "+claim+". Cause: "+e.getMessage());
96
				discardedClaims++;
97
			}
92 98
		}
93 99

  
94 100
		log.info("totalClaims: " + totalClaims);
95 101
		token.getEnv().setAttribute("claimSize", totalClaims);
96 102
		log.info("writeOps: " + totalWrites);
97 103
		token.getEnv().setAttribute("writeOps", totalWrites);
104
		log.info("discardedClaims: " + discardedClaims);
105
		token.getEnv().setAttribute("discardedClaims", discardedClaims);
98 106

  
99 107
		return Arc.DEFAULT_ARC;
100 108
	}

Also available in: Unified diff