Project

General

Profile

1 54546 sandro.lab
package eu.dnetlib.data.objectstore.s3;
2
3 54559 sandro.lab
import com.amazonaws.ClientConfiguration;
4
import com.amazonaws.Protocol;
5 54546 sandro.lab
import com.amazonaws.auth.AWSCredentials;
6
import com.amazonaws.auth.AWSStaticCredentialsProvider;
7
import com.amazonaws.auth.BasicAWSCredentials;
8
import com.amazonaws.client.builder.AwsClientBuilder;
9
import com.amazonaws.services.s3.AmazonS3;
10
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
11
import com.amazonaws.services.s3.model.ListObjectsV2Request;
12
import com.amazonaws.services.s3.model.ListObjectsV2Result;
13 55484 sandro.lab
import com.amazonaws.services.s3.model.S3Object;
14 54546 sandro.lab
import com.amazonaws.services.s3.model.S3ObjectSummary;
15
import com.google.common.collect.Lists;
16
import com.mongodb.client.MongoCollection;
17
import com.mongodb.client.model.Filters;
18
import eu.dnetlib.data.objectstore.modular.ObjectStoreRecord;
19
import eu.dnetlib.data.objectstore.modular.connector.ObjectStore;
20
import eu.dnetlib.data.objectstore.rmi.MetadataObjectRecord;
21
import eu.dnetlib.data.objectstore.rmi.ObjectStoreFile;
22
import eu.dnetlib.data.objectstore.rmi.ObjectStoreServiceException;
23
import eu.dnetlib.enabling.resultset.ResultSetListener;
24 56855 sandro.lab
import org.apache.commons.lang3.exception.ExceptionUtils;
25 54546 sandro.lab
import org.apache.commons.logging.Log;
26
import org.apache.commons.logging.LogFactory;
27
import org.bson.Document;
28
29 56855 sandro.lab
import java.io.ByteArrayInputStream;
30
import java.io.IOException;
31
import java.io.InputStream;
32 54546 sandro.lab
import java.nio.file.Files;
33
import java.nio.file.Path;
34 56855 sandro.lab
import java.nio.file.Paths;
35 54546 sandro.lab
import java.nio.file.StandardCopyOption;
36 56855 sandro.lab
import java.util.UUID;
37 54546 sandro.lab
import java.util.concurrent.atomic.AtomicInteger;
38
import java.util.regex.Pattern;
39
40
public class S3ObjectStore implements ObjectStore {
41
42
    //CONSTANT VARIABLE NAME
43
    private static final String S3_REGION = "eu-west-3";
44
    private static final String URI_FIELD = "uri";
45
    private static final String ID_FIELD = "id";
46
    private static final String MIME_FIELD = "mime";
47 54590 sandro.lab
    private static final String ORIGINAL_OBJECT_FIELD = "originalObject";
48
    private static final String TIMESTAMP_FIELD = "timestamp";
49
    private static final String MD5_FIELD = "md5Sum";
50
    private static final String SIZE_FIELD = "size";
51 54546 sandro.lab
52
53
    private final String id;
54
    private final String interpretation;
55
56
    private final String s3AccessKey;
57
    private final String s3SecretKey;
58
    private final String s3EndPoint;
59
    private final String objectStoreBucket;
60
61 56855 sandro.lab
    private final String basePath;
62 54546 sandro.lab
63 56855 sandro.lab
64 54546 sandro.lab
    private final MongoCollection<Document> mongoCollection;
65
66 56855 sandro.lab
//    private AmazonS3 client;
67 54546 sandro.lab
68
    private static final Log log = LogFactory.getLog(S3ObjectStore.class);
69
70 56855 sandro.lab
    public S3ObjectStore(final String identifier, final String interpretation, final String s3AccessKey, final String s3SecretKey, final String s3EndPoint, final String objectStoreBucket, final MongoCollection<Document> mongoCollection, final String basePath) throws ObjectStoreServiceException {
71 54546 sandro.lab
        this.id = identifier;
72
        this.interpretation = interpretation;
73
        this.s3AccessKey = s3AccessKey;
74
        this.s3SecretKey = s3SecretKey;
75
        this.mongoCollection = mongoCollection;
76
        this.s3EndPoint = s3EndPoint;
77
        this.objectStoreBucket = objectStoreBucket;
78 56855 sandro.lab
        this.basePath = basePath;
79 54546 sandro.lab
    }
80
81
82
    private AmazonS3 initializeClient() throws ObjectStoreServiceException {
83
        try {
84
            final AWSCredentials credentials = new BasicAWSCredentials(this.s3AccessKey, this.s3SecretKey);
85 54559 sandro.lab
            final ClientConfiguration cfg = new ClientConfiguration().withProtocol(Protocol.HTTPS);
86 54546 sandro.lab
            final AmazonS3 s3 = AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(credentials))
87 54559 sandro.lab
                    .withClientConfiguration(cfg)
88 54546 sandro.lab
                    .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(this.s3EndPoint, S3_REGION))
89
                    .build();
90
            if (s3 == null)
91
                throw new ObjectStoreServiceException("Cannot initialize s3 client because is NULL");
92
            return s3;
93
        } catch (Throwable e) {
94
            log.error("An Error happen on initialize client ", e);
95
            throw new ObjectStoreServiceException("Cannot initialize s3 client", e);
96
        }
97
    }
98
99
100
    @Override
101
    public String getId() {
102
        return this.id;
103
    }
104
105
    @Override
106
    public String getInterpretation() {
107
        return this.interpretation;
108
    }
109
110
    @Override
111
    public int feed(Iterable<ObjectStoreRecord> iterable, boolean upsert) throws ObjectStoreServiceException {
112
        final AtomicInteger count = new AtomicInteger();
113
        iterable.forEach(objectStoreRecord -> {
114
            try {
115
                feedObjectRecord(objectStoreRecord);
116
                count.incrementAndGet();
117
            } catch (ObjectStoreServiceException e) {
118
                log.error("Error on saving file in a temporary Folder");
119
            }
120
        });
121
        return count.intValue();
122
    }
123
124
    @Override
125
    public int feedMetadataRecord(Iterable<MetadataObjectRecord> iterable, boolean b) throws ObjectStoreServiceException {
126
        final AtomicInteger count = new AtomicInteger();
127
        iterable.forEach(mor -> {
128
            final ObjectStoreRecord r = new ObjectStoreRecord();
129
            r.setInputStream(new ByteArrayInputStream(mor.getRecord().getBytes()));
130
            final ObjectStoreFile fileMetadata = new ObjectStoreFile();
131
            fileMetadata.setObjectID(mor.getId());
132
            fileMetadata.setMimeType(mor.getMime());
133
            r.setFileMetadata(fileMetadata);
134
            try {
135
                feedObjectRecord(r);
136
                count.incrementAndGet();
137
            } catch (ObjectStoreServiceException e) {
138
                log.error("Unable to store record r", e);
139
            }
140
        });
141
        return count.intValue();
142
    }
143
144
    @Override
145
    public String feedObjectRecord(ObjectStoreRecord objectStoreRecord) throws ObjectStoreServiceException {
146 56855 sandro.lab
        final AmazonS3 client = initializeClient();
147 54546 sandro.lab
        try {
148
            long start = System.currentTimeMillis();
149 56855 sandro.lab
            final Path result = saveAndGenerateMD5(objectStoreRecord.getInputStream(), basePath);
150
            log.debug("Total time to download into fileSystem " + (System.currentTimeMillis() - start));
151
            if (result!= null) {
152 54546 sandro.lab
                log.debug("Saving object with ID: " + objectStoreRecord.getFileMetadata().getObjectID() + " on s3 ");
153 56855 sandro.lab
                client.putObject(objectStoreBucket, id + "/" + objectStoreRecord.getFileMetadata().getObjectID(), result.toFile());
154 54546 sandro.lab
                log.debug("Total time to put into the ObjectStore " + (System.currentTimeMillis() - start));
155 56855 sandro.lab
                log.debug("Deleting file");
156
                boolean deleteSuccess = Files.deleteIfExists(result);
157
                log.debug("Temporary file deleting success " + deleteSuccess);
158 55484 sandro.lab
                log.debug("Saved object on s3 ");
159 56855 sandro.lab
                final S3Object s3Object = client.getObject(objectStoreBucket ,id+ "/" + objectStoreRecord.getFileMetadata().getObjectID());
160 54546 sandro.lab
                double timestamp = System.currentTimeMillis();
161
                Document metadata = new Document()
162
                        .append(ID_FIELD, objectStoreRecord.getFileMetadata().getObjectID())
163
                        .append(MIME_FIELD, objectStoreRecord.getFileMetadata().getMimeType())
164
                        .append(ORIGINAL_OBJECT_FIELD, objectStoreRecord.getFileMetadata().toJSON())
165
                        .append(TIMESTAMP_FIELD, timestamp)
166 55484 sandro.lab
                        .append(MD5_FIELD, s3Object.getObjectMetadata().getETag())
167
                        .append(SIZE_FIELD, s3Object.getObjectMetadata().getContentLength())
168 54546 sandro.lab
                        .append(URI_FIELD, String.format("s3://%s/%s/%s", objectStoreBucket, id, objectStoreRecord.getFileMetadata().getObjectID()));
169
                log.debug("saving metadata object to the collection: " + metadata.toString());
170
                start = System.currentTimeMillis();
171
                mongoCollection.insertOne(metadata);
172 55485 sandro.lab
                log.debug("Total time to save in Mongo " + (System.currentTimeMillis() - start));
173 55484 sandro.lab
174 54546 sandro.lab
            }
175 55484 sandro.lab
        } catch (Throwable e) {
176 56307 sandro.lab
            log.error("Error on put file in the objectStore", e);
177
            log.error(ExceptionUtils.getStackTrace(e));
178 54546 sandro.lab
            throw new ObjectStoreServiceException(e);
179
        }
180
        return null;
181
    }
182
183
    @Override
184
    public ResultSetListener deliver(final Long from, final Long until) throws ObjectStoreServiceException {
185
        S3ObjectStoreResultSetListener resultSet = new S3ObjectStoreResultSetListener();
186
        resultSet.setMongoCollection(this.mongoCollection);
187
        resultSet.setObjectStoreID(getId());
188
        resultSet.setFromDate(from);
189
        resultSet.setUntilDate(until);
190
        return resultSet;
191
    }
192
193
    @Override
194
    public ResultSetListener deliverIds(final Iterable<String> ids) throws ObjectStoreServiceException {
195
        S3ObjectStoreResultSetListener resultSet = new S3ObjectStoreResultSetListener();
196
        resultSet.setMongoCollection(this.mongoCollection);
197
        resultSet.setObjectStoreID(getId());
198
        resultSet.setRecords(Lists.newArrayList(ids));
199
        return resultSet;
200
    }
201
202
    @Override
203
    public ObjectStoreFile deliverObject(String objectId) throws ObjectStoreServiceException {
204
        Document resultQuery = this.mongoCollection.find(Filters.eq("id", objectId)).first();
205 54559 sandro.lab
        if (resultQuery!= null)
206
            return ObjectStoreS3Utility.build(resultQuery);
207
        else return null;
208 54546 sandro.lab
    }
209
210
    @Override
211
    public int getSize() throws ObjectStoreServiceException {
212
        return (int) this.mongoCollection.count();
213
    }
214
215
    @Override
216
    public void deleteObject(String objectId) throws ObjectStoreServiceException {
217
        final Document response =this.mongoCollection.findOneAndDelete(Filters.eq("id", objectId));
218
        if (response == null)
219
            throw new ObjectStoreServiceException("Error document not found with objectId: "+objectId);
220
221 56855 sandro.lab
        final AmazonS3 client = initializeClient();
222 54546 sandro.lab
223 56855 sandro.lab
        client.deleteObject(this.objectStoreBucket, String.format("%s/%s", this.id, response.get(ID_FIELD)));
224 54546 sandro.lab
    }
225
226
    @Override
227
    public String getObject(String objectId) throws ObjectStoreServiceException {
228
229
        Document response = this.mongoCollection.find(Filters.eq("id", objectId)).first();
230
        if (response == null || !response.containsKey(URI_FIELD))
231
            return null;
232
        return response.getString(URI_FIELD);
233
    }
234
235
    @Override
236
    public boolean existIDStartsWith(String startId) throws ObjectStoreServiceException {
237
        return this.mongoCollection.count(Filters.regex("id", Pattern.compile(startId))) > 0;
238
    }
239
240
    @Override
241
    public boolean dropContent() throws ObjectStoreServiceException {
242 56855 sandro.lab
        final AmazonS3 client = initializeClient();
243 54546 sandro.lab
        final ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(objectStoreBucket).withPrefix(id);
244
        ListObjectsV2Result result;
245
        do {
246 56855 sandro.lab
            result = client.listObjectsV2(req);
247 54546 sandro.lab
248
            for (S3ObjectSummary objectSummary : result.getObjectSummaries()) {
249
                log.debug(String.format(" - %s (size: %d)\n", objectSummary.getKey(), objectSummary.getSize()));
250 56855 sandro.lab
                client.deleteObject(objectStoreBucket, objectSummary.getKey());
251 54546 sandro.lab
                log.debug("Object Deleted");
252
            }
253
            String token = result.getNextContinuationToken();
254
            log.debug("Next Continuation Token: " + token);
255
            req.setContinuationToken(token);
256
        } while (result.isTruncated());
257
258
        this.mongoCollection.drop();
259
        return true;
260
    }
261 56855 sandro.lab
262
    private static Path saveAndGenerateMD5(final InputStream inputStream, final String basePath) throws IOException {
263
        if (inputStream == null) return null;
264
        Path tempFile = Paths.get(basePath,  UUID.randomUUID().toString());
265
        log.debug("Temp file On Default Location: " + tempFile.toString());
266
        long size = 0;
267
        try {
268
            Files.copy(inputStream, tempFile, StandardCopyOption.REPLACE_EXISTING);
269
        } catch (IOException e1) {
270
            log.error(e1);
271
        }
272
        return tempFile;
273
    }
274 54546 sandro.lab
}