21 |
21 |
import eu.dnetlib.data.objectstore.rmi.ObjectStoreFile;
|
22 |
22 |
import eu.dnetlib.data.objectstore.rmi.ObjectStoreServiceException;
|
23 |
23 |
import eu.dnetlib.enabling.resultset.ResultSetListener;
|
24 |
|
import org.apache.commons.lang3.StringUtils;
|
25 |
|
import org.apache.commons.lang3.tuple.ImmutableTriple;
|
26 |
|
import org.apache.commons.lang3.tuple.Triple;
|
|
24 |
import org.apache.commons.lang3.exception.ExceptionUtils;
|
27 |
25 |
import org.apache.commons.logging.Log;
|
28 |
26 |
import org.apache.commons.logging.LogFactory;
|
29 |
27 |
import org.bson.Document;
|
30 |
|
import org.apache.commons.lang3.exception.ExceptionUtils;
|
31 |
28 |
|
32 |
|
|
33 |
|
import java.io.*;
|
|
29 |
import java.io.ByteArrayInputStream;
|
|
30 |
import java.io.IOException;
|
|
31 |
import java.io.InputStream;
|
34 |
32 |
import java.nio.file.Files;
|
35 |
33 |
import java.nio.file.Path;
|
|
34 |
import java.nio.file.Paths;
|
36 |
35 |
import java.nio.file.StandardCopyOption;
|
|
36 |
import java.util.UUID;
|
37 |
37 |
import java.util.concurrent.atomic.AtomicInteger;
|
38 |
38 |
import java.util.regex.Pattern;
|
39 |
39 |
|
... | ... | |
58 |
58 |
private final String s3EndPoint;
|
59 |
59 |
private final String objectStoreBucket;
|
60 |
60 |
|
|
61 |
private final String basePath;
|
61 |
62 |
|
|
63 |
|
62 |
64 |
private final MongoCollection<Document> mongoCollection;
|
63 |
65 |
|
64 |
|
private AmazonS3 client;
|
|
66 |
// private AmazonS3 client;
|
65 |
67 |
|
66 |
68 |
private static final Log log = LogFactory.getLog(S3ObjectStore.class);
|
67 |
69 |
|
68 |
|
public S3ObjectStore(final String identifier, final String interpretation, final String s3AccessKey, final String s3SecretKey, final String s3EndPoint, final String objectStoreBucket, final MongoCollection<Document> mongoCollection) throws ObjectStoreServiceException {
|
|
70 |
public S3ObjectStore(final String identifier, final String interpretation, final String s3AccessKey, final String s3SecretKey, final String s3EndPoint, final String objectStoreBucket, final MongoCollection<Document> mongoCollection, final String basePath) throws ObjectStoreServiceException {
|
69 |
71 |
this.id = identifier;
|
70 |
72 |
this.interpretation = interpretation;
|
71 |
73 |
this.s3AccessKey = s3AccessKey;
|
... | ... | |
73 |
75 |
this.mongoCollection = mongoCollection;
|
74 |
76 |
this.s3EndPoint = s3EndPoint;
|
75 |
77 |
this.objectStoreBucket = objectStoreBucket;
|
|
78 |
this.basePath = basePath;
|
76 |
79 |
}
|
77 |
80 |
|
78 |
81 |
|
... | ... | |
140 |
143 |
|
141 |
144 |
@Override
|
142 |
145 |
public String feedObjectRecord(ObjectStoreRecord objectStoreRecord) throws ObjectStoreServiceException {
|
143 |
|
if (client == null)
|
144 |
|
this.client = initializeClient();
|
|
146 |
final AmazonS3 client = initializeClient();
|
145 |
147 |
try {
|
146 |
148 |
long start = System.currentTimeMillis();
|
147 |
|
if (objectStoreRecord!= null && objectStoreRecord.getInputStream()!= null ) {
|
|
149 |
final Path result = saveAndGenerateMD5(objectStoreRecord.getInputStream(), basePath);
|
|
150 |
log.debug("Total time to download into fileSystem " + (System.currentTimeMillis() - start));
|
|
151 |
if (result!= null) {
|
148 |
152 |
log.debug("Saving object with ID: " + objectStoreRecord.getFileMetadata().getObjectID() + " on s3 ");
|
149 |
|
this.client.putObject(objectStoreBucket, id + "/" + objectStoreRecord.getFileMetadata().getObjectID(), objectStoreRecord.getInputStream(), null);
|
150 |
|
final S3Object s3Object = this.client.getObject(objectStoreBucket ,id+ "/" + objectStoreRecord.getFileMetadata().getObjectID());
|
|
153 |
client.putObject(objectStoreBucket, id + "/" + objectStoreRecord.getFileMetadata().getObjectID(), result.toFile());
|
151 |
154 |
log.debug("Total time to put into the ObjectStore " + (System.currentTimeMillis() - start));
|
|
155 |
log.debug("Deleting file");
|
|
156 |
boolean deleteSuccess = Files.deleteIfExists(result);
|
|
157 |
log.debug("Temporary file deleting success " + deleteSuccess);
|
152 |
158 |
log.debug("Saved object on s3 ");
|
|
159 |
final S3Object s3Object = client.getObject(objectStoreBucket ,id+ "/" + objectStoreRecord.getFileMetadata().getObjectID());
|
153 |
160 |
double timestamp = System.currentTimeMillis();
|
154 |
161 |
Document metadata = new Document()
|
155 |
162 |
.append(ID_FIELD, objectStoreRecord.getFileMetadata().getObjectID())
|
... | ... | |
169 |
176 |
log.error("Error on put file in the objectStore", e);
|
170 |
177 |
log.error(ExceptionUtils.getStackTrace(e));
|
171 |
178 |
throw new ObjectStoreServiceException(e);
|
172 |
|
|
173 |
179 |
}
|
174 |
180 |
return null;
|
175 |
181 |
}
|
... | ... | |
212 |
218 |
if (response == null)
|
213 |
219 |
throw new ObjectStoreServiceException("Error document not found with objectId: "+objectId);
|
214 |
220 |
|
215 |
|
if (this.client == null)
|
216 |
|
this.client = initializeClient();
|
|
221 |
final AmazonS3 client = initializeClient();
|
217 |
222 |
|
218 |
|
this.client.deleteObject(this.objectStoreBucket, String.format("%s/%s", this.id, response.get(ID_FIELD)));
|
|
223 |
client.deleteObject(this.objectStoreBucket, String.format("%s/%s", this.id, response.get(ID_FIELD)));
|
219 |
224 |
}
|
220 |
225 |
|
221 |
226 |
@Override
|
... | ... | |
234 |
239 |
|
235 |
240 |
@Override
|
236 |
241 |
public boolean dropContent() throws ObjectStoreServiceException {
|
237 |
|
if (this.client == null) {
|
238 |
|
this.client = initializeClient();
|
239 |
|
}
|
|
242 |
final AmazonS3 client = initializeClient();
|
240 |
243 |
final ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(objectStoreBucket).withPrefix(id);
|
241 |
244 |
ListObjectsV2Result result;
|
242 |
245 |
do {
|
243 |
|
result = this.client.listObjectsV2(req);
|
|
246 |
result = client.listObjectsV2(req);
|
244 |
247 |
|
245 |
248 |
for (S3ObjectSummary objectSummary : result.getObjectSummaries()) {
|
246 |
249 |
log.debug(String.format(" - %s (size: %d)\n", objectSummary.getKey(), objectSummary.getSize()));
|
247 |
|
this.client.deleteObject(objectStoreBucket, objectSummary.getKey());
|
|
250 |
client.deleteObject(objectStoreBucket, objectSummary.getKey());
|
248 |
251 |
log.debug("Object Deleted");
|
249 |
252 |
}
|
250 |
253 |
String token = result.getNextContinuationToken();
|
... | ... | |
255 |
258 |
this.mongoCollection.drop();
|
256 |
259 |
return true;
|
257 |
260 |
}
|
|
261 |
|
|
262 |
private static Path saveAndGenerateMD5(final InputStream inputStream, final String basePath) throws IOException {
|
|
263 |
if (inputStream == null) return null;
|
|
264 |
Path tempFile = Paths.get(basePath, UUID.randomUUID().toString());
|
|
265 |
log.debug("Temp file On Default Location: " + tempFile.toString());
|
|
266 |
long size = 0;
|
|
267 |
try {
|
|
268 |
Files.copy(inputStream, tempFile, StandardCopyOption.REPLACE_EXISTING);
|
|
269 |
} catch (IOException e1) {
|
|
270 |
log.error(e1);
|
|
271 |
}
|
|
272 |
return tempFile;
|
|
273 |
}
|
258 |
274 |
}
|
reverted storage on s3, we save before on a predefined path and then on s3, because storing directly on s3 not works well