Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Obtain Bitcask write locks using system call flock/2 #257

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
.eunit/*
deps/*
deps
ebin
priv/*.so
*.o
*.beam
*~
.local_dialyzer_plt
log/
erl_crash.dump
19 changes: 18 additions & 1 deletion c_src/bitcask_nifs.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// bitcask: Eric Brewer-inspired key/value store
//
// Copyright (c) 2010 Basho Technologies, Inc. All Rights Reserved.
// Copyright (c) 2018 Workday, Inc.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary? We had a chat about this recently (I'll copy you in via mail.)

//
// This file is provided to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file
Expand All @@ -22,6 +23,7 @@
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/file.h>
#include <sys/stat.h>
#include <stdint.h>
#include <time.h>
Expand Down Expand Up @@ -306,6 +308,7 @@ static ERL_NIF_TERM ATOM_ILT_CREATE_ERROR; /* Iteration lock thread creation err
static ERL_NIF_TERM ATOM_ITERATION_IN_PROCESS;
static ERL_NIF_TERM ATOM_ITERATION_NOT_PERMITTED;
static ERL_NIF_TERM ATOM_ITERATION_NOT_STARTED;
static ERL_NIF_TERM ATOM_LOCKED;
static ERL_NIF_TERM ATOM_LOCK_NOT_WRITABLE;
static ERL_NIF_TERM ATOM_NOT_FOUND;
static ERL_NIF_TERM ATOM_NOT_READY;
Expand Down Expand Up @@ -2191,13 +2194,26 @@ ERL_NIF_TERM bitcask_nifs_lock_acquire(ErlNifEnv* env, int argc, const ERL_NIF_T
// Use O_SYNC (in addition to other flags) to ensure that when we write
// data to the lock file it is immediately (or nearly) available to any
// other reading processes
flags = O_CREAT | O_EXCL | O_RDWR | O_SYNC;
flags = O_CREAT | O_RDWR | O_SYNC;
}

// Try to open the lock file -- allocate a resource if all goes well.
int fd = open(filename, flags, 0600);
if (fd > -1)
{
if (is_write_lock)
{
// write locks require that the file is locked not just in the same process
// but for all processes on the OS, so require an exclusive write lock.
// this lock will be released when the file handle is closed.
// LOCK_EX: an exclusive lock, LOCK_NB: non-blocking (don't block a nif!)
int flock_result = flock(fd, LOCK_EX | LOCK_NB);
if (flock_result != 0)
{
close(fd);
return enif_make_tuple2(env, ATOM_ERROR, ATOM_LOCKED);
}
}
// Successfully opened the file -- setup a resource to track the FD.
unsigned int filename_sz = strlen(filename) + 1;
bitcask_lock_handle* handle = enif_alloc_resource_compat(env, bitcask_lock_RESOURCE,
Expand Down Expand Up @@ -3036,6 +3052,7 @@ static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
ATOM_ITERATION_NOT_PERMITTED = enif_make_atom(env, "iteration_not_permitted");
ATOM_ITERATION_NOT_STARTED = enif_make_atom(env, "iteration_not_started");
ATOM_LOCK_NOT_WRITABLE = enif_make_atom(env, "lock_not_writable");
ATOM_LOCKED = enif_make_atom(env, "locked");
ATOM_NOT_FOUND = enif_make_atom(env, "not_found");
ATOM_NOT_READY = enif_make_atom(env, "not_ready");
ATOM_OK = enif_make_atom(env, "ok");
Expand Down
13 changes: 11 additions & 2 deletions src/bitcask.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
%% bitcask: Eric Brewer-inspired key/value store
%%
%% Copyright (c) 2010 Basho Technologies, Inc. All Rights Reserved.
%% Copyright (c) 2018 Workday, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
Expand Down Expand Up @@ -138,9 +139,17 @@ open(Dirname, Opts) ->
%% If the lock file is not stale, we'll continue initializing
%% and loading anyway: if later someone tries to write
%% something, that someone will get a write_locked exception.
_ = bitcask_lockops:delete_stale_lock(write, Dirname),
case bitcask_lockops:try_write_lock_acquisition(Dirname) of
ok ->
ok;
not_stale ->
error_logger:error_msg(
"Attempted to obtain the Bitcask write lock for '~s' but another "
"Bitcask DB has the lock! Continuing but all write operations will fail.", [Dirname])
end,
fresh;
false -> undefined
false ->
undefined
end,

%% Get the max file size parameter from opts
Expand Down
65 changes: 14 additions & 51 deletions src/bitcask_lockops.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
%% bitcask: Eric Brewer-inspired key/value store
%%
%% Copyright (c) 2010 Basho Technologies, Inc. All Rights Reserved.
%% Copyright (c) 2018 Workday, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
Expand All @@ -23,9 +24,12 @@

-export([acquire/2,
release/1,
delete_stale_lock/2,
read_activefile/2,
try_write_lock_acquisition/1,
write_activefile/2]).
-ifdef(TEST).
-export([lock_filename/2]).
-endif.

-ifdef(PULSE).
-compile({parse_transform, pulse_instrument}).
Expand All @@ -38,7 +42,8 @@
-spec acquire(Type::lock_types(), Dirname::string()) -> {ok, reference()} | {error, any()}.
acquire(Type, Dirname) ->
LockFilename = lock_filename(Type, Dirname),
case bitcask_nifs:lock_acquire(LockFilename, 1) of
IsWriteLock = 1,
case bitcask_nifs:lock_acquire(LockFilename, IsWriteLock) of
{ok, Lock} ->
%% Successfully acquired our lock. Update the file with our PID.
case bitcask_nifs:lock_writedata(Lock, iolist_to_binary([os:getpid(), " \n"])) of
Expand All @@ -47,16 +52,6 @@ acquire(Type, Dirname) ->
{error, _} = Else ->
Else
end;
{error, eexist} ->
%% Lock file already exists, but may be stale. Delete it if it's stale
%% and try to acquire again
case delete_stale_lock(LockFilename) of
ok ->
acquire(Type, Dirname);
not_stale ->
{error, locked}
end;

{error, Reason} ->
{error, Reason}
end.
Expand Down Expand Up @@ -92,9 +87,6 @@ write_activefile(Lock, ActiveFilename) ->
Contents = iolist_to_binary([os:getpid(), " ", ActiveFilename, "\n"]),
bitcask_nifs:lock_writedata(Lock, Contents).

delete_stale_lock(Type, Dirname) ->
delete_stale_lock(lock_filename(Type,Dirname)).

%% ===================================================================
%% Internal functions
%% ===================================================================
Expand All @@ -118,50 +110,21 @@ read_lock_data(Lock) ->
{error, Reason}
end.

os_pid_exists(Pid) ->
%% Use kill -0 trick to determine if a process exists. This _should_ be
%% portable across all unix variants we are interested in.
[] == os:cmd(io_lib:format("kill -0 ~s", [Pid])).


delete_stale_lock(Filename) ->
%% Open the lock for read-only access. We do this to avoid race-conditions
%% with other O/S processes that are attempting the same task. Opening a
%% fd and holding it open until AFTER the unlink ensures that the file we
%% initially read is the same one we are deleting.
case bitcask_nifs:lock_acquire(Filename, 0) of
%% Attempt to obtain the file lock across OS processes, within the span of the call
%% to check if the file is already in use.
try_write_lock_acquisition(Filename) ->
IsWriteLock = 1,
case bitcask_nifs:lock_acquire(lock_filename(write, Filename), IsWriteLock) of
{ok, Lock} ->
try
case read_lock_data(Lock) of
{ok, OsPid, _LockedFilename} ->
case os_pid_exists(OsPid) of
true ->
%% The lock IS NOT stale, so we can't delete it.
not_stale;
false ->
%% The lock IS stale; delete the file.
_ = file:delete(Filename),
ok
end;

{error, Reason} ->
error_logger:error_msg("Failed to read lock data from ~s: ~p\n",
[Filename, Reason]),
not_stale
end
after
bitcask_nifs:lock_release(Lock)
end;

bitcask_nifs:lock_release(Lock);
{error, enoent} ->
%% Failed to open the lock for reading, but only because it doesn't exist
%% any longer. Treat this as a successful delete; the lock file existed
%% when we started.
ok;

{error, Reason} ->
%% Failed to open the lock for reading due to other errors.
error_logger:error_msg("Failed to open lock file ~s: ~p\n",
[Filename, Reason]),
not_stale
end.
end.
88 changes: 88 additions & 0 deletions test/bitcask_lockops_tests.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2018 Workday, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(bitcask_lockops_tests).

-include_lib("eunit/include/eunit.hrl").

-export([bitcask_locker_vm_main/0]).

lock_cannot_be_obtained_on_already_locked_file_within_same_os_process_test() ->
Dir = "/tmp",
Filename = bitcask_lockops:lock_filename(write,Dir),
ok = file_delete(Filename),
ok = file:write_file(Filename, ""),
{ok, _Lock} = bitcask_lockops:acquire(write, Dir),
?assertEqual(
{error, locked},
bitcask_lockops:acquire(write, Dir)
).

lock_can_be_obtained_on_already_locked_file_is_unlocked_test() ->
Dir = "/tmp",
Filename = bitcask_lockops:lock_filename(write,Dir),
ok = file_delete(Filename),
ok = file:write_file(Filename, ""),
{ok, Lock} = bitcask_lockops:acquire(write, Dir),
bitcask_lockops:release(Lock),
?assertMatch(
{ok, _},
bitcask_lockops:acquire(write, Dir)
).

lock_cannot_be_obtained_on_already_locked_file_across_os_process_test() ->
os:cmd("mkdir -p /tmp/lockbitcask"),
os:cmd("rm -rf /tmp/lockbitcask/*"),
%% start another erlang vm that will open the DB and obtain a write
%% lock, then, when we try to obtain a write lock on the current vm
%% it should fail.
Eval1 =
"erl -pa ebin -pa deps/*/ebin -noinput -noshell "
"-eval \"bitcask_lockops_tests:bitcask_locker_vm_main().\"",
spawn_link(
fun() ->
Output = os:cmd(Eval1),
?debugFmt("\nCMD OUTPUT: ~s", [Output])
end),
timer:sleep(1000),
DB = bitcask:open("/tmp/lockbitcask",[read_write]),
?assertEqual(
{error,{error,locked}},
bitcask:put(DB, <<"k">>, <<"v">>)
).

%% entry function for another vm to lock a bitcask DB
bitcask_locker_vm_main() ->
case (catch bitcask:open("/tmp/lockbitcask", [read_write,{open_timeout,1000}])) of
{error, Error} ->
io:format("ERROR: ~p", [Error]);
DB ->
%% DB is locked on the first write operation
ok = bitcask:put(DB, <<"k">>, <<"v">>),
io:format('~p',[os:getpid()]),
timer:sleep(3000),
erlang:halt()
end.

file_delete(Filename) ->
case file:delete(Filename) of
ok -> ok;
{error,enoent} -> ok
end.