Title: | A Small Message Queue for Parallel Processes |
Description: | This queue is a data structure that lets parallel processes send and receive messages, and it can help coordinate the work of complicated parallel tasks. Processes can push new messages to the queue, pop old messages, and obtain a log of all the messages ever pushed. File locking preserves the integrity of the data even when multiple processes access the queue simultaneously. |
Version: | 0.2.4 |
License: | MIT + file LICENSE |
URL: | https://github.com/wlandau/txtq |
BugReports: | https://github.com/wlandau/txtq/issues |
Imports: | base64url, filelock (≥ 1.0.2), R6 |
Suggests: | parallel, purrr, testthat (≥ 2.1.0) |
Encoding: | UTF-8 |
Language: | en-US |
RoxygenNote: | 7.1.1 |
NeedsCompilation: | no |
Packaged: | 2021-03-27 18:34:28 UTC; c240390 |
Author: | William Michael Landau
|
Maintainer: | William Michael Landau <will.landau@gmail.com> |
Repository: | CRAN |
Date/Publication: | 2021-03-27 19:00:02 UTC |
txtq-package
Description
The txtq package is a small message queue for R.
Author(s)
William Michael Landau will.landau@gmail.com
References
https://github.com/wlandau/txtq
Examples
# See ?txtq for examples.
R6 class for txtq
objects
Description
See the txtq()
function for full documentation and usage.
Methods
Public methods
Method new()
Initialize a txtq.
Usage
R6_txtq$new(path, use_lock_file = TRUE)
Arguments
path
Character string giving the file path of the queue. The
txtq()
function creates a folder at this path to store the messages.use_lock_file
Logical, whether to use a lock file for blocking operations. Should only be
FALSE
in specialized use cases with no parallel computing (for example, when atxtq
is used as a database and accessed by only one process.)
Method path()
Get the txtq path.
Usage
R6_txtq$path()
Method count()
Get the number of messages in the queue.
Usage
R6_txtq$count()
Method total()
Get the number of messages in the database.
Usage
R6_txtq$total()
Method empty()
Detect whether the txtq is empty.
Usage
R6_txtq$empty()
Method log()
List the whole database.
Usage
R6_txtq$log()
Method list()
List messages.
Usage
R6_txtq$list(n = -1)
Arguments
n
Number of messages.
Method pop()
Pop messages.
Usage
R6_txtq$pop(n = 1)
Arguments
n
Number of messages.
Method push()
Push messages.
Usage
R6_txtq$push(title, message)
Arguments
title
Titles of the messages.
message
Contents of the messages.
Method reset()
Reset the txtq.
Usage
R6_txtq$reset()
Method clean()
Clean the txtq.
Usage
R6_txtq$clean()
Method destroy()
Destroy the txtq.
Usage
R6_txtq$destroy()
Method validate()
Validate the txtq.
Usage
R6_txtq$validate()
Method import()
Import another txtq.
Usage
R6_txtq$import(queue)
Arguments
queue
External txtq to import.
Method clone()
The objects of this class are cloneable with this method.
Usage
R6_txtq$clone(deep = FALSE)
Arguments
deep
Whether to make a deep clone.
See Also
txtq
Create a message queue.
Description
See the README at https://github.com/wlandau/txtq and the examples in this help file for instructions.
Usage
txtq(path, use_lock_file = TRUE)
Arguments
path |
Character string giving the file path of the queue.
The |
use_lock_file |
Logical, whether to use a lock file
for blocking operations. Should only be |
NFS
As an interprocess communication tool,
txtq
relies on the filelock
package to prevent race conditions.
Unfortunately, filelock
cannot prevent race conditions
on network file systems (NFS), which means neither can txtq
.
In other words, on certain common kinds of clusters,
txtq
cannot reliably manage interprocess communication
for processes on different computers.
However, it can still serve as a low-tech replacement
for a simple non-threadsafe database.
Examples
path <- tempfile() # Define a path to your queue.
q <- txtq(path) # Create a new queue or recover an existing one.
q$validate() # Check if the queue is corrupted.
list.files(q$path()) # The queue lives in this folder.
q$list() # You have not pushed any messages yet.
# Let's say two parallel processes (A and B) are sharing this queue.
# Process A sends Process B some messages.
# You can only send character vectors.
q$push(title = "Hello", message = "process B.")
q$push(
title = c("Calculate", "Calculate"),
message = c("sqrt(4)", "sqrt(16)")
)
q$push(title = "Send back", message = "the sum.")
# See your queued messages.
# The `time` is a formatted character string from `Sys.time()`
# indicating when the message was pushed.
q$list()
q$count() # Number of messages in the queue.
q$total() # Number of messages that were ever queued.
q$empty()
# Now, let's assume process B comes online. It can consume
# some messages, locking the queue so process A does not
# mess up the data.
q$pop(2) # Return and remove the first messages that were added.
# With those messages popped, we are farther along in the queue.
q$list()
q$count() # Number of messages in the queue.
q$list(1) # You can specify the number of messages to list.
# But you still have a log of all the messages that were ever pushed.
q$log()
q$total() # Number of messages that were ever queued.
# q$pop() with no arguments just pops one message.
# Call pop(-1) to pop all the messages at once.
q$pop()
# There are more instructions.
q$pop()
# Let's say Process B follows the instructions and sends
# the results back to Process A.
q$push(title = "Results", message = as.character(sqrt(4) + sqrt(16)))
# Process A now has access to the results.
q$pop()
# Clean out the popped messages
# so the database file does not grow too large.
q$push(title = "not", message = "popped")
q$count()
q$total()
q$list()
q$log()
q$clean()
q$count()
q$total()
q$list()
q$log()
# Optionally remove all messages from the queue.
q$reset()
q$count()
q$total()
q$list()
q$log()
# Destroy the queue's files altogether.
q$destroy()
# This whole time, the queue was locked when either Process A
# or Process B accessed it. That way, the data stays correct
# no matter who is accessing/modifying the queue and when.
#
# You can import a `txtq` into another `txtq`.
# The unpopped messages are grouped together
# and sorted by timestamp.
# Same goes for the popped messages.
q_from <- txtq(tempfile())
q_to <- txtq(tempfile())
q_from$push(title = "from", message = "popped")
q_from$push(title = "from", message = "unpopped")
q_to$push(title = "to", message = "popped")
q_to$push(title = "to", message = "unpopped")
q_from$pop()
q_to$pop()
q_to$import(q_from)
q_to$list()
q_to$log()