Project

General

Profile

« Previous | Next » 

Revision 47551

introducing dnet45 version of IIS CDH4 legacy libs

View differences:

modules/icm-iis-common/trunk/pom.xml.disabled
1
<?xml version="1.0" encoding="UTF-8"?>
2
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3
	<parent>
4
    		<groupId>eu.dnetlib</groupId>
5
	        <artifactId>dnet-hadoop-parent</artifactId>
6
            <version>1.0.0-SNAPSHOT</version>
7
	</parent>
8
	<modelVersion>4.0.0</modelVersion>
9
	<artifactId>icm-iis-common</artifactId>
10
	<packaging>jar</packaging>
11
	<version>1.0.1-SNAPSHOT</version>
12

  
13
	<scm>
14
	  <developerConnection>
15
	    scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet40/modules/icm-iis-common/trunk
16
	  </developerConnection>
17
	</scm>
18
	
19
	<properties>
20
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
21
	</properties>
22
	<dependencies>
23
		<dependency>
24
			<groupId>eu.dnetlib</groupId>
25
			<artifactId>icm-iis-core</artifactId>
26
			<version>[1.0.0,2.0.0)</version>
27
		</dependency>
28
		<dependency>
29
            <groupId>eu.dnetlib</groupId>
30
            <artifactId>icm-iis-schemas</artifactId>
31
            <version>[1.0.0,2.0.0)</version>
32
        </dependency>
33
		<dependency>
34
			<groupId>eu.dnetlib</groupId>
35
			<artifactId>dnet-openaire-data-protos</artifactId>
36
			<version>[3.0.0, 4.0.0)</version>
37
		</dependency>
38
		<dependency>
39
			<groupId>org.apache.oozie</groupId>
40
			<artifactId>oozie-core</artifactId>
41
			<version>${iis.oozie.version}</version>
42
			<scope>provided</scope>
43
		</dependency>
44
        <dependency>
45
            <groupId>org.apache.hadoop</groupId>
46
            <artifactId>hadoop-core</artifactId>
47
            <version>${iis.hadoop.core.version}</version>
48
            <scope>provided</scope>
49
        </dependency>
50
        <dependency>
51
            <groupId>org.apache.hadoop</groupId>
52
            <artifactId>hadoop-common</artifactId>
53
            <version>${iis.hadoop.common.version}</version>
54
            <scope>provided</scope>
55
        </dependency>
56
        <dependency>
57
            <groupId>org.apache.avro</groupId>
58
            <artifactId>avro</artifactId>
59
            <version>${iis.avro.version}</version>
60
        </dependency>
61
        <dependency>
62
            <groupId>org.apache.avro</groupId>
63
            <artifactId>avro-mapred</artifactId>
64
            <version>${iis.avro.version}</version>
65
            <classifier>hadoop2</classifier>
66
        </dependency>
67
        <dependency>
68
        	<!-- required by caching mechanism -->
69
            <groupId>com.google.code.gson</groupId>
70
            <artifactId>gson</artifactId>
71
            <version>${google.gson.version}</version>
72
        </dependency>
73
        <dependency>
74
        	<!-- required by caching mechanism for setting chmod -->
75
            <groupId>org.springframework</groupId>
76
            <artifactId>spring-beans</artifactId>
77
            <version>${spring.version}</version>
78
        </dependency>
79
    	<dependency>
80
			<groupId>com.thoughtworks.xstream</groupId>
81
			<artifactId>xstream</artifactId>
82
			<version>1.4.7</version>
83
		</dependency>
84
	</dependencies>
85
	<build>
86
	</build>
87
	<repositories>
88
		<repository>
89
			<id>cloudera</id>
90
			<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
91
			<releases>
92
				<enabled>true</enabled>
93
			</releases>
94
			<snapshots>
95
				<enabled>false</enabled>
96
			</snapshots>
97
		</repository>
98
  </repositories>
99
</project>
0 100

  
modules/icm-iis-common/trunk/src/test/java/eu/dnetlib/iis/common/model/extrainfo/citations/AlphaNumericCitationComparatorTest.java
1
package eu.dnetlib.iis.common.model.extrainfo.citations;
2

  
3
import static org.junit.Assert.assertFalse;
4
import static org.junit.Assert.assertTrue;
5

  
6
import java.util.ArrayList;
7
import java.util.Iterator;
8
import java.util.SortedSet;
9
import java.util.TreeSet;
10

  
11
import org.junit.Test;
12

  
13
public class AlphaNumericCitationComparatorTest {
14

  
15
	@Test
16
	public void testSorting() throws Exception {
17
		SortedSet<BlobCitationEntry> citations = new TreeSet<BlobCitationEntry>();
18
		
19
		BlobCitationEntry c1 = new BlobCitationEntry("[1] test");
20
		BlobCitationEntry c2 = new BlobCitationEntry("[10] test");
21
		BlobCitationEntry c3 = new BlobCitationEntry("[2] test");
22
		BlobCitationEntry c4 = new BlobCitationEntry("[1] test");
23
		c4.setIdentifiers(new ArrayList<TypedId>());
24
		c4.getIdentifiers().add(new TypedId("1", "openaire", 0.9f));
25
		citations.add(c4);
26
		citations.add(c3);
27
		citations.add(c2);
28
		citations.add(c1);
29
		Iterator<BlobCitationEntry> citationsIt = citations.iterator();
30
		assertTrue(c1==citationsIt.next());
31
		assertTrue(c4==citationsIt.next());
32
		assertTrue(c3==citationsIt.next());
33
		assertTrue(c2==citationsIt.next());
34
		assertFalse(citationsIt.hasNext());
35
	}
36
	
37
	@Test
38
	public void testSortingWithNulls() throws Exception {
39
		SortedSet<BlobCitationEntry> citations = new TreeSet<BlobCitationEntry>();
40
		BlobCitationEntry c2 = new BlobCitationEntry("[10] test");
41
		BlobCitationEntry c3 = new BlobCitationEntry(null);
42
		citations.add(c3);
43
		citations.add(c2);
44
		Iterator<BlobCitationEntry> citationsIt = citations.iterator();
45
		assertTrue(c2==citationsIt.next());
46
		assertTrue(c3==citationsIt.next());
47
	}
48
	
49
	@Test
50
	public void testSortingWithDuplicates() throws Exception {
51
		SortedSet<BlobCitationEntry> citations = new TreeSet<BlobCitationEntry>();
52
		
53
		BlobCitationEntry c1 = new BlobCitationEntry("[1] test");
54
		BlobCitationEntry c2 = new BlobCitationEntry("[1] test");
55
		BlobCitationEntry c3 = new BlobCitationEntry("[2] test");
56
		BlobCitationEntry c4 = new BlobCitationEntry("[2] test");
57
		BlobCitationEntry c5 = new BlobCitationEntry("[2] test");
58
		c3.setIdentifiers(new ArrayList<TypedId>());
59
		c3.getIdentifiers().add(new TypedId("1", "openaire", 0.9f));
60
		c4.setIdentifiers(new ArrayList<TypedId>());
61
		c4.getIdentifiers().add(new TypedId("1", "openaire", 0.9f));
62
		c5.setIdentifiers(new ArrayList<TypedId>());
63
		c5.getIdentifiers().add(new TypedId("2", "openaire", 0.9f));
64
		citations.add(c5);
65
		citations.add(c4);
66
		citations.add(c3);
67
		citations.add(c2);
68
		citations.add(c1);
69
		Iterator<BlobCitationEntry> citationsIt = citations.iterator();
70
		assertTrue(c2==citationsIt.next());
71
		assertTrue(c5==citationsIt.next());
72
		assertTrue(c4==citationsIt.next());
73
		assertFalse(citationsIt.hasNext());
74
	}
75
}
0 76

  
modules/icm-iis-common/trunk/src/main/java/eu/dnetlib/iis/common/model/extrainfo/converter/CitationsExtraInfoConverter.java
1
package eu.dnetlib.iis.common.model.extrainfo.converter;
2

  
3
import java.util.SortedSet;
4

  
5
import eu.dnetlib.iis.common.model.extrainfo.citations.BlobCitationEntry;
6

  
7

  
8
/**
9
 * {@link BlobCitationEntry} based avro to xml converter.
10
 * @author mhorst
11
 *
12
 */
13
public class CitationsExtraInfoConverter extends AbstractExtraInfoConverter<SortedSet<BlobCitationEntry>> {
14

  
15
	public CitationsExtraInfoConverter() {
16
		xstream.processAnnotations(BlobCitationEntry.class);
17
		xstream.alias("citations", SortedSet.class);
18
	}
19

  
20
	@SuppressWarnings("unchecked")
21
	@Override
22
	public SortedSet<BlobCitationEntry> deserialize(String source) throws UnsupportedOperationException {
23
		return (SortedSet<BlobCitationEntry>) xstream.fromXML(source);
24
	}
25
}
0 26

  
modules/icm-iis-common/trunk/src/main/java/eu/dnetlib/iis/common/model/extrainfo/converter/ExtraInfoConverter.java
1
package eu.dnetlib.iis.common.model.extrainfo.converter;
2

  
3
/**
4
 * Avro to XML converter module.
5
 * @author mhorst
6
 *
7
 * @param <T>
8
 */
9
public interface ExtraInfoConverter<T> {
10

  
11
	/**
12
	 * Serializes object to its XML representation.
13
	 * @param object
14
	 * @return XML representation of avro object
15
	 */
16
	String serialize(T object);
17
	
18
	/**
19
	 * Deserializes extra info object.
20
	 * @param source
21
	 * @return deserialized object
22
	 * @throws UnsupportedOperationException
23
	 */
24
	T deserialize(String source) throws UnsupportedOperationException;
25
}
0 26

  
modules/icm-iis-common/trunk/src/main/java/eu/dnetlib/iis/common/model/extrainfo/converter/AbstractExtraInfoConverter.java
1
package eu.dnetlib.iis.common.model.extrainfo.converter;
2

  
3
import java.io.StringWriter;
4

  
5
import com.thoughtworks.xstream.XStream;
6
import com.thoughtworks.xstream.converters.extended.NamedMapConverter;
7
import com.thoughtworks.xstream.io.xml.DomDriver;
8
import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
9

  
10
/**
11
 * Abstract xml converter.
12
 * @author mhorst
13
 *
14
 */
15
public abstract class AbstractExtraInfoConverter<T> implements ExtraInfoConverter<T> {
16

  
17
	protected final XStream xstream;
18
	
19
	public AbstractExtraInfoConverter() {
20
		xstream = new XStream(new DomDriver());
21
		xstream.setMode(XStream.NO_REFERENCES);
22
		
23
//		removing class attribute because no unmarshalling is required
24
		xstream.aliasSystemAttribute(null, "class");
25
//		changing the way maps are generated
26
		NamedMapConverter namedMapConverter = new NamedMapConverter(
27
				xstream.getMapper(),"entry","key",String.class,"value",Integer.class,
28
				true, true, xstream.getConverterLookup());
29
		xstream.registerConverter(namedMapConverter);
30
	
31
	}
32
	
33
	@Override
34
	public String serialize(T object) {
35
		StringWriter sw = new StringWriter();
36
//		xstream.marshal(object, new CompactWriter(sw));
37
		xstream.marshal(object, new PrettyPrintWriter(sw));
38
		return sw.toString();
39
	}
40
	
41
	@Override
42
	public T deserialize(String source) throws UnsupportedOperationException {
43
		throw new UnsupportedOperationException("deserialization is unsupported");
44
	}
45

  
46
}
0 47

  
modules/icm-iis-common/trunk/src/main/java/eu/dnetlib/iis/common/model/extrainfo/ExtraInfoConstants.java
1
package eu.dnetlib.iis.common.model.extrainfo;
2

  
3
/**
4
 * ExtraInfo InformationSpace model constants.
5
 * @author mhorst
6
 *
7
 */
8
public class ExtraInfoConstants {
9

  
10
	public static final String NAME_PROJECT_STATISTICS = "project statistics";
11
	public static final String NAME_RESULT_STATISTICS = "result statistics";
12
	public static final String NAME_AUTHOR_STATISTICS = "author statistics";
13
	
14
	public static final String NAME_CITATIONS = "result citations";
15
	
16
	public static final String TYPOLOGY_CITATIONS = "citations";
17
	public static final String TYPOLOGY_STATISTICS = "statistics";
18
	
19
	public static final String CITATION_TYPE_OPENAIRE = "openaire";
20
}
0 21

  
modules/icm-iis-common/trunk/src/main/java/eu/dnetlib/iis/common/model/extrainfo/citations/TypedId.java
1
package eu.dnetlib.iis.common.model.extrainfo.citations;
2

  
3
import com.thoughtworks.xstream.annotations.XStreamAlias;
4
import com.thoughtworks.xstream.annotations.XStreamAsAttribute;
5

  
6
/**
7
 * Typed identifier.
8
 * @author mhorst
9
 *
10
 */
11
@XStreamAlias("id")
12
public class TypedId {
13
	
14
	@XStreamAsAttribute
15
	String value;
16
	@XStreamAsAttribute
17
	String type;
18
	@XStreamAsAttribute
19
	float confidenceLevel;
20
	
21
	public TypedId(String value, String type,
22
			float confidenceLevel) {
23
		this.value = value;
24
		this.type = type;
25
		this.confidenceLevel = confidenceLevel;
26
	}
27
	
28
	public TypedId() {
29
		super();
30
	}
31

  
32
	public String getValue() {
33
		return value;
34
	}
35

  
36
	public void setValue(String value) {
37
		this.value = value;
38
	}
39
	
40
	public String getType() {
41
		return type;
42
	}
43

  
44
	public void setType(String type) {
45
		this.type = type;
46
	}
47

  
48
	public float getConfidenceLevel() {
49
		return confidenceLevel;
50
	}
51

  
52
	public void setConfidenceLevel(float confidenceLevel) {
53
		this.confidenceLevel = confidenceLevel;
54
	}
55

  
56
	@Override
57
	public int hashCode() {
58
		final int prime = 31;
59
		int result = 1;
60
		result = prime * result + Float.floatToIntBits(confidenceLevel);
61
		result = prime * result + ((type == null) ? 0 : type.hashCode());
62
		result = prime * result + ((value == null) ? 0 : value.hashCode());
63
		return result;
64
	}
65

  
66
	@Override
67
	public boolean equals(Object obj) {
68
		if (this == obj)
69
			return true;
70
		if (obj == null)
71
			return false;
72
		if (getClass() != obj.getClass())
73
			return false;
74
		TypedId other = (TypedId) obj;
75
		if (Float.floatToIntBits(confidenceLevel) != Float
76
				.floatToIntBits(other.confidenceLevel))
77
			return false;
78
		if (type == null) {
79
			if (other.type != null)
80
				return false;
81
		} else if (!type.equals(other.type))
82
			return false;
83
		if (value == null) {
84
			if (other.value != null)
85
				return false;
86
		} else if (!value.equals(other.value))
87
			return false;
88
		return true;
89
	}
90
}
0 91

  
modules/icm-iis-common/trunk/src/main/java/eu/dnetlib/iis/common/model/extrainfo/citations/BlobCitationEntry.java
1
package eu.dnetlib.iis.common.model.extrainfo.citations;
2

  
3
import java.text.DecimalFormatSymbols;
4
import java.util.List;
5
import java.util.Locale;
6
import java.util.regex.Matcher;
7
import java.util.regex.Pattern;
8

  
9
import com.thoughtworks.xstream.annotations.XStreamAlias;
10
import com.thoughtworks.xstream.annotations.XStreamAsAttribute;
11
import com.thoughtworks.xstream.annotations.XStreamImplicit;
12
import com.thoughtworks.xstream.annotations.XStreamOmitField;
13

  
14
/**
15
 * Comparable citation entry.
16
 * @author mhorst
17
 *
18
 */
19
@XStreamAlias("citation")
20
public class BlobCitationEntry implements Comparable<BlobCitationEntry> {
21

  
22
	/**
23
	 * Citation position.
24
	 */
25
	@XStreamAsAttribute
26
	protected int position;
27
	
28
	/**
29
	 * Raw citation text.
30
	 */
31
	protected String rawText;
32
	
33
	/**
34
	 * Matched publications identifiers.
35
	 */
36
	@XStreamImplicit
37
	protected List<TypedId> identifiers;
38

  
39
	@XStreamOmitField
40
	private final static Pattern alphaNumChunkPattern = Pattern.compile("(\\d+\\" + 
41
			new DecimalFormatSymbols(Locale.getDefault()).getDecimalSeparator() + "\\d+)|(\\d+)|(\\D+)");;
42
	
43
	
44
	public BlobCitationEntry() {
45
		super();
46
	}
47

  
48
	public BlobCitationEntry(String rawText) {
49
		this.rawText = rawText;
50
	}
51
	
52
	public String getRawText() {
53
		return rawText;
54
	}
55

  
56
	public void setRawText(String rawText) {
57
		this.rawText = rawText;
58
	}
59

  
60
	public List<TypedId> getIdentifiers() {
61
		return identifiers;
62
	}
63

  
64
	public void setIdentifiers(List<TypedId> identifiers) {
65
		this.identifiers = identifiers;
66
	}
67

  
68
	@Override
69
	public int hashCode() {
70
		final int prime = 31;
71
		int result = 1;
72
		result = prime * result
73
				+ ((identifiers == null) ? 0 : identifiers.hashCode());
74
		result = prime * result + position;
75
		result = prime * result + ((rawText == null) ? 0 : rawText.hashCode());
76
		return result;
77
	}
78

  
79
	@Override
80
	public boolean equals(Object obj) {
81
		if (this == obj)
82
			return true;
83
		if (obj == null)
84
			return false;
85
		if (getClass() != obj.getClass())
86
			return false;
87
		BlobCitationEntry other = (BlobCitationEntry) obj;
88
		if (identifiers == null) {
89
			if (other.identifiers != null)
90
				return false;
91
		} else if (!identifiers.equals(other.identifiers))
92
			return false;
93
		if (position != other.position)
94
			return false;
95
		if (rawText == null) {
96
			if (other.rawText != null)
97
				return false;
98
		} else if (!rawText.equals(other.rawText))
99
			return false;
100
		return true;
101
	}
102

  
103
	@Override
104
	public int compareTo(BlobCitationEntry c2) {
105
		if (c2!=null) {
106
			if (this.position>c2.position) {
107
				return 1;
108
			} else if (this.position<c2.position) {
109
				return -1;
110
			} else {
111
				   if (this.getRawText()!=null) {
112
						if (c2.getRawText()!=null) {
113
							int textCompareResult = compareText(this.getRawText(), c2.getRawText());
114
							if (textCompareResult==0) {
115
								return compareIdentifiers(this.getIdentifiers(), c2.getIdentifiers());
116
							} else {
117
								return textCompareResult;	
118
							}
119
						} else {
120
							return -1;
121
						}
122
					} else {
123
						if (c2.getRawText()!=null) {
124
							return 1;
125
						} else {
126
							return compareIdentifiers(this.getIdentifiers(), c2.getIdentifiers());
127
						}
128
					}   
129
				
130
			}
131
		} else {
132
			if (this.getRawText()!=null) {
133
				return -1;	
134
			} else {
135
				if (this.getIdentifiers()!=null) {
136
					return -1;
137
				} else {
138
//					should we check position in any way?
139
					return 0;
140
				}
141
			}
142
		}
143
	}
144

  
145
	private int compareIdentifiers(List<TypedId> ids1, List<TypedId> ids2) {
146
		   if (ids2!=null) {
147
			   if (ids1!=null) {
148
				   if (ids1.equals(ids2)) {
149
					   return 0;
150
				   } else {
151
//					   no matter what value, order by ids is irrelevant
152
//					   we have to return non-zero value to prevent treating objects as the same
153
					   return 1;
154
				   }
155
			   } else {
156
				   return -1;
157
			   }
158
			} else {
159
				if (ids1!=null) {
160
					return 1;
161
				} else {
162
					return 0;
163
				}
164
			}
165
	   }
166
	   
167
	   private int compareText(String s1, String s2) {
168
	      int compareValue = 0;
169
	      Matcher s1ChunkMatcher = alphaNumChunkPattern.matcher(s1);
170
	      Matcher s2ChunkMatcher = alphaNumChunkPattern.matcher(s2);
171
	      String s1ChunkValue = null;
172
	      String s2ChunkValue = null;
173
	      while (s1ChunkMatcher.find() && s2ChunkMatcher.find() && compareValue == 0) {
174
	         s1ChunkValue = s1ChunkMatcher.group();
175
	         s2ChunkValue = s2ChunkMatcher.group();
176
	         try {
177
	            // compare double values - ints get converted to doubles. Eg. 100 = 100.0
178
	            Double s1Double = Double.valueOf(s1ChunkValue);
179
	            Double s2Double = Double.valueOf(s2ChunkValue);
180
	            compareValue = s1Double.compareTo(s2Double);
181
	         } catch (NumberFormatException e) {
182
	            // not a number, use string comparison.
183
	            compareValue = s1ChunkValue.compareTo(s2ChunkValue);
184
	         }
185
	         // if they are equal thus far, but one has more left, it should come after the one that doesn't.
186
	         if (compareValue == 0) {
187
	            if (s1ChunkMatcher.hitEnd() && !s2ChunkMatcher.hitEnd()) {
188
	               compareValue = -1;
189
	            } else if (!s1ChunkMatcher.hitEnd() && s2ChunkMatcher.hitEnd()) {
190
	               compareValue = 1;
191
	            }
192
	         }
193
	      }
194
	      return compareValue;
195
	   }
196

  
197
	/**
198
	 * @return the position
199
	 */
200
	public int getPosition() {
201
		return position;
202
	}
203

  
204
	/**
205
	 * @param position the position to set
206
	 */
207
	public void setPosition(int position) {
208
		this.position = position;
209
	}
210
}
0 211

  
modules/icm-iis-common/trunk/src/main/java/eu/dnetlib/iis/common/protobuf/AvroToProtoBufConverter.java
1
package eu.dnetlib.iis.common.protobuf;
2

  
3
import com.google.protobuf.Message;
4
import org.apache.avro.generic.IndexedRecord;
5

  
6
/**
7
 * @author Mateusz Fedoryszak (m.fedoryszak@icm.edu.pl)
8
 */
9
public interface AvroToProtoBufConverter<IN extends IndexedRecord, OUT extends Message> {
10
    public String convertIntoKey(IN datum);
11
    public OUT convertIntoValue(IN datum);
12
}
modules/icm-iis-common/trunk/src/main/java/eu/dnetlib/iis/common/protobuf/AvroToProtoBufOneToOneMapper.java
1
package eu.dnetlib.iis.common.protobuf;
2

  
3
import com.google.protobuf.Message;
4
import org.apache.avro.generic.IndexedRecord;
5
import org.apache.avro.mapred.AvroKey;
6
import org.apache.hadoop.io.BytesWritable;
7
import org.apache.hadoop.io.NullWritable;
8
import org.apache.hadoop.io.Text;
9
import org.apache.hadoop.mapreduce.Mapper;
10
import org.apache.log4j.Logger;
11

  
12
import java.io.IOException;
13

  
14
/**
15
 * @author Mateusz Fedoryszak (m.fedoryszak@icm.edu.pl)
16
 */
17
public class AvroToProtoBufOneToOneMapper<IN extends IndexedRecord, OUT extends Message>
18
        extends Mapper<AvroKey<IN>, NullWritable, Text, BytesWritable> {
19
    private static final String CONVERTER_CLASS_PROPERTY = "converter_class";
20
    private final Logger log = Logger.getLogger(AvroToProtoBufOneToOneMapper.class);
21

  
22
    private final Text keyWritable = new Text();
23
    private final BytesWritable valueWritable = new BytesWritable();
24
    private AvroToProtoBufConverter<IN, OUT> converter;
25

  
26
    @SuppressWarnings("unchecked")
27
    @Override
28
    public void setup(Context context) throws IOException, InterruptedException {
29
        Class<?> converterClass = context.getConfiguration().getClass(CONVERTER_CLASS_PROPERTY, null);
30

  
31
        if (converterClass == null) {
32
            throw new IOException("Please specify " + CONVERTER_CLASS_PROPERTY);
33
        }
34

  
35
        try {
36
            converter = (AvroToProtoBufConverter<IN, OUT>) converterClass.newInstance();
37
        } catch (ClassCastException e) {
38
            throw new IOException(
39
                    "Class specified in " + CONVERTER_CLASS_PROPERTY + " doesn't implement AvroToProtoBufConverter");
40
        } catch (Exception e) {
41
            throw new IOException(
42
                    "Could not instantiate specified AvroToProtoBufConverter class, " + converterClass, e);
43
        }
44
    }
45

  
46
    @Override
47
    public void map(AvroKey<IN> avro, NullWritable ignore, Context context)
48
            throws IOException, InterruptedException {
49
        String key = null;
50
        try {
51
            key = converter.convertIntoKey(avro.datum());
52
            keyWritable.set(key);
53

  
54
            byte[] value = converter.convertIntoValue(avro.datum()).toByteArray();
55
            valueWritable.set(value, 0, value.length);
56

  
57
            context.write(keyWritable, valueWritable);
58
        } catch (Exception e) {
59
            log.error("Error" + (key != null ? " while processing  " + key : ""), e);
60
        }
61
    }
62
}
modules/icm-iis-common/trunk/src/main/java/eu/dnetlib/iis/common/utils/EmptyDatastoreVerifierProcess.java
1
package eu.dnetlib.iis.common.utils;
2

  
3
import java.io.File;
4
import java.io.FileOutputStream;
5
import java.io.OutputStream;
6
import java.util.Collections;
7
import java.util.HashMap;
8
import java.util.Map;
9
import java.util.Properties;
10

  
11
import org.apache.hadoop.conf.Configuration;
12
import org.apache.hadoop.fs.FileSystem;
13

  
14
import eu.dnetlib.iis.core.java.PortBindings;
15
import eu.dnetlib.iis.core.java.Ports;
16
import eu.dnetlib.iis.core.java.Process;
17
import eu.dnetlib.iis.core.java.io.CloseableIterator;
18
import eu.dnetlib.iis.core.java.io.DataStore;
19
import eu.dnetlib.iis.core.java.io.FileSystemPath;
20
import eu.dnetlib.iis.core.java.porttype.AnyPortType;
21
import eu.dnetlib.iis.core.java.porttype.PortType;
22

  
23
/**
24
 * Simple process verifying whether given datastore is empty.
25
 * @author mhorst
26
 *
27
 */
28
public class EmptyDatastoreVerifierProcess implements Process {
29

  
30
	public static final String INPUT_PORT_NAME = "input";
31
	
32
	public static final String OOZIE_ACTION_OUTPUT_FILENAME = "oozie.action.output.properties";
33
	
34
	public static final String DEFAULT_ENCODING = "UTF-8";
35
	
36
	public static final String OUTPUT_PROPERTY_IS_EMPTY = "isEmpty";
37
	
38
	/**
39
	 * Ports handled by this module.
40
	 */
41
	private static final Ports ports;
42
	
43
	static {
44
//		preparing ports
45
		Map<String, PortType> input = new HashMap<String, PortType>();
46
		input.put(INPUT_PORT_NAME, new AnyPortType());
47
		Map<String, PortType> output = Collections.emptyMap();
48
		ports = new Ports(input, output);
49
	}
50
	
51
	/**
52
	 * Returns ports in a static way.
53
	 * @return
54
	 */
55
	static protected Ports getStaticPorts() {
56
		return ports;
57
	}
58
	
59
	@Override
60
	public Map<String, PortType> getInputPorts() {
61
		return getStaticPorts().getInput();
62
	}
63

  
64
	@Override
65
	public Map<String, PortType> getOutputPorts() {
66
		return getStaticPorts().getOutput();
67
	}
68

  
69
	@Override
70
	public void run(PortBindings portBindings, Configuration conf,
71
			Map<String, String> parameters) throws Exception {
72
		CloseableIterator<?> closeableIt = DataStore.getReader(
73
				new FileSystemPath(FileSystem.get(conf), 
74
						portBindings.getInput().get(INPUT_PORT_NAME)));
75
		try {
76
			File file = new File(System.getProperty(OOZIE_ACTION_OUTPUT_FILENAME));
77
	        Properties props = new Properties();
78
			props.setProperty(OUTPUT_PROPERTY_IS_EMPTY, 
79
	        		Boolean.toString(!closeableIt.hasNext()));	
80
			OutputStream os = new FileOutputStream(file);
81
	        try {
82
		        	props.store(os, "");	
83
	        } finally {
84
	        	os.close();	
85
	        }	
86
		} finally {
87
			closeableIt.close();
88
		}
89
	}
90

  
91
}
0 92

  
modules/icm-iis-common/trunk/src/main/java/eu/dnetlib/iis/common/WorkflowRuntimeParameters.java
1
package eu.dnetlib.iis.common;
2

  
3
public abstract class WorkflowRuntimeParameters {
4

  
5
	private WorkflowRuntimeParameters() {}
6
	
7
	public static final char DEFAULT_CSV_DELIMITER = ',';
8
	
9
	public static final String UNDEFINED_NONEMPTY_VALUE = "$UNDEFINED$";
10
	
11
}
modules/icm-iis-common/trunk/src/main/java/eu/dnetlib/iis/common/hbase/HBaseConstants.java
1
package eu.dnetlib.iis.common.hbase;
2

  
3
import java.io.UnsupportedEncodingException;
4

  
5
import eu.dnetlib.data.proto.TypeProtos.Type;
6

  
7
/**
8
 * HBase constants
9
 * @author mhorst
10
 *
11
 */
12
public final class HBaseConstants {
13

  
14
	public static final float CONFIDENCE_TO_TRUST_LEVEL_FACTOR = 0.9f;
15
	
16
	public static final String STATIC_FIELDS_ENCODING_UTF8 = "utf-8";
17
	
18
	public static final char ROW_PREFIX_SEPARATOR = '|';
19
	
20
	public static final String ID_NAMESPACE_SEPARATOR = "::";
21
	public static final String CLASSIFICATION_HIERARCHY_SEPARATOR = ID_NAMESPACE_SEPARATOR;
22
	public static final String INFERENCE_PROVENANCE_SEPARATOR = ID_NAMESPACE_SEPARATOR;
23
	
24
	public static final byte[] ROW_PREFIX_RESULT;
25
	public static final byte[] ROW_PREFIX_PROJECT;
26
	public static final byte[] ROW_PREFIX_PERSON;
27
	
28
	public static final byte[] QUALIFIER_BODY;
29
	
30
	public static final String SEMANTIC_CLASS_MAIN_TITLE = "main title";
31
	public static final String SEMANTIC_CLASS_PUBLICATION = "publication";
32
	public static final String SEMANTIC_CLASS_UNKNOWN = "UNKNOWN";
33
	public static final String SEMANTIC_CLASS_IIS = "iis";
34
	
35
	public static final String SEMANTIC_SCHEME_DNET_PERSON_ROLES = "dnet:personroles";
36
	public static final String SEMANTIC_SCHEME_DNET_RELATIONS_RESULT_RESULT = "dnet:result_result_relations";
37
	public static final String SEMANTIC_SCHEME_DNET_RELATIONS_RESULT_PROJECT = "dnet:result_project_relations";
38
	
39
	public static final String SEMANTIC_SCHEME_DNET_TITLE_TYPOLOGIES = "dnet:title_typologies";
40
	public static final String SEMANTIC_SCHEME_DNET_RESULT_TYPOLOGIES = "dnet:result_typologies";
41
	public static final String SEMANTIC_SCHEME_DNET_PROVENANCE_ACTIONS = "dnet:provenanceActions";
42
	public static final String SEMANTIC_SCHEME_DNET_LANGUAGES = "dnet:languages";
43
	public static final String SEMANTIC_SCHEME_DNET_PID_TYPES = "dnet:pid_types";
44
	public static final String SEMANTIC_SCHEME_DNET_CLASSIFICATION_TAXONOMIES = "dnet:subject_classification_typologies";
45
	
46
//	resultResult citation and similarity related
47
	public static final String SEMANTIC_SCHEME_DNET_DATASET_PUBLICATION_RELS = "dnet:dataset_publication_rels";
48
	
49
	public static final String SEMANTIC_CLASS_TAXONOMIES_ARXIV = "arxiv";
50
	public static final String SEMANTIC_CLASS_TAXONOMIES_WOS = "wos";
51
	public static final String SEMANTIC_CLASS_TAXONOMIES_DDC = "ddc";
52
	public static final String SEMANTIC_CLASS_TAXONOMIES_MESHEUROPMC = "mesheuropmc";
53
	public static final String SEMANTIC_CLASS_TAXONOMIES_ACM = "acm";
54
	
55
	public static final String EXTERNAL_ID_TYPE_INSTANCE_URL = "dnet:instance-url";
56
	public static final String EXTERNAL_ID_TYPE_UNKNOWN = "unknown";
57
	
58
//	publication types class ids
59
	public static final String SEMANTIC_CLASS_INSTANCE_TYPE_ARTICLE = "0001";
60
	public static final String SEMANTIC_CLASS_INSTANCE_TYPE_DATASET = "0021";
61
	
62
	static {
63
		try {
64
			ROW_PREFIX_RESULT = "50|".getBytes(STATIC_FIELDS_ENCODING_UTF8);
65
			ROW_PREFIX_PROJECT = "40|".getBytes(STATIC_FIELDS_ENCODING_UTF8);
66
			ROW_PREFIX_PERSON = "30|".getBytes(STATIC_FIELDS_ENCODING_UTF8);
67
			
68
			QUALIFIER_BODY = "body".getBytes(STATIC_FIELDS_ENCODING_UTF8);
69

  
70
		} catch (UnsupportedEncodingException e) {
71
			throw new RuntimeException(e);
72
		}
73
	}
74
	
75
	/**
76
	 * Returns collumn family name for given entity type.
77
	 * @param entityType
78
	 * @return collumn family name for given entity type
79
	 */
80
	public static byte[] getCollumnFamily(Type entityType) {
81
		try {
82
			return entityType.name().getBytes(STATIC_FIELDS_ENCODING_UTF8);
83
		} catch (UnsupportedEncodingException e) {
84
			throw new RuntimeException(e);
85
		}
86
	}
87
	
88
	private HBaseConstants() {}
89
}
0 90

  
modules/icm-iis-common/trunk/src/main/java/eu/dnetlib/iis/common/ByteArrayUtils.java
1
package eu.dnetlib.iis.common;
2

  
3
/**
4
 * Byte array utility class.
5
 * @author mhorst
6
 *
7
 */
8
public class ByteArrayUtils {
9

  
10
	/**
11
	 * Does this byte array begin with match array content?
12
	 * @param source Byte array to examine
13
	 * @param match Byte array to locate in <code>source</code>
14
	 * @return true If the starting bytes are equal
15
	 */
16
	public static boolean startsWith(byte[] source, byte[] match) {
17
		return startsWith(source, 0, match);
18
	}
19

  
20
	/**
21
	 * Does this byte array begin with match array content?
22
	 * @param source Byte array to examine
23
	 * @param offset An offset into the <code>source</code> array
24
	 * @param match Byte array to locate in <code>source</code>
25
	 * @return true If the starting bytes are equal
26
	 */
27
	public static boolean startsWith(byte[] source, int offset, byte[] match) {
28
		if (match.length > (source.length - offset)) {
29
			return false;
30
		}
31
		for (int i = 0; i < match.length; i++) {
32
			if (source[offset + i] != match[i]) {
33
				return false;
34
			}
35
		}
36
		return true;
37
	}
38
	
39
}
modules/icm-iis-common/trunk/src/main/java/eu/dnetlib/iis/common/cache/CacheMetadataManagingProcess.java
1
package eu.dnetlib.iis.common.cache;
2

  
3
import java.io.File;
4
import java.io.FileOutputStream;
5
import java.io.IOException;
6
import java.io.InputStreamReader;
7
import java.io.OutputStream;
8
import java.io.OutputStreamWriter;
9
import java.util.Collections;
10
import java.util.Map;
11
import java.util.Properties;
12

  
13
import org.apache.hadoop.conf.Configuration;
14
import org.apache.hadoop.fs.FSDataInputStream;
15
import org.apache.hadoop.fs.FSDataOutputStream;
16
import org.apache.hadoop.fs.FileSystem;
17
import org.apache.hadoop.fs.Path;
18

  
19
import com.google.gson.Gson;
20
import com.google.gson.stream.JsonWriter;
21

  
22
import eu.dnetlib.iis.common.FsShellPermissions;
23
import eu.dnetlib.iis.core.java.PortBindings;
24
import eu.dnetlib.iis.core.java.porttype.PortType;
25

  
26
/**
27
 * CacheMetadata managing process.
28
 * 
29
 * @author mhorst
30
 *
31
 */
32
public class CacheMetadataManagingProcess implements eu.dnetlib.iis.core.java.Process {
33

  
34
	public static final String OUTPUT_PROPERTY_CACHE_ID = "cache_id";
35
	
36
	public static final String PARAM_CACHE_DIR = "default_cache_location";
37
	
38
	public static final String PARAM_MODE = "mode";
39
	
40
	public static final String PARAM_ID = "id";
41
	
42
	public static final String MODE_READ_CURRENT_ID = "read_current_id";
43
	
44
	public static final String MODE_GENERATE_NEW_ID = "generate_new_id";
45
	
46
	public static final String MODE_WRITE_ID = "write_id";
47
	
48
	public static final String DEFAULT_METAFILE_NAME = "meta.json";
49
	
50
	public static final int CACHE_ID_PADDING_LENGTH = 6;
51
	
52
	public static final String NON_EXISTING_CACHE_ID = "$UNDEFINED$"; 
53

  
54
	public static final String OOZIE_ACTION_OUTPUT_FILENAME = "oozie.action.output.properties";
55
	
56
	public static final String DEFAULT_ENCODING = "UTF-8";
57
	
58
	public class CacheMeta {
59
		protected String currentCacheId;
60
//		TODO handle this property in a safe way!
61
//		protected boolean isCacheUnderConstruction;
62

  
63
		public String getCurrentCacheId() {
64
			return currentCacheId;
65
		}
66

  
67
		public void setCurrentCacheId(String currentCacheId) {
68
			this.currentCacheId = currentCacheId;
69
		}
70
	}
71
	
72
	@Override
73
	public Map<String, PortType> getInputPorts() {
74
		return Collections.emptyMap();
75
	}
76

  
77
	@Override
78
	public Map<String, PortType> getOutputPorts() {
79
		return Collections.emptyMap();
80
	}
81

  
82
	protected String getExistingCacheId(Configuration conf, Map<String, String> parameters) throws IOException {
83
		if (parameters.containsKey(PARAM_CACHE_DIR)) {
84
			FileSystem fs = FileSystem.get(conf);
85
			Path cacheFilePath = new Path(parameters.get(PARAM_CACHE_DIR), 
86
					DEFAULT_METAFILE_NAME);
87
			if (fs.exists(cacheFilePath)) {
88
				FSDataInputStream inputStream = fs.open(cacheFilePath);
89
				InputStreamReader reader = new InputStreamReader(
90
						inputStream, DEFAULT_ENCODING);
91
				try {
92
					Gson gson = new Gson();
93
					CacheMeta cacheMeta = gson.fromJson(reader, CacheMeta.class);
94
					return cacheMeta.getCurrentCacheId();
95
				} finally {
96
					reader.close();
97
					inputStream.close();
98
				}
99
			} else {
100
//				cache does not exist yet
101
				return NON_EXISTING_CACHE_ID;
102
			}
103
		} else {
104
			throw new RuntimeException("cache directory location not provided! "
105
					+ "'" + PARAM_CACHE_DIR + "' parameter is missing!");
106
		}
107
	}
108
	
109
	protected String generateNewCacheId(Configuration conf, Map<String, String> parameters) throws IOException {
110
		if (parameters.containsKey(PARAM_CACHE_DIR)) {
111
			FileSystem fs = FileSystem.get(conf);
112
			Path cacheFilePath = new Path(parameters.get(PARAM_CACHE_DIR), 
113
					DEFAULT_METAFILE_NAME);
114
			CacheMeta cachedMeta = null;
115
			if (fs.exists(cacheFilePath)) {
116
				FSDataInputStream inputStream = fs.open(cacheFilePath);
117
				InputStreamReader reader = new InputStreamReader(
118
						inputStream, DEFAULT_ENCODING);
119
				try {
120
					Gson gson = new Gson();
121
					cachedMeta = gson.fromJson(reader, CacheMeta.class);
122
				} finally {
123
					reader.close();
124
					inputStream.close();
125
				}
126
			}
127
			if (cachedMeta!=null) {
128
				int currentIndex = convertCacheIdToInt(cachedMeta.getCurrentCacheId());
129
				return convertIntToCacheId(currentIndex+1);
130
			} else {
131
//				initializing cache meta
132
				return convertIntToCacheId(1);
133
			}
134
		} else {
135
			throw new RuntimeException("cache directory location not provided! "
136
					+ "'" + PARAM_CACHE_DIR + "' parameter is missing!");
137
		}
138
	}
139
	
140
	protected void writeCacheId(Configuration conf, Map<String, String> parameters) throws IOException {
141
		if (parameters.containsKey(PARAM_CACHE_DIR)) {
142
			if (parameters.containsKey(PARAM_ID)) {
143
				String cacheId = parameters.get(PARAM_ID);
144
				FileSystem fs = FileSystem.get(conf);
145
				Path cacheFilePath = new Path(parameters.get(PARAM_CACHE_DIR), 
146
						DEFAULT_METAFILE_NAME);
147
				CacheMeta cachedMeta = null;
148
				if (fs.exists(cacheFilePath)) {
149
					FSDataInputStream inputStream = fs.open(cacheFilePath);
150
					InputStreamReader reader = new InputStreamReader(
151
							inputStream, DEFAULT_ENCODING);
152
					try {
153
						Gson gson = new Gson();
154
						cachedMeta = gson.fromJson(reader, CacheMeta.class);
155
					} finally {
156
						reader.close();
157
						inputStream.close();
158
					}
159
				}
160
//				writing new id
161
				if (cachedMeta==null) {
162
					cachedMeta = new CacheMeta();
163
				}
164
				cachedMeta.setCurrentCacheId(cacheId);
165
				
166
				Gson gson = new Gson();
167
				FSDataOutputStream outputStream = fs.create(cacheFilePath, true);
168
				JsonWriter writer = new JsonWriter(
169
						new OutputStreamWriter(outputStream, DEFAULT_ENCODING));
170
				try {
171
					gson.toJson(cachedMeta, CacheMeta.class, writer);
172
				} finally {
173
					writer.close();
174
					outputStream.close();
175
//					changing file permission to +rw to allow writing for different users
176
					FsShellPermissions.changePermissions(fs, 
177
							conf, 
178
							FsShellPermissions.Op.CHMOD, 
179
							false, "0666", cacheFilePath.toString());
180
				}
181
			} else {
182
				throw new RuntimeException("unable to write new cache id in meta.json file, "
183
						+ "no '" + PARAM_ID + "' input parameter provied!");
184
			}
185
		} else {
186
			throw new RuntimeException("cache directory location not provided! "
187
					+ "'" + PARAM_CACHE_DIR + "' parameter is missing!");
188
		}
189
	}
190
	
191
	protected static int convertCacheIdToInt(String cacheId) {
192
		StringBuffer strBuff = new StringBuffer(cacheId);
193
		while (true) {
194
			if (strBuff.charAt(0)=='0') {
195
				strBuff.deleteCharAt(0);
196
			} else {
197
				break;
198
			}
199
		}
200
		return Integer.parseInt(strBuff.toString());
201
	}
202
	
203
	protected static String convertIntToCacheId(int cacheIndex) {
204
		StringBuffer strBuff = new StringBuffer(String.valueOf(cacheIndex));
205
		while(strBuff.length()<CACHE_ID_PADDING_LENGTH) {
206
			strBuff.insert(0, '0');
207
		}
208
		return strBuff.toString();
209
	}
210
	
211
	@Override
212
	public void run(PortBindings portBindings, Configuration conf,
213
			Map<String, String> parameters) throws Exception {
214
		String mode = parameters.get(PARAM_MODE);
215
		File file = new File(System.getProperty(OOZIE_ACTION_OUTPUT_FILENAME));
216
        Properties props = new Properties();
217
		if (MODE_READ_CURRENT_ID.equals(mode)) {
218
			props.setProperty(OUTPUT_PROPERTY_CACHE_ID, 
219
	        		getExistingCacheId(conf, parameters));
220
		} else if (MODE_GENERATE_NEW_ID.equals(mode)) {
221
			props.setProperty(OUTPUT_PROPERTY_CACHE_ID, 
222
	        		generateNewCacheId(conf, parameters));
223
		} else if (MODE_WRITE_ID.equals(mode)) {
224
			writeCacheId(conf, parameters);
225
		} else {
226
			throw new RuntimeException("unsupported mode: " + mode);	
227
		}
228
        OutputStream os = new FileOutputStream(file);
229
        try {
230
        	props.store(os, "");	
231
        } finally {
232
        	os.close();	
233
        }	
234
	}
235
	
236
}
0 237

  
modules/icm-iis-common/trunk/src/main/java/eu/dnetlib/iis/common/affiliation/AffiliationBuilder.java
1
package eu.dnetlib.iis.common.affiliation;
2

  
3
import java.util.List;
4

  
5
import org.jdom.Element;
6

  
7
import eu.dnetlib.iis.metadataextraction.schemas.Affiliation;
8

  
9
/**
10
 * Affiliation builder.
11
 * @author mhorst
12
 *
13
 */
14
public class AffiliationBuilder {
15

  
16
	
17
	/**
18
	 * Creates affiliation object based on node.
19
	 * @param node
20
	 * @return affiliation object
21
	 */
22
	public static Affiliation build(Element node) {
23
		String affId = node.getAttributeValue("id");
24
        String country = node.getChildText("country");
25
        String countryCode = null;
26
        if (node.getChild("country") != null) {
27
            countryCode = node.getChild("country").getAttributeValue("country");
28
        }
29
        String address = node.getChildText("addr-line");
30
        StringBuilder sb = new StringBuilder();
31
        List<?> institutions = node.getChildren("institution");
32
        for (Object institution : institutions) {
33
            if (!sb.toString().isEmpty()) {
34
                sb.append(", ");
35
            }
36
            sb.append(((Element) institution).getTextTrim());
37
        }
38
        String institution = sb.toString().replaceFirst(", $", "");
39
        if (institution.isEmpty()) {
40
            institution = null;
41
        }
42
        
43
        return Affiliation.newBuilder()
44
                .setRawText(node.getValue().trim().replaceFirst(affId, "").trim())
45
                .setOrganization(institution!=null?institution.trim():null)
46
                .setAddress(address!=null?address.trim():null)
47
                .setCountryName(country!=null?country.trim():null)
48
                .setCountryCode(countryCode)
49
                .build();
50
	}
51
	
52
}
0 53

  
modules/icm-iis-common/trunk/src/main/java/eu/dnetlib/iis/common/oozie/property/ConditionalPropertySetter.java
1
package eu.dnetlib.iis.common.oozie.property;
2

  
3
import java.io.File;
4
import java.io.FileOutputStream;
5
import java.io.OutputStream;
6
import java.util.Collections;
7
import java.util.Map;
8
import java.util.Properties;
9

  
10
import org.apache.hadoop.conf.Configuration;
11

  
12
import eu.dnetlib.iis.core.java.PortBindings;
13
import eu.dnetlib.iis.core.java.Process;
14
import eu.dnetlib.iis.core.java.porttype.PortType;
15

  
16
/**
17
 * This process is a solution for setting dynamic properties in oozie workflow definition.
18
 * 
19
 * Expects three parameters to be provided: the first 'condition' parameter is boolean value 
20
 * based on which either first 'inCaseOfTrue' or second 'elseCase' parameter value is set as 
21
 * the 'result' property.
22
 *  
23
 * This can be understood as the: 
24
 * 
25
 * condition ? inCaseOfTrue : elseCase
26
 * 
27
 * java syntax equivalent.
28
 * 
29
 * @author mhorst
30
 *
31
 */
32
public class ConditionalPropertySetter implements Process {
33

  
34
	public static final String PARAM_CONDITION = "condition";
35
	public static final String PARAM_INCASEOFTRUE = "inCaseOfTrue";
36
	public static final String PARAM_ELSECASE = "elseCase";
37
	
38
	public static final String OUTPUT_PROPERTY_RESULT = "result";
39
	
40
	public static final String OOZIE_ACTION_OUTPUT_FILENAME = "oozie.action.output.properties";
41
	
42
	@Override
43
	public Map<String, PortType> getInputPorts() {
44
		return Collections.emptyMap();
45
	}
46

  
47
	@Override
48
	public Map<String, PortType> getOutputPorts() {
49
		return Collections.emptyMap();
50
	}
51

  
52
	@Override
53
	public void run(PortBindings portBindings, Configuration conf,
54
			Map<String, String> parameters) throws Exception {
55

  
56
		String condition = parameters.get(PARAM_CONDITION);
57
		if (condition == null) {
58
			throw new RuntimeException("unable to make decision: " + 
59
					PARAM_CONDITION + " parameter was not set!");
60
		}
61

  
62
		Properties props = new Properties();
63
        props.setProperty(OUTPUT_PROPERTY_RESULT, 
64
        		Boolean.parseBoolean(condition)?
65
        				parameters.get(PARAM_INCASEOFTRUE):
66
        					parameters.get(PARAM_ELSECASE));
67
        OutputStream os = new FileOutputStream(
68
        		new File(System.getProperty(OOZIE_ACTION_OUTPUT_FILENAME)));
69
        try {
70
        	props.store(os, "");	
71
        } finally {
72
        	os.close();	
73
        }
74

  
75
	}
76

  
77
}
0 78

  
modules/icm-iis-common/trunk/src/main/java/eu/dnetlib/iis/common/oozie/property/JobPropertiesDumperProcess.java
1
package eu.dnetlib.iis.common.oozie.property;
2

  
3
import java.util.Collections;
4
import java.util.HashMap;
5
import java.util.Map;
6
import java.util.Properties;
7

  
8
import org.apache.hadoop.conf.Configuration;
9
import org.apache.hadoop.fs.FSDataOutputStream;
10
import org.apache.hadoop.fs.FileSystem;
11
import org.apache.hadoop.fs.Path;
12

  
13
import eu.dnetlib.iis.core.java.PortBindings;
14
import eu.dnetlib.iis.core.java.porttype.AnyPortType;
15
import eu.dnetlib.iis.core.java.porttype.PortType;
16

  
17
/**
18
 * Job properties dumper process. 
19
 * Writes all properties to job.properties file stored in output directory provided as input parameter.
20
 * 
21
 * @author mhorst
22
 *
23
 */
24
public class JobPropertiesDumperProcess implements eu.dnetlib.iis.core.java.Process {
25

  
26
	public static final String OUTPUT_PORT = "output";
27

  
28
	private static final Map<String, PortType> outputPorts = new HashMap<String, PortType>();
29
	
30
	{
31
		outputPorts.put(OUTPUT_PORT, new AnyPortType());
32
	}
33
	
34
	@Override
35
	public void run(PortBindings portBindings, Configuration conf,
36
			Map<String, String> parameters) throws Exception {
37
        Properties props = new Properties();
38
        for(Map.Entry<String,String> property : conf) {
39
        	props.setProperty(property.getKey(), 
40
        			property.getValue());
41
        }
42
		FileSystem fs = FileSystem.get(conf);
43
        FSDataOutputStream outputStream = fs.create(new Path(
44
        				portBindings.getOutput().get(OUTPUT_PORT),
45
        				"job.properties"), true);
46
        try {
47
        	props.store(outputStream, "");	
48
        } finally {
49
        	outputStream.close();	
50
        }	
51
	}
52

  
53
	@Override
54
	public Map<String, PortType> getInputPorts() {
55
		return Collections.emptyMap();
56
	}
57

  
58
	@Override
59
	public Map<String, PortType> getOutputPorts() {
60
		return outputPorts;
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff