Project

General

Profile

1
package eu.dnetlib.iis.common.cache;
2

    
3
import java.io.File;
4
import java.io.FileOutputStream;
5
import java.io.IOException;
6
import java.io.InputStreamReader;
7
import java.io.OutputStream;
8
import java.io.OutputStreamWriter;
9
import java.util.Collections;
10
import java.util.Map;
11
import java.util.Properties;
12

    
13
import org.apache.hadoop.conf.Configuration;
14
import org.apache.hadoop.fs.FSDataInputStream;
15
import org.apache.hadoop.fs.FSDataOutputStream;
16
import org.apache.hadoop.fs.FileSystem;
17
import org.apache.hadoop.fs.Path;
18

    
19
import com.google.gson.Gson;
20
import com.google.gson.stream.JsonWriter;
21

    
22
import eu.dnetlib.iis.common.FsShellPermissions;
23
import eu.dnetlib.iis.core.java.PortBindings;
24
import eu.dnetlib.iis.core.java.porttype.PortType;
25

    
26
/**
27
 * CacheMetadata managing process.
28
 * 
29
 * @author mhorst
30
 *
31
 */
32
public class CacheMetadataManagingProcess implements eu.dnetlib.iis.core.java.Process {
33

    
34
	public static final String OUTPUT_PROPERTY_CACHE_ID = "cache_id";
35
	
36
	public static final String PARAM_CACHE_DIR = "default_cache_location";
37
	
38
	public static final String PARAM_MODE = "mode";
39
	
40
	public static final String PARAM_ID = "id";
41
	
42
	public static final String MODE_READ_CURRENT_ID = "read_current_id";
43
	
44
	public static final String MODE_GENERATE_NEW_ID = "generate_new_id";
45
	
46
	public static final String MODE_WRITE_ID = "write_id";
47
	
48
	public static final String DEFAULT_METAFILE_NAME = "meta.json";
49
	
50
	public static final int CACHE_ID_PADDING_LENGTH = 6;
51
	
52
	public static final String NON_EXISTING_CACHE_ID = "$UNDEFINED$"; 
53

    
54
	public static final String OOZIE_ACTION_OUTPUT_FILENAME = "oozie.action.output.properties";
55
	
56
	public static final String DEFAULT_ENCODING = "UTF-8";
57
	
58
	public class CacheMeta {
59
		protected String currentCacheId;
60
//		TODO handle this property in a safe way!
61
//		protected boolean isCacheUnderConstruction;
62

    
63
		public String getCurrentCacheId() {
64
			return currentCacheId;
65
		}
66

    
67
		public void setCurrentCacheId(String currentCacheId) {
68
			this.currentCacheId = currentCacheId;
69
		}
70
	}
71
	
72
	@Override
73
	public Map<String, PortType> getInputPorts() {
74
		return Collections.emptyMap();
75
	}
76

    
77
	@Override
78
	public Map<String, PortType> getOutputPorts() {
79
		return Collections.emptyMap();
80
	}
81

    
82
	protected String getExistingCacheId(Configuration conf, Map<String, String> parameters) throws IOException {
83
		if (parameters.containsKey(PARAM_CACHE_DIR)) {
84
			FileSystem fs = FileSystem.get(conf);
85
			Path cacheFilePath = new Path(parameters.get(PARAM_CACHE_DIR), 
86
					DEFAULT_METAFILE_NAME);
87
			if (fs.exists(cacheFilePath)) {
88
				FSDataInputStream inputStream = fs.open(cacheFilePath);
89
				InputStreamReader reader = new InputStreamReader(
90
						inputStream, DEFAULT_ENCODING);
91
				try {
92
					Gson gson = new Gson();
93
					CacheMeta cacheMeta = gson.fromJson(reader, CacheMeta.class);
94
					return cacheMeta.getCurrentCacheId();
95
				} finally {
96
					reader.close();
97
					inputStream.close();
98
				}
99
			} else {
100
//				cache does not exist yet
101
				return NON_EXISTING_CACHE_ID;
102
			}
103
		} else {
104
			throw new RuntimeException("cache directory location not provided! "
105
					+ "'" + PARAM_CACHE_DIR + "' parameter is missing!");
106
		}
107
	}
108
	
109
	protected String generateNewCacheId(Configuration conf, Map<String, String> parameters) throws IOException {
110
		if (parameters.containsKey(PARAM_CACHE_DIR)) {
111
			FileSystem fs = FileSystem.get(conf);
112
			Path cacheFilePath = new Path(parameters.get(PARAM_CACHE_DIR), 
113
					DEFAULT_METAFILE_NAME);
114
			CacheMeta cachedMeta = null;
115
			if (fs.exists(cacheFilePath)) {
116
				FSDataInputStream inputStream = fs.open(cacheFilePath);
117
				InputStreamReader reader = new InputStreamReader(
118
						inputStream, DEFAULT_ENCODING);
119
				try {
120
					Gson gson = new Gson();
121
					cachedMeta = gson.fromJson(reader, CacheMeta.class);
122
				} finally {
123
					reader.close();
124
					inputStream.close();
125
				}
126
			}
127
			if (cachedMeta!=null) {
128
				int currentIndex = convertCacheIdToInt(cachedMeta.getCurrentCacheId());
129
				return convertIntToCacheId(currentIndex+1);
130
			} else {
131
//				initializing cache meta
132
				return convertIntToCacheId(1);
133
			}
134
		} else {
135
			throw new RuntimeException("cache directory location not provided! "
136
					+ "'" + PARAM_CACHE_DIR + "' parameter is missing!");
137
		}
138
	}
139
	
140
	protected void writeCacheId(Configuration conf, Map<String, String> parameters) throws IOException {
141
		if (parameters.containsKey(PARAM_CACHE_DIR)) {
142
			if (parameters.containsKey(PARAM_ID)) {
143
				String cacheId = parameters.get(PARAM_ID);
144
				FileSystem fs = FileSystem.get(conf);
145
				Path cacheFilePath = new Path(parameters.get(PARAM_CACHE_DIR), 
146
						DEFAULT_METAFILE_NAME);
147
				CacheMeta cachedMeta = null;
148
				if (fs.exists(cacheFilePath)) {
149
					FSDataInputStream inputStream = fs.open(cacheFilePath);
150
					InputStreamReader reader = new InputStreamReader(
151
							inputStream, DEFAULT_ENCODING);
152
					try {
153
						Gson gson = new Gson();
154
						cachedMeta = gson.fromJson(reader, CacheMeta.class);
155
					} finally {
156
						reader.close();
157
						inputStream.close();
158
					}
159
				}
160
//				writing new id
161
				if (cachedMeta==null) {
162
					cachedMeta = new CacheMeta();
163
				}
164
				cachedMeta.setCurrentCacheId(cacheId);
165
				
166
				Gson gson = new Gson();
167
				FSDataOutputStream outputStream = fs.create(cacheFilePath, true);
168
				JsonWriter writer = new JsonWriter(
169
						new OutputStreamWriter(outputStream, DEFAULT_ENCODING));
170
				try {
171
					gson.toJson(cachedMeta, CacheMeta.class, writer);
172
				} finally {
173
					writer.close();
174
					outputStream.close();
175
//					changing file permission to +rw to allow writing for different users
176
					FsShellPermissions.changePermissions(fs, 
177
							conf, 
178
							FsShellPermissions.Op.CHMOD, 
179
							false, "0666", cacheFilePath.toString());
180
				}
181
			} else {
182
				throw new RuntimeException("unable to write new cache id in meta.json file, "
183
						+ "no '" + PARAM_ID + "' input parameter provied!");
184
			}
185
		} else {
186
			throw new RuntimeException("cache directory location not provided! "
187
					+ "'" + PARAM_CACHE_DIR + "' parameter is missing!");
188
		}
189
	}
190
	
191
	protected static int convertCacheIdToInt(String cacheId) {
192
		StringBuffer strBuff = new StringBuffer(cacheId);
193
		while (true) {
194
			if (strBuff.charAt(0)=='0') {
195
				strBuff.deleteCharAt(0);
196
			} else {
197
				break;
198
			}
199
		}
200
		return Integer.parseInt(strBuff.toString());
201
	}
202
	
203
	protected static String convertIntToCacheId(int cacheIndex) {
204
		StringBuffer strBuff = new StringBuffer(String.valueOf(cacheIndex));
205
		while(strBuff.length()<CACHE_ID_PADDING_LENGTH) {
206
			strBuff.insert(0, '0');
207
		}
208
		return strBuff.toString();
209
	}
210
	
211
	@Override
212
	public void run(PortBindings portBindings, Configuration conf,
213
			Map<String, String> parameters) throws Exception {
214
		String mode = parameters.get(PARAM_MODE);
215
		File file = new File(System.getProperty(OOZIE_ACTION_OUTPUT_FILENAME));
216
        Properties props = new Properties();
217
		if (MODE_READ_CURRENT_ID.equals(mode)) {
218
			props.setProperty(OUTPUT_PROPERTY_CACHE_ID, 
219
	        		getExistingCacheId(conf, parameters));
220
		} else if (MODE_GENERATE_NEW_ID.equals(mode)) {
221
			props.setProperty(OUTPUT_PROPERTY_CACHE_ID, 
222
	        		generateNewCacheId(conf, parameters));
223
		} else if (MODE_WRITE_ID.equals(mode)) {
224
			writeCacheId(conf, parameters);
225
		} else {
226
			throw new RuntimeException("unsupported mode: " + mode);	
227
		}
228
        OutputStream os = new FileOutputStream(file);
229
        try {
230
        	props.store(os, "");	
231
        } finally {
232
        	os.close();	
233
        }	
234
	}
235
	
236
}
    (1-1/1)