KX Community

Find answers, ask questions, and connect with our KX Community around the world.
KX Community Guidelines

Home Forums PyKX PyKX with Prefect Re: PyKX with Prefect

  • rocuinneagain

    Member
    June 27, 2023 at 12:00 am

    I have tested and 1.6.0 will allow the import to continue after this error.

     

    This runs for me:

    from prefect import flow
    from prefect.deployments import Deployment
    import os
    os.environ["QARGS"] = "--unlicensed"
    
    import pykx as kx
    
    def setup():
        print("testing pykx")
        masterConn = kx.SyncQConnection( host='localhost', port=5000, timeout=3.0 )
        user = masterConn('.z.u')
        print('User = ' + user.py())
        port = masterConn('5000')
        print('Port = ' + str(port.py()))
        masterConn.close()
        global conn
        conn = kx.SyncQConnection( host='localhost', port=port.py())
        user = conn('.z.u')
        print('User = ' + user.py())
    
    @flow()
    def test_pykx():
        setup()
        logger = get_run_logger()
        logger.info("Initializing initial arguments")
        dailyDates = conn('dailyDates:(.z.d-7)+til 7')
        logFile = conn('inLogFile:`')
    
    
    def deploy():
        deployment = Deployment.build_from_flow(
            flow=test_pykx,
            name="prefect-example-deployment"
        )
        deployment.apply()
    
    if __name__ == "__main__":
        deploy()

     

    If you want to run in licensed mode you need to move the import of pykx to avoid a nosocket error

    from prefect import flow
    from prefect.deployments import Deployment
    
    def setup():
        import pykx as kx
        print("testing pykx")
        masterConn = kx.SyncQConnection( host='localhost', port=5000, timeout=3.0 )
        user = masterConn('.z.u')
        print('User = ' + user.py())
        port = masterConn('5000')
        print('Port = ' + str(port.py()))
        masterConn.close()
        global conn
        conn = kx.SyncQConnection( host='localhost', port=port.py())
        user = conn('.z.u')
        print('User = ' + user.py())
    
    @flow()
    def test_pykx():
        setup()
        logger = get_run_logger()
        logger.info("Initializing initial arguments")
        dailyDates = conn('dailyDates:(.z.d-7)+til 7')
        logFile = conn('inLogFile:`')
    
    
    def deploy():
        deployment = Deployment.build_from_flow(
            flow=test_pykx,
            name="prefect-example-deployment"
        )
        deployment.apply()
    
    if __name__ == "__main__":
        deploy()