code.onehippo.org is currently readonly. We are migrating to code.bloomreach.com, please continue working there on Monday 14/12. See: https://docs.bloomreach.com/display/engineering/GitLab

Commit 92c914ef authored by Ard Schrijvers's avatar Ard Schrijvers

REPO-1811 Support database locking

parent 1dba0d49
......@@ -19,14 +19,14 @@ import java.lang.ref.WeakReference;
import org.onehippo.cms7.services.lock.Lock;
abstract class AbstractLock extends Lock {
public abstract class AbstractLock extends Lock {
private WeakReference<Thread> thread;
int holdCount;
public AbstractLock(final String lockKey, final String name, final long creationTime) {
super(lockKey, name, creationTime);
public AbstractLock(final String lockKey, final String lockOwner, final String lockThread, final long creationTime) {
super(lockKey, lockOwner, lockThread, creationTime);
thread = new WeakReference<>(Thread.currentThread());
holdCount = 1;
}
......@@ -47,5 +47,9 @@ abstract class AbstractLock extends Lock {
holdCount--;
}
public int getHoldCount() {
return holdCount;
}
abstract void destroy();
}
......@@ -70,6 +70,7 @@ public abstract class AbstractLockManager implements LockManager {
if (lockThread == null) {
getLogger().warn("Thread '{}' that created lock for '{}' has stopped without releasing the lock. Removing lock now",
abstractLock.getLockOwner(), key, Thread.currentThread().getName());
abstractLock.destroy();
locks.remove(key);
}
if (lockThread != Thread.currentThread()) {
......@@ -81,9 +82,11 @@ public abstract class AbstractLockManager implements LockManager {
getLogger().error("Hold count of lock should never be able to be less than 0. Core implementation issue in {}. Remove " +
"lock for {} nonetheless.",
this.getClass().getName(), key);
abstractLock.destroy();
locks.remove(key);
} else if (abstractLock.holdCount == 0) {
getLogger().debug("Remove lock '{}'", key);
abstractLock.destroy();
locks.remove(key);
} else {
getLogger().debug("Lock '{}' will not be removed since hold count is '{}'", key, abstractLock.holdCount);
......
......@@ -15,27 +15,50 @@
*/
package org.onehippo.services.lock;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.onehippo.services.lock.DbLockManager.DELETE_STATEMENT;
class DbLock extends AbstractLock {
private static final Logger log = LoggerFactory.getLogger(DbLock.class);
ResultSet dbLockResult;
private final Connection connection;
private final boolean originalAutoCommit;
private ResultSet dbLockResult;
DbLock(final String lockKey, final ResultSet dbLockSet) {
super(lockKey, Thread.currentThread().getName(), System.currentTimeMillis());
DbLock(final String lockKey, final String clusterNodeId, final Connection connection, final boolean originalAutoCommit, final ResultSet dbLockSet) {
super(lockKey, clusterNodeId, Thread.currentThread().getName(), System.currentTimeMillis());
this.connection = connection;
this.originalAutoCommit = originalAutoCommit;
this.dbLockResult = dbLockSet;
}
@Override
void destroy() {
try {
// remove the row from the database (which can be done safely because we contain the lock)
final PreparedStatement preparedSelectStatement = connection.prepareStatement(DELETE_STATEMENT);
preparedSelectStatement.setString(1, getLockKey());
preparedSelectStatement.setQueryTimeout(10);
preparedSelectStatement.execute();
connection.commit();
dbLockResult.close();
// The connection pool does not provide you with the actual Connection instance from the driver,
// but returns a wrapper. When you call 'close()' on a Connection instance from the pool,
// it will not close the driver's Connection, but instead just return the open connection to the pool
// so that it can be re-used. Hence we first have to restore the original auto commit value before closing
connection.setAutoCommit(originalAutoCommit);
connection.close();
} catch (SQLException e) {
log.error("Error while destroying DbLock", e);
}
......
......@@ -19,8 +19,6 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
......@@ -34,27 +32,25 @@ public class DbLockManager extends AbstractLockManager implements LockManager {
private static final Logger log = LoggerFactory.getLogger(DbLockManager.class);
final static String TABLE_NAME_LOCK = "hippolock";
final static String TABLE_NAME_LOCK_OVERVIEW = "hippolockoverview";
public final static String TABLE_NAME_LOCK = "hippolock";
public final static String TABLE_NAME_LOCK_OVERVIEW = "hippolockoverview";
final static String CREATE_LOCK_TABLE_STATEMENT = "CREATE TABLE %s (lockKey VARCHAR(256) NOT NULL)";
// TODO for oracle it must be NUMBER instead of BIGINT
final static String CREATE_LOCK_OVERVIEW_TABLE_STATEMENT = "CREATE TABLE %s (lockKey VARCHAR(256) NOT NULL, lockTime BIGINT NOT NULL)";
private DataSource dataSource;
private final Map<String, DbLock> locks = new HashMap();
public static final String LOCK_STATEMENT = "SELECT * FROM " + TABLE_NAME_LOCK + " WHERE lockKey=? FOR UPDATE NOWAIT";
public static final String INSERT_STATEMENT = "INSERT INTO " + TABLE_NAME_LOCK + " VALUES(?)";
public static final String DELETE_STATEMENT = "DELETE " + TABLE_NAME_LOCK + " WHERE lockKey=?";
private DataSource dataSource;
private String clusterNodeId;
public DbLockManager(final DataSource dataSource) {
public DbLockManager(final DataSource dataSource, final String clusterNodeId) {
this.dataSource = dataSource;
this.clusterNodeId = clusterNodeId;
DbHelper.createTableIfNeeded(dataSource, CREATE_LOCK_TABLE_STATEMENT, TABLE_NAME_LOCK);
DbHelper.createTableIfNeeded(dataSource, CREATE_LOCK_OVERVIEW_TABLE_STATEMENT, TABLE_NAME_LOCK_OVERVIEW);
try {
lock("foo");
} catch (LockException e) {
e.printStackTrace();
}
}
@Override
......@@ -64,10 +60,14 @@ public class DbLockManager extends AbstractLockManager implements LockManager {
@Override
AbstractLock createLock(final String key) throws LockException {
final String lockStatement = "SELECT * FROM " + TABLE_NAME_LOCK + " WHERE lockKey=? FOR UPDATE NOWAIT";
final String insertStatement = "INSERT INTO " + TABLE_NAME_LOCK + " VALUES(?)";
try (Connection connection = dataSource.getConnection()) {
final PreparedStatement preparedLockStatement = connection.prepareStatement(lockStatement);
Connection connection = null;
boolean originalAutoCommit = false;
try {
connection = dataSource.getConnection();
originalAutoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
final PreparedStatement preparedLockStatement = connection.prepareStatement(LOCK_STATEMENT);
preparedLockStatement.setString(1, key);
preparedLockStatement.setQueryTimeout(10);
......@@ -76,32 +76,43 @@ public class DbLockManager extends AbstractLockManager implements LockManager {
if (!lockResultSet.next()) {
// entry did not yet exist, we need to add an entry first
lockResultSet.close();
final PreparedStatement preparedInsertstatement = connection.prepareStatement(insertStatement);
final PreparedStatement preparedInsertstatement = connection.prepareStatement(INSERT_STATEMENT);
preparedInsertstatement.setString(1, key);
try {
preparedInsertstatement.execute();
connection.commit();
} catch (SQLException e) {
// entry can already be created concurrently by other cluster node. We can still try to get the lock
// now
System.out.println(e);
log.debug("'{}' : Cannot created new row for key '{}' because most likely concurrently created by another " +
"cluster node. Can try to lock the row now{} ", e.toString(), key);
}
lockResultSet = preparedLockStatement.executeQuery();
if (!lockResultSet.next()) {
String msg = String.format("Unexpected : A row for '%s' was expected and if it could not be locked, an SQL Exception was " +
"expected.", key);
String msg = String.format("Illegal state : A row for '%s' was expected and if it could not be locked, " +
"an SQL Exception was expected instead of empty result set.", key);
log.error(msg);
throw new LockException(msg);
}
}
// push the lockResultSet in a cache such that it cannot be GC-ed and thus not closed as a result of being GC-ed
return new DbLock(key, lockResultSet);
// push the connection and lockResultSet in a lock object such that it cannot be GC-ed and thus not closed as a result of being GC-ed
return new DbLock(key, clusterNodeId, connection, originalAutoCommit, lockResultSet);
} catch (SQLException e) {
// TODO
log.error(e.toString());
if (connection != null) {
try {
connection.setAutoCommit(originalAutoCommit);
connection.close();
} catch (SQLException e1) {
log.error("Failed to close connection.", e);
throw new LockException(e);
}
}
if (log.isDebugEnabled()) {
log.info("Cannot lock '{}'. Most likely already locked by another cluster node.", e);
} else {
log.info("Cannot lock '{}'. Most likely already locked by another cluster node : {}", e.toString());
}
throw new LockException(e);
}
}
......
......@@ -42,7 +42,8 @@ public class LockManagerFactory {
final ConnectionHelper journalConnectionHelper = repositoryImpl.getJournalConnectionHelperAccessor().getConnectionHelper();
if (journalConnectionHelper != null) {
final DataSource dataSource = ConnectionHelperDataSourceAccessor.getDataSource(journalConnectionHelper);
return new AssertingLockManager(new DbLockManager(dataSource));
String clusterNodeId = repositoryImpl.getDescriptor("jackrabbit.cluster.id");
return new AssertingLockManager(new DbLockManager(dataSource, clusterNodeId == null ? "default" : clusterNodeId));
} else {
return new AssertingLockManager(new MemoryLockManager());
}
......
......@@ -18,7 +18,7 @@ package org.onehippo.services.lock;
class MemoryLock extends AbstractLock {
public MemoryLock(final String lockKey) {
super(lockKey, Thread.currentThread().getName(), System.currentTimeMillis());
super(lockKey, "default", Thread.currentThread().getName(), System.currentTimeMillis());
}
@Override
......
......@@ -41,7 +41,7 @@ public class MemoryLockManagerTest {
memoryLockManager.lock("123");
assertEquals(1, memoryLockManager.getLocks().size());
assertEquals("123", memoryLockManager.getLocks().iterator().next().getLockKey());
assertEquals(Thread.currentThread().getName(), memoryLockManager.getLocks().iterator().next().getLockOwner());
assertEquals(Thread.currentThread().getName(), memoryLockManager.getLocks().iterator().next().getLockThread());
assertEquals(2, ((MemoryLock)memoryLockManager.getLocks().iterator().next()).holdCount);
......@@ -135,7 +135,7 @@ public class MemoryLockManagerTest {
}
assertEquals("123", memoryLockManager.getLocks().iterator().next().getLockKey());
assertEquals(lockThread.getName(), memoryLockManager.getLocks().iterator().next().getLockOwner());
assertEquals(lockThread.getName(), memoryLockManager.getLocks().iterator().next().getLockThread());
// set the lock thread to null and make it eligible for GC ....however since the lock has not been unlocked, we
// do expect a warning
......@@ -153,7 +153,7 @@ public class MemoryLockManagerTest {
// main thread can lock again
memoryLockManager.lock("123");
assertEquals("123", memoryLockManager.getLocks().iterator().next().getLockKey());
assertEquals(Thread.currentThread().getName(), memoryLockManager.getLocks().iterator().next().getLockOwner());
assertEquals(Thread.currentThread().getName(), memoryLockManager.getLocks().iterator().next().getLockThread());
}
private boolean tryFor10Seconds(final long l) {
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2007-2013 Hippo B.V. (http://www.onehippo.com)
Copyright 2007-2017 Hippo B.V. (http://www.onehippo.com)
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
......@@ -90,4 +90,13 @@
<ISMLocking class="org.apache.jackrabbit.core.state.FineGrainedISMLocking"/>
</Versioning>
<Cluster id="node1">
<Journal class="org.apache.jackrabbit.core.journal.DatabaseJournal">
<param name="driver" value="org.h2.Driver" />
<param name="url" value="jdbc:h2:${rep.home}/datastore/db"/>
<param name="schemaObjectPrefix" value="journal_"/>
<param name="databaseType" value="h2"/>
</Journal>
</Cluster>
</Repository>
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2012-2013 Hippo B.V. (http://www.onehippo.com)
Copyright 2012-2017 Hippo B.V. (http://www.onehippo.com)
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
......@@ -100,4 +100,12 @@
<param name="schemaObjectPrefix" value="" />
</DataStore>
<Cluster id="node1">
<Journal class="org.apache.jackrabbit.core.journal.DatabaseJournal">
<param name="dataSourceName" value="ds1"/>
<param name="schemaObjectPrefix" value="repository_"/>
<param name="databaseType" value="mysql"/>
</Journal>
</Cluster>
</Repository>
......@@ -156,6 +156,7 @@
<exclude>**/SchedulerTest.java</exclude>
<exclude>**/MonkeyTest.java</exclude>
<exclude>**/LockTest.java</exclude>
<exclude>**/LockManagerTest.java</exclude>
</excludes>
</configuration>
</plugin>
......@@ -196,6 +197,7 @@
<include>**/SchedulerTest.java</include>
<include>**/MonkeyTest.java</include>
<include>**/LockTest.java</include>
<include>**/LockManagerTest.java</include>
</includes>
</configuration>
</execution>
......
/*
* Copyright 2017 Hippo B.V. (http://www.onehippo.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onehippo.repository.locking;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.ExecutionException;
import javax.jcr.Repository;
import javax.sql.DataSource;
import org.apache.jackrabbit.core.util.db.ConnectionHelperDataSourceAccessor;
import org.hippoecm.repository.impl.RepositoryDecorator;
import org.hippoecm.repository.jackrabbit.RepositoryImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onehippo.cms7.services.HippoServiceRegistry;
import org.onehippo.cms7.services.lock.LockException;
import org.onehippo.cms7.services.lock.LockManager;
import org.onehippo.repository.journal.JournalConnectionHelperAccessor;
import org.onehippo.repository.testutils.RepositoryTestCase;
import org.onehippo.services.lock.AbstractLock;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.onehippo.services.lock.DbLockManager.LOCK_STATEMENT;
import static org.onehippo.services.lock.DbLockManager.TABLE_NAME_LOCK;
public class LockManagerTest extends RepositoryTestCase {
private LockManager lockManager;
// dataSource is not null in case of cluster Db test
private DataSource dataSource;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
lockManager = HippoServiceRegistry.getService(LockManager.class);
//if (server.getRepository() instanceof Decora)
Repository repository = server.getRepository();
if (repository instanceof RepositoryDecorator) {
repository = RepositoryDecorator.unwrap(repository);
}
if (repository instanceof RepositoryImpl) {
JournalConnectionHelperAccessor journalConnectionHelperAccessor = ((RepositoryImpl)repository).getJournalConnectionHelperAccessor();
if (journalConnectionHelperAccessor != null) {
// running a cluster db test
dataSource = ConnectionHelperDataSourceAccessor.getDataSource(journalConnectionHelperAccessor.getConnectionHelper());
}
}
}
@Override
@After
public void tearDown() {
lockManager.destroy();
assertTrue(lockManager.getLocks().isEmpty());
// TODO assert db rows empty
}
@Test
public void same_thread_can_lock_same_key_multiple_times() throws Exception {
final String KEY = "123";
lockManager.lock(KEY);
assertDbRowLocked(KEY);
lockManager.lock(KEY);
assertDbRowLocked(KEY);
assertEquals(1, lockManager.getLocks().size());
assertEquals(KEY, lockManager.getLocks().iterator().next().getLockKey());
assertEquals(Thread.currentThread().getName(), lockManager.getLocks().iterator().next().getLockThread());
assertEquals(2, ((AbstractLock)lockManager.getLocks().iterator().next()).getHoldCount());
lockManager.unlock(KEY);
assertEquals(1, lockManager.getLocks().size());
assertEquals(1, ((AbstractLock)lockManager.getLocks().iterator().next()).getHoldCount());
assertDbRowLocked(KEY);
lockManager.unlock(KEY);
assertEquals(0, lockManager.getLocks().size());
assertDbRowNotLocked(KEY);
assertDbRowDoesNotExist(KEY);
}
private void assertDbRowLocked(final String key) {
assertDbRowLocked(key, true);
}
private void assertDbRowLocked(final String key, final boolean expectedLock) {
if (dataSource == null) {
// not a clustered db test
return;
}
try (Connection connection = dataSource.getConnection()) {
final PreparedStatement preparedLockStatement = connection.prepareStatement(LOCK_STATEMENT);
preparedLockStatement.setString(1, key);
preparedLockStatement.setQueryTimeout(10);
preparedLockStatement.executeQuery();
if (expectedLock) {
fail(String.format("Database row for key '%s% should be locked", key));
}
} catch (SQLException e) {
if (!expectedLock) {
fail(String.format("Database row for key '%s% should not be locked", key));
}
}
}
private void assertDbRowNotLocked(final String key) {
assertDbRowLocked(key, false);
}
private void assertDbRowDoesNotExist(final String key) {
if (dataSource == null) {
// not a clustered db test
return;
}
final String selectStatement = "SELECT * FROM " + TABLE_NAME_LOCK + " WHERE lockKey=?";
try (Connection connection = dataSource.getConnection()) {
final PreparedStatement preparedSelectStatement = connection.prepareStatement(selectStatement);
preparedSelectStatement.setString(1, key);
preparedSelectStatement.setQueryTimeout(10);
ResultSet resultSet = preparedSelectStatement.executeQuery();
assertFalse(String.format("There should be no database row for ", key),resultSet.next());
} catch (SQLException e) {
fail(String.format("Database row for key '%s% should not be locked", key));
}
}
@Test
public void same_thread_can_unlock_() throws Exception {
lockManager.lock("123");
assertDbRowLocked("123");
lockManager.unlock("123");
assertDbRowNotLocked("123");
assertDbRowDoesNotExist("123");
assertEquals(0, lockManager.getLocks().size());
}
@Test
public void other_thread_cannot_unlock_() throws Exception {
lockManager.lock("123");
assertDbRowLocked("123");
Thread lockThread = new Thread(() -> {
try {
lockManager.unlock("123");
} catch (LockException e) {
// expected
assertDbRowLocked("123");
}
});
lockThread.start();
lockThread.join();
assertEquals(1, lockManager.getLocks().size());
}
@Test
public void when_other_thread_contains_lock_a_lock_exception_is_thrown_on_lock_attempt() throws Exception {
lockManager.lock("123");
assertDbRowLocked("123");
try {
newSingleThreadExecutor().submit(() -> {
lockManager.lock("123");
return true;
}).get();
fail("ExecutionException excpected");
} catch (ExecutionException e) {
Throwable cause = e.getCause();
assertTrue(cause instanceof LockException);
assertDbRowLocked("123");
}
}
@Test
public void when_other_thread_contains_lock_it_cannot_be_unlocked_by_other_thread() throws Exception {
Thread lockThread = new Thread(() -> {
try {
lockManager.lock("123");
} catch (LockException e) {
fail(e.toString());
}
});
lockThread.start();
lockThread.join();
try {
lockManager.unlock("123");
assertDbRowLocked("123");
fail("Main thread should not be able to unlock");
} catch (LockException e) {
// expected
}
}
@Test
public void when_thread_containing_lock_has_ended_without_unlocking_the_lock_can_be_reclaimed_by_another_thread() throws Exception {
Thread lockThread = new Thread(() -> {
try {
lockManager.lock("123");
} catch (LockException e) {
e.printStackTrace();
}
});
lockThread.start();
lockThread.join();
try {
lockManager.lock("123");
fail("Other thread should have the lock");
} catch (LockException e) {
assertDbRowLocked("123");
}
assertEquals("123", lockManager.getLocks().iterator().next().getLockKey());
assertEquals(lockThread.getName(), lockManager.getLocks().iterator().next().getLockThread());
// set the lock thread to null and make it eligible for GC ....however since the lock has not been unlocked, we
// do expect a warning
lockThread = null;
long l = System.currentTimeMillis();
while (tryFor10Seconds(l)) {
System.gc();
if (lockManager.getLocks().size() == 0) {
break;
}
}
assertEquals(0, lockManager.getLocks().size());
assertDbRowNotLocked("123");
// main thread can lock again
lockManager.lock("123");
assertEquals("123", lockManager.getLocks().iterator().next().getLockKey());
assertEquals(Thread.currentThread().getName(), lockManager.getLocks().iterator().next().getLockThread());
}
private boolean tryFor10Seconds(final long l) {
return System.currentTimeMillis() - l < 10000;
}
@Test
public void assert_database_lock_manager_destroy_only_removes_rows_for_which_it_is_the_owner() throws Exception {
}
}
......@@ -16,3 +16,4 @@
Updating /test/foo/bar failed
Failed to acquire next trigger
Removing the lock now
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment