KX Community

Find answers, ask questions, and connect with our KX Community around the world.

Home Forums PyKX PyKX with Prefect

  • PyKX with Prefect

    Posted by cleung on June 22, 2023 at 12:00 am

    Hi There,

    We’re trying to integrate our KDB Processes with Prefect by running PyKX via an IPC port. We are running flows/tasks via Prefect that contain PyKX libraries to establish a connection. A snippet of the code can be found below:

    import os 
    import time 
    import subprocess 
    from prefect import flow, task, get_run_logger 
    from prefect_shell import ShellOperation 
    import pykx as kx 
    def setup(): 
         token = subprocess.Popen('\\ccl\data\extlib\KDBSecToken.exe', stdout=subprocess.PIPE).communicate()[0].decode('utf-8') 
         masterConn = kx.SyncQConnection( host='v-kdbr-01', port=5000, username='cleung', password=token, timeout=3.0 ) 
         user = masterConn('.z.u') 
         print('User = ' + user.py()) 
         port = masterConn('getProcessClient[`prefect_testing_base;`pykx_test]') 
         print('Port = ' + str(port.py())) 
         masterConn.close() 
         global conn 
         conn = kx.SyncQConnection( host='v-kdbr-01', port=port.py(), username='cleung', password=token) 
         user = conn('.z.u') 
         print('User = ' + user.py()) 
    
    @flow() 
    def ibes_pykx():
         setup() logger = get_run_logger() 
         logger.info("Initializing initial arguments") 
         dailyDates = conn('dailyDates: .dt.drb[.dt.shiftdateb[exec max date from QModelD;-3]; exec max date from QModelD];') 
         logFile = conn('inLogFile:`') 
         updateIBESTickers_result = updateIBESTickers.submit() 
         saveRecRevAI260_result = saveRecRevAI260.submit(wait_for=[updateIBESTickers_result])
    

    This works fine when running it on the console (on Visual Studio Code). However, when it comes time to deploy, we keep getting this issue. A deployment on prefect essentially builds the function so it can be used on the Prefect UI.

    Script at ‘./Script/ibes_pykx.py’ encountered an exception: ValueError(‘signal only works in main thread’)

    Were not sure what to do as it seems like an OS level issue. So far, I am confident that it is the

    import pykx as kx

    line that is messing it up. However, Im unsure as to what the best approach is to solve this.

    Thanks so much for the help, I know it’s a pretty niche area and any advice would be GREATLY appreciated

    rocuinneagain replied 1 week, 1 day ago 2 Members · 2 Replies
  • 2 Replies
  • rocuinneagain

    Member
    June 27, 2023 at 12:00 am

    I have tested and 1.6.0 will allow the import to continue after this error.

     

    This runs for me:

    from prefect import flow
    from prefect.deployments import Deployment
    import os
    os.environ["QARGS"] = "--unlicensed"
    
    import pykx as kx
    
    def setup():
        print("testing pykx")
        masterConn = kx.SyncQConnection( host='localhost', port=5000, timeout=3.0 )
        user = masterConn('.z.u')
        print('User = ' + user.py())
        port = masterConn('5000')
        print('Port = ' + str(port.py()))
        masterConn.close()
        global conn
        conn = kx.SyncQConnection( host='localhost', port=port.py())
        user = conn('.z.u')
        print('User = ' + user.py())
    
    @flow()
    def test_pykx():
        setup()
        logger = get_run_logger()
        logger.info("Initializing initial arguments")
        dailyDates = conn('dailyDates:(.z.d-7)+til 7')
        logFile = conn('inLogFile:`')
    
    
    def deploy():
        deployment = Deployment.build_from_flow(
            flow=test_pykx,
            name="prefect-example-deployment"
        )
        deployment.apply()
    
    if __name__ == "__main__":
        deploy()

     

    If you want to run in licensed mode you need to move the import of pykx to avoid a nosocket error

    from prefect import flow
    from prefect.deployments import Deployment
    
    def setup():
        import pykx as kx
        print("testing pykx")
        masterConn = kx.SyncQConnection( host='localhost', port=5000, timeout=3.0 )
        user = masterConn('.z.u')
        print('User = ' + user.py())
        port = masterConn('5000')
        print('Port = ' + str(port.py()))
        masterConn.close()
        global conn
        conn = kx.SyncQConnection( host='localhost', port=port.py())
        user = conn('.z.u')
        print('User = ' + user.py())
    
    @flow()
    def test_pykx():
        setup()
        logger = get_run_logger()
        logger.info("Initializing initial arguments")
        dailyDates = conn('dailyDates:(.z.d-7)+til 7')
        logFile = conn('inLogFile:`')
    
    
    def deploy():
        deployment = Deployment.build_from_flow(
            flow=test_pykx,
            name="prefect-example-deployment"
        )
        deployment.apply()
    
    if __name__ == "__main__":
        deploy()

     

     

  • rocuinneagain

    Member
    April 19, 2024 at 12:13 pm

    We added some functionality in version 2.3.0

    https://code.kx.com/pykx/2.4/release-notes/changelog.html#pykx-230

    If you enable beta features as well as pykx threading all calls into q from any python thread will be run as if they were calling from the main thread which allows python multithreaded programs to use IPC connections in licensed mode.

    You can enable this functionality like this:

    import os
    os.environ['PYKX_THREADING'] = '1'
    os.environ['PYKX_BETA_FEATURES'] = '1'
    import pykx as kx

    You will also want to ensure that kx.shutdown_thread()​ is called when the script finishes. The safest way to do this is within a try​ – finally​ block like this.

    if __name__ == '__main__':
    try:
    main()
    finally:
    kx.shutdown_thread()

    More information about this functionality and an example can be found within our documentation:

    https://code.kx.com/pykx/2.4/examples/threaded_execution/threading.html

Log in to reply.