1
|
package eu.dnetlib.iis.common.cache;
|
2
|
|
3
|
import java.io.File;
|
4
|
import java.io.FileOutputStream;
|
5
|
import java.io.IOException;
|
6
|
import java.io.InputStreamReader;
|
7
|
import java.io.OutputStream;
|
8
|
import java.io.OutputStreamWriter;
|
9
|
import java.util.Collections;
|
10
|
import java.util.Map;
|
11
|
import java.util.Properties;
|
12
|
|
13
|
import org.apache.hadoop.conf.Configuration;
|
14
|
import org.apache.hadoop.fs.FSDataInputStream;
|
15
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
16
|
import org.apache.hadoop.fs.FileSystem;
|
17
|
import org.apache.hadoop.fs.Path;
|
18
|
|
19
|
import com.google.gson.Gson;
|
20
|
import com.google.gson.stream.JsonWriter;
|
21
|
|
22
|
import eu.dnetlib.iis.common.FsShellPermissions;
|
23
|
import eu.dnetlib.iis.core.java.PortBindings;
|
24
|
import eu.dnetlib.iis.core.java.porttype.PortType;
|
25
|
|
26
|
/**
|
27
|
* CacheMetadata managing process.
|
28
|
*
|
29
|
* @author mhorst
|
30
|
*
|
31
|
*/
|
32
|
public class CacheMetadataManagingProcess implements eu.dnetlib.iis.core.java.Process {
|
33
|
|
34
|
public static final String OUTPUT_PROPERTY_CACHE_ID = "cache_id";
|
35
|
|
36
|
public static final String PARAM_CACHE_DIR = "default_cache_location";
|
37
|
|
38
|
public static final String PARAM_MODE = "mode";
|
39
|
|
40
|
public static final String PARAM_ID = "id";
|
41
|
|
42
|
public static final String MODE_READ_CURRENT_ID = "read_current_id";
|
43
|
|
44
|
public static final String MODE_GENERATE_NEW_ID = "generate_new_id";
|
45
|
|
46
|
public static final String MODE_WRITE_ID = "write_id";
|
47
|
|
48
|
public static final String DEFAULT_METAFILE_NAME = "meta.json";
|
49
|
|
50
|
public static final int CACHE_ID_PADDING_LENGTH = 6;
|
51
|
|
52
|
public static final String NON_EXISTING_CACHE_ID = "$UNDEFINED$";
|
53
|
|
54
|
public static final String OOZIE_ACTION_OUTPUT_FILENAME = "oozie.action.output.properties";
|
55
|
|
56
|
public static final String DEFAULT_ENCODING = "UTF-8";
|
57
|
|
58
|
public class CacheMeta {
|
59
|
protected String currentCacheId;
|
60
|
// TODO handle this property in a safe way!
|
61
|
// protected boolean isCacheUnderConstruction;
|
62
|
|
63
|
public String getCurrentCacheId() {
|
64
|
return currentCacheId;
|
65
|
}
|
66
|
|
67
|
public void setCurrentCacheId(String currentCacheId) {
|
68
|
this.currentCacheId = currentCacheId;
|
69
|
}
|
70
|
}
|
71
|
|
72
|
@Override
|
73
|
public Map<String, PortType> getInputPorts() {
|
74
|
return Collections.emptyMap();
|
75
|
}
|
76
|
|
77
|
@Override
|
78
|
public Map<String, PortType> getOutputPorts() {
|
79
|
return Collections.emptyMap();
|
80
|
}
|
81
|
|
82
|
protected String getExistingCacheId(Configuration conf, Map<String, String> parameters) throws IOException {
|
83
|
if (parameters.containsKey(PARAM_CACHE_DIR)) {
|
84
|
FileSystem fs = FileSystem.get(conf);
|
85
|
Path cacheFilePath = new Path(parameters.get(PARAM_CACHE_DIR),
|
86
|
DEFAULT_METAFILE_NAME);
|
87
|
if (fs.exists(cacheFilePath)) {
|
88
|
FSDataInputStream inputStream = fs.open(cacheFilePath);
|
89
|
InputStreamReader reader = new InputStreamReader(
|
90
|
inputStream, DEFAULT_ENCODING);
|
91
|
try {
|
92
|
Gson gson = new Gson();
|
93
|
CacheMeta cacheMeta = gson.fromJson(reader, CacheMeta.class);
|
94
|
return cacheMeta.getCurrentCacheId();
|
95
|
} finally {
|
96
|
reader.close();
|
97
|
inputStream.close();
|
98
|
}
|
99
|
} else {
|
100
|
// cache does not exist yet
|
101
|
return NON_EXISTING_CACHE_ID;
|
102
|
}
|
103
|
} else {
|
104
|
throw new RuntimeException("cache directory location not provided! "
|
105
|
+ "'" + PARAM_CACHE_DIR + "' parameter is missing!");
|
106
|
}
|
107
|
}
|
108
|
|
109
|
protected String generateNewCacheId(Configuration conf, Map<String, String> parameters) throws IOException {
|
110
|
if (parameters.containsKey(PARAM_CACHE_DIR)) {
|
111
|
FileSystem fs = FileSystem.get(conf);
|
112
|
Path cacheFilePath = new Path(parameters.get(PARAM_CACHE_DIR),
|
113
|
DEFAULT_METAFILE_NAME);
|
114
|
CacheMeta cachedMeta = null;
|
115
|
if (fs.exists(cacheFilePath)) {
|
116
|
FSDataInputStream inputStream = fs.open(cacheFilePath);
|
117
|
InputStreamReader reader = new InputStreamReader(
|
118
|
inputStream, DEFAULT_ENCODING);
|
119
|
try {
|
120
|
Gson gson = new Gson();
|
121
|
cachedMeta = gson.fromJson(reader, CacheMeta.class);
|
122
|
} finally {
|
123
|
reader.close();
|
124
|
inputStream.close();
|
125
|
}
|
126
|
}
|
127
|
if (cachedMeta!=null) {
|
128
|
int currentIndex = convertCacheIdToInt(cachedMeta.getCurrentCacheId());
|
129
|
return convertIntToCacheId(currentIndex+1);
|
130
|
} else {
|
131
|
// initializing cache meta
|
132
|
return convertIntToCacheId(1);
|
133
|
}
|
134
|
} else {
|
135
|
throw new RuntimeException("cache directory location not provided! "
|
136
|
+ "'" + PARAM_CACHE_DIR + "' parameter is missing!");
|
137
|
}
|
138
|
}
|
139
|
|
140
|
protected void writeCacheId(Configuration conf, Map<String, String> parameters) throws IOException {
|
141
|
if (parameters.containsKey(PARAM_CACHE_DIR)) {
|
142
|
if (parameters.containsKey(PARAM_ID)) {
|
143
|
String cacheId = parameters.get(PARAM_ID);
|
144
|
FileSystem fs = FileSystem.get(conf);
|
145
|
Path cacheFilePath = new Path(parameters.get(PARAM_CACHE_DIR),
|
146
|
DEFAULT_METAFILE_NAME);
|
147
|
CacheMeta cachedMeta = null;
|
148
|
if (fs.exists(cacheFilePath)) {
|
149
|
FSDataInputStream inputStream = fs.open(cacheFilePath);
|
150
|
InputStreamReader reader = new InputStreamReader(
|
151
|
inputStream, DEFAULT_ENCODING);
|
152
|
try {
|
153
|
Gson gson = new Gson();
|
154
|
cachedMeta = gson.fromJson(reader, CacheMeta.class);
|
155
|
} finally {
|
156
|
reader.close();
|
157
|
inputStream.close();
|
158
|
}
|
159
|
}
|
160
|
// writing new id
|
161
|
if (cachedMeta==null) {
|
162
|
cachedMeta = new CacheMeta();
|
163
|
}
|
164
|
cachedMeta.setCurrentCacheId(cacheId);
|
165
|
|
166
|
Gson gson = new Gson();
|
167
|
FSDataOutputStream outputStream = fs.create(cacheFilePath, true);
|
168
|
JsonWriter writer = new JsonWriter(
|
169
|
new OutputStreamWriter(outputStream, DEFAULT_ENCODING));
|
170
|
try {
|
171
|
gson.toJson(cachedMeta, CacheMeta.class, writer);
|
172
|
} finally {
|
173
|
writer.close();
|
174
|
outputStream.close();
|
175
|
// changing file permission to +rw to allow writing for different users
|
176
|
FsShellPermissions.changePermissions(fs,
|
177
|
conf,
|
178
|
FsShellPermissions.Op.CHMOD,
|
179
|
false, "0666", cacheFilePath.toString());
|
180
|
}
|
181
|
} else {
|
182
|
throw new RuntimeException("unable to write new cache id in meta.json file, "
|
183
|
+ "no '" + PARAM_ID + "' input parameter provied!");
|
184
|
}
|
185
|
} else {
|
186
|
throw new RuntimeException("cache directory location not provided! "
|
187
|
+ "'" + PARAM_CACHE_DIR + "' parameter is missing!");
|
188
|
}
|
189
|
}
|
190
|
|
191
|
protected static int convertCacheIdToInt(String cacheId) {
|
192
|
StringBuffer strBuff = new StringBuffer(cacheId);
|
193
|
while (true) {
|
194
|
if (strBuff.charAt(0)=='0') {
|
195
|
strBuff.deleteCharAt(0);
|
196
|
} else {
|
197
|
break;
|
198
|
}
|
199
|
}
|
200
|
return Integer.parseInt(strBuff.toString());
|
201
|
}
|
202
|
|
203
|
protected static String convertIntToCacheId(int cacheIndex) {
|
204
|
StringBuffer strBuff = new StringBuffer(String.valueOf(cacheIndex));
|
205
|
while(strBuff.length()<CACHE_ID_PADDING_LENGTH) {
|
206
|
strBuff.insert(0, '0');
|
207
|
}
|
208
|
return strBuff.toString();
|
209
|
}
|
210
|
|
211
|
@Override
|
212
|
public void run(PortBindings portBindings, Configuration conf,
|
213
|
Map<String, String> parameters) throws Exception {
|
214
|
String mode = parameters.get(PARAM_MODE);
|
215
|
File file = new File(System.getProperty(OOZIE_ACTION_OUTPUT_FILENAME));
|
216
|
Properties props = new Properties();
|
217
|
if (MODE_READ_CURRENT_ID.equals(mode)) {
|
218
|
props.setProperty(OUTPUT_PROPERTY_CACHE_ID,
|
219
|
getExistingCacheId(conf, parameters));
|
220
|
} else if (MODE_GENERATE_NEW_ID.equals(mode)) {
|
221
|
props.setProperty(OUTPUT_PROPERTY_CACHE_ID,
|
222
|
generateNewCacheId(conf, parameters));
|
223
|
} else if (MODE_WRITE_ID.equals(mode)) {
|
224
|
writeCacheId(conf, parameters);
|
225
|
} else {
|
226
|
throw new RuntimeException("unsupported mode: " + mode);
|
227
|
}
|
228
|
OutputStream os = new FileOutputStream(file);
|
229
|
try {
|
230
|
props.store(os, "");
|
231
|
} finally {
|
232
|
os.close();
|
233
|
}
|
234
|
}
|
235
|
|
236
|
}
|