Project

General

Profile

1
package eu.dnetlib.data.objectstore.s3;
2

    
3
import com.amazonaws.ClientConfiguration;
4
import com.amazonaws.Protocol;
5
import com.amazonaws.auth.AWSCredentials;
6
import com.amazonaws.auth.AWSStaticCredentialsProvider;
7
import com.amazonaws.auth.BasicAWSCredentials;
8
import com.amazonaws.client.builder.AwsClientBuilder;
9
import com.amazonaws.services.s3.AmazonS3;
10
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
11
import com.amazonaws.services.s3.model.ListObjectsV2Request;
12
import com.amazonaws.services.s3.model.ListObjectsV2Result;
13
import com.amazonaws.services.s3.model.S3Object;
14
import com.amazonaws.services.s3.model.S3ObjectSummary;
15
import com.google.common.collect.Lists;
16
import com.mongodb.client.MongoCollection;
17
import com.mongodb.client.model.Filters;
18
import eu.dnetlib.data.objectstore.modular.ObjectStoreRecord;
19
import eu.dnetlib.data.objectstore.modular.connector.ObjectStore;
20
import eu.dnetlib.data.objectstore.rmi.MetadataObjectRecord;
21
import eu.dnetlib.data.objectstore.rmi.ObjectStoreFile;
22
import eu.dnetlib.data.objectstore.rmi.ObjectStoreServiceException;
23
import eu.dnetlib.enabling.resultset.ResultSetListener;
24
import org.apache.commons.lang3.StringUtils;
25
import org.apache.commons.lang3.tuple.ImmutableTriple;
26
import org.apache.commons.lang3.tuple.Triple;
27
import org.apache.commons.logging.Log;
28
import org.apache.commons.logging.LogFactory;
29
import org.bson.Document;
30
import org.apache.commons.lang3.exception.ExceptionUtils;
31

    
32

    
33
import java.io.*;
34
import java.nio.file.Files;
35
import java.nio.file.Path;
36
import java.nio.file.StandardCopyOption;
37
import java.util.concurrent.atomic.AtomicInteger;
38
import java.util.regex.Pattern;
39

    
40
public class S3ObjectStore implements ObjectStore {
41

    
42
    //CONSTANT VARIABLE NAME
43
    private static final String S3_REGION = "eu-west-3";
44
    private static final String URI_FIELD = "uri";
45
    private static final String ID_FIELD = "id";
46
    private static final String MIME_FIELD = "mime";
47
    private static final String ORIGINAL_OBJECT_FIELD = "originalObject";
48
    private static final String TIMESTAMP_FIELD = "timestamp";
49
    private static final String MD5_FIELD = "md5Sum";
50
    private static final String SIZE_FIELD = "size";
51

    
52

    
53
    private final String id;
54
    private final String interpretation;
55

    
56
    private final String s3AccessKey;
57
    private final String s3SecretKey;
58
    private final String s3EndPoint;
59
    private final String objectStoreBucket;
60

    
61

    
62
    private final MongoCollection<Document> mongoCollection;
63

    
64
    private AmazonS3 client;
65

    
66
    private static final Log log = LogFactory.getLog(S3ObjectStore.class);
67

    
68
    public S3ObjectStore(final String identifier, final String interpretation, final String s3AccessKey, final String s3SecretKey, final String s3EndPoint, final String objectStoreBucket, final MongoCollection<Document> mongoCollection) throws ObjectStoreServiceException {
69
        this.id = identifier;
70
        this.interpretation = interpretation;
71
        this.s3AccessKey = s3AccessKey;
72
        this.s3SecretKey = s3SecretKey;
73
        this.mongoCollection = mongoCollection;
74
        this.s3EndPoint = s3EndPoint;
75
        this.objectStoreBucket = objectStoreBucket;
76
    }
77

    
78

    
79
    private AmazonS3 initializeClient() throws ObjectStoreServiceException {
80
        try {
81
            final AWSCredentials credentials = new BasicAWSCredentials(this.s3AccessKey, this.s3SecretKey);
82
            final ClientConfiguration cfg = new ClientConfiguration().withProtocol(Protocol.HTTPS);
83
            final AmazonS3 s3 = AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(credentials))
84
                    .withClientConfiguration(cfg)
85
                    .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(this.s3EndPoint, S3_REGION))
86
                    .build();
87
            if (s3 == null)
88
                throw new ObjectStoreServiceException("Cannot initialize s3 client because is NULL");
89
            return s3;
90
        } catch (Throwable e) {
91
            log.error("An Error happen on initialize client ", e);
92
            throw new ObjectStoreServiceException("Cannot initialize s3 client", e);
93
        }
94
    }
95

    
96

    
97
    @Override
98
    public String getId() {
99
        return this.id;
100
    }
101

    
102
    @Override
103
    public String getInterpretation() {
104
        return this.interpretation;
105
    }
106

    
107
    @Override
108
    public int feed(Iterable<ObjectStoreRecord> iterable, boolean upsert) throws ObjectStoreServiceException {
109
        final AtomicInteger count = new AtomicInteger();
110
        iterable.forEach(objectStoreRecord -> {
111
            try {
112
                feedObjectRecord(objectStoreRecord);
113
                count.incrementAndGet();
114
            } catch (ObjectStoreServiceException e) {
115
                log.error("Error on saving file in a temporary Folder");
116
            }
117
        });
118
        return count.intValue();
119
    }
120

    
121
    @Override
122
    public int feedMetadataRecord(Iterable<MetadataObjectRecord> iterable, boolean b) throws ObjectStoreServiceException {
123
        final AtomicInteger count = new AtomicInteger();
124
        iterable.forEach(mor -> {
125
            final ObjectStoreRecord r = new ObjectStoreRecord();
126
            r.setInputStream(new ByteArrayInputStream(mor.getRecord().getBytes()));
127
            final ObjectStoreFile fileMetadata = new ObjectStoreFile();
128
            fileMetadata.setObjectID(mor.getId());
129
            fileMetadata.setMimeType(mor.getMime());
130
            r.setFileMetadata(fileMetadata);
131
            try {
132
                feedObjectRecord(r);
133
                count.incrementAndGet();
134
            } catch (ObjectStoreServiceException e) {
135
                log.error("Unable to store record r", e);
136
            }
137
        });
138
        return count.intValue();
139
    }
140

    
141
    @Override
142
    public String feedObjectRecord(ObjectStoreRecord objectStoreRecord) throws ObjectStoreServiceException {
143
        if (client == null)
144
            this.client = initializeClient();
145
        try {
146
            long start = System.currentTimeMillis();
147
            if (objectStoreRecord!= null && objectStoreRecord.getInputStream()!= null ) {
148
                log.debug("Saving object with ID: " + objectStoreRecord.getFileMetadata().getObjectID() + " on s3 ");
149
                this.client.putObject(objectStoreBucket, id + "/" + objectStoreRecord.getFileMetadata().getObjectID(), objectStoreRecord.getInputStream(), null);
150
                final S3Object s3Object = this.client.getObject(objectStoreBucket ,id+ "/" + objectStoreRecord.getFileMetadata().getObjectID());
151
                log.debug("Total time to put into the ObjectStore " + (System.currentTimeMillis() - start));
152
                log.debug("Saved object on s3 ");
153
                double timestamp = System.currentTimeMillis();
154
                Document metadata = new Document()
155
                        .append(ID_FIELD, objectStoreRecord.getFileMetadata().getObjectID())
156
                        .append(MIME_FIELD, objectStoreRecord.getFileMetadata().getMimeType())
157
                        .append(ORIGINAL_OBJECT_FIELD, objectStoreRecord.getFileMetadata().toJSON())
158
                        .append(TIMESTAMP_FIELD, timestamp)
159
                        .append(MD5_FIELD, s3Object.getObjectMetadata().getETag())
160
                        .append(SIZE_FIELD, s3Object.getObjectMetadata().getContentLength())
161
                        .append(URI_FIELD, String.format("s3://%s/%s/%s", objectStoreBucket, id, objectStoreRecord.getFileMetadata().getObjectID()));
162
                log.debug("saving metadata object to the collection: " + metadata.toString());
163
                start = System.currentTimeMillis();
164
                mongoCollection.insertOne(metadata);
165
                log.debug("Total time to save in Mongo " + (System.currentTimeMillis() - start));
166

    
167
            }
168
        } catch (Throwable e) {
169
            log.error("Error on put file in the objectStore", e);
170
            log.error(ExceptionUtils.getStackTrace(e));
171
            throw new ObjectStoreServiceException(e);
172

    
173
        }
174
        return null;
175
    }
176

    
177
    @Override
178
    public ResultSetListener deliver(final Long from, final Long until) throws ObjectStoreServiceException {
179
        S3ObjectStoreResultSetListener resultSet = new S3ObjectStoreResultSetListener();
180
        resultSet.setMongoCollection(this.mongoCollection);
181
        resultSet.setObjectStoreID(getId());
182
        resultSet.setFromDate(from);
183
        resultSet.setUntilDate(until);
184
        return resultSet;
185
    }
186

    
187
    @Override
188
    public ResultSetListener deliverIds(final Iterable<String> ids) throws ObjectStoreServiceException {
189
        S3ObjectStoreResultSetListener resultSet = new S3ObjectStoreResultSetListener();
190
        resultSet.setMongoCollection(this.mongoCollection);
191
        resultSet.setObjectStoreID(getId());
192
        resultSet.setRecords(Lists.newArrayList(ids));
193
        return resultSet;
194
    }
195

    
196
    @Override
197
    public ObjectStoreFile deliverObject(String objectId) throws ObjectStoreServiceException {
198
        Document resultQuery = this.mongoCollection.find(Filters.eq("id", objectId)).first();
199
        if (resultQuery!= null)
200
            return ObjectStoreS3Utility.build(resultQuery);
201
        else return null;
202
    }
203

    
204
    @Override
205
    public int getSize() throws ObjectStoreServiceException {
206
        return (int) this.mongoCollection.count();
207
    }
208

    
209
    @Override
210
    public void deleteObject(String objectId) throws ObjectStoreServiceException {
211
        final Document response =this.mongoCollection.findOneAndDelete(Filters.eq("id", objectId));
212
        if (response == null)
213
            throw new ObjectStoreServiceException("Error document not found with objectId: "+objectId);
214

    
215
        if (this.client == null)
216
            this.client = initializeClient();
217

    
218
        this.client.deleteObject(this.objectStoreBucket, String.format("%s/%s", this.id, response.get(ID_FIELD)));
219
    }
220

    
221
    @Override
222
    public String getObject(String objectId) throws ObjectStoreServiceException {
223

    
224
        Document response = this.mongoCollection.find(Filters.eq("id", objectId)).first();
225
        if (response == null || !response.containsKey(URI_FIELD))
226
            return null;
227
        return response.getString(URI_FIELD);
228
    }
229

    
230
    @Override
231
    public boolean existIDStartsWith(String startId) throws ObjectStoreServiceException {
232
        return this.mongoCollection.count(Filters.regex("id", Pattern.compile(startId))) > 0;
233
    }
234

    
235
    @Override
236
    public boolean dropContent() throws ObjectStoreServiceException {
237
        if (this.client == null) {
238
            this.client = initializeClient();
239
        }
240
        final ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(objectStoreBucket).withPrefix(id);
241
        ListObjectsV2Result result;
242
        do {
243
            result = this.client.listObjectsV2(req);
244

    
245
            for (S3ObjectSummary objectSummary : result.getObjectSummaries()) {
246
                log.debug(String.format(" - %s (size: %d)\n", objectSummary.getKey(), objectSummary.getSize()));
247
                this.client.deleteObject(objectStoreBucket, objectSummary.getKey());
248
                log.debug("Object Deleted");
249
            }
250
            String token = result.getNextContinuationToken();
251
            log.debug("Next Continuation Token: " + token);
252
            req.setContinuationToken(token);
253
        } while (result.isTruncated());
254

    
255
        this.mongoCollection.drop();
256
        return true;
257
    }
258
}
(4-4/6)