27 March 2021

Create a Watchdog in Python

 What is Watchdog in Python

Watchdog is a python module that can be used to monitor file system changes. As the name suggests this module observes the given directory and can notify if a file is created or changed.

Step 1: Import some Stuff

import os
import time
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler

Step 2: Create the event handler


if __name__ == "__main__":
    # patterns = "*" # the file patterns we want to handle
    patterns = "*.csv" # choose only csv files
    ignore_patterns = ""
    ignore_directories = True
    case_sensitive = True
    # Create the event handler
    my_event_handler = PatternMatchingEventHandler(patterns, ignore_patterns, ignore_directories, case_sensitive)

So here I have created a variable named patterns and have set it to '*.csv'. The ide is to use this to keep a tab on all csv files. If instead of "*.csv" , I set it to "*", then it will keep a watch on all files.
The ignore_patterns contains the patterns we dont want to handle
The ignore_directories is a boolean and we can set it to True in case we don't want to watch changes in directories.
If case_sensitive is set to True, makes the patterns to be case sensitive.
my_event_handler is an object that will be notified when something happens to the file system. In general a script is written to watch over any type of new files created or modified like jpg, xml, csv etc.

Step 3: Handle the Events

Next we need to write the code we want to run when the events are raised. So let us create four different functions that will be used when a file is created, deleted modified or moved.

# Handle the events
def on_created(event):
    ''' Executed when a file or a directory is created.'''
    print("Created Event: {}".format(event.src_path))

def on_deleted(event):
    ''' Executed when a file or directory is deleted. '''
    print("Deleted Event: {}".format(event.src_path))

def on_modified(event):
    '''Executed when a file is modified or a directory renamed. '''
    print("Modified Event: {}".format(event.src_path))

def on_moved(event):
    ''' Executed when a file or directory is moved. '''
    print("File Moved Event - from source: {} to destination: {}".format(event.src_path, event.dest_path))

Each one of those above functions receives the event object as first parameter, and the event object has 3 attributes:
  • event_type: modified/created/moved/deleted
  • is_directory: True/False
  • src_path: path/to/observe/file
In our example, we are just using the print function, but we can do anything else that we seek to.

Step 4: Specify the handler to which the above functions have to be called

Here we specify to the handler that we want these functions to be called when the corresponding event is raised

    # Specify the handler to which the above functions have to be called.
    my_event_handler.on_created = on_created
    my_event_handler.on_deleted = on_deleted
    my_event_handler.on_modified = on_modified
    my_event_handler.on_moved = on_moved

Step 5: Choose the directory that needs to be observed


    # Choose the directory which needs to be observed
    full_path = os.path.realpath(__file__)
    current_directory = os.path.dirname(full_path)
    symbols_data_dir = current_directory + "\\RTD_from_IB"

Step 6: Create an observer


Why observer ? Becos we need to have another object that will observe our file systems. It is the Observer that watches for any file system change and then dispatches the event to the event handler. It monitors the file system and look for any changes.

    # Create an Observer and call schedule method
    # path = "." # . represents the curent directory
    path = symbols_data_dir # use only the directory where we have stock data
    go_recursively = True
    my_observer = Observer()
    my_observer.schedule(my_event_handler, path, recursive=go_recursively)

Step 7: Start the observer


    # Start the observer thread
    my_observer.start()
    try:
        while True:
            # Set the thread sleep time
            time.sleep(1)
    except KeyboardInterrupt:
        my_observer.stop()
    my_observer.join()

Step 8: Run the program

And what do we get when we run the program , with my list of csv files having OHLCV stock data ?


Now if you notice carefully, you will observe that Watchdog triggers a modify event twice. On some occasions, it triggers the event thrice. One possible solution was to use pyinotify instead of watchdog. But the problem with pyinotify is that it works only on Linux Here is the Stackoverflow query on that . It is explained thus in the inotify FAQ

The IN_MODIFY event is emitted on a file content change (e.g. via the write() syscall) while IN_CLOSE_WRITE occurs on closing the changed file. It means each change operation causes one IN_MODIFY event (it may occur many times during manipulations with an open file) whereas IN_CLOSE_WRITE is emitted only once (on closing the file).

A workaround solution using watchdog was to change the name of the file upon writing. And then use the 'on_moved' event in watchdog to keep track of the renaming. The 'on_moved' event keeps track of two events:
  • Files that are moved
  • Files that are renamed
Yep !!! Surprised that it keeps tracks of files that are renamed ??? Check This


Windows could however throw up problems in renaming if the file already exists. This stackoverflow post gives us the workaround . 

This is my re-written piece of code: 

                # write to csv
                full_path = os.path.realpath(__file__)
                current_directory = os.path.dirname(full_path)
                src_directory = current_directory + "\\temp_data\\"
                dest_directory = current_directory + "\\RTD_from_IB\\"
                # write to a temporary directory
                src_file_name = symbol + '_Minute' + '.csv'
                src_file_with_path = os.path.join(src_directory, src_file_name)
                df.to_csv(src_file_with_path, encoding='utf-8', index=False)

                # The copy is necessiated so that any other program that is reading the csv files
                # from the destination directory (RTD from Google) continually, will have modified date
                # after the entire writing is done (and NOT while the writing is being done).
                dest_file_name = symbol + '_Minute' + '.csv'
                dest_file_with_path = os.path.join(dest_directory, dest_file_name)
                shutil.move(src_file_with_path, dest_file_with_path)
                # Rename the file so that on_moved event on watchdog module can capture the change in name
                renamed_file_name = symbol + '.csv'
                renamed_file_with_path = os.path.join(dest_directory, renamed_file_name)
                try:
                    os.rename(dest_file_with_path, renamed_file_with_path)
                except WindowsError: # On Windows os.rename won't replace the destination file if it exists. You have to remove it first
                    os.remove(renamed_file_with_path)
                    os.rename(dest_file_with_path, renamed_file_with_path)


Now we can see an event for each stock is triggered Only once and that happens when there is a change in the name of the stock


Step 9: Using watchdog in production - as daemon thread

To actually make watchdog useful, you should wrap this up as a daemon or upstart script which can be run indefinitely.  Lets first understand a daemon thread. 

For that let us first take a look at an example of a non-daemon thread.

# This is an example of a non-daemon thread
import threading
import time

def work_a():
    print('Start of thread :{}'.format(threading.currentThread().name))
    time.sleep(3)
    print('End of thread :{}'.format(threading.currentThread().name))

def work_b():
    print('Start of thread :{}'.format(threading.currentThread().name))
    print('End of thread :{}'.format(threading.currentThread().name))

a = threading.Thread(target=work_a, name='Thread-a')
b = threading.Thread(target=work_b, name='Thread-b')

a.start()
b.start()

When we run the above program, we will see the output like this:

So both the threads executed and then main thread exits and terminates the program.

Now let us look at a daemon thread.

# This is an example of a daemon thread
import threading
import time

def work_a():
    print('Start of thread :{}'.format(threading.currentThread().name))
    time.sleep(3)
    print('End of thread :{}'.format(threading.currentThread().name))

def work_b():
    print('Start of thread :{}'.format(threading.currentThread().name))
    print('End of thread :{}'.format(threading.currentThread().name))

a = threading.Thread(target=work_a, name='Thread-a', daemon=True)
b = threading.Thread(target=work_b, name='Thread-b')

a.start()
b.start()

Pay attention to the extra argument daemon=True while creating Thread a. This is how we specify a thread as daemon thread. Below image shows the output when we run this program.

Notice that program exits even though daemon thread was running.

In many big projects, some threads are there just to do background tasks like sending data, performing periodic garbage collection etc. This ca be done by a non-daemon thread. But if a non-daemon thread is used, the main thread has to keep track of them manually. However if we use a daemon thread, the main thread can completely forget about this task and this task will be either complete or be killed.

A note of caution - A daemon thread is best used for non-critical tasks. However if we choose to use a print statement to print the timestamp every time the daemon thread processes a task, and we keep track of the print statements, a daemon thread can be as effective as a normal thread. So using daemon threads is useful for services where there may not be an easy way to interrupt the thread.

Waiting for the daemon thread to exit using join() means it has a chance to produce its "End of thread" message.

# This is an example of a daemon thread with join
import threading
import time

def work_a():
    print('Start of thread :{}'.format(threading.currentThread().name))
    time.sleep(3)
    print('End of thread :{}'.format(threading.currentThread().name))

def work_b():
    print('Start of thread :{}'.format(threading.currentThread().name))
    print('End of thread :{}'.format(threading.currentThread().name))

a = threading.Thread(target=work_a, name='Thread-a', daemon=True)
b = threading.Thread(target=work_b, name='Thread-b')

a.start()
b.start()
a.join()
b.join()

This is the output when we run the program

By default, join() blocks indefinitely. It is also possible to pass a timeout argument (a float representing the number of seconds to wait for the thread to become inactive). If the thread does not complete within the timeout period, join() returns anyway.

# This is an example of a daemon thread with join and timeout
import threading
import time

def work_a():
    print('Start of thread :{}'.format(threading.currentThread().name))
    time.sleep(3)
    print('End of thread :{}'.format(threading.currentThread().name))

def work_b():
    print('Start of thread :{}'.format(threading.currentThread().name))
    print('End of thread :{}'.format(threading.currentThread().name))

a = threading.Thread(target=work_a, name='Thread-a', daemon=True)
b = threading.Thread(target=work_b, name='Thread-b')

a.start()
b.start()

a.join(2)
print('Is a alive ?', a.is_alive())
b.join()

Since the timeout is 2 seconds while the sleep time of thread a is 3, the thread is still alive 
This is our output.



No comments:

Post a Comment