~woffs/haskell-amqp-utils

850b4e3cdb52e31df01629132f1e062cd391758f — Frank Doepper 8 months ago fee9e40 0.6.3.1
release 0.6.3.1

- avoid setCurrentDirectory which is not thread-safe
- start watch before initial scan
- put filename without directory into filename header
M ChangeLog.md => ChangeLog.md +6 -0
@@ 1,5 1,11 @@
# Revision history for haskell-amqp-utils

## 0.6.3.1  -- 2021-09-08

* avoid setCurrentDirectory which is not thread-safe
* start watch before initial scan
* put filename without directory into filename header

## 0.6.3.0  -- 2021-09-06

* watch multiple dirs in hotfolder mode

M Network/AMQP/Utils/Helpers.hs => Network/AMQP/Utils/Helpers.hs +4 -3
@@ 1,4 1,5 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE OverloadedStrings #-}

module Network.AMQP.Utils.Helpers where



@@ 422,9 423,9 @@ sleepingBeauty =
    return

-- | extract first input file in case only one is needed
firstInputFile :: [(FilePath,String,String)] -> BS.ByteString
firstInputFile []          = BS.pack "-"
firstInputFile ((x,_,_):_) = BS.pack x
firstInputFile :: [(RawFilePath,String,String)] -> RawFilePath
firstInputFile []          = "-"
firstInputFile ((x,_,_):_) = x

-- | read RawFilePath to Lazy ByteString
readFileRawLazy :: RawFilePath -> IO BL.ByteString

M Network/AMQP/Utils/Options.hs => Network/AMQP/Utils/Options.hs +15 -12
@@ 1,18 1,21 @@
{-# LANGUAGE OverloadedStrings #-}

module Network.AMQP.Utils.Options where

import qualified Data.ByteString.Char8 as BS
import qualified Data.ByteString.Char8            as BS
import           Data.Default.Class
import           Data.Int              (Int64)
import qualified Data.Map              as M
import           Data.Int                         (Int64)
import qualified Data.Map                         as M
import           Data.Maybe
import           Data.Text             (Text, pack)
import           Data.Version          (showVersion)
import           Data.Word             (Word16)
import           Data.Text                        (Text, pack)
import           Data.Version                     (showVersion)
import           Data.Word                        (Word16)
import           Network.AMQP
import           Network.AMQP.Types
import           Network.Socket        (PortNumber)
import           Paths_amqp_utils      (version)
import           Network.Socket                   (PortNumber)
import           Paths_amqp_utils                 (version)
import           System.Console.GetOpt
import           System.FilePath.Posix.ByteString (RawFilePath)

portnumber :: Args -> PortNumber
portnumber a


@@ 43,7 46,7 @@ data Args =
    , additionalArgs  :: [String]
    , connectionName  :: Maybe String
    , tmpQName        :: String
    , inputFiles      :: [(FilePath,String,String)]
    , inputFiles      :: [(RawFilePath,String,String)]
    , outputFile      :: String
    , lineMode        :: Bool
    , confirm         :: Bool


@@ 70,7 73,7 @@ data Args =
    , simple          :: Bool
    , cleanupTmpFile  :: Bool
    , removeSentFile  :: Bool
    , moveSentFileTo  :: Maybe FilePath
    , moveSentFileTo  :: Maybe RawFilePath
    , initialScan     :: Bool
    }



@@ 199,7 202,7 @@ allOptions =
    , Option
        ['f']
        ["inputfile"]
        (ReqArg (\s o -> o {inputFiles = (s,currentExchange o,rKey o):(inputFiles o)}) "INPUTFILE")
        (ReqArg (\s o -> o {inputFiles = (BS.pack s,currentExchange o,rKey o):(inputFiles o)}) "INPUTFILE")
        ("Message input file (default: <stdin>)"))
  , ( "p"
    , Option


@@ 316,7 319,7 @@ allOptions =
    , Option
        ['u']
        ["remove", "move"]
        (OptArg (\s o -> o {removeSentFile = True, moveSentFileTo = s}) "DIR")
        (OptArg (\s o -> o {removeSentFile = True, moveSentFileTo = fmap BS.pack s}) "DIR")
        ("Remove (or move to DIR) sent file in hotfolder mode"))
  , ( "a"
    , Option

M agitprop.hs => agitprop.hs +33 -34
@@ 22,7 22,6 @@ import           Network.AMQP.Utils.Connection
import           Network.AMQP.Utils.Helpers
import           Network.AMQP.Utils.Options
import           Paths_amqp_utils                 (version)
import           System.Directory
import           System.Environment
import           System.Exit
import           System.FilePath.Posix.ByteString


@@ 55,10 54,12 @@ main = do
  let publishOneMsg =
        publishOneMsg' chan args {removeSentFile = removeSentFile args && isDir}
  if isDir
    then printparam "initial scan" (initialScan args) >>
         if isNothing (moveSentFileTo args)
           then printparam "remove sent file" (removeSentFile args)
           else printparam "move sent file to" (moveSentFileTo args)
    then do
      printparam "hotfolder mode" True
      printparam "initial scan" (initialScan args)
      if isNothing (moveSentFileTo args)
        then printparam "remove sent file" (removeSentFile args)
        else printparam "move sent file to" (moveSentFileTo args)
    else printparam
           "input file"
           [ inputFile'


@@ 74,7 75,7 @@ main = do
         sleepingBeauty >>= printparam "exception"
         forM_ wds (\(wd,folder) -> do
           removeWatch wd
           hr $ "END watching " ++ folder
           printparam "END watching" folder
           )
#else
         X.throw (X.ErrorCall "ERROR: watching a directory is only supported in Linux")


@@ 86,8 87,8 @@ main = do
             then BL.getContents
             else readFileRawLazy inputFile'
         if (lineMode args)
           then mapM_ (publishOneMsg (currentExchange args) (rKey args) Nothing Nothing) (BL.lines messageFile)
           else publishOneMsg (currentExchange args) (rKey args) Nothing (Just (inputFile')) messageFile
           then mapM_ (publishOneMsg (currentExchange args) (rKey args) Nothing) (BL.lines messageFile)
           else publishOneMsg (currentExchange args) (rKey args) (Just (inputFile')) messageFile
         hr "END sending")
    exceptionHandler
  -- all done. wait and close.


@@ 100,24 101,23 @@ main = do
-- | watch a hotfolder
watchHotfolder ::
     Args
  -> (String -> String -> Maybe FilePath -> Maybe RawFilePath -> BL.ByteString -> IO ())
  -> (FilePath, String, String)
  -> IO (WatchDescriptor,String)
  -> (String -> String -> Maybe RawFilePath -> BL.ByteString -> IO ())
  -> (RawFilePath, String, String)
  -> IO (WatchDescriptor, RawFilePath)
watchHotfolder args publishOneMsg (folder, exchange, rkey) = do
  printparam "hotfolder" folder
  setCurrentDirectory folder
  if (initialScan args)
   then RD.listDirectory "." >>=
        mapM_ (\fn -> handleFile (publishOneMsg exchange rkey (Just folder)) (suffix args) (Just folder) fn)
   else return ()
  inotify <- initINotify
  wd <-
   addWatch
     inotify
     [CloseWrite, MoveIn]
     (BS.pack folder)
     (handleEvent (publishOneMsg exchange rkey (Just folder)) (suffix args) (Just folder))
  hr $ "BEGIN watching " ++ folder
     folder
     (handleEvent (publishOneMsg exchange rkey) (suffix args) folder)
  hr "BEGIN watching"
  if (initialScan args)
   then RD.listDirectory folder >>=
        mapM_ (\fn -> handleFile (publishOneMsg exchange rkey) (suffix args) (folder </> fn))
   else return ()
  return (wd,folder)
#endif



@@ 143,25 143,25 @@ confirmCallback (deliveryTag, isAll, ackType) =
#if linux_HOST_OS
-- | Hotfolder event handler
handleEvent ::
     (Maybe RawFilePath -> BL.ByteString -> IO ()) -> [BS.ByteString] -> Maybe FilePath -> Event -> IO ()
     (Maybe RawFilePath -> BL.ByteString -> IO ()) -> [BS.ByteString] -> RawFilePath -> Event -> IO ()
-- just handle closewrite and movedin events
handleEvent func suffixes folder (Closed False (Just fileName) True) =
  handleFile func suffixes folder fileName
  handleFile func suffixes (folder </> fileName)
handleEvent func suffixes folder (MovedIn False fileName _) =
  handleFile func suffixes folder fileName
  handleFile func suffixes (folder </> fileName)
handleEvent _ _ _ _ = return ()

-- | Hotfolder file handler
handleFile ::
     (Maybe RawFilePath -> BL.ByteString -> IO ()) -> [BS.ByteString] -> Maybe FilePath -> RawFilePath -> IO ()
handleFile func suffixes@(_:_) folder fileName =
     (Maybe RawFilePath -> BL.ByteString -> IO ()) -> [BS.ByteString] -> RawFilePath -> IO ()
handleFile func suffixes@(_:_) fileName =
  if (any (flip BS.isSuffixOf fileName) suffixes) && not ("." `BS.isPrefixOf` fileName)
    then handleFile func [] folder fileName
    then handleFile func [] fileName
    else return ()
handleFile func [] folder fileName =
handleFile func [] fileName =
  X.catch
    (mapM_ setCurrentDirectory folder >> readFileRawLazy fileName >>= func (Just fileName))
    (\e -> printparam "exception in handleFile" (e :: X.IOException))
    (readFileRawLazy fileName >>= func (Just fileName))
    (\e -> printparam "exception while processing" fileName >> printparam "exception in handleFile" (e :: X.IOException))
#endif

-- | Publish one message with our settings


@@ 170,12 170,11 @@ publishOneMsg' ::
  -> Args
  -> String
  -> String
  -> Maybe FilePath
  -> Maybe RawFilePath
  -> BL.ByteString
  -> IO ()
publishOneMsg' chan a exchange rkey folder fn content = do
  printparam "sending" [fmap BS.pack folder, fn]
publishOneMsg' chan a exchange rkey fn content = do
  printparam "sending" fn
  (mtype, mencoding) <-
    if (magic a)
      then do


@@ 207,10 206,10 @@ publishOneMsg' chan a exchange rkey folder fn content = do
      , msgPriority = prio a
      , msgCorrelationID = corrid a
      , msgExpiration = msgexp a
      , msgHeaders = substheader (fnheader a) fn $ msgheader a
      , msgHeaders = substheader (fnheader a) (fmap takeFileName fn) $ msgheader a
      } >>=
    printparam "sent"
  removeSentFileIfRequested (removeSentFile a) (fmap BS.pack (moveSentFileTo a)) fn
  removeSentFileIfRequested (removeSentFile a) (moveSentFileTo a) fn
  where
    substheader ::
         [String] -> Maybe BS.ByteString -> Maybe FieldTable -> Maybe FieldTable


@@ 223,7 222,7 @@ publishOneMsg' chan a exchange rkey folder fn content = do
      printparam "removing" fname >> RD.removeFile fname
    removeSentFileIfRequested True (Just path) (Just fname) =
      printparam "moving" [fname,"to",path] >>
      F.rename fname (replaceDirectory fname path)
      F.rename fname (replaceDirectory fname ((takeDirectory fname) </> path))
    addheader' :: Maybe FieldTable -> String -> BS.ByteString -> Maybe FieldTable
    addheader' Nothing k v =
      Just $ FieldTable $ M.singleton (T.pack k) (FVString v)

M amqp-utils.cabal => amqp-utils.cabal +1 -1
@@ 1,6 1,6 @@
name:                amqp-utils

version:             0.6.3.0
version:             0.6.3.1

synopsis:            AMQP toolset for the command line


M amqp-utils.nix => amqp-utils.nix +1 -1
@@ 6,7 6,7 @@

mkDerivation {
  pname = "amqp-utils";
  version = "0.6.3.0";
  version = "0.6.3.1";
  src = ./.;
  isLibrary = false;
  isExecutable = true;

M debian/changelog => debian/changelog +8 -0
@@ 1,3 1,11 @@
haskell-amqp-utils (0.6.3.1) unstable; urgency=medium

  * avoid setCurrentDirectory which is not thread-safe
  * start watch before initial scan
  * put filename without directory into filename header

 -- Frank Doepper <fd@taz.de>  Wed, 08 Sep 2021 12:06:49 +0200

haskell-amqp-utils (0.6.3.0) unstable; urgency=medium

  * watch multiple dirs in hotfolder mode