Project

General

Profile

« Previous | Next » 

Revision 51302

[maven-release-plugin] copy for tag dnet-mapreduce-jobs-1.1.1-PROD

View differences:

modules/dnet-mapreduce-jobs/tags/dnet-mapreduce-jobs-1.1.1-PROD/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/enrich/EnrichmentMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.broker.enrich;
2

  
3
import java.io.IOException;
4
import java.util.Map;
5

  
6
import com.google.common.collect.Iterables;
7
import eu.dnetlib.data.mapreduce.util.DedupUtils;
8
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
9
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
10
import eu.dnetlib.data.proto.DedupProtos.Dedup.RelName;
11
import eu.dnetlib.data.proto.OafProtos.Oaf;
12
import eu.dnetlib.data.proto.TypeProtos.Type;
13
import org.apache.commons.collections.MapUtils;
14
import org.apache.hadoop.hbase.client.Result;
15
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
16
import org.apache.hadoop.hbase.util.Bytes;
17

  
18
/**
19
 * Created by claudio on 08/07/16.
20
 */
21
public class EnrichmentMapper extends AbstractEnrichmentMapper {
22

  
23
	@Override
24
	protected String counterGroup() {
25
		return "Broker Enrichment";
26
	}
27

  
28
	@Override
29
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
30

  
31
		final String type = OafRowKeyDecoder.decode(key.copyBytes()).getType().toString();
32

  
33
		final Map<byte[], byte[]> result = value.getFamilyMap(Bytes.toBytes(Type.result.name()));
34
		if (MapUtils.isEmpty(result)) {
35
			context.getCounter(counterGroup(), type + ": empty family map").increment(1);
36
			return;
37
		}
38

  
39
		final Oaf body = UpdateMerger.mergeBodyUpdates(context, result);
40

  
41
		if (body == null) {
42
			context.getCounter(counterGroup(), type + ": body null").increment(1);
43
			return;
44
		}
45

  
46
		final String mergedInCF = DedupUtils.getDedupCF_mergedIn(Type.result);
47
		final Map<byte[], byte[]> mergedIn = value.getFamilyMap(Bytes.toBytes(mergedInCF));
48

  
49
		final byte[] outKey = getEmitKey(context, key, mergedIn);
50

  
51
		emit(context, outKey, body.toByteArray(), type);
52
	}
53

  
54
	private byte[] getEmitKey(final Context context, final ImmutableBytesWritable key, final Map<byte[], byte[]> mergedIn) {
55
		if (MapUtils.isNotEmpty(mergedIn)) {
56
			context.getCounter(Type.result.name(), RelName.isMergedIn.name()).increment(1);
57
			return Iterables.getOnlyElement(mergedIn.keySet());
58
		} else {
59
			return key.copyBytes();
60
		}
61
	}
62

  
63
}
modules/dnet-mapreduce-jobs/tags/dnet-mapreduce-jobs-1.1.1-PROD/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/enrich/ProjectEnrichmentMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.broker.enrich;
2

  
3
import java.io.IOException;
4
import java.util.Arrays;
5
import java.util.Map;
6

  
7
import com.google.common.base.Function;
8
import com.google.common.base.Predicate;
9
import com.google.common.collect.Iterables;
10
import com.google.common.collect.Lists;
11
import eu.dnetlib.data.mapreduce.util.DedupUtils;
12
import eu.dnetlib.data.mapreduce.util.OafDecoder;
13
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
14
import eu.dnetlib.data.proto.OafProtos.Oaf;
15
import eu.dnetlib.data.proto.TypeProtos.Type;
16
import org.apache.commons.collections.MapUtils;
17
import org.apache.hadoop.hbase.client.Result;
18
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
19
import org.apache.hadoop.hbase.util.Bytes;
20

  
21
/**
22
 * Created by claudio on 08/07/16.
23
 */
24
public class ProjectEnrichmentMapper extends AbstractEnrichmentMapper {
25

  
26
	@Override
27
	protected String counterGroup() {
28
		return "Broker Enrichment projects";
29
	}
30

  
31
	@Override
32
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
33

  
34
		final Type type = getEntityType(key);
35

  
36
		final byte[] body = value.getValue(Bytes.toBytes(type.toString()), Bytes.toBytes("body"));
37

  
38
		if (body == null) {
39
			context.getCounter(counterGroup(), "missing metadata").increment(1);
40
			return;
41
		}
42

  
43
		switch (type) {
44
		case project:
45
			for (final byte[] resultId : listRelatedIds(value, "resultProject_outcome_produces")) {
46
				emit(context, resultId, body, "project");
47
			}
48
			break;
49
		case result:
50

  
51
			final Oaf.Builder oafBuilder = Oaf.newBuilder(OafDecoder.decode(body).getOaf());
52
			for (final String relName : Arrays.asList("resultProject_outcome_isProducedBy")) { // Â TODO add dataset rels
53
				for (final Oaf rel : listRelations(value, relName)) {
54
					oafBuilder.getEntityBuilder().addCachedOafRel(rel);
55
					context.getCounter(counterGroup(), "rel: " + relName).increment(1);
56
				}
57
			}
58

  
59
			final Map<byte[], byte[]> mergedIn = value.getFamilyMap(DedupUtils.getDedupCF_mergedInBytes(Type.result));
60

  
61
			if (MapUtils.isEmpty(mergedIn) & !DedupUtils.isRoot(key)) {
62
				emit(context, key.copyBytes(), oafBuilder.build().toByteArray(), "result not deduped");
63
			} else if (DedupUtils.isRoot(key)) {
64
				emit(context, key.copyBytes(), oafBuilder.build().toByteArray(), "result merges");
65
			} else {
66
				emit(context, getRootId(mergedIn), oafBuilder.build().toByteArray(), "result mergedIn");
67
			}
68

  
69
			break;
70
		default:
71
			throw new IllegalArgumentException("invalid type: " + type);
72
		}
73
	}
74

  
75
	private Iterable<Oaf> listRelations(final Result value, final String relType) {
76

  
77
		//TODO consider only relationshipt not deletedbyinference
78

  
79
		final Map<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(relType));
80
		if (MapUtils.isNotEmpty(map)) {
81
			return Iterables.filter(Iterables.transform(map.values(), new Function<byte[], Oaf>() {
82

  
83
				@Override
84
				public Oaf apply(final byte[] input) {
85
					return OafDecoder.decode(input).getOaf();
86
				}
87
			}), new Predicate<Oaf>() {
88
				@Override
89
				public boolean apply(final Oaf rel) {
90
					return !rel.getRel().getTarget().contains("unidentified");
91
				}
92
			});
93
		} else {
94
			return Lists.newArrayList();
95
		}
96
	}
97

  
98
	private Iterable<byte[]> listRelatedIds(final Result value, final String relType) {
99

  
100
		//TODO consider only relationshipt not deletedbyinference
101

  
102
		final Map<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(relType));
103
		if (MapUtils.isNotEmpty(map)) {
104
			return map.keySet();
105
		} else {
106
			return Lists.newArrayList();
107
		}
108
	}
109

  
110
	private Type getEntityType(final ImmutableBytesWritable key) {
111
		return OafRowKeyDecoder.decode(key.copyBytes()).getType();
112
	}
113

  
114
	private byte[] getRootId(final Map<byte[], byte[]> mergedIn) {
115
		return Iterables.getOnlyElement(mergedIn.keySet());
116
	}
117

  
118
}
modules/dnet-mapreduce-jobs/tags/dnet-mapreduce-jobs-1.1.1-PROD/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/AbstractEventFactory.java
1
package eu.dnetlib.data.mapreduce.hbase.broker;
2

  
3
import java.io.IOException;
4
import java.util.List;
5

  
6
import com.google.common.collect.Lists;
7
import eu.dnetlib.broker.objects.OpenAireEventPayload;
8
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory;
9
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory;
10
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventWrapper;
11
import eu.dnetlib.data.proto.FieldTypeProtos.StringField;
12
import eu.dnetlib.data.proto.OafProtos.Oaf;
13
import org.apache.commons.lang.StringUtils;
14
import org.dom4j.DocumentException;
15

  
16
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent;
17

  
18
/**
19
 * Created by claudio on 26/07/16.
20
 */
21
public class AbstractEventFactory {
22

  
23
	public static List<EventWrapper> process(final Oaf current, final Oaf other, final float trust)
24
			throws IOException, InterruptedException, DocumentException {
25
		return new AbstractEventFactory().processAbstract(current, other, trust);
26
	}
27

  
28
	private List<EventWrapper> processAbstract(final Oaf current, final Oaf other, final float trust)
29
			throws IOException, InterruptedException, DocumentException {
30

  
31
		final List<EventWrapper> events = Lists.newArrayList();
32

  
33
		if (hasAbstract(other)) {
34
			// doProcessAbstract(context, current, other, Topic.MORE_ABSTRACT);
35

  
36
			if (!hasAbstract(current)) {
37
				events.add(doProcessAbstract(current, other, Topic.ENRICH_MISSING_ABSTRACT, trust));
38
			}
39
		}
40

  
41
		return events;
42
	}
43

  
44
	private EventWrapper doProcessAbstract(final Oaf current, final Oaf other, final Topic topic, final float trust)
45
			throws IOException, InterruptedException, DocumentException {
46
		final Oaf.Builder prototype = Oaf.newBuilder(current);
47
		final List<StringField> descriptionList = other.getEntity().getResult().getMetadata().getDescriptionList();
48
		prototype.getEntityBuilder().getResultBuilder().getMetadataBuilder().addAllDescription(descriptionList);
49

  
50
		final Oaf oaf = prototype.build();
51

  
52
		final OpenAireEventPayload payload =
53
				HighlightFactory.highlightEnrichAbstract(OpenAireEventPayloadFactory.fromOAF(oaf.getEntity(), other.getEntity(), trust), descriptionList);
54

  
55
		return EventWrapper.newInstance(
56
				asEvent(oaf.getEntity(), topic, payload, other.getEntity(), trust),
57
				topic.getValue());
58
	}
59

  
60
	private boolean hasAbstract(final Oaf oaf) {
61
		return oaf.getEntity().getResult().getMetadata().getDescriptionList()
62
				.stream()
63
				.anyMatch(s -> StringUtils.isNotBlank(s.getValue()));
64
	}
65

  
66
}
modules/dnet-mapreduce-jobs/tags/dnet-mapreduce-jobs-1.1.1-PROD/pom.xml
1
<?xml version="1.0" ?>
2
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3
	<parent>
4
		<groupId>eu.dnetlib</groupId>
5
		<artifactId>dnet45-parent</artifactId>
6
		<version>1.0.0</version>
7
		<relativePath />
8
	</parent>
9
	<modelVersion>4.0.0</modelVersion>
10
	<groupId>eu.dnetlib</groupId>
11
	<artifactId>dnet-mapreduce-jobs</artifactId>
12
	<version>1.1.1-PROD</version>
13
	<packaging>jar</packaging>
14
	<scm>
15
		<developerConnection>scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-mapreduce-jobs/tags/dnet-mapreduce-jobs-1.1.1-PROD</developerConnection>
16
	</scm>
17
	<build>
18
		<plugins>
19
			<plugin>
20
				<artifactId>maven-assembly-plugin</artifactId>
21
				<configuration>
22
					<archive>
23
						<manifest>
24
							<mainClass>eu.dnetlib.data.mapreduce.hbase.dataimport.ImportRecordsJob</mainClass>
25
						</manifest>
26
					</archive>
27
					<descriptorRefs>
28
						<descriptorRef>jar-with-dependencies</descriptorRef>
29
					</descriptorRefs>
30
				</configuration>
31
			</plugin>
32
		</plugins>
33
	</build>
34
	<repositories>
35
		<!-- Cloudera Repositories -->
36
		<repository>
37
			<snapshots>
38
				<enabled>false</enabled>
39
			</snapshots>
40
			<id>cloudera-central</id>
41
			<name>cloundera-libs-release</name>
42
			<url>http://maven.research-infrastructures.eu/nexus/content/repositories/cloudera-central</url>
43
		</repository>
44
		<repository>
45
			<id>cloudera-snapshots</id>
46
			<name>cloudera-libs-snapshot</name>
47
			<url>http://maven.research-infrastructures.eu/nexus/content/repositories/cloudera-snapshots</url>
48
		</repository>
49
	</repositories>
50
	<dependencies>
51
		<dependency>
52
			<groupId>commons-logging</groupId>
53
			<artifactId>commons-logging</artifactId>
54
			<version>${commons.logging.version}</version>
55
		</dependency>
56
		<dependency>
57
			<groupId>junit</groupId>
58
			<artifactId>junit</artifactId>
59
			<version>${junit.version}</version>
60
			<scope>test</scope>
61
		</dependency>
62
		<dependency>
63
			<groupId>com.google.code.gson</groupId>
64
			<artifactId>gson</artifactId>
65
			<version>${google.gson.version}</version>
66
		</dependency>
67
		<dependency>
68
			<groupId>eu.dnetlib</groupId>
69
			<artifactId>dnet-index-solr-common</artifactId>
70
			<version>[1.0.0,2.0.0)</version>
71
		</dependency>
72
		<dependency>
73
			<groupId>eu.dnetlib</groupId>
74
			<artifactId>dnet-openaire-broker-common</artifactId>
75
			<version>[1.0.0,2.0.0)</version>
76
		</dependency>
77
		<dependency>
78
			<groupId>com.mycila</groupId>
79
			<artifactId>xmltool</artifactId>
80
			<version>3.3</version>
81
		</dependency>
82
		<dependency>
83
			<groupId>eu.dnetlib</groupId>
84
			<artifactId>cnr-misc-utils</artifactId>
85
			<version>[1.0.0,2.0.0)</version>
86
			<exclusions>
87
				<exclusion>
88
					<groupId>apache</groupId>
89
					<artifactId>commons-lang</artifactId>
90
				</exclusion>
91
			</exclusions>
92
		</dependency>
93
		<dependency>
94
			<groupId>eu.dnetlib</groupId>
95
			<artifactId>dnet-hadoop-commons</artifactId>
96
			<version>[2.0.0,3.0.0)</version>
97
			<exclusions>
98
				<exclusion>
99
					<artifactId>commons-httpclient</artifactId>
100
					<groupId>commons-httpclient</groupId>
101
				</exclusion>
102
			</exclusions>
103
		</dependency>
104
		<dependency>
105
			<groupId>org.apache.hbase</groupId>
106
			<artifactId>hbase</artifactId>
107
			<version>${apache.hbase.version}</version>
108
			<exclusions>
109
				<exclusion>
110
					<groupId>tomcat</groupId>
111
					<artifactId>jasper-runtime</artifactId>
112
				</exclusion>
113
				<exclusion>
114
					<groupId>tomcat</groupId>
115
					<artifactId>jasper-compiler</artifactId>
116
				</exclusion>
117
				<exclusion>
118
					<artifactId>slf4j-api</artifactId>
119
					<groupId>org.slf4j</groupId>
120
				</exclusion>
121
				<exclusion>
122
					<artifactId>slf4j-log4j12</artifactId>
123
					<groupId>org.slf4j</groupId>
124
				</exclusion>
125
				<exclusion>
126
					<artifactId>commons-lang</artifactId>
127
					<groupId>commons-lang</groupId>
128
				</exclusion>
129
				<exclusion>
130
					<artifactId>commons-httpclient</artifactId>
131
					<groupId>commons-httpclient</groupId>
132
				</exclusion>
133
				<exclusion>
134
					<artifactId>httpclient</artifactId>
135
					<groupId>org.apache.httpcomponents</groupId>
136
				</exclusion>
137
				<exclusion>
138
					<artifactId>httpcore</artifactId>
139
					<groupId>org.apache.httpcomponents</groupId>
140
				</exclusion>
141
			</exclusions>
142
		</dependency>
143
		<dependency>
144
			<groupId>com.googlecode.protobuf-java-format</groupId>
145
			<artifactId>protobuf-java-format</artifactId>
146
			<version>1.2</version>
147
		</dependency>
148
		<dependency>
149
			<groupId>eu.dnetlib</groupId>
150
			<artifactId>dnet-openaireplus-mapping-utils</artifactId>
151
			<version>[6.0.0,6.1.5]</version>
152
		</dependency>
153
		<dependency>
154
			<groupId>org.antlr</groupId>
155
			<artifactId>stringtemplate</artifactId>
156
			<version>3.2</version>
157
		</dependency>
158
		<dependency>
159
			<groupId>org.json</groupId>
160
			<artifactId>json</artifactId>
161
			<version>20140107</version>
162
		</dependency>
163
		<dependency>
164
			<groupId>com.typesafe</groupId>
165
			<artifactId>config</artifactId>
166
			<version>1.2.1</version>
167
		</dependency>
168
		<dependency>
169
			<groupId>eu.dnetlib</groupId>
170
			<artifactId>dnet-pace-core</artifactId>
171
			<version>[2.0.0,3.0.0)</version>
172
		</dependency>
173
		<dependency>
174
			<groupId>org.mongodb</groupId>
175
			<artifactId>mongo-java-driver</artifactId>
176
			<version>${mongodb.driver.version}</version>
177
		</dependency>
178
		<dependency>
179
			<groupId>eu.dnetlib</groupId>
180
			<artifactId>dnet-actionmanager-common</artifactId>
181
			<version>[6.0.0,7.0.0)</version>
182
			<exclusions>
183
				<exclusion>
184
					<groupId>commons-httpclient</groupId>
185
					<artifactId>commons-httpclient</artifactId>
186
				</exclusion>
187
			</exclusions>
188
		</dependency>
189

  
190
		<dependency>
191
			<groupId>org.elasticsearch</groupId>
192
			<artifactId>elasticsearch-hadoop-mr</artifactId>
193
			<version>5.2.0</version>
194
			<exclusions>
195
				<exclusion>
196
					<groupId>tomcat</groupId>
197
					<artifactId>jasper-compiler</artifactId>
198
				</exclusion>
199
				<exclusion>
200
					<groupId>org.antlr</groupId>
201
					<artifactId>antlr-runtime</artifactId>
202
				</exclusion>
203
				<exclusion>
204
					<groupId>org.eclipse.jetty.aggregate</groupId>
205
					<artifactId>jetty-all</artifactId>
206
				</exclusion>
207
				<exclusion>
208
					<groupId>org.slf4j</groupId>
209
					<artifactId>slf4j-log4j12</artifactId>
210
				</exclusion>
211
				<exclusion>
212
					<groupId>org.glassfish.jersey.core</groupId>
213
					<artifactId>jersey-client</artifactId>
214
				</exclusion>
215
				<exclusion>
216
					<groupId>org.glassfish.jersey.core</groupId>
217
					<artifactId>jersey-common</artifactId>
218
				</exclusion>
219
				<exclusion>
220
					<groupId>org.glassfish.jersey.core</groupId>
221
					<artifactId>jersey-server</artifactId>
222
				</exclusion>
223
				<exclusion>
224
					<groupId>org.glassfish.jersey.containers</groupId>
225
					<artifactId>jersey-container-servlet</artifactId>
226
				</exclusion>
227
				<exclusion>
228
					<groupId>org.glassfish.jersey.containers</groupId>
229
					<artifactId>jersey-container-servlet-core</artifactId>
230
				</exclusion>
231

  
232
				<exclusion>
233
					<groupId>org.codehaus.groovy</groupId>
234
					<artifactId>groovy-all</artifactId>
235
				</exclusion>
236

  
237
				<exclusion>
238
					<groupId>org.apache.hive</groupId>
239
					<artifactId>hive-service</artifactId>
240
				</exclusion>
241

  
242
				<exclusion>
243
					<groupId>org.apache.spark</groupId>
244
					<artifactId>spark-core_2.10</artifactId>
245
				</exclusion>
246
				<exclusion>
247
					<groupId>org.apache.spark</groupId>
248
					<artifactId>spark-sql_2.10</artifactId>
249
				</exclusion>
250
				<exclusion>
251
					<groupId>org.apache.spark</groupId>
252
					<artifactId>spark-streaming_2.10</artifactId>
253
				</exclusion>
254
				<exclusion>
255
					<groupId>cascading</groupId>
256
					<artifactId>cascading-hadoop</artifactId>
257
				</exclusion>
258
				<exclusion>
259
					<groupId>cascading</groupId>
260
					<artifactId>cascading-local</artifactId>
261
				</exclusion>
262
				<exclusion>
263
					<groupId>org.apache.storm</groupId>
264
					<artifactId>storm-core</artifactId>
265
				</exclusion>
266
				<exclusion>
267
					<groupId>org.apache.pig</groupId>
268
					<artifactId>pig</artifactId>
269
				</exclusion>
270
			</exclusions>
271

  
272
		</dependency>
273
		<dependency>
274
			<groupId>org.mockito</groupId>
275
			<artifactId>mockito-core</artifactId>
276
			<version>${mockito.version}</version>
277
			<scope>test</scope>
278
		</dependency>
279
		<dependency>
280
			<groupId>eu.dnetlib</groupId>
281
			<artifactId>dnet-openaireplus-profiles</artifactId>
282
			<version>[1.0.0,2.0.0)</version>
283
			<scope>test</scope>
284
		</dependency>
285
		<dependency>
286
			<groupId>com.google.guava</groupId>
287
			<artifactId>guava</artifactId>
288
			<version>${google.guava.version}</version>
289
		</dependency>
290

  
291
	</dependencies>
292
</project>
modules/dnet-mapreduce-jobs/tags/dnet-mapreduce-jobs-1.1.1-PROD/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/add/AdditionMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.broker.add;
2

  
3
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent;
4
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getPropertyValues;
5
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.listKeys;
6

  
7
import java.io.IOException;
8
import java.util.HashSet;
9
import java.util.Map;
10
import java.util.Set;
11

  
12
import org.apache.commons.collections.MapUtils;
13
import org.apache.commons.lang.StringUtils;
14
import org.apache.commons.lang.math.RandomUtils;
15
import org.apache.hadoop.hbase.client.HTable;
16
import org.apache.hadoop.hbase.client.Result;
17
import org.apache.hadoop.hbase.client.ResultScanner;
18
import org.apache.hadoop.hbase.client.Scan;
19
import org.apache.hadoop.hbase.filter.FilterList;
20
import org.apache.hadoop.hbase.filter.FilterList.Operator;
21
import org.apache.hadoop.hbase.filter.PrefixFilter;
22
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
23
import org.apache.hadoop.hbase.mapreduce.TableMapper;
24
import org.apache.hadoop.hbase.util.Bytes;
25
import org.apache.hadoop.io.Text;
26

  
27
import com.google.common.base.Function;
28
import com.google.common.collect.Iterables;
29
import com.google.common.collect.Maps;
30
import com.google.common.collect.Sets;
31

  
32
import eu.dnetlib.broker.objects.OpenAireEventPayload;
33
import eu.dnetlib.data.mapreduce.hbase.broker.Topic;
34
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory;
35
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventMessage;
36
import eu.dnetlib.data.mapreduce.util.OafDecoder;
37
import eu.dnetlib.data.proto.OafProtos.Oaf;
38
import eu.dnetlib.data.proto.OafProtos.OafEntity;
39

  
40
/**
41
 * Created by claudio on 08/07/16.
42
 */
43
public class AdditionMapper extends TableMapper<Text, Text> {
44

  
45
	private Text outKey;
46
	private Text outValue;
47

  
48
	/**
49
	 * Map ProjectID -> Set of Organization info (id, name)
50
	 */
51
	private Map<String, Set<EntityInfo>> projectOrganization;
52

  
53
	/**
54
	 * Map OrganizationID -> Set of Datasource info (id, name)
55
	 */
56
	private Map<String, Set<EntityInfo>> organizationDatasource;
57

  
58
	private Set<String> organizationPrefixBlacklist = Sets.newHashSet();
59

  
60
	// White list for datasource typologies.
61
	private Set<String> dsTypeWhitelist = Sets.newHashSet();
62

  
63
	@Override
64
	protected void setup(final Context context) throws IOException {
65

  
66
		organizationPrefixBlacklist = Sets.newHashSet("nsf_________");
67
		dsTypeWhitelist.addAll(getPropertyValues(context, "broker.datasource.type.whitelist"));
68

  
69
		projectOrganization = getRelMap(context, "20", "organization", "projectOrganization_participation_isParticipant", organizationPrefixBlacklist);
70
		organizationDatasource = getRelMap(context, "10", "datasource", "datasourceOrganization_provision_provides", dsTypeWhitelist);
71

  
72
		outKey = new Text("");
73
		outValue = new Text();
74
	}
75

  
76
	class EntityInfo {
77

  
78
		private String id;
79
		private String name;
80

  
81
		public EntityInfo(final String id, final String name) {
82
			this.id = id;
83
			this.name = name;
84
		}
85

  
86
		@Override
87
		public int hashCode() {
88
			return getId().hashCode();
89
		}
90

  
91
		@Override
92
		public boolean equals(final Object obj) {
93
			return getId().equals(obj);
94
		}
95

  
96
		public String getId() {
97
			return id;
98
		}
99

  
100
		public void setId(final String id) {
101
			this.id = id;
102
		}
103

  
104
		public String getName() {
105
			return name;
106
		}
107

  
108
		public void setName(final String name) {
109
			this.name = name;
110
		}
111
	}
112

  
113
	@Override
114
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
115
		try {
116
			final Map<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes("result"));
117
			final byte[] bodyB = map.get(Bytes.toBytes("body"));
118

  
119
			if (MapUtils.isEmpty(map) || (bodyB == null)) {
120
				context.getCounter("result", "empty body").increment(1);
121
				return;
122
			}
123

  
124
			final Oaf oaf = Oaf.parseFrom(bodyB);
125

  
126
			if (oaf.getDataInfo().getDeletedbyinference()) {
127
				context.getCounter("result", "deletedbyinference = true").increment(1);
128
				return;
129
			}
130

  
131
			final Set<String> currentDatasourceIds = Sets.newHashSet(listKeys(oaf.getEntity().getCollectedfromList()));
132

  
133
			final Map<byte[], byte[]> resultProject = value.getFamilyMap(Bytes.toBytes("resultProject_outcome_isProducedBy"));
134
			if (!MapUtils.isEmpty(resultProject)) {
135

  
136
				for (final String projectId : asStringID(resultProject.keySet())) {
137

  
138
					final Set<EntityInfo> organizations = projectOrganization.get(projectId);
139

  
140
					if ((organizations != null) && !organizations.isEmpty()) {
141

  
142
						for (final EntityInfo organization : organizations) {
143

  
144
							final Set<EntityInfo> datasources = organizationDatasource.get(organization.getId());
145

  
146
							if ((datasources != null) && !datasources.isEmpty()) {
147

  
148
								for (final EntityInfo datasource : datasources) {
149

  
150
									if (!currentDatasourceIds.contains(datasource.getId())) {
151

  
152
										// emit event for datasourceId
153
										final float trust = RandomUtils.nextFloat();
154
										final OpenAireEventPayload payload = OpenAireEventPayloadFactory.fromOAF(oaf.getEntity(), oaf.getEntity(), trust);
155
										// event.setPayload(HighlightFactory.highlightEnrichPid(payload,
156
										// Lists.newArrayList(pids)).toJSON());
157
										final EventMessage event =
158
												asEvent(oaf.getEntity(), Topic.ADD_BY_PROJECT, payload, datasource.getId(), datasource.getName(), trust);
159

  
160
										emit(event, context);
161

  
162
										context.getCounter("event", Topic.ADD_BY_PROJECT.getValue()).increment(1);
163
									}
164
								}
165
							}
166
						}
167
					}
168
				}
169
			}
170
		} catch (final Exception e) {
171
			throw new RuntimeException(e);
172
		}
173
	}
174

  
175
	private void emit(final EventMessage e, final Context context) throws IOException, InterruptedException {
176
		// tKey.set(e.getMap().get("id"));
177
		outValue.set(e.toString());
178
		context.write(outKey, outValue);
179
	}
180

  
181
	private Iterable<String> asStringID(final Iterable<byte[]> in) {
182
		return Iterables.transform(in, new Function<byte[], String>() {
183

  
184
			@Override
185
			public String apply(final byte[] input) {
186
				return getID(new String(input));
187
			}
188
		});
189
	}
190

  
191
	private Map<String, Set<EntityInfo>> getRelMap(final Context context,
192
			final String prefixFilter,
193
			final String entity,
194
			final String columnFamily,
195
			final Set<String> filter) throws IOException {
196
		System.out.println(String.format("loading %s, %s", entity, columnFamily));
197

  
198
		final Map<String, Set<EntityInfo>> out = Maps.newHashMap();
199
		final String tableName = context.getConfiguration().get("hbase.mapred.inputtable");
200

  
201
		System.out.println(String.format("table name: '%s'", tableName));
202

  
203
		try (final HTable table = new HTable(context.getConfiguration(), tableName);
204
				final ResultScanner res = scanTable(table, prefixFilter, entity, columnFamily)) {
205

  
206
			for (final Result r : res) {
207

  
208
				final byte[] bodyB = r.getValue(Bytes.toBytes(entity), Bytes.toBytes("body"));
209

  
210
				if (bodyB == null) {
211
					context.getCounter("missing body", entity).increment(1);
212
				} else {
213

  
214
					final OafEntity oafEntity = OafDecoder.decode(bodyB).getEntity();
215
					final EntityInfo kv = getEntityInfo(oafEntity, filter);
216
					if (kv != null) {
217

  
218
						final Map<byte[], byte[]> relMap = r.getFamilyMap(Bytes.toBytes(columnFamily));
219

  
220
						if (MapUtils.isNotEmpty(relMap)) {
221
							for (final String id : asStringID(relMap.keySet())) {
222

  
223
								if (!out.containsKey(id)) {
224
									out.put(id, new HashSet<EntityInfo>());
225
								}
226
								out.get(id).add(kv);
227
							}
228
						} else {
229
							context.getCounter("skipped", entity).increment(1);
230
						}
231
					}
232
				}
233
			}
234
		}
235

  
236
		System.out.println(String.format("loaded map for %s, %s, size: %s", entity, columnFamily, out.size()));
237
		return out;
238
	}
239

  
240
	private EntityInfo getEntityInfo(final OafEntity entity, final Set<String> filter) {
241

  
242
		final String id = getID(entity.getId());
243
		switch (entity.getType()) {
244
		case datasource:
245
			final String dsType = entity.getDatasource().getMetadata().getDatasourcetype().getClassid();
246
			if (!filter.contains(dsType)) { return null; }
247
			return new EntityInfo(id, entity.getDatasource().getMetadata().getOfficialname().getValue());
248
		case organization:
249
			if (filter.contains(prefix(id))) { return null; }
250
			return new EntityInfo(id, entity.getOrganization().getMetadata().getLegalname().getValue());
251
		default:
252
			throw new IllegalArgumentException("invalid entity: " + entity);
253
		}
254
	}
255

  
256
	private ResultScanner scanTable(final HTable table, final String prefixFilter, final String entity, final String columnFamily) throws IOException {
257
		final Scan scan = new Scan();
258
		final FilterList fl = new FilterList(Operator.MUST_PASS_ALL);
259
		fl.addFilter(new PrefixFilter(Bytes.toBytes(prefixFilter)));
260
		scan.setFilter(fl);
261
		scan.addFamily(Bytes.toBytes(entity));
262
		scan.addFamily(Bytes.toBytes(columnFamily));
263

  
264
		return table.getScanner(scan);
265
	}
266

  
267
	private String getID(final String s) {
268
		return StringUtils.substringAfter(s, "|");
269
	}
270

  
271
	private String prefix(final String s) {
272
		return StringUtils.substringBefore(s, "::");
273
	}
274

  
275
}
modules/dnet-mapreduce-jobs/tags/dnet-mapreduce-jobs-1.1.1-PROD/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupBuildRootsReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

  
3
import java.io.IOException;
4
import java.util.List;
5

  
6
import com.google.common.collect.Iterables;
7
import com.google.common.collect.Lists;
8
import eu.dnetlib.data.mapreduce.JobParams;
9
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses;
10
import eu.dnetlib.data.mapreduce.util.DedupUtils;
11
import eu.dnetlib.data.mapreduce.util.OafDecoder;
12
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils;
13
import eu.dnetlib.data.mapreduce.util.OafRelDecoder;
14
import eu.dnetlib.data.proto.OafProtos.Oaf;
15
import eu.dnetlib.data.transform.OafEntityMerger;
16
import eu.dnetlib.pace.config.DedupConfig;
17
import org.apache.commons.lang.StringUtils;
18
import org.apache.hadoop.hbase.client.Put;
19
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
20
import org.apache.hadoop.hbase.mapreduce.TableReducer;
21
import org.apache.hadoop.hbase.util.Bytes;
22
import org.apache.hadoop.io.Text;
23

  
24
public class DedupBuildRootsReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
25

  
26
	private enum OafPatch {
27
		rootToEntity, entityToRoot
28
	}
29

  
30
	private DedupConfig dedupConf;
31

  
32
	private RelClasses relClasses;
33

  
34
	@Override
35
	protected void setup(final Context context) throws IOException, InterruptedException {
36
		super.setup(context);
37
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
38
		System.out.println("dedup buildRoots reducer\n\nwf conf: " + dedupConf.toString());
39

  
40
		final String relClassJson = context.getConfiguration().get("relClasses");
41
		System.out.println("relClassesJson:\n" + relClassJson);
42
		relClasses = RelClasses.fromJSon(relClassJson);
43
		System.out.println("relClasses:\n" + relClasses);
44
	}
45

  
46
	@Override
47
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
48

  
49
		// ensures we're dealing with a root, otherwise returns
50
		if (!DedupUtils.isRoot(key.toString())) {
51
			System.err.println("aborting DedupBuildRootsReducer, found non-root key: " + key);
52
			context.getCounter("DedupBuildRootsReducer", "aborted").increment(1);
53
			return;
54
		}
55

  
56
		final byte[] rowkey = Bytes.toBytes(key.toString());
57
		final List<Oaf> entities = Lists.newArrayList();
58

  
59
		for (final Oaf oaf : toOaf(values)) {
60
			switch (oaf.getKind()) {
61
			case entity:
62
				entities.add(oaf);
63
				break;
64
			case relation:
65
				handleRels(context, rowkey, oaf);
66
				break;
67
			default:
68
				break;
69
			}
70
		}
71

  
72
		// build and emit the root body
73
		final Oaf.Builder builder = OafEntityMerger.merge(dedupConf, key.toString(), entities);
74
		if (entities.size() < JobParams.MAX_COUNTERS) {
75
			context.getCounter(dedupConf.getWf().getEntityType() + " root group size", lpad(entities.size())).increment(1);
76
		} else {
77
			context.getCounter(dedupConf.getWf().getEntityType() + " root group size", "> " + JobParams.MAX_COUNTERS).increment(1);
78
		}
79

  
80
		emit(context, rowkey, dedupConf.getWf().getEntityType(), DedupUtils.BODY_S, builder.build().toByteArray(), "root");
81

  
82
	}
83

  
84
	private Iterable<Oaf> toOaf(final Iterable<ImmutableBytesWritable> values) {
85
		return Iterables.transform(values, OafHbaseUtils.oafDecoder());
86
	}
87

  
88
	private void handleRels(final Context context, final byte[] rowkey, final Oaf oaf) throws IOException, InterruptedException {
89

  
90
		// emit relation from the root to the related entities
91
		OafDecoder decoder = rootToEntity(rowkey, oaf);
92
		emit(context, rowkey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "[root -> entity]");
93

  
94
		// emit relation from the related entities to the root
95
		decoder = entityToRoot(rowkey, oaf);
96
		byte[] revKey = Bytes.toBytes(decoder.relSourceId());
97
		emit(context, revKey, decoder.getCFQ(), new String(rowkey), decoder.toByteArray(), "[entity -> root]");
98

  
99
		// mark relation from the related entities to the duplicate as deleted
100
		decoder = markDeleted(oaf, true);
101
		revKey = Bytes.toBytes(decoder.relSourceId());
102
		emit(context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [dup -> entity]");
103

  
104
		// mark relation from the related entities to the duplicate as deleted
105
		decoder = markDeleted(oaf, false);
106
		revKey = Bytes.toBytes(decoder.relSourceId());
107
		emit(context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [entity -> dup]");
108
	}
109

  
110
	private void emit(final Context context, final byte[] rowkey, final String family, final String qualifier, final byte[] value, final String label)
111
			throws IOException, InterruptedException {
112
		final Put put = new Put(rowkey).add(Bytes.toBytes(family), Bytes.toBytes(qualifier), value);
113
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
114
		context.write(new ImmutableBytesWritable(rowkey), put);
115
		context.getCounter(family, label).increment(1);
116
	}
117

  
118
	// /////////////////
119

  
120
	private OafDecoder rootToEntity(final byte[] rootRowkey, final Oaf rel) {
121
		return patchRelations(rootRowkey, rel, OafPatch.rootToEntity);
122
	}
123

  
124
	private OafDecoder entityToRoot(final byte[] rootRowkey, final Oaf rel) {
125
		return patchRelations(rootRowkey, rel, OafPatch.entityToRoot);
126
	}
127

  
128
	private OafDecoder markDeleted(final Oaf rel, final boolean reverse) {
129
		return deleteRelations(rel, reverse);
130
	}
131

  
132
	// patches relation objects setting the source field with the root id
133
	private OafDecoder patchRelations(final byte[] rootRowkey, final Oaf rel, final OafPatch patchKind) {
134
		final String id = new String(rootRowkey);
135
		final OafRelDecoder decoder = OafRelDecoder.decode(rel.getRel());
136
		final Oaf.Builder builder = Oaf.newBuilder(rel);
137
		builder.getDataInfoBuilder().setInferred(true).setDeletedbyinference(false);
138
		switch (patchKind) {
139
		case rootToEntity:
140
			// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:rootToEntity)");
141
			builder.getRelBuilder().setSource(id);
142
			break;
143

  
144
		case entityToRoot:
145
			final String relClass = rel.getRel().getRelClass();
146
			/*
147
			if(StringUtils.isBlank(relClass)) {
148
				throw new IllegalStateException(String.format("missing relation term for %s in row %s", rel.getRel().getRelType().name(), id));
149
			}
150
			*/
151
			final String inverse = relClasses.getInverse(relClass);
152
			if(StringUtils.isBlank(inverse)) {
153
				throw new IllegalStateException(String.format("missing inverse relation for %s in row %s", relClass, id));
154
			}
155
			builder.setRel(decoder.setClassId(inverse));
156
			// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:entityToRoot)");
157
			builder.getRelBuilder().setSource(builder.getRel().getTarget());
158
			builder.getRelBuilder().setTarget(id);
159
			break;
160

  
161
		default:
162
			break;
163
		}
164

  
165
		return OafDecoder.decode(builder.build());
166
	}
167

  
168
	private OafDecoder deleteRelations(final Oaf rel, final boolean reverse) {
169
		final Oaf.Builder builder = Oaf.newBuilder(rel);
170
		// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots d: " + reverse + ")");
171
		builder.getDataInfoBuilder().setDeletedbyinference(true);
172

  
173
		if (reverse) {
174
			final OafRelDecoder decoder = OafRelDecoder.decode(rel.getRel());
175
			builder.setRel(decoder.setClassId(relClasses.getInverse(rel.getRel().getRelClass())));
176
			// swap source and target
177
			final String tmp = builder.getRel().getSource();
178
			builder.getRelBuilder().setSource(builder.getRel().getTarget());
179
			builder.getRelBuilder().setTarget(tmp);
180
		}
181

  
182
		return OafDecoder.decode(builder.build());
183
	}
184

  
185
	private String lpad(final int s) {
186
		return StringUtils.leftPad(String.valueOf(s), 5);
187
	}
188

  
189
}
modules/dnet-mapreduce-jobs/tags/dnet-mapreduce-jobs-1.1.1-PROD/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/cc/MindistSearchReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.cc;
2

  
3
/**
4
 * Created by claudio on 14/10/15.
5
 */
6

  
7
import java.io.IOException;
8

  
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.apache.hadoop.io.Text;
12
import org.apache.hadoop.mapreduce.Reducer;
13

  
14
public class MindistSearchReducer extends Reducer<Text, VertexWritable, Text, VertexWritable> {
15

  
16
	private static final Log log = LogFactory.getLog(MindistSearchReducer.class);
17

  
18
	public static final String UPDATE_COUNTER = "UpdateCounter";
19
	public static final String SKIPPED = "SKIPPED";
20
	public static final String UPDATED = "UPDATED";
21

  
22
	private boolean depthOne;
23

  
24
	private boolean debug = false;
25

  
26
	@Override
27
	protected void setup(Context context) throws IOException, InterruptedException {
28
		super.setup(context);
29
		final String recursionDepth = context.getConfiguration().get("mindist_recursion_depth");
30
		log.info("got recursion depth: " + recursionDepth);
31
		if (Integer.parseInt(recursionDepth) == 0) {
32
			depthOne = true;
33
		}
34

  
35
		debug = context.getConfiguration().getBoolean("mindist_DEBUG", false);
36
		log.info("debug mode: " + debug);
37
	}
38

  
39
	@Override
40
	protected void reduce(Text key, Iterable<VertexWritable> values, Context context) throws IOException, InterruptedException {
41

  
42
		VertexWritable realVertex = null;
43
		Text currentMinimalKey = null;
44
		//boolean foundEdges = false;
45

  
46
		if (depthOne) {
47
			for (VertexWritable vertex : values) {
48
				if (!vertex.isMessage()) {
49
					//log.info(String.format("found vertex with edges: %s", key.toString()));
50
					realVertex = vertex.clone();
51
				}
52
			}
53

  
54
			if (realVertex == null) {
55
				context.getCounter(UPDATE_COUNTER, SKIPPED).increment(1);
56
				return;
57
			}
58

  
59
			realVertex.setActivated(true);
60
			realVertex.setVertexId(realVertex.getEdges().first());
61

  
62
			if (key.compareTo(realVertex.getVertexId()) < 0) {
63
				realVertex.setVertexId(key);
64
			}
65

  
66
			context.getCounter(UPDATE_COUNTER, UPDATED).increment(1);
67
		} else {
68
			for (VertexWritable vertex : values) {
69
				if (!vertex.isMessage()) {
70
					if (realVertex == null) {
71
						realVertex = vertex.clone();
72
					}
73
				} else {
74
					if (currentMinimalKey == null) {
75
						currentMinimalKey = new Text(vertex.getVertexId());
76
					} else {
77

  
78
						if (currentMinimalKey.compareTo(vertex.getVertexId()) > 0) {
79
							currentMinimalKey = new Text(vertex.getVertexId());
80
						}
81
					}
82
				}
83
			}
84

  
85
			if (realVertex == null) {
86
				context.getCounter(UPDATE_COUNTER, SKIPPED).increment(1);
87
				return;
88
			}
89

  
90
			if (currentMinimalKey != null && currentMinimalKey.compareTo(realVertex.getVertexId()) < 0) {
91
				realVertex.setVertexId(currentMinimalKey);
92
				realVertex.setActivated(true);
93
				context.getCounter(UPDATE_COUNTER, UPDATED).increment(1);
94
			} else {
95
				realVertex.setActivated(false);
96
			}
97
		}
98

  
99
		context.write(key, realVertex);
100
		if (debug) {
101
			log.info(realVertex.toJSON());
102
		}
103
	}
104

  
105
}
modules/dnet-mapreduce-jobs/tags/dnet-mapreduce-jobs-1.1.1-PROD/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/cc/HBaseToSimilarityGraphMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.cc;
2

  
3
import java.io.IOException;
4

  
5
import org.apache.hadoop.hbase.KeyValue;
6
import org.apache.hadoop.hbase.client.Result;
7
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
8
import org.apache.hadoop.hbase.mapreduce.TableMapper;
9
import org.apache.hadoop.io.Text;
10

  
11
/**
12
 * Created by claudio on 14/10/15.
13
 */
14
public class HBaseToSimilarityGraphMapper extends TableMapper<Text, VertexWritable> {
15

  
16
	@Override
17
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
18

  
19
		final VertexWritable vertex = new VertexWritable();
20
		final Text realKey = new Text(keyIn.copyBytes());
21

  
22
		vertex.checkAndSetMinimalVertex(realKey);
23
		vertex.addVertex(realKey);
24

  
25
		for (KeyValue kv : value.list()) {
26

  
27
			Text tmp = new Text(kv.getQualifier());
28
			vertex.checkAndSetMinimalVertex(tmp);
29
			vertex.addVertex(tmp);
30
		}
31

  
32
		context.write(realKey, vertex);
33

  
34
		for (Text edge : vertex.getEdges()) {
35
			context.write(edge, vertex.makeMessage());
36
		}
37
	}
38

  
39
}
modules/dnet-mapreduce-jobs/tags/dnet-mapreduce-jobs-1.1.1-PROD/src/main/java/eu/dnetlib/data/mapreduce/util/XmlRecordFactory.java
1
package eu.dnetlib.data.mapreduce.util;
2

  
3
import java.io.StringReader;
4
import java.io.StringWriter;
5
import java.util.List;
6
import java.util.Map;
7
import java.util.Map.Entry;
8
import java.util.Set;
9
import javax.xml.transform.*;
10
import javax.xml.transform.dom.DOMSource;
11
import javax.xml.transform.stream.StreamResult;
12

  
13
import com.google.common.base.Joiner;
14
import com.google.common.base.Predicate;
15
import com.google.common.base.Splitter;
16
import com.google.common.collect.Iterables;
17
import com.google.common.collect.Lists;
18
import com.google.common.collect.Maps;
19
import com.google.common.collect.Sets;
20
import com.google.protobuf.Descriptors.EnumValueDescriptor;
21
import com.google.protobuf.Descriptors.FieldDescriptor;
22
import com.google.protobuf.GeneratedMessage;
23
import com.mycila.xmltool.XMLDoc;
24
import com.mycila.xmltool.XMLTag;
25
import eu.dnetlib.data.mapreduce.hbase.index.config.*;
26
import eu.dnetlib.data.proto.FieldTypeProtos.*;
27
import eu.dnetlib.data.proto.OafProtos.OafEntity;
28
import eu.dnetlib.data.proto.OafProtos.OafRel;
29
import eu.dnetlib.data.proto.ProjectProtos.Project;
30
import eu.dnetlib.data.proto.RelMetadataProtos.RelMetadata;
31
import eu.dnetlib.data.proto.ResultProtos.Result;
32
import eu.dnetlib.data.proto.ResultProtos.Result.Context;
33
import eu.dnetlib.data.proto.ResultProtos.Result.ExternalReference;
34
import eu.dnetlib.data.proto.ResultProtos.Result.Instance;
35
import eu.dnetlib.data.proto.ResultProtos.Result.Journal;
36
import eu.dnetlib.data.proto.TypeProtos;
37
import eu.dnetlib.data.proto.TypeProtos.Type;
38
import org.apache.commons.lang.StringUtils;
39
import org.dom4j.Document;
40
import org.dom4j.DocumentException;
41
import org.dom4j.Element;
42
import org.dom4j.Node;
43
import org.dom4j.io.SAXReader;
44

  
45
import static eu.dnetlib.miscutils.collections.MappedCollection.listMap;
46

  
47
public class XmlRecordFactory {
48

  
49
	// private static final Log log = LogFactory.getLog(XmlRecordFactory.class); // NOPMD by marko on 11/24/08 5:02 PM
50

  
51
	private final Map<String, Integer> relCounters = Maps.newHashMap();
52
	protected Set<String> specialDatasourceTypes;
53
	protected TemplateFactory templateFactory = new TemplateFactory();
54
	protected OafDecoder mainEntity = null;
55
	protected String key = null;
56
	protected List<OafDecoder> relations = Lists.newLinkedList();
57
	protected List<OafDecoder> children = Lists.newLinkedList();
58
	protected EntityConfigTable entityConfigTable;
59
	protected ContextMapper contextMapper;
60
	protected RelClasses relClasses;
61
	protected String schemaLocation;
62
	protected boolean entityDefaults;
63
	protected boolean relDefaults;
64
	protected boolean childDefaults;
65
	protected Set<String> contextes = Sets.newHashSet();
66
	protected List<String> extraInfo = Lists.newArrayList();
67
	protected Map<String, Integer> counters = Maps.newHashMap();
68
	protected Transformer transformer;
69

  
70
	protected static Predicate<String> instanceFilter = new Predicate<String>() {
71
		final Set<String> instanceFieldFilter = Sets.newHashSet("instancetype", "hostedby", "license", "accessright", "collectedfrom", "dateofacceptance");
72
		@Override
73
		public boolean apply(final String s) {
74
			return instanceFieldFilter.contains(s);
75
		}
76
	};
77

  
78
	public XmlRecordFactory(final EntityConfigTable entityConfigTable, final ContextMapper contextMapper, final RelClasses relClasses,
79
			final String schemaLocation, final boolean entityDefaults, final boolean relDefaults, final boolean childDefeaults, final Set<String> otherDatasourceTypesUForUI)
80
			throws TransformerConfigurationException, TransformerFactoryConfigurationError {
81
		this.entityConfigTable = entityConfigTable;
82
		this.contextMapper = contextMapper;
83
		this.relClasses = relClasses;
84
		this.schemaLocation = schemaLocation;
85
		this.entityDefaults = entityDefaults;
86
		this.relDefaults = relDefaults;
87
		this.childDefaults = childDefeaults;
88
		this.specialDatasourceTypes = otherDatasourceTypesUForUI;
89

  
90
		transformer = TransformerFactory.newInstance().newTransformer();
91
		transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
92
	}
93

  
94
	public static String removePrefix(final String s) {
95
		if (s.contains("|")) return StringUtils.substringAfter(s, "|");
96
		return s;
97
	}
98

  
99
	public static String escapeXml(final String value) {
100
		return value.replaceAll("&", "&amp;").replaceAll("<", "&lt;").replaceAll(">", "&gt;").replaceAll("\"", "&quot;").replaceAll("'", "&apos;");
101
	}
102

  
103
	public Map<String, Integer> getRelCounters() {
104
		return relCounters;
105
	}
106

  
107
	public RelClasses getRelClasses() {
108
		return relClasses;
109
	}
110

  
111
	public String getId() {
112
		return key;
113
	}
114

  
115
	public boolean isValid() {
116
		return mainEntity != null;
117
	}
118

  
119
	public void setMainEntity(final OafDecoder mainEntity) {
120
		this.mainEntity = mainEntity;
121
		this.key = mainEntity.decodeEntity().getId();
122
	}
123

  
124
	public void addRelation(final Type type, final OafDecoder rel) {
125
		addRelOrChild(type, relations, rel);
126
	}
127

  
128
	public void addChild(final Type type, final OafDecoder child) {
129
		addRelOrChild(type, children, child);
130
	}
131

  
132
	private void addRelOrChild(final Type type, final List<OafDecoder> list, final OafDecoder decoder) {
133

  
134
		final OafRel oafRel = decoder.getOafRel();
135
		final String rd = oafRel.getRelType().toString() + "_" + oafRel.getSubRelType() + "_" + relClasses.getInverse(oafRel.getRelClass());
136
		final LinkDescriptor ld = entityConfigTable.getDescriptor(type, new RelDescriptor(rd));
137

  
138
		if (getRelCounters().get(rd) == null) {
139
			getRelCounters().put(rd, 0);
140
		}
141

  
142
		if (ld == null) {
143
			list.add(decoder);
144
			return;
145
		}
146

  
147
		if (ld.getMax() < 0) {
148
			list.add(decoder);
149
			return;
150
		}
151

  
152
		if (getRelCounters().get(rd) < ld.getMax()) {
153
			getRelCounters().put(rd, getRelCounters().get(rd) + 1);
154
			list.add(decoder);
155
		}
156
	}
157

  
158
	public String build() {
159
		try {
160
			final OafEntityDecoder entity = mainEntity.decodeEntity();
161
			// log.info("building");
162
			// log.info("main: " + mainEntity);
163
			// log.info("rel:  " + relations);
164
			// log.info("chi:  " + children);
165
			// log.info("=============");
166

  
167
			final Predicate<String> filter = entityConfigTable.getFilter(entity.getType());
168
			final List<String> metadata = decodeType(entity, filter, entityDefaults, false);
169

  
170
			// rels has to be processed before the contexts because they enrich the contextMap with the funding info.
171
			final List<String> rels = listRelations();
172
			metadata.addAll(buildContexts(entity.getType()));
173
			metadata.add(parseDataInfo(mainEntity));
174

  
175
			final String body = templateFactory.buildBody(entity.getType(), metadata, rels, listChildren(), extraInfo);
176

  
177
			return templateFactory
178
					.buildRecord(key, entity.getDateOfCollection(), entity.getDateOfTransformation(), schemaLocation, body, countersAsXml());
179
		} catch (final Throwable e) {
180
			throw new RuntimeException(String.format("error building record '%s'", this.key), e);
181
		}
182
	}
183

  
184
	private String parseDataInfo(final OafDecoder decoder) {
185
		final DataInfo dataInfo = decoder.getOaf().getDataInfo();
186

  
187
		final StringBuilder sb = new StringBuilder();
188
		sb.append("<datainfo>");
189
		sb.append(asXmlElement("inferred", dataInfo.getInferred() + "", null, null));
190
		sb.append(asXmlElement("deletedbyinference", dataInfo.getDeletedbyinference() + "", null, null));
191
		sb.append(asXmlElement("trust", dataInfo.getTrust() + "", null, null));
192
		sb.append(asXmlElement("inferenceprovenance", dataInfo.getInferenceprovenance() + "", null, null));
193
		sb.append(asXmlElement("provenanceaction", null, dataInfo.getProvenanceaction(), null));
194
		sb.append("</datainfo>");
195

  
196
		return sb.toString();
197
	}
198

  
199
	private List<String> decodeType(final OafEntityDecoder decoder, final Predicate<String> filter, final boolean defaults, final boolean expandingRel) {
200

  
201
		final List<String> metadata = Lists.newArrayList();
202
		metadata.addAll(listFields(decoder.getMetadata(), filter, defaults, expandingRel));
203
		metadata.addAll(listFields(decoder.getOafEntity(), filter, defaults, expandingRel));
204

  
205
		if ((decoder.getEntity() instanceof Result) && !expandingRel) {
206
			metadata.add(asXmlElement("bestaccessright", "", getBestAccessright(), null));
207

  
208
			metadata.addAll(listFields(decoder.getEntity(), filter, defaults, expandingRel));
209
		}
210
		if ((decoder.getEntity() instanceof Project) && !expandingRel) {
211
			metadata.addAll(listFields(decoder.getEntity(), filter, defaults, expandingRel));
212
		}
213

  
214
		return metadata;
215
	}
216

  
217
	private Qualifier getBestAccessright() {
218
		Qualifier bestAccessRight = getQualifier("UNKNOWN", "not available", "dnet:access_modes");
219
		final LicenseComparator lc = new LicenseComparator();
220
		for (final Instance instance : ((Result) mainEntity.decodeEntity().getEntity()).getInstanceList()) {
221
			if (lc.compare(bestAccessRight, instance.getAccessright()) > 0) {
222
				bestAccessRight = instance.getAccessright();
223
			}
224
		}
225
		return bestAccessRight;
226
	}
227

  
228
	public Qualifier getQualifier(final String classid, final String classname, final String schemename) {
229
		return Qualifier.newBuilder().setClassid(classid).setClassname(classname).setSchemeid(schemename).setSchemename(schemename).build();
230
	}
231

  
232
	private List<String> listRelations() {
233

  
234
		final List<String> rels = Lists.newArrayList();
235

  
236
		for (final OafDecoder decoder : this.relations) {
237

  
238
			final OafRel rel = decoder.getOafRel();
239
			final OafEntity cachedTarget = rel.getCachedTarget();
240
			final OafRelDecoder relDecoder = OafRelDecoder.decode(rel);
241

  
242
			// if (!relDecoder.getRelType().equals(RelType.personResult) || relDecoder.getRelTargetId().equals(key)) {
243
			if (relDecoder.getRelSourceId().equals(key) || relDecoder.getRelTargetId().equals(key)) {
244

  
245
				final List<String> metadata = Lists.newArrayList();
246
				final TypeProtos.Type targetType = relDecoder.getTargetType(mainEntity.getEntity().getType());
247
				//final Set<String> relFilter = entityConfigTable.getFilter(targetType, relDecoder.getRelDescriptor());
248
				metadata.addAll(listFields(relDecoder.getSubRel(), entityConfigTable.getIncludeFilter(targetType, relDecoder.getRelDescriptor()), false, true));
249

  
250
				String semanticclass = "";
251
				String semanticscheme = "";
252

  
253
				final RelDescriptor relDescriptor = relDecoder.getRelDescriptor();
254

  
255
				if ((cachedTarget != null) && cachedTarget.isInitialized()) {
256

  
257
					//final Set<String> filter = entityConfigTable.getFilter(targetType, relDescriptor);
258
					final OafEntityDecoder d = OafEntityDecoder.decode(cachedTarget);
259
					metadata.addAll(decodeType(d, entityConfigTable.getIncludeFilter(targetType, relDescriptor), relDefaults, true));
260
					if (d.getType().equals(Type.result)) {
261
						for(Instance i : cachedTarget.getResult().getInstanceList()) {
262
							final List<String> fields = listFields(i, entityConfigTable.getIncludeFilter(targetType, relDecoder.getRelDescriptor()), false, true);
263
							metadata.addAll(fields);
264
						}
265
					}
266
				}
267

  
268
				final RelMetadata relMetadata = relDecoder.getRelMetadata();
269
				// debug
270
				if (relMetadata == null) {
271
					// System.err.println(this);
272
					semanticclass = semanticscheme = "UNKNOWN";
273
				} else {
274
					semanticclass = relClasses.getInverse(relMetadata.getSemantics().getClassname());
275
					semanticscheme = relMetadata.getSemantics().getSchemename();
276
				}
277

  
278
				final String rd = relDescriptor.getSubRelType().toString();
279
				incrementCounter(rd);
280

  
281
				final DataInfo info = decoder.getOaf().getDataInfo();
282
				if (info.getInferred()) {
283
					incrementCounter(rd + "_inferred");
284
				} else if(StringUtils.startsWith(info.getProvenanceaction().getClassid(), "sysimport:crosswalk")) {
285
					incrementCounter(rd + "_collected");
286
				} else if(StringUtils.startsWith(info.getProvenanceaction().getClassid(), "user:")) {
287
					incrementCounter(rd + "_claimed");
288
				}
289

  
290
				final LinkDescriptor ld = entityConfigTable.getDescriptor(relDecoder.getTargetType(mainEntity.getEntity().getType()), relDescriptor);
291

  
292
				final String relId = (ld != null) && !ld.isSymmetric() ? relDecoder.getRelTargetId() : relDecoder.getRelSourceId();
293

  
294
				rels.add(templateFactory.getRel(targetType, relId, Sets.newHashSet(metadata), semanticclass, semanticscheme, info.getInferred(), info.getTrust(),
295
						info.getInferenceprovenance(), info.getProvenanceaction().getClassid()));
296
			}
297
		}
298
		return rels;
299
	}
300

  
301
	// //////////////////////////////////
302

  
303
	private List<String> listChildren() {
304

  
305
		final List<String> children = Lists.newArrayList();
306
		for (final OafDecoder decoder : this.children) {
307
			final OafEntity cachedTarget = decoder.getOafRel().getCachedTarget();
308
			addChildren(children, cachedTarget, decoder.getRelDescriptor());
309
		}
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff