Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ Provides a helper to run specified database commands. This is the preferred meth
* [__adminCommand__](https://docs.mongodb.com/manual/reference/method/db.adminCommand/#db.adminCommand) -
Provides a helper to run specified database commands against the admin database

You can use `update-sql` command at command line also. When you run the update-sql command, a changelog file that includes db.runCommand s will be created.

```bash
liquibase update-sql --changelog-file=example-changelog.xml
```

<a name="connection-string"></a>
## Connection String Formats

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,13 @@ protected List<RanChangeSet> queryRanChangeSets() throws DatabaseException {
final Bson filter = new Document();
final Bson sort = Sorts.ascending(MongoRanChangeSet.Fields.orderExecuted);

return getExecutor()
.queryForList(new FindAllStatement(getDatabaseChangeLogTableName(), filter, sort), Document.class)
.stream().map(Document.class::cast).map(getConverter()::fromDocument).collect(Collectors.toList());
List<Document> ranChangesets = getExecutor().queryForList(
new FindAllStatement(getDatabaseChangeLogTableName(), filter, sort),
Document.class
);
return ranChangesets.stream()
.map(getConverter()::fromDocument)
.collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import liquibase.database.Database;
import liquibase.exception.DatabaseException;
import liquibase.exception.LockException;
import liquibase.executor.ExecutorService;
import liquibase.executor.LoggingExecutor;
import liquibase.ext.mongodb.database.MongoLiquibaseDatabase;
import liquibase.ext.mongodb.statement.CountCollectionByNameStatement;
import liquibase.ext.mongodb.statement.DropCollectionStatement;
Expand All @@ -46,6 +48,11 @@ public class MongoLockService extends AbstractNoSqlLockService<MongoLiquibaseDat

private final Logger log = Scope.getCurrentScope().getLog(getClass());

public static final int LOCK_OPERATION_SUCCESSFUL = 1;

public static final int LOCK_OPERATION_FAILED = 0;


@Getter
private final MongoChangeLogLockToDocumentConverter converter;

Expand All @@ -69,9 +76,17 @@ protected Boolean isLocked() throws DatabaseException {
@Override
protected int replaceLock(final boolean locked) throws DatabaseException {
try {
return getExecutor().update(
int lockResult = getExecutor().update(
new ReplaceChangeLogLockStatement(getDatabaseChangeLogLockTableName(), locked)
);

boolean loggingExecutorActive = isLoggingExecutorActive();
if (lockResult == LOCK_OPERATION_FAILED && loggingExecutorActive) {
log.info("Logging executor is active. Assumed operation ran successful.");
return LOCK_OPERATION_SUCCESSFUL;
}

return lockResult;
} catch (DatabaseException e) {
// Mongo driver does not allow to release lock if thread is interrupted
// Next code clears interrupted flag if it is happened due to of the timeout
Expand All @@ -95,8 +110,12 @@ protected List<DatabaseChangeLogLock> queryLocks() throws DatabaseException {

final SqlStatement findAllStatement = new FindAllStatement(getDatabaseChangeLogLockTableName());

return getExecutor().queryForList(findAllStatement, Document.class).stream().map(Document.class::cast)
.map(getConverter()::fromDocument).filter(MongoChangeLogLock::getLocked).collect(Collectors.toList());
List<Document> locks = getExecutor().queryForList(findAllStatement, Document.class);

return locks.stream()
.map(getConverter()::fromDocument)
.filter(MongoChangeLogLock::getLocked)
.collect(Collectors.toList());
}

@Override
Expand Down Expand Up @@ -148,4 +167,12 @@ protected Logger getLogger() {
return log;
}

public boolean isLoggingExecutorActive() {
Database database = getDatabase();
final ExecutorService executorService = Scope.getCurrentScope().getSingleton(ExecutorService.class);

return executorService.executorExists("logging", database) &&
(executorService.getExecutor("logging", database) instanceof LoggingExecutor);
}

}
11 changes: 10 additions & 1 deletion src/main/java/liquibase/ext/mongodb/statement/BsonUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.bson.codecs.*;
import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.json.JsonMode;
import org.bson.json.JsonWriterSettings;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -78,7 +80,14 @@ public static List<Document> orEmptyList(final String json) {
}

public static String toJson(final Document document) {
return ofNullable(document).map(Document::toJson).orElse(null);
return ofNullable(document)
.map(doc -> {
JsonWriterSettings writerSettings = JsonWriterSettings.builder()
.outputMode(JsonMode.SHELL)
.build();
return doc.toJson(writerSettings);
})
.orElse(null);
}

public static Document toCommand(final String commandName, final Object commandValue, final Document options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,8 @@ public D getNoSqlDatabase() {
return (D) getDatabase();
}

public NoSqlExecutor getExecutor() throws DatabaseException {
Executor executor = Scope.getCurrentScope().getSingleton(ExecutorService.class).getExecutor(NoSqlExecutor.EXECUTOR_NAME, getDatabase());
if (executor instanceof LoggingExecutor) {
throw new DatabaseException(String.format(mongoBundle.getString("command.unsupported"), "*sql"));
}
return (NoSqlExecutor) executor ;
public Executor getExecutor() throws DatabaseException {
return Scope.getCurrentScope().getSingleton(ExecutorService.class).getExecutor(NoSqlExecutor.EXECUTOR_NAME, getDatabase());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package liquibase.nosql.executor;

import liquibase.database.Database;
import liquibase.exception.ValidationErrors;
import liquibase.ext.mongodb.lockservice.AdjustChangeLogLockCollectionStatement;
import liquibase.ext.mongodb.statement.BsonUtils;
import liquibase.sql.CallableSql;
import liquibase.sql.Sql;
import liquibase.sqlgenerator.SqlGenerator;
import liquibase.sqlgenerator.SqlGeneratorChain;
import liquibase.sqlgenerator.core.AbstractSqlGenerator;

import static liquibase.ext.mongodb.statement.AbstractRunCommandStatement.COMMAND_NAME;
import static liquibase.ext.mongodb.statement.AbstractRunCommandStatement.SHELL_DB_PREFIX;


public class AdjustChangeLogLockCollectionSqlGenerator extends AbstractSqlGenerator<AdjustChangeLogLockCollectionStatement> {

@Override
public ValidationErrors validate(
AdjustChangeLogLockCollectionStatement statement,
Database database,
SqlGeneratorChain<AdjustChangeLogLockCollectionStatement> sqlGeneratorChain
) {
return null;
}

@Override
public Sql[] generateSql(
AdjustChangeLogLockCollectionStatement statement,
Database database,
SqlGeneratorChain<AdjustChangeLogLockCollectionStatement> sqlGeneratorChain
) {
String sqlString = SHELL_DB_PREFIX + COMMAND_NAME
+ "("
+ BsonUtils.toJson(statement.getCommand())
+ ");";

CallableSql sql = new CallableSql(sqlString, ";", "1");
return new Sql[]{sql};
}

@Override
public int getPriority() {
return SqlGenerator.PRIORITY_DATABASE;
}

}
9 changes: 8 additions & 1 deletion src/main/java/liquibase/nosql/executor/NoSqlGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import liquibase.database.Database;
import liquibase.exception.ValidationErrors;
import liquibase.nosql.statement.AbstractNoSqlStatement;
import liquibase.sql.CallableSql;
import liquibase.sql.Sql;
import liquibase.sqlgenerator.SqlGeneratorChain;
import liquibase.sqlgenerator.core.AbstractSqlGenerator;
Expand All @@ -37,11 +38,17 @@ public ValidationErrors validate(AbstractNoSqlStatement statement, Database data

@Override
public Sql[] generateSql(AbstractNoSqlStatement statement, Database database, SqlGeneratorChain<AbstractNoSqlStatement> sqlGeneratorChain) {
return new Sql[0];
CallableSql sql = new CallableSql(statement.toString(), ";", "1");
return new Sql[] {sql};
}

@Override
public boolean generateStatementsIsVolatile(Database database) {
return false;
}

@Override
public boolean supports(AbstractNoSqlStatement statement, Database database) {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package liquibase.nosql.executor;


import com.mongodb.client.model.Filters;
import liquibase.database.Database;
import liquibase.exception.ValidationErrors;
import liquibase.ext.mongodb.lockservice.MongoChangeLogLock;
import liquibase.ext.mongodb.lockservice.MongoChangeLogLockToDocumentConverter;
import liquibase.ext.mongodb.lockservice.ReplaceChangeLogLockStatement;
import liquibase.ext.mongodb.statement.BsonUtils;
import liquibase.sql.CallableSql;
import liquibase.sql.Sql;
import liquibase.sqlgenerator.SqlGenerator;
import liquibase.sqlgenerator.SqlGeneratorChain;
import liquibase.sqlgenerator.core.AbstractSqlGenerator;
import lombok.Getter;
import lombok.Setter;
import org.bson.BsonDocument;
import org.bson.Document;

import java.time.Clock;
import java.util.Arrays;
import java.util.Date;

import static liquibase.ext.mongodb.statement.AbstractRunCommandStatement.COMMAND_NAME;
import static liquibase.ext.mongodb.statement.AbstractRunCommandStatement.SHELL_DB_PREFIX;


public class ReplaceChangeLogLockSqlGenerator extends AbstractSqlGenerator<ReplaceChangeLogLockStatement> {

public static final int DEFAULT_LOCK_ID = 1;

/**
* Clock field in order to make it testable
*/
@Getter
@Setter
private Clock clock = Clock.systemDefaultZone();


@Override
public ValidationErrors validate(
ReplaceChangeLogLockStatement statement,
Database database,
SqlGeneratorChain<ReplaceChangeLogLockStatement> sqlGeneratorChain
) {
return null;
}

@Override
public Sql[] generateSql(
ReplaceChangeLogLockStatement statement,
Database database,
SqlGeneratorChain<ReplaceChangeLogLockStatement> sqlGeneratorChain
) {
Document updateCommand = toUpdateCommand(statement);
String sqlString = SHELL_DB_PREFIX + COMMAND_NAME
+ "("
+ BsonUtils.toJson(updateCommand)
+ ");";

CallableSql sql = new CallableSql(sqlString, ";", "1");
return new Sql[]{sql};
}

private Document toUpdateCommand(ReplaceChangeLogLockStatement statement) {
MongoChangeLogLock lockModel = new MongoChangeLogLock(
DEFAULT_LOCK_ID,
new Date(clock.instant().toEpochMilli()),
MongoChangeLogLock.formLockedBy(),
statement.isLocked()
);
Document lockDocument = new MongoChangeLogLockToDocumentConverter().toDocument(lockModel);

BsonDocument idFilter = Filters.eq(MongoChangeLogLock.Fields.id, DEFAULT_LOCK_ID).toBsonDocument();

Document docToUpdate = new Document();
docToUpdate.put("q", Document.parse(idFilter.toJson()));
docToUpdate.put("u", lockDocument);
docToUpdate.put("upsert", true);

Document updateCommand = new Document();
updateCommand.put("update", statement.getCollectionName());
updateCommand.put("updates", Arrays.asList(docToUpdate));
return updateCommand;
}

@Override
public int getPriority() {
return SqlGenerator.PRIORITY_DATABASE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,8 @@ public D getDatabase() {
return database;
}

public NoSqlExecutor getExecutor() throws DatabaseException {
Executor executor = Scope.getCurrentScope().getSingleton(ExecutorService.class).getExecutor(NoSqlExecutor.EXECUTOR_NAME, getDatabase());
if (executor instanceof LoggingExecutor) {
throw new DatabaseException(String.format(mongoBundle.getString("command.unsupported"), "*sql"));
}
return (NoSqlExecutor) executor ;
public Executor getExecutor() throws DatabaseException {
return Scope.getCurrentScope().getSingleton(ExecutorService.class).getExecutor(NoSqlExecutor.EXECUTOR_NAME, getDatabase());
}

@Override
Expand Down Expand Up @@ -165,6 +161,7 @@ public boolean acquireLock() throws LockException {
if (isLocked()) {
return false;
} else {
getExecutor().comment("Lock Database");
getLogger().info("Lock Database");

final int rowsUpdated = replaceLock(true);
Expand Down Expand Up @@ -204,6 +201,7 @@ public void releaseLock() throws LockException {
try {
if (hasDatabaseChangeLogLockTable()) {

getExecutor().comment("Release Database Lock");
getLogger().info("Release Database Lock");

database.rollback();
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
liquibase.nosql.executor.NoSqlGenerator
liquibase.nosql.executor.NoSqlGenerator
liquibase.nosql.executor.ReplaceChangeLogLockSqlGenerator
liquibase.nosql.executor.AdjustChangeLogLockCollectionSqlGenerator
10 changes: 6 additions & 4 deletions src/test/java/liquibase/ext/AbstractMongoIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,27 @@
import liquibase.Scope;
import liquibase.changelog.ChangeLogHistoryServiceFactory;
import liquibase.database.DatabaseFactory;
import liquibase.executor.Executor;
import liquibase.executor.ExecutorService;
import liquibase.ext.mongodb.database.MongoConnection;
import liquibase.ext.mongodb.database.MongoLiquibaseDatabase;
import liquibase.ext.mongodb.statement.DropAllCollectionsStatement;
import liquibase.lockservice.LockServiceFactory;
import liquibase.nosql.executor.NoSqlExecutor;
import lombok.SneakyThrows;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInstance;

import static liquibase.ext.mongodb.TestUtils.*;
import static liquibase.ext.mongodb.TestUtils.DB_CONNECTION_PATH;
import static liquibase.ext.mongodb.TestUtils.PROPERTY_FILE;
import static liquibase.ext.mongodb.TestUtils.loadProperty;
import static liquibase.nosql.executor.NoSqlExecutor.EXECUTOR_NAME;

@TestInstance(TestInstance.Lifecycle.PER_METHOD)
public abstract class AbstractMongoIntegrationTest {

protected MongoConnection connection;
protected NoSqlExecutor executor;
protected Executor executor;
protected MongoLiquibaseDatabase database;
protected MongoDatabase mongoDatabase;

Expand All @@ -56,7 +58,7 @@ protected void setUpEach() {
database = (MongoLiquibaseDatabase) DatabaseFactory.getInstance().openDatabase(url, null, null, null , null);
connection = (MongoConnection) database.getConnection();
mongoDatabase = connection.getMongoDatabase();
executor = (NoSqlExecutor) Scope.getCurrentScope().getSingleton(ExecutorService.class).getExecutor(EXECUTOR_NAME, database);
executor = Scope.getCurrentScope().getSingleton(ExecutorService.class).getExecutor(EXECUTOR_NAME, database);
deleteContainers();
}

Expand Down
Loading
Loading