code.onehippo.org is currently readonly. We are migrating to code.bloomreach.com, please continue working there on Monday 14/12. See: https://docs.bloomreach.com/display/engineering/GitLab

Commit 5ab1e55f authored by Ard Schrijvers's avatar Ard Schrijvers

REPO-1811 Extract abstract test and add an 'abort' test

parent d3d25849
/*
* Copyright 2017 Hippo B.V. (http://www.onehippo.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onehippo.repository.lock;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import javax.jcr.Repository;
import javax.sql.DataSource;
import org.apache.jackrabbit.core.util.db.ConnectionHelperDataSourceAccessor;
import org.hippoecm.repository.impl.RepositoryDecorator;
import org.hippoecm.repository.jackrabbit.RepositoryImpl;
import org.junit.After;
import org.junit.Before;
import org.onehippo.cms7.services.HippoServiceRegistry;
import org.onehippo.cms7.services.lock.LockException;
import org.onehippo.cms7.services.lock.LockManager;
import org.onehippo.repository.journal.JournalConnectionHelperAccessor;
import org.onehippo.repository.testutils.RepositoryTestCase;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.onehippo.repository.lock.db.DbLockManager.CREATE_STATEMENT;
import static org.onehippo.repository.lock.db.DbLockManager.SELECT_STATEMENT;
public abstract class AbstractLockManagerTest extends RepositoryTestCase {
protected final String CLUSTER_NODE_ID = "node1";
protected InternalLockManager lockManager;
// dataSource is not null in case of cluster Db test
protected DataSource dataSource;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
lockManager = (InternalLockManager)HippoServiceRegistry.getService(LockManager.class);
Repository repository = server.getRepository();
if (repository instanceof RepositoryDecorator) {
repository = RepositoryDecorator.unwrap(repository);
}
if (repository instanceof RepositoryImpl) {
JournalConnectionHelperAccessor journalConnectionHelperAccessor = ((RepositoryImpl)repository).getJournalConnectionHelperAccessor();
if (journalConnectionHelperAccessor.getConnectionHelper() != null) {
// running a cluster db test
dataSource = ConnectionHelperDataSourceAccessor.getDataSource(journalConnectionHelperAccessor.getConnectionHelper());
}
}
}
@Override
@After
public void tearDown() throws Exception {
lockManager.clear();
// DELETE ALL ROWS if there are any present
if (dataSource != null) {
Connection connection = null;
boolean originalAutoCommit = false;
try {
connection = dataSource.getConnection();
originalAutoCommit = connection.getAutoCommit();
connection.setAutoCommit(true);
final PreparedStatement deleteStatement = connection.prepareStatement("DELETE FROM hippo_lock");
deleteStatement.execute();
} catch (SQLException e) {
fail("Failed to delete rows : " + e.toString());
} finally {
close(connection, originalAutoCommit);
}
}
super.tearDown();
}
private void close(final Connection connection, final boolean originalAutoCommit) {
if (connection == null) {
return;
}
try {
connection.setAutoCommit(originalAutoCommit);
connection.close();
} catch (SQLException e) {
log.error("Failed to close connection.", e);
}
}
protected void dbRowAssertion(final String key, final String expectedStatus) throws SQLException {
dbRowAssertion(key, expectedStatus, null, null);
}
protected void dbRowAssertion(final String key, final String expectedStatus, final String lockOwnerExpectation, final String lockThreadExpectation) throws SQLException {
if (dataSource == null) {
// not a clustered db test
return;
}
try (Connection connection = dataSource.getConnection()) {
final PreparedStatement selectStatement = connection.prepareStatement(SELECT_STATEMENT);
selectStatement.setString(1, key);
ResultSet resultSet = selectStatement.executeQuery();
if (resultSet.next()) {
assertEquals(expectedStatus, resultSet.getString("status"));
if (lockOwnerExpectation != null) {
assertEquals(lockOwnerExpectation, resultSet.getString("lockOwner"));
}
if (lockThreadExpectation != null) {
assertEquals(lockThreadExpectation, resultSet.getString("lockThread"));
}
} else {
fail(String.format("A row with lockKey '%s' should exist", key));
}
}
}
protected void addManualLockToDatabase(final String key, final String clusterNodeId,
final String threadName, final int refreshRateSeconds) throws LockException {
if (dataSource != null) {
Connection connection = null;
boolean originalAutoCommit = false;
try {
connection = dataSource.getConnection();
originalAutoCommit = connection.getAutoCommit();
final PreparedStatement createStatement = connection.prepareStatement(CREATE_STATEMENT);
connection.setAutoCommit(true);
createStatement.setString(1, key);
createStatement.setString(2, clusterNodeId);
createStatement.setString(3, threadName);
long lockTime = System.currentTimeMillis();
createStatement.setLong(4, lockTime);
createStatement.setLong(5, refreshRateSeconds);
createStatement.setLong(6, lockTime + refreshRateSeconds * 1000);
try {
createStatement.execute();
} catch (SQLException e) {
throw new LockException(String.format("Cannot create lock row for '{}'", key), e);
}
} catch (SQLException e) {
fail("Failed to delete rows : " + e.toString());
} finally {
close(connection, originalAutoCommit);
}
}
}
}
/*
* Copyright 2017 Hippo B.V. (http://www.onehippo.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onehippo.repository.lock;
import java.sql.SQLException;
import org.junit.Test;
import org.onehippo.cms7.services.lock.LockException;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class LockManagerAbortTest extends AbstractLockManagerTest {
@Test
public void any_thread_can_request_abort_which_results_in_an_interrupt_for_thread_holding_lock() throws Exception {
final String key = "123";
final LockRunnable runnable = new LockRunnable(key, true);
final Thread lockThread = new Thread(runnable);
lockThread.start();
// give lockThread time to lock
Thread.sleep(100);
// now abort with main thread : This should signal directly an interrupt to the Thread that has the lock because
// the Lock is kept in the same JVM
lockManager.abort("123");
// although keepalive still true, the Thread should still have stopped because of the interrupt
lockThread.join();
assertTrue("lockThread should be interrupted", runnable.interrupted);
if (runnable.msgExceptionPair != null) {
fail(runnable.msgExceptionPair.msg + " : " + runnable.msgExceptionPair.e.toString());
}
// AFTER the interrupt, the database record should be in state 'FREE' since LockRunnable invoked #unlock
dbRowAssertion(key, "FREE");
}
@Test
public void fake_other_cluster_node_has_set_abort() throws Exception {
// by manually in the database setting status 'ABORT' is the same as if it is done on a different cluster node
}
private class LockRunnable implements Runnable {
private String key;
private volatile boolean keepAlive;
private boolean interrupted = false;
private MsgExceptionPair msgExceptionPair;
LockRunnable(final String key , final boolean keepAlive) {
this.key = key;
this.keepAlive = keepAlive;
}
@Override
public void run() {
try {
lockManager.lock(key);
while (keepAlive) {
Thread.sleep(25);
}
} catch (LockException e) {
msgExceptionPair = new MsgExceptionPair("LockException", e);
} catch (InterruptedException e) {
try {
// The DB entry is either in status 'ABORT' or in status 'RUNNING'
boolean locked = lockManager.isLocked(key);
if (!locked) {
msgExceptionPair = new MsgExceptionPair("Key should be in state ABORT or RUNNING", new IllegalStateException());
}
// because of the call above to lockManager.isLocked which is synchronized, the #abort invocation by the main
// thread must have finished (because synchronized on same object) and thus the database record must now
// be 'ABORT'
try {
dbRowAssertion(key, "ABORT");
} catch (SQLException e1) {
msgExceptionPair = new MsgExceptionPair("SQL Exception", e);
}
interrupted = true;
lockManager.unlock(key);
try {
dbRowAssertion(key, "FREE");
} catch (SQLException e1) {
msgExceptionPair = new MsgExceptionPair("SQL Exception", e);
}
} catch (LockException e1) {
msgExceptionPair = new MsgExceptionPair("After interruption, the current Thread holding the lock should be able to unlock", e1);
}
}
}
}
private static class MsgExceptionPair {
private String msg;
private Exception e;
public MsgExceptionPair(final String msg, final Exception e) {
this.msg = msg;
this.e = e;
}
}
}
......@@ -50,68 +50,8 @@ import static org.onehippo.repository.lock.db.DbLockManager.CREATE_STATEMENT;
import static org.onehippo.repository.lock.db.DbLockManager.SELECT_STATEMENT;
import static org.onehippo.repository.lock.db.DbLockManager.TABLE_NAME_LOCK;
public class LockManagerTest extends RepositoryTestCase {
public class LockManagerBasicTest extends AbstractLockManagerTest {
private InternalLockManager lockManager;
// dataSource is not null in case of cluster Db test
private DataSource dataSource;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
lockManager = (InternalLockManager)HippoServiceRegistry.getService(LockManager.class);
Repository repository = server.getRepository();
if (repository instanceof RepositoryDecorator) {
repository = RepositoryDecorator.unwrap(repository);
}
if (repository instanceof RepositoryImpl) {
JournalConnectionHelperAccessor journalConnectionHelperAccessor = ((RepositoryImpl)repository).getJournalConnectionHelperAccessor();
if (journalConnectionHelperAccessor.getConnectionHelper() != null) {
// running a cluster db test
dataSource = ConnectionHelperDataSourceAccessor.getDataSource(journalConnectionHelperAccessor.getConnectionHelper());
}
}
}
@Override
@After
public void tearDown() throws Exception {
lockManager.clear();
// DELETE ALL ROWS if there are any present
if (dataSource != null) {
Connection connection = null;
boolean originalAutoCommit = false;
try {
connection = dataSource.getConnection();
originalAutoCommit = connection.getAutoCommit();
connection.setAutoCommit(true);
final PreparedStatement deleteStatement = connection.prepareStatement("DELETE FROM hippo_lock");
deleteStatement.execute();
} catch (SQLException e) {
fail("Failed to delete rows : " + e.toString());
} finally {
close(connection, originalAutoCommit);
}
}
super.tearDown();
}
private void close(final Connection connection, final boolean originalAutoCommit) {
if (connection == null) {
return;
}
try {
connection.setAutoCommit(originalAutoCommit);
connection.close();
} catch (SQLException e) {
log.error("Failed to close connection.", e);
}
}
@Test
public void general_single_threaded_lock_interaction() throws Exception {
......@@ -146,34 +86,6 @@ public class LockManagerTest extends RepositoryTestCase {
dbRowAssertion(key, "RUNNING");
}
private void dbRowAssertion(final String key, final String expectedStatus) throws SQLException {
dbRowAssertion(key, expectedStatus, null, null);
}
private void dbRowAssertion(final String key, final String expectedStatus, final String lockOwnerExpectation, final String lockThreadExpectation) throws SQLException {
if (dataSource == null) {
// not a clustered db test
return;
}
try (Connection connection = dataSource.getConnection()) {
final PreparedStatement selectStatement = connection.prepareStatement(SELECT_STATEMENT);
selectStatement.setString(1, key);
ResultSet resultSet = selectStatement.executeQuery();
if (resultSet.next()) {
assertEquals(expectedStatus, resultSet.getString("status"));
if (lockOwnerExpectation != null) {
assertEquals(lockOwnerExpectation, resultSet.getString("lockOwner"));
}
if (lockThreadExpectation != null) {
assertEquals(lockThreadExpectation, resultSet.getString("lockThread"));
}
} else {
fail(String.format("A row with lockKey '%s' should exist", key));
}
}
}
private void assertDbRowDoesExist(final String key) throws SQLException {
if (dataSource == null) {
// not a clustered db test
......@@ -246,10 +158,11 @@ public class LockManagerTest extends RepositoryTestCase {
}
}
class LockRunnable implements Runnable {
protected class LockRunnable implements Runnable {
private String key;
private volatile boolean keepAlive;
private Exception e;
LockRunnable(final String key , final boolean keepAlive) {
this.key = key;
......@@ -264,12 +177,7 @@ public class LockManagerTest extends RepositoryTestCase {
Thread.sleep(25);
}
} catch (LockException | InterruptedException e) {
try {
dbRowAssertion(key, "RUNNING");
} catch (SQLException e1) {
fail(e1.toString());
}
fail(e.toString());
this.e = e;
}
}
}
......@@ -284,7 +192,7 @@ public class LockManagerTest extends RepositoryTestCase {
// give lockThread time to lock
Thread.sleep(100);
dbRowAssertion(key, "RUNNING", "node1", lockThread.getName());
dbRowAssertion(key, "RUNNING", CLUSTER_NODE_ID, lockThread.getName());
try {
lockManager.unlock(key);
......@@ -299,6 +207,9 @@ public class LockManagerTest extends RepositoryTestCase {
// after the thread is finished, the lock manager should have no locks any more
lockThread.join();
if (runnable.e != null) {
fail(runnable.e.toString());
}
}
......@@ -325,6 +236,10 @@ public class LockManagerTest extends RepositoryTestCase {
runnable.keepAlive = false;
lockThread.join();
if (runnable.e != null) {
fail(runnable.e.toString());
}
assertEquals(0, lockManager.getLocks().size());
dbRowAssertion(key, "FREE");
// main thread can lock again
......@@ -374,37 +289,6 @@ public class LockManagerTest extends RepositoryTestCase {
}
}
private void addManualLockToDatabase(final String key, final String clusterNodeId,
final String threadName, final int refreshRateSeconds) throws LockException {
if (dataSource != null) {
Connection connection = null;
boolean originalAutoCommit = false;
try {
connection = dataSource.getConnection();
originalAutoCommit = connection.getAutoCommit();
final PreparedStatement createStatement = connection.prepareStatement(CREATE_STATEMENT);
connection.setAutoCommit(true);
createStatement.setString(1, key);
createStatement.setString(2, clusterNodeId);
createStatement.setString(3, threadName);
long lockTime = System.currentTimeMillis();
createStatement.setLong(4, lockTime);
createStatement.setLong(5, refreshRateSeconds);
createStatement.setLong(6, lockTime + refreshRateSeconds * 1000);
try {
createStatement.execute();
} catch (SQLException e) {
throw new LockException(String.format("Cannot create lock row for '{}'", key), e);
}
} catch (SQLException e) {
fail("Failed to delete rows : " + e.toString());
} finally {
close(connection, originalAutoCommit);
}
}
}
@Test
public void get_locks_returns_also_locks_owned_by_other_cluster_node_in_case_of_clustered_setup() throws Exception {
lockManager.lock("a");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment