使用 crontab 安排 python 脚本

2023-12-19

我有一个 bash 脚本,我正在尝试使用 cron 作业运行它。我正在尝试在我的 ubuntu 服务器上运行 cron 作业。我希望它每天 8 小时 UTC 运行。 bash 脚本激活 conda python 虚拟环境并运行 python 脚本。该脚本应该提取数据并将其加载到 mysql 数据库中。我还在整个 python 脚本中记录了日志。昨晚数据库中没有显示新数据,也没有创建新日志。下面我展示了 crontab 中的内容以及stocks_etl.sh 脚本中的内容。有谁知道问题可能是什么以及如何解决它?

须藤 crontab -e

crontab 显示

0 8 * * * /mnt/data/sda/user_storage/stocks_etl.sh

stock_etl.sh

#!/bin/bash
source activate py36
python /mnt/data/sda/user_storage/stocks_etl.py

更新#3:

当我在 ubuntu 服务器上的命令行中运行此命令时,它工作正常

bash ~/etl_scripts/stocks_etl.bashrc

当我使用同一用户在 crontab 中运行它时,它会抛出以下错误

error:

Started stocks_etl.bash
Thu Feb 25 05:20:01 UTC 2021
/home/user/etl_scripts/stocks_etl.bashrc: line 5: activate: No such file or directory
Traceback (most recent call last):
  File "/home/user/etl_scripts/stocks_etl.py", line 4, in <module>
    import numpy as np
ImportError: No module named numpy

这是 bashrc 文件:

#!/bin/bash -l
echo 'Started stocks_etl.bash'
date +'%a %b %e %H:%M:%S %Z %Y'


source activate py36
python ~/etl_scripts/stocks_etl.py

就像当我在 crontab 中运行它时,它找不到 conda,它只是使用未安装 numpy 的基本 python 安装运行它。有人知道问题可能是什么吗?您能建议如何解决它吗?

更新#2: 现在我已经对文件运行 chmod 777,当 crontab 执行时,我收到以下错误。就像 conda 虚拟环境没有被激活,它只是尝试使用基本的 python 安装来运行它

error:

/mnt/data/sda/user_storage/etl_scripts/stocks_etl.sh: line 2: activate: No such file or directory
Traceback (most recent call last):
  File "/mnt/data/sda/user_storage/etl_scripts/stocks_etl.py", line 1, in <module>
    import numpy as np
ImportError: No module named numpy

update:

stocks_etl.py

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

from yahoofinancials import YahooFinancials

import pymysql

import datetime
import logging

import time

import glob

from sqlalchemy import create_engine

import os

import datetime


# helper functions



# function for creating error logs
# Note: function not currently working, doesn't recognize logger

def error_logger(path):
    
    # adding a timestamp to logname
    ts=str(datetime.datetime.now().isoformat())
    
    # logging.basicConfig(filename='example.log',level=logging.DEBUG)
    logging.basicConfig(filename=path+ts+'.log', level=logging.DEBUG, 
                        format='%(asctime)s %(levelname)s %(name)s %(message)s')

    logger=logging.getLogger(__name__)


# function to query mysql db and return dataframe of results
def mysql_query(user,password,database,host,query):
    
    connection = pymysql.connect(user=user, password=password, database=database, host=host)


    try:
        with connection.cursor() as cursor:
            query = query


        df = pd.read_sql(query, connection)
        
        logging.info('query succeeded: '+query)
        
#     finally:
        connection.close()
        
        logging.info('close connection mysql')

    except Exception as err:
        
        logger.error('query failed: '+query+' got error: '+str(err))
        
        return df
        
    pass

    
        
    


# function to download OHLC stock data

def download_stocks(Ticker_list,start_date,end_date,time_interval,path):
    
    
    # get data for stocks in Ticker_list and save as csv

    failed_list=[]
    passed_list=[]

    Ticker_list = Ticker_list

    for x in range(len(Ticker_list)):


        try:

            yahoo_financials = YahooFinancials(Ticker_list[x])
            # data = yahoo_financials.get_historical_price_data('2019-01-01', '2019-09-30', time_interval='daily')
            data = yahoo_financials.get_historical_price_data(start_date, end_date, time_interval=time_interval)

            prices_df=pd.DataFrame(data[Ticker_list[x]]['prices'])

            prices_df=prices_df[['adjclose', 'close', 'formatted_date', 'high', 'low', 'open',
                   'volume']]

            prices_df['date']=prices_df['formatted_date']

            prices_df=prices_df[['date','adjclose', 'close', 'high', 'low', 'open',
                   'volume']]

            prices_df['Ticker']=Ticker_list[x]

            prices_df.to_csv(path+Ticker_list[x]+'.csv')

            passed_list.append(Ticker_list[x])

            logging.info('downloaded: '+Ticker_list[x])

            time.sleep(1)

        except Exception as err:

            failed_list.append(Ticker_list[x])
            logger.error('tried download: '+Ticker_list[x]+' got error: '+str(err))

        pass
        

# function read csv in and append to one dataframe

def stock_dataframe(path):    

    try:
        path = path
        all_files = glob.glob(path + "/*.csv")

        li = []

        for filename in all_files:
            df = pd.read_csv(filename, index_col=None, header=0)
            li.append(df)

        frame = pd.concat(li, axis=0, ignore_index=True)

        frame=frame[['date', 'adjclose', 'close', 'high', 'low', 'open',
               'volume', 'Ticker']]

        return frame
    
        logging.info('created stock dataframe')
        
    except Exception as err:

            logger.error('stock dataframe create failed got error: '+str(err))
            
    pass


# write dataframe to mysql db

def write_dataframe(username, password, host, schema,dataframe,table,if_exists,index):
    
    try:
        
        from sqlalchemy import create_engine
        
        # connection = pymysql.connect(user='user', password='psswd', database='sandbox', host='xxxxx')

        engine = create_engine("mysql+pymysql://"+str(username)+":"+str(password)+"@"+str(host)+"/"+str(schema))
        # engine = create_engine("mysql+mysqldb://user:"+'psswd'+"@xxxxx/sandbox")
        dataframe.to_sql(con=engine, name=table, if_exists=if_exists, index=index)
        
        logging.info('write_dataframe succeeded')
        
    except Exception as err:

            logger.error('write_dataframe failed got error: '+str(err))
            
    pass




# to do

# - create directory with datetime prefix as part of path
# - add step that checks max date in current table
# - only pull data later than max date in current table
# - check max date in current derived table
# - only pull data later than current date from source table


def etl_pipeline(table_var):


    i=table_var

    max_date_query="""select max(date) as max_date from """+i+""""""

    try:
        
        max_date_df=mysql_query(user='user',
                            password='psswd',
                            database='stocks',
                            host='xxxxx',
                            query=max_date_query)
            
        logging.info('max_date succeeded: '+i)
            
    except Exception as err:

            logger.error('max_date failed: '+i)

    pass
        


    # In[8]:

    try:
        # get max date
        max_date=max_date_df.astype(str)['max_date'][0]


        # create directory

        base_path='/mnt/data/sda/user_storage/stock_data_downloads/'

        # get current_date
        current_date=datetime.datetime.today().strftime('%Y-%m-%d')

        directory_path=base_path+i+'/'+current_date

        # create directory for downloading new stocks in to
        os.mkdir(directory_path)

        logging.info('create directory succeeded: '+i)

    except Exception as err:

            logger.error('create directory failed: '+i)

    pass


    # In[9]:


    # getting ticker symbols

    ticker_query="""select distinct ticker as ticker from """+i+""""""

    try:
        
        tickers_df=mysql_query(user='user',
                            password='psswd',
                            database='stocks',
                            host='xxxxx',
                            query=ticker_query)
            
        logging.info('get tickers succeeded: '+i)
            
    except Exception as err:

            logger.error('get tickers failed: '+i)

    pass


    # In[12]:


    # get ticker symbols 
    stocks=tickers_df.ticker.tolist()


    # download stocks
    # Note: must add '/' to end of path
    # '2019-01-01', '2021-01-01', time_interval='daily'
    download_stocks(Ticker_list=stocks,
                    start_date=max_date,
                    end_date=current_date,
                    time_interval='daily',
                    path=directory_path+'/')


    # In[70]:


    # directory_path


    # In[13]:


    # create dataframe
    stocks_df=stock_dataframe(path=directory_path)

    # trav_stocks_df.head()


    # In[14]:





    # create mysql table
    write_dataframe(username='user', 
                    password='psswd', 
                    host='xxxxx', 
                    schema='stocks',
                    dataframe=stocks_df,
                    table=i,
                    if_exists='append',
                    index=False)


    # In[15]:


    # creating additional avg annual returns

    try:
        
        query="""select ticker, avg(annual_returns) as avg_annual_returns from (
        select ticker,date, ( -1 +
                a.adjclose / max(a.adjclose) over (partition by ticker 
                                             order by date
                                             range between interval 365 day preceding and interval 365 day preceding
                                            ) 
               ) as annual_returns              
        from """+i+""" a
        ) b where annual_returns is not null
        group by ticker"""

        df=mysql_query(user='user',password='psswd',database='stocks',host='xxxxx',query=query)

        logging.info('etl succeeded: '+i+'_returns')

    except Exception as err:

            logger.error('etl failed: '+i+'_returns')

    pass


    # In[16]:


    # adding additional avg annual returns to table

    # create mysql table
    write_dataframe(username='user', 
                    password='psswd', 
                    host='xxxxx', 
                    schema='stocks',
                    dataframe=df,
                    table=i+'_returns',
                    if_exists='replace',
                    index=False)
    
    
# start logging

# adding a timestamp to logname
ts=str(datetime.datetime.now().isoformat())  

# logging.basicConfig(filename='example.log',level=logging.DEBUG)
logging.basicConfig(filename='/mnt/data/sda/user_storage/logs/etl_scripts/'+ts+'.log', level=logging.DEBUG, 
                    format='%(asctime)s %(levelname)s %(name)s %(message)s')

logger=logging.getLogger(__name__)


    
table_list=['trav_stocks','s_and_p','american_mutual_funds']

for j in table_list:
    
    try:
        
        etl_pipeline(j)
        
        logging.info('etl_pipeline succeeded: '+j)
        
    except Exception as err:

            logger.error('etl_pipeline failed: '+j)

    pass

update:

我将文件更改为 .bash 文件,并将其中的代码更改为

#!/bin/bash -l
echo ''
'Started stocks_etl.bash'
date +'%a %b %e %H:%M:%S %Z %Y'


source /home/user/anaconda3/envs/py36/bin/activate 
conda activate py36
python ~/etl_scripts/stocks_etl.py

现在我在 crontab 中运行时收到以下错误

error:

/home/user/etl_scripts/stocks_etl.bash: line 3: Started stocks_etl.bash: command not found
Fri Feb 26 16:28:01 UTC 2021
/home/user/etl_scripts/stocks_etl.bash: line 7: /home/user/anaconda3/envs/py36/bin/activate: No such file or directory
/home/user/etl_scripts/stocks_etl.bash: line 8: conda: command not found
Traceback (most recent call last):
  File "/home/user/etl_scripts/stocks_etl.py", line 4, in <module>
    import numpy as np
ImportError: No module named numpy

update:

code:

#!/bin/bash
echo ''
'Started stocks_etl.bash'
date +'%a %b %e %H:%M:%S %Z %Y'


/home/user/anaconda3 run -n py36 python ~/user/etl_scripts/stocks_etl.py

error:

/home/user/etl_scripts/stocks_etl.bash: line 3: Started stocks_etl.bash: command not found
Fri Feb 26 16:43:01 UTC 2021
/home/user/etl_scripts/stocks_etl.bash: line 7: /home/user/anaconda3: Is a directory

First, source activate语法几年前已被弃用(你的 Conda 实例有多老了?) - 你应该使用conda activate。其次,Conda shell 命令作为采购的一部分加载到 shell 中.bashrc or .bash_profile。所以至少,你需要包括-l在舍邦和

#!/bin/bash -l
conda activate py36
python /mnt/data/sda/user_storage/stocks_etl.py

您可能需要做一些额外的事情来确保.bashrc它的来源是正确的(例如,它以什么用户身份运行?)。

请注意,Conda 还具有conda run用于在 envs 内执行命令的命令,我认为应该首选:

#!/bin/bash -l
conda run -n py36 python /mnt/data/sda/user_storage/stocks_etl.py

后一种形式也应该在没有 Conda 初始化的情况下工作,但提供了完整的路径conda入口点:

#!/bin/bash

# change to match where your `conda` location
/home/user/anaconda3/condabin/conda run -n py36 python /mnt/data/sda/user_storage/stocks_etl.py
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 crontab 安排 python 脚本 的相关文章

随机推荐

  • 当自动递增列用完时会发生什么?

    考虑一个带有自动增量列的简单表 如下所示 CREATE TABLE foo fooid bigint unsigned NOT NULL auto increment snipped other columns PRIMARY KEY fo
  • 除了静态库本身之外,停止 cmake target_link_libraries 链接静态库的两个目标文件

    我尝试使用 cmake ninja msvc 在 Windows 上构建一个相当大的共享库 它由子文件夹中的多个静态库组成 那么一个根CMakeLists txt好像 project sharedlib CXX include CMAKE
  • 如何检查计费协议是否有效或取消?

    我在 PayPal 中使用参考交易 API 不确定计费时计费协议是否有效 我想提前了解计费协议是否被取消 我想知道计费协议取消电话的回拨电话 要检查计费协议的状态 您可以使用 BAUpdate API NVP METHOD BillAgre
  • R 中 N 个元素与 q 个元素的组合

    I have N 6元素和q 3元素符号为0 1 2 我想创建的所有向量N 6元素与2元素等于0 2元素等于1 and 2元素等于2在所有可能的位置 这些向量的数量等于combn 6 2 combn 4 2 combn 2 2 90 这是构
  • qt中的“morph into”有什么用?

    在qt gui编辑器中 任何gui组件都可以变形为某种类型的其他gui组件 但实际上 这个选项的实际用途是什么 可以动态完成吗 如果是的话那么这样做的好处是什么 实际用途是快速将小部件转换为其他类似的小部件 假设您有一个包含一些小部件的组框
  • Azure 无效 AccessToken

    我正在尝试使用 Microsoft Azure Management Resources 库来管理一些 Azure 资源 我已在 Azure AD 中注册了应用程序 并授予了它所有权限 我获取了它的 ApplicationId 和 Secr
  • 链接器文件中输入和输出部分之间的区别?

    虽然在生成的二进制或 ELF 文件的上下文中可以清楚什么是节 但文档中的许多地方 与所使用的编译器无关 将它们称为输入或输出节 它们之间有什么区别 链接器使用目标文件 以及可能的共享库 并输出 可执行文件或共享库 输入目标文件由命名的 部分
  • 在 Python 2.4 中处理上下文类

    我正在尝试使用 python daemon 模块 它提供 daemon DaemonContext 类来正确地守护脚本 虽然我主要针对 Python 2 6 但我想保持对版本 2 4 的向后兼容性 Python 2 5 支持从以下位置导入上
  • 创建一个接受对象但不接受数组的通用函数

    我想创建一个满足以下条件的通用 TypeScript 函数 f a 1 success f undefined success f should fail the type check f 1 2 should fail the type
  • ImageMagick PATH 无法被knitr 中的engine = "tikz" 识别

    我正在尝试编译 TikZ 图形knitr 我正在使用可用的示例here https github com yihui knitr examples blob master 058 engine tikz Rmd 我专门尝试从 Rstudio
  • 如何防止任务管理器杀死我的程序?

    有什么方法可以保护我的 Delphi 应用程序不被 Windows 任务管理器 或其他类似 Process Explorer 杀死 我认为 Windows 消息可以做到这一点 通过执行挂钩并拦截 TerminateProcess 消息 我想
  • 根据年份而不是最小值或最大值重置序列

    创建一个将根据年份重置的序列 考虑以 000000001 开头的 9 位数字序列 最大值为 999999999 出租日期为 30 12 2017 seq 为 000012849 因此 当日期为 01 01 2018 时 我希望 seq 为
  • WebSocket 在 django 通用中间件中生成错误

    我正在使用 Django gevent socketio Haproxy 所有套接字功能都工作正常 但在后台它会生成如下所示的错误 似乎 Haproxy 的套接字标头具有非常有限的属性 其中不包含 status code 因此它失败并发送错
  • 如何存储每个上下文而不是每个程序的 OpenGL 程序对象的制服?

    我正在开发多线程 OpenGL 合成引擎 我有一组在多个上下文之间共享的着色器 出于性能原因 我想避免为每个线程编译每个着色器程序的单独实例 但是 如果多个线程碰巧使用同一个程序对象 并且我尝试在每个线程上设置不同的统一值 则统一值会混淆
  • 导入/导出项目首选项

    遇到一点问题 由于我们使用的源代码控制设置 每个错误 增强都在新分支中进行 这很好 但是在 Eclipse 中 我们需要为每个分支设置一个新项目 有没有什么方法可以轻松地为项目设置默认首选项 或者在 Eclipse 中导入 导出项目首选项
  • 使用Maven和Spring实现依赖倒置原则

    根据这篇维基百科文章 实现依赖倒置原则 https en wikipedia org wiki Dependency inversion principle DIP implementations可以通过两种方式完成 在单独的包中对低级组件
  • Python 2.5 中的相对导入

    我知道在 Python 中存在很多关于相同导入问题的问题 但似乎没有人能够提供正确用法的清晰示例 假设我们有一个包mypackage有两个模块foo and bar 里面foo我们需要能够访问bar 因为我们还在开发中 mypackage不
  • 如何在login.do提交时将表单密码数据隐藏在Chrome开发者工具网络面板中?

    当正常登录时以简单的html表单提交 Chrome 开发者工具 网络 面板显示这样的表单数据 但在 Facebook Google 等主要网站上 他们不会像这样在网络面板中显示表单数据 怎样才能做到这一点呢 有没有例子或者教程 我们无法隐藏
  • 粗俗的代码。国际奥委会的救援

    In question https stackoverflow com questions 871405 why do i need an ioc container as opposed to straightforward di cod
  • 使用 crontab 安排 python 脚本

    我有一个 bash 脚本 我正在尝试使用 cron 作业运行它 我正在尝试在我的 ubuntu 服务器上运行 cron 作业 我希望它每天 8 小时 UTC 运行 bash 脚本激活 conda python 虚拟环境并运行 python